DEV Community

Cover image for Mind Boggling Speed when Caching with Momento and Rust
Benjamen Pyle for AWS Community Builders

Posted on • Originally published at binaryheap.com

Mind Boggling Speed when Caching with Momento and Rust

Summer is here and in the northern hemisphere, temperatures are heating up. Living in North Texas, you get used to the heat and humidity but somehow it still always seems to sneak up on me. As I start this new season (which happens to be my favorite) I wanted to reflect a touch and remember the summer of 2023. That summer, I looked at 6 different aspects of serverless development from the perspective of things I wish I had known when I was getting started. Fast forward to this summer when I started with Does Serverless Still Matter?. What a year it's been for sure. And as I look forward to the next few hot months, I'm going to explore my current focus which is highly performant serverless patterns. And to kick things off, let's get started with caching with Momento and
Rust.

Architecture

I always like to start by describing what it is that I'm going to be building throughout the article. When designing for highly performant Lambda-based solutions, I like to keep things as simple as possible. Since all of these transitions require HTTP requests, latency only grows as more requests enter the mix. Additionally, by choosing Rust as the language for the Lambda Function, I can be assured that I'm getting the best compute performance that is possible.

Caching with Momento and Rust

Project Setup

As I mentioned above, I'm going to be using Rust to build out my Lambda Function. And as I explore caching with Momento and Rust, I'll be using Momento's SDK for Rust. In addition to Rust, I'm building the infrastructure with SAM instead of my usual CDK. I tend to go back and forth. When working in purely serverless setups, I tend to favor SAM for its simplicity. But when I've got more complexity, I lean towards CDK.

SAM Template

The architecture diagram above highlights a few pieces of AWS infrastructure. The template below sets up those necessary pieces for getting started as we dive deeper into caching with Momento and Rust.

Pay close attention to the Rust Lambda Function piece which requires the naming of the handler to be bootstrap. Also to note is that the path in the CodUri points to where the Cargo.toml manifest file is for the Lambda Function handler.

Resources:
  KinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      RetentionPeriodHours: 24
      StreamModeDetails:
        StreamMode: ON_DEMAND

  DynamoDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: Locations
      AttributeDefinitions:
        - AttributeName: location
          AttributeType: S
      KeySchema:
        - AttributeName: location
          KeyType: HASH
      BillingMode: PAY_PER_REQUEST

  RustConsumerFunction:
    Type: AWS::Serverless::Function 
    Metadata:
      BuildMethod: rust-cargolambda 
    Properties:
      FunctionName: kinesis-consumer-model-one-rust
      Environment:
        Variables:
          RUST_LOG: kinesis_consumer=debug
      CodeUri: ./kinesis-consumer-model-one-rust/rust_app # Points to dir of Cargo.toml
      Handler: bootstrap # Do not change, as this is the default executable name produced by Cargo Lambda
      Runtime: provided.al2023
      Architectures:
        - arm64
      Policies:
        - AmazonDynamoDBFullAccess
        - Version: "2012-10-17" 
          Statement:
            - Effect: Allow
              Action:
                - ssm:*
              Resource: "*"
      Events:
        Stream:
          Type: Kinesis
          Properties:
            Stream: !GetAtt KinesisStream.Arn
            StartingPosition: LATEST
            BatchSize: 10
Enter fullscreen mode Exit fullscreen mode

Momento SDK

Diving into the Momento piece of the caching with Momento and Rust, I need to first establish an account, a cache, and an API key. Instead of demonstrating that here, I'll refer you to wonderful documentation that will guide you through that process.

With an API key and cache all configured, I'm going to store that key in an AWS SSM parameter. That can be demonstrated through this code. Feel free to change this if you are following along, but if you don't want to make any adjustments, you'll need this value in SSM

let parameter = client
    .get_parameter()
    .name("/keys/momento-pct-key")
    .send()
    .await?;
Enter fullscreen mode Exit fullscreen mode

Caching with Momento and Rust

First off, the Momento SDK is still less than v1.0 so I'd expect some changes along the way. But in that same thought, it's well-polished for being so new. It has a very AWS SDK feel to it which I LOVE. It's one of the things that I appreciate about working with AWS and the Momento Rust SDK has that same vibe.

I first need to establish a connection or client into the Momento API.

// create a new Momento client
let cache_client = match CacheClient::builder()
    .default_ttl(Duration::from_secs(10))
    .configuration(configurations::Laptop::latest())
    .credential_provider(CredentialProvider::from_string(api_key).unwrap())
    .build()
{
    Ok(c) => c,
    Err(_) => panic!("error with momento client"),
};
Enter fullscreen mode Exit fullscreen mode

With the client established, I can then make requests against the control plane and data plane APIs. For the balance of the article, I'll be using the data plane API to make gets and sets.

Gets

Issuing a get on a cache dictionary is straightforward.

// use the client to execute a Get
match cache_client
    .get("sample-a".to_string(), location.clone())
    .await
{
    Ok(r) => match r {
        // match on OK or Error
        GetResponse::Hit { value } => {
            // A Cache Hit
            tracing::info!("Cache HIT");
            let cached: String = value.try_into().expect("Should have been a string");
            let model = serde_json::from_str(cached.as_ref()).unwrap();
            Ok(Some(model))
        }
        GetResponse::Miss => {
            // A Cache Miss
            tracing::info!("Cache MISS, going to DDB");
            // Code ommitted but included in the main repository ...
        }
    },
    Err(e) => {
        tracing::error!("(Error)={:?}", e);
        Ok(None)
    }
}
Enter fullscreen mode Exit fullscreen mode

As shown above, the get operation will return a Result with the inner value being an Enum that holds information about whether the request was a Hit or a Miss. What I like about this is that the Hit also includes the value retrieved. This is a nice touch as then deserializing into my CacheModel is as simple as executing serde_json::from_str. Again, really nice feature.

Sets

Caching with Momento and Rust was easy and clean with gets, and sets work the same way. Think of it as almost the reverse of the get. Instead of deserializing, I now serialize. Instead of querying, I'm now writing.

let s = serde_json::to_string(cache_model).unwrap();
match cache_client
    .set("sample-a".to_string(), cache_model.location.clone(), s)
    .await
{
    Ok(_) => Ok(()),
    Err(e) => {
        tracing::error!("(Error)={:?}", e);
        Ok(())
    }
}
Enter fullscreen mode Exit fullscreen mode

Final Momento SDK Thoughts

Consider me impressed at my first go with the SDK. The code worked the very first time without having to dive into documentation. The SDK API is based on the common Builder Pattern which makes the configuration of a request simple and readable. There is a common error enum that I then can easily work around with thiserror to take advantage of the Rust ? operator. And lastly, it is highly performant. And that brings me back to this summer exploration. I've executed roughly 65K requests through Kinesis to be processed through my Lambda Function which also makes 65K Momento requests. I consistently saw Momento return me either a hit with the value or a miss at an average of 1.8ms.

Momento Performance

Running the Sample

Let's dive into how to run this sample and see what happens when I do. Caching with Momento and Rust is such a powerful pattern but sometimes a picture can tell more than words. I've written about Rust's performance with Lambda before so you either agree with that data or you don't. I've never steered away from the fact that if you want the maximum amount of speed you can get, then maybe you shouldn't be running in the cloud, using HTTP, and a host of other decisions. If that's the camp you fall in, then 7ms is going to seem slow to you. But for most of us who enjoy the speed and scale of the cloud without the overhead of management and the ability to iterate quickly at a low cost, then 7ms is much better than what you are going to get with another runtime and setup.

Rust's performance shines when paired with Kinesis and Momento.

Rust Performance

The Producer

In the repository's root directory, there is a producer directory that holds a Rust program which will load as many Kinesis records as you want. It will run several threads to loop for a specified duration and write those values into Kinesis. This is a test harness so to speak.

The main function has the below code to handle the threads. I can configure how many, but by default, I'm just going to kick off 1.

// THREAD_COUNT defaults to 1 but can be changed to support multiple threads that'll execute
// the thread_runner function as many times as defined in the RECORD_COUNT
let thread_count_var: Result<String, VarError> = std::env::var("THREAD_COUNT");
let thread_count: i32 = thread_count_var
    .as_deref()
    .unwrap_or("1")
    .parse()
    .expect("THREAD_COUNT must be an int");
while loop_counter < thread_count {
    // create as many threads as defined
    let cloned_client = client.clone();
    let handle = tokio::spawn(async {
        thread_runner(cloned_client).await;
    });
    handles.push(handle);
    loop_counter += 1;
}

while let Some(h) = handles.pop() {
    h.await.unwrap();
}
Enter fullscreen mode Exit fullscreen mode

It then contains a thread_runner function that will loop some number of times (defaults to 10) and write a record into Kinesis. The record has a location field which is selected from an array at random.

async fn thread_runner(client: Client) {
    // record count default to 10
    let record_count_var: Result<String, VarError> = std::env::var("RECORD_COUNT");
    let record_count: i32 = record_count_var
        .as_deref()
        .unwrap_or("10")
        .parse()
        .expect("RECORD_COUNT must be an int");

    // this is where it publishes.
    // RUN the SAM code in the publisher and take the Stream Name and put that in an environment
    // variable to make this work
    let kinesis_stream =
        std::env::var("KINESIS_STREAM_NAME").expect("KINESIS_STREAM_NAME is required");
    let mut i = 0;
    while i < record_count {
        let model_one = ModelOne::new(String::from("Model One"));

        // create a new model in the loop and push into kinesis
        let model_one_json = serde_json::to_string(&model_one);
        let model_one_blob = Blob::new(model_one_json.unwrap());
        let key = model_one.get_id();

        let result = client
            .put_record()
            .data(model_one_blob)
            .partition_key(key)
            .stream_name(kinesis_stream.to_string())
            .send()
            .await;

        match result {
            Ok(_) => {
                println!("Success!");
            }
            Err(e) => {
                println!("Error putting");
                println!("{:?}", e);
            }
        }

        i += 1;
    }
    }
Enter fullscreen mode Exit fullscreen mode

I can then run this program by doing the following.

cd publisher
cargo build
export KINESIS_STREAM_NAME=<the name of the stream>
cargo run
Enter fullscreen mode Exit fullscreen mode

You'll see Success printed into the terminal output and records will start showing up in the Lambda Function.

The Consumer

I'm getting to the end of this sample so let's dive into the consumer. There is a single Lambda Function that brings together caching with Momento and Rust by hooking up to the Kinesis stream and processing the records.

The function handler takes a KinesisEvent, loops the records, and then works with the cache.


async fn function_handler(
    cache_client: &CacheClient,
    ddb_client: &aws_sdk_dynamodb::Client,
    event: LambdaEvent<KinesisEvent>,
) -> Result<(), Error> {
    info!("Starting the loop ...");

    // loop the kinesis records
    for e in event.payload.records {
        // convert the data into a ModelOne
        // ModelOne implements the From trait
        let mut model_one: ModelOne = e.into();
        info!("(ModelOne BEFORE)={:?}", model_one);

        // grab the item from storage
        let result = fetch_item(ddb_client, cache_client, model_one.read_location.clone()).await;
        match result {
            Ok(r) => {
                model_one.location = r;
                info!("(ModelOne AFTER)={:?}", model_one);
            }
            Err(e) => {
                error!("(Err)={:?}", e);
            }
        }
    }

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

The main operation inside of the loop is the fetch_item. I've written a good bit about Rust and DynamoDB so I'm not going to highlight the code below, but the way it works is if the item isn't found in the fetch to Momento, it then goes to DynamoDB to grab the record and then execute the set operation that I showed above. The key to making this work in this sample is to have the records in DynamoDB so that I have something to set.

My ModelOne struct has a location field which is one of the three values. ['Car', 'House', 'Diner']. Insert the following records into the Locations table created by the SAM infrastructure template.

{
    "location": "Car",
    "description": "Car description",
    "notes": "Car notes"
}
{
    "location": "Diner",
    "description": "Diner description",
    "notes": "Diner notes"
}
{
    "location": "House",
    "description": "House description",
    "notes": "House notes"
}
Enter fullscreen mode Exit fullscreen mode

And that'll do it. When you run the producer above, you'll see a host of output into CloudWatch that highlights the Hits, Misses, DynamoDB queries, and the printing out of a large number of ModelOne structs.

Wrapping Up

I wrote a few blocks above that 7ms might not be the speed you are looking for, but I'd present you with another opinion. With serverless, I don't stress over the infrastructure, the durability, reliability, or the fact that I might need 10x more capacity today than I needed yesterday. Yes, that comes at a premium but as builders, we need to know how tools and know when they are right and when they are wrong. Serverless to me is still the right solution more than it is the wrong one. And paired with Momento and Rust, I can get a highly performant and extremely scalable solution with very little investment. That will stretch a long way for so many that are shipping value.

To demonstrate that, here's a comparison of when the record was written to Kinesis and when it was read and processed. I'm more than happy with 16ms from write to read. That'll take care of the performance criteria I have in so many requirements.

Record write read

This is just the first of many scenarios I plan to look at this summer. High performance and serverless aren't at odds. They go hand in hand. And by using the right tools, you can even further enhance your user's experience. Because speed does just that. Enhance user experience. I hope you've enjoyed Caching with Momento and Rust.

And as always, here is the GitHub repository I've been working through

Thanks for reading and happy building!

Top comments (0)