DEV Community

Jaine Conceição
Jaine Conceição

Posted on

Ingest data into Elasticsearch with AWS Lambda and SNS using LocalStack

In this post I will explain and show the implementation of an architecture that ingest data into Elasticsearch with Lambda and SNS using LocalStack. In simple terms, the architecture looks like this:

Architecture

The ideia is to monitor the business events of an application. So once the application wants to send a business event, this event is sent as a message to a SNS topic. After that a lambda function receives the message and send it to elasticsearch. Finally we can see the event using Kibana interface.

LocalStack

LocalStack is a framework originally developed by AWS that we can use to simulate AWS environment locally. With LocalStack we can set up SNS topics and Lambda functions.

SNS

SNS is an AWS service that works like a pub/sub, so we create SNS topics and send messages to these topics, once these messages are received by SNS, it pushes notifications to subscribers. SNS subscribers can be SQS (AWS queue service), Lambda functions, HTTP endpoints and Amazon Kinesis.

Lambda

Lambda is a serverless service that we can run on AWS environment, that is, we upload the code and AWS takes care of all compute resources needed.

Elasticsearch

Elasticsearch is a powerful engine that store any kind of data with the purpose of analyzing and searching with high performance and escalability.

Kibana

Kibana provides a graphic interface to elasticsearch and lets us visualize the data with charts and graphs.

Architecture implementation

First of all we will configure a docker-compose.yml file so we can run LocalStack, Elasticsearch and Kibana locally:

version: '3.5'
services:
  localstack:
    container_name: localstack
    image: localstack/localstack:latest
    networks:
      - default
    ports:
      - '4566:4566'
    environment:
      - EDGE_PORT=4566
      - SERVICES=sns,lambda

  elasticsearch:
    container_name: elasticsearch
    image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0
    environment:
      discovery.type: "single-node"
      ES_JAVA_OPTS: "-Xms2g -Xmx2g"
      xpack.monitoring.enabled: "true"
    ports:
      - "9200:9200"
    networks:
      - default

  kibana:
    container_name: kibana
    image: docker.elastic.co/kibana/kibana:7.7.0
    ports:
      - "5601:5601"
    networks:
      - default

networks:
  default:
    driver: bridge
Enter fullscreen mode Exit fullscreen mode

Now run the docker-compose.yml file:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Install the AWS CLI, so we can communicate with the LocalStack container easier and then configure AWS running:

aws configure
Enter fullscreen mode Exit fullscreen mode

You can use dummy credentials for configuration because LocalStack doesn't really validate them.

Creating the SNS topic

Now create a SNS topic to send the business events. We will name our topic events-topic:

aws --no-cli-pager --endpoint-url=http://0.0.0.0:4566 sns create-topic --name events-topic
Enter fullscreen mode Exit fullscreen mode

Creating the Lambda function

Before creating the lambda in LocalStack, we need to implement the code. Here I will be using node.js:

1) Choose a directory and initiate a node project:

npm init
Enter fullscreen mode Exit fullscreen mode

2) Install the elasticsearch library inside your project:

npm i elasticsearch
Enter fullscreen mode Exit fullscreen mode

3) Create a directory (I called it elastic) and add the elastic.js file inside:

var elasticsearch = require('elasticsearch');

const elasticClient = elasticsearch.Client({
    host: 'http://elasticsearch:9200'
});

var indexName = 'business_events';

function initIndex() {    
    try {
        return elasticClient.indices.create({
            index: indexName
        });
    } catch (error) {
        console.log('Error creating elasticsearch index')
    }
}
exports.initIndex = initIndex;

function addDocument(message) {
    console.log('Adding document to elastic');

    return elasticClient.index({
        index: indexName,
        type: "document",
        body: {
            event_name: message.name ?? null,
            properties: message.properties ?? null,
            created_at: message.created_at ?? null,
        }
    });
}
exports.addDocument = addDocument;
Enter fullscreen mode Exit fullscreen mode

This file implements the two main functions to manipulate elasticsearch. First we connect to the local elasticsearch http://elasticsearch:9200, then we create the function initIndex() responsible to create the elasticsearch index (business_events). Ultimately we create the addDocument() function to send the data to elastic.

4) Create the index.js file at the project root:

var elasticsearch = require('./elastic/elastic');

elasticsearch.initIndex();

exports.handler = function(event) {
    var message = event.Records[0].Sns.Message;

    elasticsearch.addDocument(JSON.parse(message));

    return 'Success';
};    
Enter fullscreen mode Exit fullscreen mode

Here we implement the main function handler() which will be invoked by SNS every time a new message arrives. This function just retrieves the message from SNS topic and add it to elasticsearch.

5) Access your code directory, select all files and zip them with the name: function.zip.

Now the code is done and you can find the complete implementation in my github: lambda-sns-elasticsearch.

6) The next step is to create a lambda function in LocalStack and upload the compressed code to it:

aws --no-cli-pager --endpoint-url=http://0.0.0.0:4566 lambda create-function --function-name events-lambda \
          --zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x --role _
Enter fullscreen mode Exit fullscreen mode

Subscribing lambda to SNS topic

Finally subscribe the lambda to the sns topic we created:

aws --no-cli-pager --endpoint-url=http://0.0.0.0:4566 sns subscribe --topic-arn arn:aws:sns:us-east-1:000000000000:events-topic --protocol lambda \
          --notification-endpoint arn:aws:lambda:us-east-1:000000000000:function:events-lambda
Enter fullscreen mode Exit fullscreen mode

From now on every message you send to the SNS topic, will be received by the lambda function which will send it to elasticsearch. We can see the messages accessing kibana: http://localhost:5601/

Testing

Let's make a test sending a message to the SNS topic via command line:

aws --no-cli-pager sns publish --endpoint-url=http://0.0.0.0:4566 --topic-arn arn:aws:sns:us-east-1:000000000000:events-topic --message '{"name":"test", "user_id":1, "properties": {"nickname": "test-user1", "job": "Software engineer"}}'
Enter fullscreen mode Exit fullscreen mode

Now access Kibana and make sure that you have configured the index pattern:

Image description

Then go to Discover section and your message should appear like that:

Image description

Instead of sending messages via command line, you can set your services to send messages to a SNS topic using AWS SDK.

Thank you for reading :)

Top comments (0)