DEV Community

OpenSearch with AWS Lambda Rust (part 2) - business logic

In the previous part we prepared a local environment for testing lambda function integration with OpenSearch. The code for the first part is here

The code for this part is on branch 2-implement-logic

Prerequisites

I assume, that the OpenSearch cluster runs locally. Please check the Part 1 if you want to follow the same steps as I did.

Goals

With the lambda function and OpenSearch running locally, I can now iterate over the code and add business logic.

I would like my lambda to receive an event with a list of fields to use in the OpenSearch query. Later, I could connect the function to the REST API Gateway or use it as a data source for AppSync.

My next steps are the following:

  • prepare OpenSearch query
  • update lambda input type
  • test lambda locally

OpenSearch query

In my case, a query to OpenSearch looks more or less like this:

{
  "size": 2,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "AvgTicketPrice": {
              "gte": 500,
              "lte": 900
            }
          }
        },
        {
          "match": {
            "DestWeather": "Sunny"
          }
        }
      ]
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

I have some matches and range. For the sake of my example, it is enough.

My idea is to create a struct that reflects the structure of the query and to implement a builder pattern, so I can construct the query with ease.

Let's go to the functions/opensearch_service/src/lib.rs and start creating types. Going from the top I create uery->bool->must structure

#[derive(Debug, Serialize, Deserialize)]
pub struct OpenSearchQuery {
    query: BoolQuery,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct BoolQuery {
    bool: MustQuery,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct MustQuery {
    must: Vec<QueryStatement>,
}
Enter fullscreen mode Exit fullscreen mode

Thanks to serde and serde_json it's enough to annotate structs with Serialize and Deserialize to have both operations covered.

In my case QueryStatement might have two variants: match or range. Rust provides a native way to define discriminated unions ("OR types"):

#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
enum QueryStatement {
    MatchStatement(MatchStatement),
    RangeStatement(RangeStatement),
}
Enter fullscreen mode Exit fullscreen mode

Enums can be represented in JSON in different ways. I let serde know that I want to keep them untagged. This way they "disappear" in the JSON leaving only properties of enumerated types.

#[derive(Debug, Serialize, Deserialize)]
pub struct MatchStatement {
    #[serde(rename = "match")]
    match_statement: Value,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct RangeStatement {
    range: Value,
}
Enter fullscreen mode Exit fullscreen mode

The word "match" is a reserved keyword. I use match_statement as a struct property and rename it only during serialization.

Value is a type provided by serde_json. It might represent various types available in JSON. I probably could spend more time on making my domain a bit more precise, however, I decided to leave it as this. The main reason is that I will use a builder to construct the query, so I don't expect any ambiguity in the final object.

pub struct OpenSearchQueryBuilder {
    query: OpenSearchQuery,
}

impl OpenSearchQueryBuilder {
    pub fn new() -> Self {
        Self {
            query: OpenSearchQuery {
                query: BoolQuery {
                    bool: MustQuery { must: vec![] },
                },
            },
        }
    }

    pub fn with_must_match(mut self, field: &str, value: String) -> Self {
        if value.is_empty() {
            return self;
        }
        self.query
            .query
            .bool
            .must
            .push(QueryStatement::MatchStatement(MatchStatement {
                match_statement: json!({
                    field: value
                }),
            }));
        self
    }

    pub fn with_must_range(mut self, field: &str, from: Option<f64>, to: Option<f64>) -> Self {
        let range = json!({
            field: {
                "gte": from,
                "lte": to
            }
        });

        self.query
            .query
            .bool
            .must
            .push(QueryStatement::RangeStatement(RangeStatement { range }));
        self
    }

    pub fn build(self) -> OpenSearchQuery {
        self.query
    }
}
Enter fullscreen mode Exit fullscreen mode

We are almost done here. I updated my first iteration of query_all_docs funtion so it can be used with created OpenSearchQuery type

pub async fn query<T>(
        &self,
        index: &str,
        limit: i64,
        offset: i64,
        query: OpenSearchQuery,
    ) -> anyhow::Result<Vec<T>>
    where
        T: DeserializeOwned,
    {
        let query_json = json!(query);

        println!("query: {}", query_json);

        let response = self
            .client
            .search(SearchParts::Index(&[index]))
            .size(limit)
            .from(offset)
            .body(query_json)
            .send()
            .await?;

        let response_body = response.json::<Value>().await?;

        let result = response_body["hits"]["hits"]
            .as_array()
            .unwrap()
            .iter()
            .map(|raw_value| serde_json::from_value::<T>(raw_value["_source"].clone()).unwrap())
            .collect::<Vec<_>>();

        Ok(result)
    }
Enter fullscreen mode Exit fullscreen mode

Looks good. Now let's update lambda function code.

Lambda function

In the functions/query/src/main.rs

First of all, I update the Request type.

#[derive(Deserialize, Clone, Copy)]
struct Pagination {
    limit: Option<i64>,
    offset: Option<i64>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Request {
    destination_city_name: Option<String>,
    origin_city_name: Option<String>,
    destination_weather: Option<String>,
    origin_weather: Option<String>,
    max_avg_ticket_price: Option<f64>,
    min_avg_ticket_price: Option<f64>,
    pagination: Option<Pagination>,
}
Enter fullscreen mode Exit fullscreen mode

I can query by destination city and weather, the same for the origin city. There is a way to define average ticket price limits. Finally, we have some basic pagination.

The main point is that all properties are optional, so the caller has a lot of flexibility in querying data.

The function handler stays straightforward - the only new action is building the query

async fn function_handler(os_client: &OpenSearchService, event: LambdaEvent<Request>) -> Result<Response, Error> {

    let request_body = event.payload;

    let index = "opensearch_dashboards_sample_data_flights";

    let limit = request_body.pagination.and_then(|p| p.limit).unwrap_or(10);

    let offset = request_body.pagination.and_then(|p| p.offset).unwrap_or(0);

    let query = OpenSearchQueryBuilder::new()
        .with_must_match("OriginWeather", request_body.origin_weather.unwrap_or("".to_string()))
        .with_must_match("DestWeather", request_body.destination_weather.unwrap_or("".to_string()))
        .with_must_match("DestCityName", request_body.destination_city_name.unwrap_or("".to_string()))
        .with_must_match("OriginCityName", request_body.origin_city_name.unwrap_or("".to_string()))
        .with_must_range("AvgTicketPrice", request_body.min_avg_ticket_price, request_body.max_avg_ticket_price)
        .build();

    let query_result = os_client.query::<FlightData>(index, limit, offset, query).await?;

    // Prepare the response
    let resp = Response {
        flights: query_result,
    };

    Ok(resp)
}
Enter fullscreen mode Exit fullscreen mode

Testing locally

Using cargo lambda I run the function

cd functions/query
cargo lambda watch
Enter fullscreen mode Exit fullscreen mode

Now I can invoke it with the provided input. I update my events/flights.json

{
    "destinationCityName": "London",
    "maxAvgTicketPrice": 600,
    "minAvgTicketPrice": 400,
    "pagination": {
        "limit": 1,
        "offset": 10
    }
}
Enter fullscreen mode Exit fullscreen mode

And, in the second terminal window, I run

cargo lambda invoke -F events/flights.json
Enter fullscreen mode Exit fullscreen mode

Image description

🎉 🎉 🎉

It works.

Now I can update test events, or create more events, to test my query.

Summary

In this part, we implemented an OpenSearchQuery with a builder to be used by the lambda function. We also updated the lambda function itself, so now it prepares queries based on incoming Request.

For now, we didn't need to touch the AWS cloud. The whole integration was created locally thanks to OpenSearch docker compose and cargo lambda.

I believe that the ability to work locally with the short feedback loop helps boost the developer's performance and feels really nice.

Next steps

In the next part, I plan to define IaC and deploy the solution to the AWS.

Stay tuned!

Top comments (0)