In the previous part we prepared a local environment for testing lambda function integration with OpenSearch. The code for the first part is here
The code for this part is on branch 2-implement-logic
Prerequisites
I assume, that the OpenSearch cluster runs locally. Please check the Part 1 if you want to follow the same steps as I did.
Goals
With the lambda function and OpenSearch running locally, I can now iterate over the code and add business logic.
I would like my lambda to receive an event with a list of fields to use in the OpenSearch query. Later, I could connect the function to the REST API Gateway or use it as a data source for AppSync.
My next steps are the following:
- prepare OpenSearch query
- update lambda input type
- test lambda locally
OpenSearch query
In my case, a query to OpenSearch looks more or less like this:
{
"size": 2,
"query": {
"bool": {
"must": [
{
"range": {
"AvgTicketPrice": {
"gte": 500,
"lte": 900
}
}
},
{
"match": {
"DestWeather": "Sunny"
}
}
]
}
}
}
I have some matches and range. For the sake of my example, it is enough.
My idea is to create a struct that reflects the structure of the query and to implement a builder pattern, so I can construct the query with ease.
Let's go to the functions/opensearch_service/src/lib.rs and start creating types. Going from the top I create uery->bool->must structure
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenSearchQuery {
query: BoolQuery,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BoolQuery {
bool: MustQuery,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct MustQuery {
must: Vec<QueryStatement>,
}
Thanks to serde and serde_json it's enough to annotate structs with Serialize and Deserialize to have both operations covered.
In my case QueryStatement might have two variants: match or range. Rust provides a native way to define discriminated unions ("OR types"):
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum QueryStatement {
MatchStatement(MatchStatement),
RangeStatement(RangeStatement),
}
Enums can be represented in JSON in different ways. I let serde know that I want to keep them untagged. This way they "disappear" in the JSON leaving only properties of enumerated types.
#[derive(Debug, Serialize, Deserialize)]
pub struct MatchStatement {
#[serde(rename = "match")]
match_statement: Value,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RangeStatement {
range: Value,
}
The word "match" is a reserved keyword. I use match_statement as a struct property and rename it only during serialization.
Value is a type provided by serde_json. It might represent various types available in JSON. I probably could spend more time on making my domain a bit more precise, however, I decided to leave it as this. The main reason is that I will use a builder to construct the query, so I don't expect any ambiguity in the final object.
pub struct OpenSearchQueryBuilder {
query: OpenSearchQuery,
}
impl OpenSearchQueryBuilder {
pub fn new() -> Self {
Self {
query: OpenSearchQuery {
query: BoolQuery {
bool: MustQuery { must: vec![] },
},
},
}
}
pub fn with_must_match(mut self, field: &str, value: String) -> Self {
if value.is_empty() {
return self;
}
self.query
.query
.bool
.must
.push(QueryStatement::MatchStatement(MatchStatement {
match_statement: json!({
field: value
}),
}));
self
}
pub fn with_must_range(mut self, field: &str, from: Option<f64>, to: Option<f64>) -> Self {
let range = json!({
field: {
"gte": from,
"lte": to
}
});
self.query
.query
.bool
.must
.push(QueryStatement::RangeStatement(RangeStatement { range }));
self
}
pub fn build(self) -> OpenSearchQuery {
self.query
}
}
We are almost done here. I updated my first iteration of query_all_docs funtion so it can be used with created OpenSearchQuery type
pub async fn query<T>(
&self,
index: &str,
limit: i64,
offset: i64,
query: OpenSearchQuery,
) -> anyhow::Result<Vec<T>>
where
T: DeserializeOwned,
{
let query_json = json!(query);
println!("query: {}", query_json);
let response = self
.client
.search(SearchParts::Index(&[index]))
.size(limit)
.from(offset)
.body(query_json)
.send()
.await?;
let response_body = response.json::<Value>().await?;
let result = response_body["hits"]["hits"]
.as_array()
.unwrap()
.iter()
.map(|raw_value| serde_json::from_value::<T>(raw_value["_source"].clone()).unwrap())
.collect::<Vec<_>>();
Ok(result)
}
Looks good. Now let's update lambda function code.
Lambda function
In the functions/query/src/main.rs
First of all, I update the Request type.
#[derive(Deserialize, Clone, Copy)]
struct Pagination {
limit: Option<i64>,
offset: Option<i64>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Request {
destination_city_name: Option<String>,
origin_city_name: Option<String>,
destination_weather: Option<String>,
origin_weather: Option<String>,
max_avg_ticket_price: Option<f64>,
min_avg_ticket_price: Option<f64>,
pagination: Option<Pagination>,
}
I can query by destination city and weather, the same for the origin city. There is a way to define average ticket price limits. Finally, we have some basic pagination.
The main point is that all properties are optional, so the caller has a lot of flexibility in querying data.
The function handler stays straightforward - the only new action is building the query
async fn function_handler(os_client: &OpenSearchService, event: LambdaEvent<Request>) -> Result<Response, Error> {
let request_body = event.payload;
let index = "opensearch_dashboards_sample_data_flights";
let limit = request_body.pagination.and_then(|p| p.limit).unwrap_or(10);
let offset = request_body.pagination.and_then(|p| p.offset).unwrap_or(0);
let query = OpenSearchQueryBuilder::new()
.with_must_match("OriginWeather", request_body.origin_weather.unwrap_or("".to_string()))
.with_must_match("DestWeather", request_body.destination_weather.unwrap_or("".to_string()))
.with_must_match("DestCityName", request_body.destination_city_name.unwrap_or("".to_string()))
.with_must_match("OriginCityName", request_body.origin_city_name.unwrap_or("".to_string()))
.with_must_range("AvgTicketPrice", request_body.min_avg_ticket_price, request_body.max_avg_ticket_price)
.build();
let query_result = os_client.query::<FlightData>(index, limit, offset, query).await?;
// Prepare the response
let resp = Response {
flights: query_result,
};
Ok(resp)
}
Testing locally
Using cargo lambda I run the function
cd functions/query
cargo lambda watch
Now I can invoke it with the provided input. I update my events/flights.json
{
"destinationCityName": "London",
"maxAvgTicketPrice": 600,
"minAvgTicketPrice": 400,
"pagination": {
"limit": 1,
"offset": 10
}
}
And, in the second terminal window, I run
cargo lambda invoke -F events/flights.json
๐ ๐ ๐
It works.
Now I can update test events, or create more events, to test my query.
Summary
In this part, we implemented an OpenSearchQuery with a builder to be used by the lambda function. We also updated the lambda function itself, so now it prepares queries based on incoming Request.
For now, we didn't need to touch the AWS cloud. The whole integration was created locally thanks to OpenSearch docker compose and cargo lambda.
I believe that the ability to work locally with the short feedback loop helps boost the developer's performance and feels really nice.
Next steps
In the next part, I plan to define IaC and deploy the solution to the AWS.
Stay tuned!

Top comments (0)