In this post I will explain and show the implementation of an architecture that ingest data into Elasticsearch with Lambda and SNS using LocalStack. In simple terms, the architecture looks like this:
The ideia is to monitor the business events of an application. So once the application wants to send a business event, this event is sent as a message to a SNS topic. After that a lambda function receives the message and send it to elasticsearch. Finally we can see the event using Kibana interface.
LocalStack
LocalStack is a framework originally developed by AWS that we can use to simulate AWS environment locally. With LocalStack we can set up SNS topics and Lambda functions.
SNS
SNS is an AWS service that works like a pub/sub, so we create SNS topics and send messages to these topics, once these messages are received by SNS, it pushes notifications to subscribers. SNS subscribers can be SQS (AWS queue service), Lambda functions, HTTP endpoints and Amazon Kinesis.
Lambda
Lambda is a serverless service that we can run on AWS environment, that is, we upload the code and AWS takes care of all compute resources needed.
Elasticsearch
Elasticsearch is a powerful engine that store any kind of data with the purpose of analyzing and searching with high performance and escalability.
Kibana
Kibana provides a graphic interface to elasticsearch and lets us visualize the data with charts and graphs.
Architecture implementation
First of all we will configure a docker-compose.yml
file so we can run LocalStack, Elasticsearch and Kibana locally:
version: '3.5'
services:
localstack:
container_name: localstack
image: localstack/localstack:latest
networks:
- default
ports:
- '4566:4566'
environment:
- EDGE_PORT=4566
- SERVICES=sns,lambda
elasticsearch:
container_name: elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:7.7.0
environment:
discovery.type: "single-node"
ES_JAVA_OPTS: "-Xms2g -Xmx2g"
xpack.monitoring.enabled: "true"
ports:
- "9200:9200"
networks:
- default
kibana:
container_name: kibana
image: docker.elastic.co/kibana/kibana:7.7.0
ports:
- "5601:5601"
networks:
- default
networks:
default:
driver: bridge
Now run the docker-compose.yml
file:
docker-compose up -d
Install the AWS CLI, so we can communicate with the LocalStack container easier and then configure AWS running:
aws configure
You can use dummy credentials for configuration because LocalStack doesn't really validate them.
Creating the SNS topic
Now create a SNS topic to send the business events. We will name our topic events-topic
:
aws --no-cli-pager --endpoint-url=http://0.0.0.0:4566 sns create-topic --name events-topic
Creating the Lambda function
Before creating the lambda in LocalStack, we need to implement the code. Here I will be using node.js
:
1) Choose a directory and initiate a node project:
npm init
2) Install the elasticsearch library inside your project:
npm i elasticsearch
3) Create a directory (I called it elastic) and add the elastic.js
file inside:
var elasticsearch = require('elasticsearch');
const elasticClient = elasticsearch.Client({
host: 'http://elasticsearch:9200'
});
var indexName = 'business_events';
function initIndex() {
try {
return elasticClient.indices.create({
index: indexName
});
} catch (error) {
console.log('Error creating elasticsearch index')
}
}
exports.initIndex = initIndex;
function addDocument(message) {
console.log('Adding document to elastic');
return elasticClient.index({
index: indexName,
type: "document",
body: {
event_name: message.name ?? null,
properties: message.properties ?? null,
created_at: message.created_at ?? null,
}
});
}
exports.addDocument = addDocument;
This file implements the two main functions to manipulate elasticsearch. First we connect to the local elasticsearch http://elasticsearch:9200
, then we create the function initIndex()
responsible to create the elasticsearch index (business_events
). Ultimately we create the addDocument()
function to send the data to elastic.
4) Create the index.js
file at the project root:
var elasticsearch = require('./elastic/elastic');
elasticsearch.initIndex();
exports.handler = function(event) {
var message = event.Records[0].Sns.Message;
elasticsearch.addDocument(JSON.parse(message));
return 'Success';
};
Here we implement the main function handler()
which will be invoked by SNS every time a new message arrives. This function just retrieves the message from SNS topic and add it to elasticsearch.
5) Access your code directory, select all files and zip them with the name: function.zip
.
Now the code is done and you can find the complete implementation in my github: lambda-sns-elasticsearch.
6) The next step is to create a lambda function in LocalStack and upload the compressed code to it:
aws --no-cli-pager --endpoint-url=http://0.0.0.0:4566 lambda create-function --function-name events-lambda \
--zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x --role _
Subscribing lambda to SNS topic
Finally subscribe the lambda to the sns topic we created:
aws --no-cli-pager --endpoint-url=http://0.0.0.0:4566 sns subscribe --topic-arn arn:aws:sns:us-east-1:000000000000:events-topic --protocol lambda \
--notification-endpoint arn:aws:lambda:us-east-1:000000000000:function:events-lambda
From now on every message you send to the SNS topic, will be received by the lambda function which will send it to elasticsearch. We can see the messages accessing kibana: http://localhost:5601/
Testing
Let's make a test sending a message to the SNS topic via command line:
aws --no-cli-pager sns publish --endpoint-url=http://0.0.0.0:4566 --topic-arn arn:aws:sns:us-east-1:000000000000:events-topic --message '{"name":"test", "user_id":1, "properties": {"nickname": "test-user1", "job": "Software engineer"}}'
Now access Kibana and make sure that you have configured the index pattern:
Then go to Discover section and your message should appear like that:
Instead of sending messages via command line, you can set your services to send messages to a SNS topic using AWS SDK.
Thank you for reading :)
Top comments (0)