Introduction
Rust is well suited for creating low-latency lambda functions. It comes well with OpenSearch and lets build extremely performant integrations.
I plan to create a serverless application, that would use OpenSearch, and document my learnings along the way. I will create a few blog posts to keep things short and sweet.
Project structure
The code for this blog post is here (branch 1-local-setup)
Dependencies
Infrastructure will be handled with AWS CDK.
For lambda-related tasks, I am going to use great cargo lambda
Architecture
To be honest I haven't decided yet how I want to use created lambdas. It is not crucial for the local setup though.
Run OpenSearch locally and connect with the Rust client
Folders structure
I would like to start development locally because there is a lot of stuff I can build and test before going to the cloud. I love to have a short feedback loop.
Having that in mind I start by creating a root project as AWS CDK because eventually, I want to use it as my IaC.
cdk init --language typescript
Now I define a new folder functions and subfolders query and opensearch_service. functions is my root folder for lambda functions.
Run OpenSearch locally
A local instance of OpenSearch is pretty handy. There are different ways to achieve it but for me, the easiest way to run OpenSearch locally  was to use docker-compose.yml from official site
I just copied docker-compose file to the root folder of my project and ran docker compose up. I needed also to apply some changes in the system configuration, but it wasn't hard at all.
Once all containers are running, I can go to http://localhost:5601/app/home#/ and start playing with the OpenSearch dashboard (user: admin, password: admin)
That's nice!
I need data to test my functions, so I go to Add sample data and pick Sample Flights Data to be loaded
Now I go to dev tools and confirm, that data was loaded
My local OpenSearch instance is ready.
  
  
  Rust OpenSearchService
I plan to put functionalities related to OpenSearch in a separate library, so I can use them in different functions later on. 
I use the cargo feature which is a workspace. I can create a few crates in a single workspace (eg. different lambda functions, common libraries, etc.) 
In the functions/ folder  I put Cargo.toml file with the following content:
[workspace]
members = [
    "query",
    "opensearch_service",
]
I go to the functions/opensearch_service folder and run cargo init --lib. Here I will place functionalities I want to be shared by different functions. For now, I need a client to connect to OpenSearch and perform a dummy query.
I will use the following dependencies: opensearch, serde, serde_json, anyhow. For me using cargo add is the most convenient way.
cargo add serde -F derive
cargo add opensearch -F aws-auth
cargo add anyhow serde_json
The OpenSearchService at this point is simple:
pub struct OpenSearchService {
    client: OpenSearch,    
}
I implement a function for creating a client with a local connection:
// ...
impl OpenSearchService {
    pub fn local_client() -> Self {
        let url = Url::parse("https://localhost:9200").unwrap();
        let conn_pool = SingleNodeConnectionPool::new(url);
        let credentials =
            opensearch::auth::Credentials::Basic("admin".to_string(), "admin".to_string());
        let cert_validation = opensearch::cert::CertificateValidation::None;
        let transport = TransportBuilder::new(conn_pool)
            .cert_validation(cert_validation)
            .auth(credentials)
            .build().unwrap();
        let client = OpenSearch::new(transport);
        Self { client }
    }
To be honest I struggled a lot with setting up this connection. It turned out that I needed to use a custom URL (the default one in use is http://) and configure the certificate validation flow.
Once the client is ready, I build the function that runs an empty must query and parses the results:
// ...
// impl OpenSearchService {
// ... 
pub async fn query_all_docs<T>(&self, index: &str, limit: i64) -> anyhow::Result<Vec<T>> 
    where T: DeserializeOwned
    {
        let response = self.client
            .search(SearchParts::Index(&[index]))
            .size(limit)
            .from(0)
            .body(json!({
                "query": {
                    "bool": {
                        "must": []
                    }
                }
            }))
            .send()
            .await?;
        let response_body = response.json::<Value>().await?;
        let result = response_body["hits"]["hits"]
            .as_array()
            .unwrap()
            .iter()
            .map(|raw_value| {
                serde_json::from_value::<T>(raw_value["_source"].clone()).unwrap()
            })
            .collect::<Vec<_>>();
        Ok(result)
    }
The query function is generic, so it can be used for different types in a type-safe manner. A thing to remember is that we need to add some bounds for the generic type to make sure, that it can be deserialized where T: DeserializeOwned. If it is missing, the compiler in cooperation with stack overflow would help us close the gap.
Ok, the initial implementation of the OpenSearchService is in place, let's create a lambda function.
  
  
  Rust query lambda function
I go to the functions/query and run cargo lambda init. When prompted I select no for the question about creating http handler, and I leave the event type empty. 
In the Cargo.toml just created library can be added using the path
# ...
[dependencies]
opensearch_service = { path = "../opensearch_service"}
#...
I will use also serde and serde_json.
To be able to query my sample dataset I need to define a type for it. I create FlightData struct and copy fields from the OpenSearch dashboard.
In cases like this I usually just paste the commented json and start creating a struct - Code Whisperer is clever enough to figure out what I need
Finally, the structure looks like this:
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct FlightData {
    flight_num: String,
    dest_country: String,
    origin_weather: String,
    origin_city_name: String,
    avg_ticket_price: f32,
    distance_miles: f32,
    flight_delay: bool,
    dest_weather: String,
    dest: String,
    flight_delay_type: String,
    origin_country: String,
    #[serde(rename = "dayOfWeek")]
    day_of_week: u8,
    distance_kilometers: f32,
    #[serde(rename = "timestamp")]
    timestamp: String,
    dest_location: Location,
    #[serde(rename = "DestAirportID")]
    dest_airport_id: String,
    carrier: String,
    cancelled: bool,
    flight_time_min: f32,
    origin: String,
    origin_location: Location,
    dest_region: String,
    #[serde(rename = "OriginAirportID")]
    origin_airport_id: String,
    origin_region: String,
    dest_city_name: String,
    flight_time_hour: f32,
    flight_delay_min: i64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Location {
    lat: String,
    lon: String,
}
For initial testing, request, and response can be simple:
#[derive(Deserialize)]
struct Request {
    limit: i64,
}
#[derive(Serialize)]
struct Response {
    flights: Vec<FlightData>,
}
async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
// ...
Once types are in place, lambda itself is pretty straightforward
// ...
async fn function_handler(os_client: &OpenSearchService, event: LambdaEvent<Request>) -> Result<Response, Error> {
    let limit = event.payload.limit;
    let index = "opensearch_dashboards_sample_data_flights";
    let result = os_client.query_all_docs::<FlightData>(index, limit).await?;
    // Prepare the response
    let resp = Response {
        flights: result,
    };
    Ok(resp)
}
#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing::init_default_subscriber();
    let os_client = opensearch_service::OpenSearchService::local_client();
    run(service_fn(|event: LambdaEvent<Request>| {
        function_handler(&os_client, event)
    })).await
}
//...
Testing lambda locally
Once all is prepared I run the function and see if it works. I create a dummy test event in the functions/query/events/flights.json file
{
    "limit": 5
}
Testing lambda locally with cargo lambda is a pleasant experience. I open two terminal windows and place them one next to the other. I run cargo lambda watch in the right one, and cargo lambda invoke -F events/flights.json in the left.
As you might imagine, I needed a few iterations before I got the FlightData deserialization properly defined. With cargo lambda iterating is super smooth - in the watch mode lambda is rebuilt automatically on save and can be invoked right away. 
Summary
In the current post, I prepared a local environment for building a lambda function that consumes data from OpenSearch. This is a nice starting point.
As the next steps, I will implement more real-life queries, write an indexer function to populate data, and finally, deploy the whole solution to AWS. Bear with me if you are interested in this topic.
 






 
    
Top comments (0)