DEV Community

Renato Byrro
Renato Byrro

Posted on

Crash Course on Fan-out & Fan-in with AWS Lambda

Lambda is the beloved serverless computing service from AWS. It allows for massive parallelization with very simplified infrastructure management, which makes it a great candidate tool for implementing a fan-out / fan-in (a.k.a. pull-push) architecture.

Fan-out is basically splitting a larger task (or a batch of tasks) into smaller sub-tasks, deferring the processing of those to another function, which runs in parallel to finish the entire process in a rapid and performant way.

Fan-in is applied when we need to collect results from sub-tasks that were previously subject to fan-out in order to consolidate results. It can also be useful, for instance, when the application needs to wait for all sub-tasks to finish before moving on with the overall process.

Fan-out can be applied isolated by itself, but Fan-in obviously depends on a previous Fan-out process (there’s no point in consolidating something that hasn’t been split before).

Fan-Out Diagram

Fan-out Diagram

Fan-In Diagram

Fan-in Diagram

Simple Fan-out from Lambda to Lambda

Lambda could be used not only as the sub-task processor but also as the "ventilator" (in the diagram above). We would have two Lambda functions at a minimum:

  1. Ventilator: receives a large task and break it down to invoke sub-task processors;
  2. Processor: receives a sub-task and performs whatever is needed

In this case, we will need the Ventilator Lambda to invoke one or more instances of the Processor Lambda programmatically.

AWS has SDKs for all major runtimes and programming languages: NodeJS, Python, Java, you name it. In this article, we will be using Python and its respective AWS SDK (boto3) because it has quite a simple and easy to read syntax, making examples somewhat agnostic to the environment.

There are two methods for invoking a Lambda function: Invoke and InvokeAsync. The difference is basically that, by using Invoke, the Ventilator will wait for each Processor to respond with the results before closing the HTTP connection. The InvokeAsync, on the other hand, will only call the Processor Lambda, which will respond quickly with an "Ok" and terminate the connection right away, carrying on the sub-task processing subsequently.

This will have implications for the fan-in strategy, as we’ll discuss in the next topic.

Concurrency, Concurrency, Concurrency

There is a problem with the AWS SDK in this case, though: it uses blocking HTTP requests. If we loop through a list of sub-tasks to fan-out, each request will block the next ones from going out to the Processor. The second sub-task will only go out after the first one is done, and so on, which completely defeats the parallelization advantage of the entire architecture.

What we need is to parallelize the Processor invocation requests. Using the InvokeAsync endpoint won't make the SDK run in a parallel way. We published an implementation of non-blocking HTTP requests for the AWS Lambda API endpoints using the Python asyncio and aiohttp libraries. It’s open sourced under MIT license, feel free to use it in your projects.

Here's how we would use this non-blocking HTTP request method:

from async_lambda import invoke_all

requests = [
    {
        'function_name': 'lambda-function',
        'payload': {...},
    },
    ...
]

results = invoke_all(
    requests=requests,
    region='us-east-1',
    async=True,
)

Step by step fan-in

If the sub-tasks were fan-out to Lambda using the Invoke endpoint (synchronous requests), it's easy to consume and consolidate results: just wait for all Processor Lambdas to respond and run the consolidation logic.

In asynchronous invocations, which are quite common, the Fan-in process will require a bit more structure. Below is a step by step guide to do this using DynamoDB.

1. Create a processing batch object

The first thing is creating a processing batch as an umbrella to keep track and identify all sub-tasks in DynamoDB.

We will need a unique identifier for this processing batch, which needs to be idempotent. This is important because, if something fails along the way, Lambda will automatically retry the request and we want to avoid unwanted side-effects from this behavior (I recently wrote about this).

Two possible unique identifiers associated with a Lambda invocation are:

Invocation request ID:

def handler(event, context):
    unique_id = context.aws_request_id

Event payload hash:

import hashlib
import json

def handler(event, context):
    unique_id = md5_hash(json.dumps(event))

def md5_hash(string, encoding='utf-8'):
    md5 = hashlib.md5()
    md5.update(string.encode(encoding))
    return md5.hexdigest()

Now let's store this batch in DynamoDB for later tracking, along with the number of sub-tasks generated:

import time
import boto3

# Preparing the list of sub-tasks is beyond the scope of this tutorial
sub_tasks = [...]

client = boto3.client('dynamodb')

client.put_item(
    TableName='fan-out-tasks',
    Item={
        'unique_id': {
            'S': f'BATCH-{unique_id}',  # Defined in the previous step
        },
        'total_tasks': {
            'N': len(sub_tasks),
        },
        'finished_tasks': {
            'N': 0,
        },
        'status': {
            'S': 'pending',
        },
        'created_at': {
            'N': str(int(time.time())),
        },
        'finished_at': {
            'NULL': True,
        },
    },
    # This condition prevents running the same batch multiple times
    ConditionExpression='attribute_not_exists(unique_id)',
)

2. Store a reference for each sub-task in DynamoDB

We will be tracking the job progress by using the finished_tasks attribute in the batch object stored in DynamoDB. Every time a sub-task is finished, it will increment this counter. When finished_tasks == total_tasks, it means we’re done processing everything.

The problem is DynamoDB increment counter is not idempotent. In the event of a failure, Lambda will retry the same sub-task invocation, which will get processed more than once, potentially incrementing the counter multiple times. This would break our logic.

To avoid this issue and provide idempotency to the Processor Lambda, we will store a reference to each sub-task in DynamoDB, along with its status.

import time
import boto3

sub_tasks = [
    {'task': 'foo bar'},
    {'task': 'hello world!'},
    {'task': 'the quick brown fox jumped over the lazy dog'},
]

client = boto3.client('dynamodb')

# Beware that Dynamo accepts up to 25 items in batch write; split and
# request multiple times, if you have more sub-tasks
response = client.batch_write_item(
    RequestItems={
        'fan-out-tasks': [
            {
                'PutRequest': {
                    'Item': {
                        'unique_id': {'S': f'SUBTASK-123ABC-{index}'},
                        'status': {'S': 'pending'}
                        'created_at': {'N': str(int(time.time()))},
                        'finished_at': {'NULL': True},
                    },
                },
            }
            for index, task in enumerate(sub_tasks)
        ]
    }
)

Once a sub-task is finished, we can make a transactional query containing two updates:

  1. Increment the finished_tasks counter in the batch object
  2. Update the status attribute in the sub-task object to 'finished', providing the condition that the current status is 'pending'

Upon getting invoked, the Processor Lambda must then check whether the sub-task status in DynamoDB is 'pending' before moving forward with processing:

The conditional update will also prevent from incrementing more than once in the - unlikely, but not impossible - event of having the same sub-task being processed more multiple times concurrently.

In massively scaled and distributed systems like AWS Lambda, if things can go wrong, they will go wrong at some point. Better be safe than sorry.

3. Fan-out sub-tasks

Finally! We can now fan-out the sub-tasks. We should provide the processing batch ID along with the sub-task payload so that each Processor Lambda is aware of its sub-task owner:

from async_lambda import invoke_all

sub_tasks = [
    {'task': 'foo bar'},
    {'task': 'hello world!'},
    {'task': 'the quick brown fox jumped over the lazy dog'},
]

requests = [
    {
        'function_name': 'processor-lambda-name',
        'payload': {
            'batch_id': unique_id,  # Determined in previous step
            'index': index,
            'task': task,
        },
    }
    for index, task in enumerate(sub_tasks)
]

invoke_all(requests=requests, region='us-east-1', async=True)

4. Process the sub-tasks

When the Lambda Processor is invoked and before attempting to process the sub-task, it should retrieve the DynamoDB reference and double check its status. Unless it is 'pending', it should not proceed.

from boto3.dynamodb.conditions import Key

# Get the sub-task index from the Lambda event payload
sub_task_index = event.get('index')

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('fan-out-tasks')

response = table.query(
    KeyConditionExpression=Key('unique_id').eq(f'SUBTASK-123ABC-{sub_task_index }')
)

item = response['Items'][0]

if item.get('status') != 'pending':
    raise ValueError(
        f'Expected sub-task status "pending", got {item.get('status')}'
    )

After the sub-task is finished, the Processor Lambda should update the batch counter in DynamoDB as well as the sub-task reference using a transactional query. We are also storing the result of the task processing so that all results can be consolidated at the end:

import json
import boto3

# Extract sub-task index from invocation payload
sub_task_index = event.get('index')

client = boto3.client('dynamodb')

response = client.transact_write_items(
    TransactItems=[
        {
            'Update': {
                'Key': {
                    'S': 'BATCH-123ABC',
                },
                'UpdateExpression': 
                    'SET finished_tasks = finished_tasks + :increment',
                'ExpressionAttributeValues': json.dumps(
                    {
                        ':increment': {
                            'N': '1',
                        }
                    }
                )
            },
        },
        {
            'Update': {
                'Key': {
                    'S': f'SUBTASK-123ABC-{sub_task_index}',
                },
                'UpdateExpression': 'SET status = :new_status',
                'ConditionExpression': 'status = :pending',
                'ExpressionAttributeValues': json.dumps(
                    {
                        ':new_status': {
                            'S': 'finished',
                        },
                        ':pending': {
                            'S': 'pending',
                        },
                    }
                ),
            },
        },
    ]
)

5. The last one wraps everything up

If everything goes as expected, at some point all sub-tasks will be finished. Hopefully!

We need a process in place to identify when it's all ready and start the consolidation of the results. We will defer that to the Processor Lambda as well. At the end of the processing, this Lambda will check the batch object in DynamoDB. If total_tasks == finished_tasks, it will trigger the consolidation of results.

The consolidation could take place in the Processor Lambda, but it would be better to separate concerns and create a third Lambda for that. Let's call it Consolidator Lambda. The Processor should then invoke the Consolidator, passing the batch unique ID:

from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('fan-out-tasks')

# Get the Batch ID from the Lambda event payload
batch_id = event.get('batch_id')

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('fan-out-tasks')

response = table.query(
    KeyConditionExpression=Key('unique_id').eq(batch_id)
)

batch = response['Items'][0]

if batch['total_tasks'] == item['finished_tasks']:
    lambda_client = boto3.client('lambda')
    lambda_client.invoke(
        FunctionName='consolidator-lambda',
        InvocationType='Event',  # Asynchronous, will not wait for consolidator
        Payload={
            'batch_id': batch['unique_id'],
            'action': 'consolidate_results',
        },
    )

Upon getting invoked, the Consolidator Lambda will get all subtasks from DynamoDB and do its magic:

6. Get visibility into what is going on under the hood

AWS Lambda is a beast in itself. Its massive scalability and idiosyncrasies (timeout and memory limits, for instance, can cause trouble) coupled with a pile of things that can go wrong in any application makes it hard to monitor and maintain the health of any serverless application.

In order to do that, first create a free account on Dashbird (requires no credit card). Then, integrate Dashbird with your AWS account.

And voilá, Dashbird will automatically start monitoring your AWS Lambda functions. I strongly advise to setup alerting channels (email and/or Slack channel) alerts, so that you get notified when things go south.

Alternative Architectures

Fan-out with Kinesis or SQS

Take our use case: Dashbird monitors logs generated by other people’s Lambda executions. We parse those logs to identify all sorts of things to help our customers better maintain their application health: cold starts, errors, time outs, execution anomalies, etc. Our service has been growing steadily, now processing logs for +350,000 Lambda functions, requiring an architecture that can smoothly scale up and down, according to demand.

Fan-out with Kinesis

Since these logs are stored in CloudWatch Log Streams, we subscribe a Kinesis Data Stream to it, which is responsible for fan-out the logs to a Lambda function that implements our logic for processing Lambda logs.

That is one way of creating a serverless and easy-to-manage fan-out pattern on top of AWS infrastructure. Our actual implementation has more complexities, but in general terms, that is what is going on in the diagram above:

  1. CloudWatch will notify Kinesis whenever there are new logs generated
  2. Kinesis will batch and prepare logs to be consumed
  3. Lambda platform will constantly poll Kinesis checking for new logs
  4. New instances of the Lambda function will be invoked, scaling up and down according to demand, receiving the logs as input for parsing

Here is a good step-by-step guide from AWS explaining each topic of this implementation.

A similar architecture could be achieved by using SQS instead of Kinesis Data Streams. AWS has a great article about it here.

Let's complicate it a bit

In the use case outlined above, it wasn’t hard to fan-out because each log requires the same parsing logic. There’s no difference between a cold start or a timeout error from one customer or another. But there are differences between runtimes: NodeJS errors will be different than Python or .NET ones, for example.

Even more complicated: let’s say we wanted to allow users to customize the way they wanted their logs parsed? That’s precisely what a service like Logbird is supposed to do. You could enter, for example, multiple RegEx patterns and determine what should happen in case any of them matches a particular logline. For instance: you could setup a Slack channel notification in case it identifies UserTier=BigBucksCustomer and CreditCardExpired error in the logs so that your support team can react accordingly.

Ok, but… how do we make Lambda functions aware of the way they should parse each log coming from Kinesis? We can’t have a purely stateless execution anymore since the parsing logic will vary depending on who owns the log streams.

Customizing the fan-out approach

Let’s say we store user’s patterns in DynamoDB. The same Lambda function could retrieve all patterns and parse logs from any and all customers. The problem is that it would be difficult to scale this approach. What if there are one thousand customers each with 50 patterns? How about 10 or 100 thousand customers? That won’t probably scale smoothly. So, how do we apply fan-out in this scenario?

Well, we could think of a few approaches. Of course, we would be too naive to pull up all patterns every time. Since each Lambda invocation will be provided with just a few logs, they are restricted to a few customer patterns as well. So the DynamoDB query should certainly filter only for patterns related to the log owners.

But this could still fail on some occasions. What if there is an issue with one customer pattern that causes Lambda to terminate execution early? We will prevent the other customer’s logs to be parsed. Or maybe one invocation coincides to receive logs from a bunch of customers that have too many patterns each. That may overload our function capacity.

So, maybe it would be a good idea to apply a two-level fan-out strategy. After receiving logs from Kinesis, our Lambda function could group logs owner, then invoke another function to take care of each customer logs separately.

Fan-out with Kinesis - two-level Lambdas

Conclusions

Fan-out / fan-in pattern empowers developers to build scalable and flexible applications on top of AWS Lambda infrastructure. The vast integrations and event source triggers between Lambda and the AWS catalog also provides many options to adapt for each context and demand.

There are many other options we encourage you to explore. AWS Lambda is able to capture data changes in an Aurora SQL database, for example, based on custom triggers, effectively allowing us to build a fan-out architecture using a SQL database as a starting point, which can be very useful in some cases (see an implementation guide here).

AWS serverless offerings and potentials are constantly evolving, keep an eye on it. Join our newsletter for free to receive the latest updates in the serverless arena (enter your email at the bottom of the page - we do hate spam as much as you do!).

Top comments (0)