DEV Community

szymon-szym
szymon-szym

Posted on

2

OpenSearch with AWS Lambda Rust (part 1) - set up the local environment

Introduction

Rust is well suited for creating low-latency lambda functions. It comes well with OpenSearch and lets build extremely performant integrations.

I plan to create a serverless application, that would use OpenSearch, and document my learnings along the way. I will create a few blog posts to keep things short and sweet.

Project structure

The code for this blog post is here (branch 1-local-setup)

Dependencies

Infrastructure will be handled with AWS CDK.
For lambda-related tasks, I am going to use great cargo lambda

Architecture

To be honest I haven't decided yet how I want to use created lambdas. It is not crucial for the local setup though.

Run OpenSearch locally and connect with the Rust client

Folders structure

I would like to start development locally because there is a lot of stuff I can build and test before going to the cloud. I love to have a short feedback loop.

Having that in mind I start by creating a root project as AWS CDK because eventually, I want to use it as my IaC.



cdk init --language typescript


Enter fullscreen mode Exit fullscreen mode

Now I define a new folder functions and subfolders query and opensearch_service. functions is my root folder for lambda functions.

Run OpenSearch locally

A local instance of OpenSearch is pretty handy. There are different ways to achieve it but for me, the easiest way to run OpenSearch locally was to use docker-compose.yml from official site

I just copied docker-compose file to the root folder of my project and ran docker compose up. I needed also to apply some changes in the system configuration, but it wasn't hard at all.

Once all containers are running, I can go to http://localhost:5601/app/home#/ and start playing with the OpenSearch dashboard (user: admin, password: admin)

Image description

That's nice!

I need data to test my functions, so I go to Add sample data and pick Sample Flights Data to be loaded

Image description

Now I go to dev tools and confirm, that data was loaded

Image description

My local OpenSearch instance is ready.

Rust OpenSearchService

I plan to put functionalities related to OpenSearch in a separate library, so I can use them in different functions later on.
I use the cargo feature which is a workspace. I can create a few crates in a single workspace (eg. different lambda functions, common libraries, etc.)
In the functions/ folder I put Cargo.toml file with the following content:



[workspace]

members = [
    "query",
    "opensearch_service",
]


Enter fullscreen mode Exit fullscreen mode

I go to the functions/opensearch_service folder and run cargo init --lib. Here I will place functionalities I want to be shared by different functions. For now, I need a client to connect to OpenSearch and perform a dummy query.

I will use the following dependencies: opensearch, serde, serde_json, anyhow. For me using cargo add is the most convenient way.



cargo add serde -F derive
cargo add opensearch -F aws-auth
cargo add anyhow serde_json


Enter fullscreen mode Exit fullscreen mode

The OpenSearchService at this point is simple:



pub struct OpenSearchService {
    client: OpenSearch,    
}


Enter fullscreen mode Exit fullscreen mode

I implement a function for creating a client with a local connection:



// ...
impl OpenSearchService {

    pub fn local_client() -> Self {
        let url = Url::parse("https://localhost:9200").unwrap();
        let conn_pool = SingleNodeConnectionPool::new(url);
        let credentials =
            opensearch::auth::Credentials::Basic("admin".to_string(), "admin".to_string());
        let cert_validation = opensearch::cert::CertificateValidation::None;
        let transport = TransportBuilder::new(conn_pool)
            .cert_validation(cert_validation)
            .auth(credentials)
            .build().unwrap();
        let client = OpenSearch::new(transport);

        Self { client }
    }


Enter fullscreen mode Exit fullscreen mode

To be honest I struggled a lot with setting up this connection. It turned out that I needed to use a custom URL (the default one in use is http://) and configure the certificate validation flow.

Once the client is ready, I build the function that runs an empty must query and parses the results:



// ...
// impl OpenSearchService {
// ... 
pub async fn query_all_docs<T>(&self, index: &str, limit: i64) -> anyhow::Result<Vec<T>> 
    where T: DeserializeOwned
    {
        let response = self.client
            .search(SearchParts::Index(&[index]))
            .size(limit)
            .from(0)
            .body(json!({
                "query": {
                    "bool": {
                        "must": []
                    }
                }
            }))
            .send()
            .await?;

        let response_body = response.json::<Value>().await?;

        let result = response_body["hits"]["hits"]
            .as_array()
            .unwrap()
            .iter()
            .map(|raw_value| {
                serde_json::from_value::<T>(raw_value["_source"].clone()).unwrap()
            })
            .collect::<Vec<_>>();

        Ok(result)
    }


Enter fullscreen mode Exit fullscreen mode

The query function is generic, so it can be used for different types in a type-safe manner. A thing to remember is that we need to add some bounds for the generic type to make sure, that it can be deserialized where T: DeserializeOwned. If it is missing, the compiler in cooperation with stack overflow would help us close the gap.

Ok, the initial implementation of the OpenSearchService is in place, let's create a lambda function.

Rust query lambda function

I go to the functions/query and run cargo lambda init. When prompted I select no for the question about creating http handler, and I leave the event type empty.

In the Cargo.toml just created library can be added using the path



# ...
[dependencies]

opensearch_service = { path = "../opensearch_service"}
#...


Enter fullscreen mode Exit fullscreen mode

I will use also serde and serde_json.

To be able to query my sample dataset I need to define a type for it. I create FlightData struct and copy fields from the OpenSearch dashboard.
In cases like this I usually just paste the commented json and start creating a struct - Code Whisperer is clever enough to figure out what I need

Image description

Finally, the structure looks like this:



#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct FlightData {
    flight_num: String,
    dest_country: String,
    origin_weather: String,
    origin_city_name: String,
    avg_ticket_price: f32,
    distance_miles: f32,
    flight_delay: bool,
    dest_weather: String,
    dest: String,
    flight_delay_type: String,
    origin_country: String,
    #[serde(rename = "dayOfWeek")]
    day_of_week: u8,
    distance_kilometers: f32,
    #[serde(rename = "timestamp")]
    timestamp: String,
    dest_location: Location,
    #[serde(rename = "DestAirportID")]
    dest_airport_id: String,
    carrier: String,
    cancelled: bool,
    flight_time_min: f32,
    origin: String,
    origin_location: Location,
    dest_region: String,
    #[serde(rename = "OriginAirportID")]
    origin_airport_id: String,
    origin_region: String,
    dest_city_name: String,
    flight_time_hour: f32,
    flight_delay_min: i64,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Location {
    lat: String,
    lon: String,
}


Enter fullscreen mode Exit fullscreen mode

For initial testing, request, and response can be simple:



#[derive(Deserialize)]
struct Request {
    limit: i64,
}

#[derive(Serialize)]
struct Response {
    flights: Vec<FlightData>,
}

async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
// ...


Enter fullscreen mode Exit fullscreen mode

Once types are in place, lambda itself is pretty straightforward



// ...
async fn function_handler(os_client: &OpenSearchService, event: LambdaEvent<Request>) -> Result<Response, Error> {

    let limit = event.payload.limit;

    let index = "opensearch_dashboards_sample_data_flights";

    let result = os_client.query_all_docs::<FlightData>(index, limit).await?;
    // Prepare the response
    let resp = Response {
        flights: result,
    };

    Ok(resp)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing::init_default_subscriber();

    let os_client = opensearch_service::OpenSearchService::local_client();


    run(service_fn(|event: LambdaEvent<Request>| {
        function_handler(&os_client, event)
    })).await
}
//...


Enter fullscreen mode Exit fullscreen mode

Testing lambda locally

Once all is prepared I run the function and see if it works. I create a dummy test event in the functions/query/events/flights.json file



{
    "limit": 5
}


Enter fullscreen mode Exit fullscreen mode

Testing lambda locally with cargo lambda is a pleasant experience. I open two terminal windows and place them one next to the other. I run cargo lambda watch in the right one, and cargo lambda invoke -F events/flights.json in the left.

Image description

As you might imagine, I needed a few iterations before I got the FlightData deserialization properly defined. With cargo lambda iterating is super smooth - in the watch mode lambda is rebuilt automatically on save and can be invoked right away.

Summary

In the current post, I prepared a local environment for building a lambda function that consumes data from OpenSearch. This is a nice starting point.

As the next steps, I will implement more real-life queries, write an indexer function to populate data, and finally, deploy the whole solution to AWS. Bear with me if you are interested in this topic.

Billboard image

Deploy and scale your apps on AWS and GCP with a world class developer experience

Coherence makes it easy to set up and maintain cloud infrastructure. Harness the extensibility, compliance and cost efficiency of the cloud.

Learn more

Top comments (0)

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay