loading...
AWS Community Builders

Building an AWS Lambda extension with Rust

duarten profile image Duarte Nunes Updated on ・9 min read

AWS Lambda extensions were recently announced. An extension is a long running process, executed alongside your Lambda function, helping with use cases like:

  • capturing telemetry;
  • doing work outside the invocation path, like refreshing configuration settings, secrets, or feature flags;
  • providing a language-agnostic way to implement some common behavior;
  • etc.

As an example, imagine a framework that provides utilities for writing integration tests as Lambda functions in arbitrary languages. Those utilities can help with sending captured production traffic to a particular Lambda version, or with comparing invocation responses against historical data.

We'll be building something much simpler though.

Before we delve into the code, a brief overview of how Lambda extensions behave.

Anatomy of an extension

An extension is a program distributed as a Lambda layer that extracts to a special directory named "extensions". Programs in that directory are executed by Lambda and have three distinct phases:

  1. The init phase, covering initialization logic and extension registration;
  2. The invoke phase, in which the extension polls for incoming invocation events;
  3. The shutdown phase, for cleanup logic.

An extension interacts with the Lambda service through the Extensions API, an HTTP API similar to the Runtime API, which allows custom runtimes to receive invocation events from Lambda. (An example of such a runtime is the one we'll be using, the aws-lambda-rust-runtime.)

The documentation for Lambda extensions contains the following helpful diagram:

Sequence diagram

You'll note that the diagram includes two extensions of different types. There can be up to 10 extensions for a function, and an extension can be internal or external. Internal extensions run as separate threads within the runtime process, which starts and stops them. As such, they don't handle the shutdown event. External extensions - which is what we'll be building - run as independent processes in the execution environment. This is what allows them to be written in a different language than the function.

The documentation suggests external extensions be implemented as a compiled language so they are compatible with all runtimes. Let's start! 🦀

A Rusty extension

Our extension needs to make HTTP requests, parse JSON, and deal with errors. These are the crates we're using for those tasks:

[dependencies]
anyhow = "1.0"
serde = "1.0"
serde_json = "1.0"
reqwest = { version = "0.10.8", default-features = false, features = ["blocking", "json"] }
Enter fullscreen mode Exit fullscreen mode

Our entry point will look like this:

fn main() -> Result<()> {
    let client = Client::builder().timeout(None).build()?;
    let r = register(&client)?;
    loop {
        std::thread::sleep(time::Duration::from_secs(1));
        println!("Waiting for event...");
        match next_event(&client, &r.extension_id) {
            // process event, that is, print some stuff
        }
    }
} 
Enter fullscreen mode Exit fullscreen mode

The entry point consists of initialization logic and a main event loop (this is similar to how we structure a Lambda function, with initialization logic living outside of the event handler). In this example, we just initialize the HTTP client and register the extension. If you're wondering whether it's a good idea to use an infinite timeout for our HTTP calls, the answer is yes: the extension can be suspended for an arbitrary period of time until there is an event to return, which is also why we're using blocking I/O for our interaction with the Extension API.

The init phase completes when we enter the event loop and request an event for the first time; the Lambda service now knows we're ready to process events. Events are only allowed to come in when the Lambda function is done initializing and all extensions reach this point. This means that the initialization we do in our extensions directly impacts cold start times. (On the topic of cold starts, I recommend this video by Marc Brooker on virtualization technology underpinning Lambda!)

Notice that we're adding a 1 second sleep, as it'll make it easier to point out some behavior when we look at logs.

Registering the extension is a simple HTTP POST:

#[derive(Debug)]
struct RegisterResponse {
    pub extension_id: String,
}

fn register(client: &reqwest::blocking::Client) -> Result<RegisterResponse> {
    let mut map = HashMap::new();
    map.insert("events", vec!["INVOKE", "SHUTDOWN"]);
    let url = format!("{}/register", base_url()?);
    let res = client
        .post(&url)
        .header(EXTENSION_NAME_HEADER, EXTENSION_NAME)
        .json(&map)
        .send()?;

    ensure!(
        res.status() == StatusCode::OK,
        "Unable to register extension",
    );

    let ext_id = res.headers().get(EXTENSION_ID_HEADER).unwrap().to_str()?;

    Ok(RegisterResponse {
        extension_id: ext_id.into(),
    })
}
Enter fullscreen mode Exit fullscreen mode

The most important thing about this operation is that the extension name must match the filename of the binary, or else we'll get back a 403 response.

The base_url() returns the endpoint for the Extension API, at http://$AWS_LAMBDA_RUNTIME_API/2020-01-01/extension.

As an external extension, we register for the shutdown event. We get back the extension ID, which we flow back on subsequent API calls.

Requesting the next event is also a simple HTTP operation:

fn next_event(client: &reqwest::blocking::Client, ext_id: &str) -> Result<NextEventResponse> {
    let url = format!("{}/event/next", base_url()?);
    Ok(client
        .get(&url)
        .header(EXTENSION_ID_HEADER, ext_id)
        .send()?
        .json()?)
}
Enter fullscreen mode Exit fullscreen mode

Where NextEventResponse is define as:

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Tracing {
    pub r#type: String,
    pub value: String,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "UPPERCASE", tag = "eventType")]
enum NextEventResponse {
    #[serde(rename_all = "camelCase")]
    Invoke {
        deadline_ms: u64,
        request_id: String,
        invoked_function_arn: String,
        tracing: Tracing,
    },
    #[serde(rename_all = "camelCase")]
    Shutdown {
        shutdown_reason: String,
        deadline_ms: u64,
    },
}
Enter fullscreen mode Exit fullscreen mode

As I'm sure you've noticed, the event's payload is conspicuously missing from these definitions. That's because the invocation event sent to each extension contains only metadata. The only way for the extension to get the payload is by communicating with the function, which we'll cover in the next section. But first, let's look at CloudWatch logs for an invocation of a simple test Lambda that's deployed using this extension:

timestamp message
17:26:52.325-03:00 START RequestId: d11df8ca-1955-4797-a19d-5de248d0dc86 Version: $LATEST
17:26:52.325-03:00 Waiting for event...
17:26:52.325-03:00 EXTENSION Name: sidecar State: Ready Events: [INVOKE,SHUTDOWN]
17:26:52.327-03:00 Invoke event d11df8ca-1955-4797-a19d-5de248d0dc86; deadline: 1464625
17:26:52.344-03:00 Hello, world!
17:26:53.327-03:00 Waiting for event... (1 second after)
17:26:53.327-03:00 END RequestId: d11df8ca-1955-4797-a19d-5de248d0dc86
17:26:53.327-03:00 REPORT RequestId: d11df8ca-1955-4797-a19d-5de248d0dc86 Duration: 1001.13 ms Billed Duration: 2100 ms Memory Size: 128 MB Max Memory Used: 34 MB Init Duration: 1039.65 ms
17:37:51.351-03:00 Exiting: spindown

The first line is logged by the Lambda service right before the extension logs it's waiting for an event. We're not seeing the initial 1 second delay in the logs, but it is there: notice the last log line, where the init duration is 1039.65 ms. We can see that indeed, Lambda waits for extensions to become initialized before handling incoming requests.

The function logs "Hello, world!", which is all that it does. After 1 second, the extension signals it's ready for more events. Notice that it's only after this delay that the Lambda service prints the "END" log entry. Lambda waits until all extensions have finished processing an event before sending out the Lambda function's response. Extensions signal that they have finished processing an event by requesting the next one.

An important thing to keep in mind is that an extension can add cold start latency and add latency to each request.

Roughly 10 minutes after that singleton event is processed, the Lambda service sends the extension a shutdown event with the "spindown" reason. During shutdown, the extension has two seconds to execute any cleanup logic.

What we have now isn't terribly useful. Let's add a little bit more code.

Extension <-> Function IPC

For some use-cases it's useful for the extension to communicate with the function, but there's no built-in or established way to do that. Fortunately, the extension and the function share the memory and /tmp disk storage, so there's plenty of ways to fashion an IPC mechanism for this purpose.

We could create a FIFO, which is a simple approach for unidirectional communication (and a pair of FIFOs could be used for full-duplex communication). We could use a domain socket, or, similarly, the extension could start an HTTP server. This would fit well as a simple but flexible way for the integration test Lambda to communicate with the helpful, but imaginary extension we gave as an example in the beginning of the post.

Yet another alternative is to use the filesystem, which is better suited for unidirectional communication. The extension can expect that the lambda writes a file named with the request ID after each invocation, containing some result. This is exactly what we're going to implement.

The extension must know when the file is created and when the function is finished writing to it. It must also tolerate failures and deal with the absence of the file. We could use inotify or some sophisticated approach to deal with all potential corner cases, but we'll instead assume the system tolerates a 10 minute delay between the function ending and the results becoming available to the extension, and just process the previous event's result when the next one arrives.

Our extension entry point becomes:

fn main() -> Result<()> {
    let client = Client::builder().timeout(None).build()?;
    let r = register(&client)?;
    let mut prev_request: Option<String> = Option::None;
    loop {
        std::thread::sleep(time::Duration::from_secs(1));
        println!("Waiting for event...");
        let evt = next_event(&client, &r.extension_id);
        prev_request.map(process_result);
        match evt {
            Ok(evt) => match evt {
                NextEventResponse::Invoke {
                    request_id,
                    deadline_ms,
                    ..
                } => {
                    println!("Invoke event {}; deadline: {}", request_id, deadline_ms);
                    prev_request = Some(request_id);
                }
                NextEventResponse::Shutdown {
                    shutdown_reason, ..
                } => {
                    println!("Exiting: {}", shutdown_reason);
                    return Ok(());
                }
            },
            Err(err) => {
                eprintln!("Error: {:?}", err);
                println!("Exiting");
                return Err(err);
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The relevant addition is the prev_request variable, which hold the ID of the previous event. process_result() is defined as

#[derive(Deserialize)]
struct InvocationResult {
    payload: Value,
}

fn read_result(req_id: String) -> Result<InvocationResult> {
    let filename = format!("/tmp/{}", req_id);
    let f = fs::File::open(filename)?;
    let reader = BufReader::new(f);
    let res = serde_json::from_reader(reader)?;
    Ok(res)
}

fn process_result(req_id: String) {
    match read_result(req_id) {
        Ok(InvocationResult { payload }) => println!("Payload: {}", payload),
        Err(e) => eprintln!("Error processing invocation result: {:?}", e),
    }
}
Enter fullscreen mode Exit fullscreen mode

Basically, it reads a file at /tmp/{id} containing a JSON object with the "payload" key mapping to the invocation's payload. The Lambda function writes the files:

fn handler(v: Value, ctx: Context) -> Result<(), HandlerError> {
    println!("Hello, world!");
    let wrapped = json!({ "payload": v });
    match File::create(format!("/tmp/{}", ctx.aws_request_id)) {
        Ok(mut file) => match file.write_all(wrapped.to_string().as_bytes()) {
            Ok(_) => Ok(()),
            Err(e) => Err(e.to_string().as_str().into()),
        },
        Err(e) => Err(e.to_string().as_str().into()),
    }
}
Enter fullscreen mode Exit fullscreen mode

When we invoke this function with

aws lambda invoke --function-name rust-test-function --cli-binary-format raw-in-base64-out --payload '{"hello": "world"}'
Enter fullscreen mode Exit fullscreen mode

We can see in the logs, 10 minutes after:

timestamp message
18:27:32.101-03:00 Payload: {"hello":"world"}
18:27:32.101-03:00 Exiting: spindown

That's it: the Lambda function wrote a file with the payload, which the extension read and logged to CloudWatch.

Building and deploying

Since we're writing native code, we have to worry about which platform to target when compiling, and which libraries to link against. Fortunately, there's the lambci/lambda:build-provided.al2 Docker image, which we use to ensure we link against the exact library versions that exist in the AWS Lambda environment. As for compiling, we're using rustup to fetch and install the appropriate Rust toolchain for the platform. Pretty simple.

We're using the AWS Cloud Development Kit to deploy the stack:

const app = new App()
const s = new Stack(app, "Test", {
    env: {
        account: process.env.CDK_DEFAULT_ACCOUNT,
        region: process.env.CDK_DEFAULT_REGION,
    },
})

const l = new Lambda(this, "Function", {
    functionName: "rust-test-function",
    runtime: Runtime.PROVIDED_AL2,
    handler: "doesnt.matter",
    code: Code.fromAsset(app.node.tryGetContext("lambda")),
    currentVersionOptions: {
        removalPolicy: RemovalPolicy.DESTROY,
        retryAttempts: 2,
    },
    timeout: Duration.minutes(1),
})

l.addLayers([
    new LayerVersion(s, "Sidecar", {
        code: Code.fromAsset(app.node.tryGetContext("sidecar")),
    }),
])
Enter fullscreen mode Exit fullscreen mode

The Test stack is deployed with:

cdk deploy --context lambda=./lambda.zip --context sidecar=./sidecar.zip Test
Enter fullscreen mode Exit fullscreen mode

Where ./lambda.zip is an archive containing the function's binary, a single file named "bootstrap", which is a requirement when using a custom runtime. The extension is packaged in ./sidecar.zip, containing a binary named "sidecar" - the same name we give the Lambda service when registering the extension -, inside the "extensions" folder.

Final thoughts

We've shown how easy it is to build a Lambda extension: it's a single binary that can be written in any (compiled) language, which uses a simple HTTP-based protocol to communicate with the Lambda service. Surely the Extension API will become richer and more flexible, which will unlock more use cases for extensions - for example, to implement a good and efficient IPC mechanism it would be really helpful to know when a function has finished processing an event.

As with all things serverless, latency and cold starts are a concern, especially if you plan to write extensions for a wider audience.

Tangentially, we showed how easy and cool it is to use Rust to write Lambda functions, and how effortless it is to deploy using the CDK.

Discussion

pic
Editor guide
Collapse
klaatu01 profile image
Charlie Ede

Hey Duarte great post!
I have implemented something similar myself, but I can manage to get my extension to start running. It would be great to see the source code, To check I have my file structures right.

Collapse
duarten profile image
Duarte Nunes Author

Thanks! :)

Here's a gist with the extension code: gist.github.com/duarten/855f8e18e3...

When you package the Lambda layer, the extension executable must be named the same as what it passed to the Register API, and it must be in the "extensions" folder. For example:

❯ unzip -l target/lambda/release/sidecar.zip
Archive:  target/lambda/release/sidecar.zip
  Length      Date    Time    Name
---------  ---------- -----   ----
  7778472  11-04-2020 17:18   extensions/sidecar
---------                     -------
  7778472                     1 file
Enter fullscreen mode Exit fullscreen mode

I can provide the CDK code as a gist too, but it's pretty much the same as what's in the post :)

Collapse
094459 profile image
Ricardo Sueiras

Great post, is there more to come?

Collapse
duarten profile image
Duarte Nunes Author

Huh, I just noticed dev.to ate the final sections of the article, together with a bunch of edits, when I published it. I guess that's what you were referring to. I just rewrote them, so hopefully now the post makes a bit more sense :)

Collapse
duarten profile image
Duarte Nunes Author

Thanks! Eventually there will be, yeah :)