DEV Community

Play Button Pause Button
Andrew Brown 🇨🇦
Andrew Brown 🇨🇦

Posted on

Event Tracking and Analytics via Ruby on Rails, DynamoDB (with Streams), Kinesis Firehose and Athena and CloudWatch Dashboard!

p.s. This is not an end-to-end guide, I documented my journey and figured I would publish with what I had time to document instead of vaulting this knowledge in our private Knowledgebase. Then I happen to put a tech talk together so between the video and content below I hope it helps you create your own Event Tracking and Analytics on AWS.

AWS SDK Initializer

Since we only need DynamoDB add to your Gemfile:

gem 'aws-sdk-dynamodb'

To make it easier to work with the SDK I have in an initializer RAILS_ROOT/config/intializers/aws.rb

You will notice I am aggressively setting credentials. The SDK is supposed to pick up these Environment Variables implicitly but I found in practice it did not when I wrote this. Maybe you don't have to be as verbose here like me here.

creds =
Aws.config.update credentials: creds

module DynamoDB
  def self.resource
    @@dynamodb ||={
     region: 'us-east-1',

Probably should be storing the region as an Environment Variable in Figaro

When we want to use DynamoDB all we have to do is the following:

  # ...

Primary and Sort

Very unique ids such as User IDs make good Primary keys since its better for distribution across partitions.

Dates make very good Sort keys. Your table when queried will be stored ASC based on your SORT key. Explore the DynamoDB table explorer so you have an idea of the limitations of how you can filter.

Notice for Primary we only have the ability to do = and for Sort we have many options.

There are more advanced filter options in the documentation if you can make sense of it.


First I define how I want to use my tracker before writing a module.
So this would write to the Dynamo

Putting data:

        event_type: 'login',
        user_agent: request.user_agent,
        ip_address: request.remote_ip

Getting Data

    @recent_activity = Tracker::Get.recent_activity

For the Putting data probably want to put this in an ActiveJob since it's possible having these event calls littered throughout your application can cause the code to block resulting in latency experienced by your team. I think DynamoDB blocks as it waits for a response even though we don't need one.

I created a new module in my lib directory eg. RAILS_ROOT/lib/tracker.rb

# This class is responsible for writing event data
# to various DynamoDB tables and fetching that data
# for display.

module Tracker
  class Entity
    include ActiveModel::Validations

    def initialize(opts={})
      opts.each { |k,v| instance_variable_set("@#{k}", v) }

    attr_accessor :user_id,

    validates :user_id        , presence: true, numericality: { only_integer: true }
    validates :event_type     , presence: true, inclusion: { in: %w(
    validates :user_agent , presence: true
    validates :ip_address , presence: true
    validates :event_at   , presence: true

    def event_at
      @event_at ||

  class Put
    def self.event attrs={}
      entity = attrs
      unless entity.valid?
        raise ArgumentError, "Tracker Entity invalid permissions"

        item: {
          'user_id'    => entity.user_id,
          'ip_address' => entity.ip_address,
          'user_agent' => entity.user_agent,
          'event_id'   => entity.event_id,
          'event_type' => entity.event_type,
          'event_at'   => # sort key
        # We don't care about returning cosumed capactiy
        # We can handle looking event tracking data and
        # don't need to be alerted.
        return_consumed_capacity: 'NONE',
        table_name: 'exampro-events'
  end ## Put

  class Get
    def self.recent_activity user_id
      result =
        expression_attribute_values: {
          ":user_id"    => user_id
        key_condition_expression: "user_id = :user_id",
        limit: 50,
        projection_expression: 'ip_address,event_type,event_at,user_agent', # select statement
        scan_index_forward: false, # descending order
        table_name: 'exampro-events'
      result.each do |t|
        t['event_at']   = DateTime.parse(t['event_at'])
        unless t['user_agent'].blank?
          t['user_agent'] =['user_agent'])

    def self.logins user_id, event_type
      result =
        expression_attribute_values: {
          ":user_id"    => user_id,
          ":event_type" => event_type
        key_condition_expression: "user_id = :user_id",
        filter_expression: "event_type = :event_type",
        limit: 10,
        projection_expression: 'ip_address,event_type,event_at,user_agent', # select statement
        scan_index_forward: false, # descending order
        table_name: 'exampro-events'
      result.each do |t|
        t['event_at']   = DateTime.parse(t['event_at'])
        unless t['user_agent'].blank?
          t['user_agent'] =['user_agent'])
  end ## Get


I have this Entity class. Its purpose is to validate the format of arguments. I would probably enrich this further in the future with a metadata attribute.


I have a class for Put which for writing to DynamoDB. Currently, I only have one method but may in the future add more.


I have another class called Get which queries data from DyanmoDB

DateTime as String

Another thing to note is that I am converting the time to a string DynamoDB does not have a DateTime datatype.

This StackOverflow does a good explaining what to consider when choosing what format to use your dates.

I care about readability so ISO 8601 is a good format.
I don't care about using TTL (Time to live) since I don't need to expire records from my DynamoDB to prune the DB.
You have DynamoDB only stream TTL events which is interesting.
What matters most is when filtering the date I can use the BETWEEN to filter between two ranges.


We are using scan_index_forward: false to change the sort to be DESC instead of ASC.


We only want specific attributes returned from the database so thati s the purposes of:
projection_expression: 'ip_address,event_type,event_at,user_agent'


We are using return_consumed_capacity: 'NONE' because I don't care about getting a response back. If there was a capacity issue I have an alarm where I would take action. Since this is event data I don't are some event tracking is dropped.


We are passing our user_agent through DeviceDetector gem eg.['user_agent'])

It so in our dashboard for our app I can get human readable values such as if they are on a phone/desktop, windows/mac or using a specific web browser.


Enabling DyanmoDB Streams

We are going to need to turn on DynamoDB streams.

To have streams trigger a lambda under the Triggers tab we will add an existing function. You may need to click more to find this Triggers tab.

When a record is inserted into DynamoDB. Streams will allow us to pass the puts in batches to a Lambda function.

We only want New Images. I believe a record it first put its an "Old Image" and does not contain all data. Then when all data is written it is a "New Image".

We will leave it for batches of 100. This doesn't mean the Streams will wait until it has 100 records to send but can send up to 100 at a time.

We can see our Lambda is attached. If an error occurs on this Lambda sometimes its smart to check here to find out at a glance if the Lambda is failing.

Here we can see the records in our DynamoDB table

We need to create a policy which allows Lambda to accept data from a specific DynamoDB Stream.

    "Version": "2012-10-17",
    "Statement": [
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
            "Resource": "arn:aws:dynamodb:us-east-1:ACCOUNT-ID:table/exampro-events/stream/2019-06-30T11:17:05.770"
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "dynamodb:ListStreams",
            "Resource": "*"

We need to allow our lambda function to stream data to our Kinesis Firehose

    "Version": "2012-10-17",
    "Statement": [
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
            "Resource": "arn:aws:firehose:us-east-1:ACCOUNT-ID:deliverystream/exampro-events"

Then I attach these two new policies to a role which is then attached to my Lambda function.

Lambda that streams data from DynamoDB to Firehose

Since DynamoDB Streams can deliver data in batches we are going to use the put_record_batch

We need to supply the delivery_stream_name. Probably should place this in Environment Variables instead of how I'm hardcoding here.

Even though we are never going to update DynamoDB records we are going to only publish events to stream for INSERT

require 'json'
require 'aws-sdk-firehose'

def lambda_handler(event:, context:)
  records = []
  event['Records'].each do |t|
    if t['eventName'] == 'INSERT'
      records.push({data: {
        user_id:  t['dynamodb']['NewImage']['user_id']['N'],
        event_at: t['dynamodb']['NewImage']['event_at']['S'],
        event_id: t['dynamodb']['NewImage']['event_id']['N'],
        event_type: t['dynamodb']['NewImage']['event_type']['S'],
        ip_address: t['dynamodb']['NewImage']['ip_address']['S'],
        user_agent: t['dynamodb']['NewImage']['user_agent']['S']
      }.to_json + "\n" })
  json = {records_size: records.size}.to_json
  puts json
    firehose =
    resp = firehose.client.put_record_batch({
      delivery_stream_name: "exampro-events", # required
      records: records
    json = {failed_put_count: resp.failed_put_count}.to_json
    puts json

  return true

Json records on newline

You will notice I am adding a new line at then of our json string.

.to_json + "\n"

This is very important because when Athena reads our json files it expects each json record to be on its own line. If they are all on one line it will read only one record.

Json Log Events

Notice that I am converting my hash to json and then using puts to log it. This is how you log Json events so we can then use a Metric Filter for later. You cannot just puts a hash, you have to convert it to json.

  json = {records_size: records.size}.to_json
  puts json


If you're wondering why I'm not using KPL (Kinesis Producer Library) I could have but I would have had to use a Java Lambda and its configuration is more complicated. KPL is more efficient but for our use-case we don't need to KPL. You can read more about KPL in the documentation

Metric Filter

Based on the Filter and Pattern Syntax under Publishing Numerical Values Found in Log Entries we can select an attribute of a JSON Log Event and then log it.

So for the metric filter, we want to filter json log events with an attribute of records_size greater than 0

{ $.records_size > 0 }

For the metric value, we will supply the attribute we want it to then collect


Define Metric Filter

View created metric filter

You cannot add a Metric Filter to your Cloudwatch Dashboard until data has been published to it.

How to find metric filter after its created

If you ever need to find this filter metric its shows up under Logs as a column in the logs table.

Kinesis Firehose


Kinesis Firehose is incredibly affordable at $0.029/GB so 500 GB = $14 USD. Other Kinesis can have a very expensive base cost.

But what about Kinesis Data Analytics?

You will see there is another AWS Kinesis service called Kinesis Data Analytics and you make think you nee this expensive service based on its name.

Kinesis Data Analytics lets you run queries (SQL) on incoming streaming data. I am thinking that Kinesis Data Analytics might be faster at proactivity producing real-time analytics because it crunches data as it comes in.

Using Firehose we just dump out data to S3. When someone needs to see an up to date dashboard we can just query Athena with a Lamba function, dump the results back into DynamoDB or maybe as a json file and then display that to the user. We can decide to only generate new analytics only if the last version compiled is out of date by say 5 mins.

Creating Firehose

The dashboard is a bit confusing so you look where I created my Firehose stream.

We could transform our data via Kinesis but for us, this is not necessary since we can apply our transformation prior Lambda and we do. If you have data coming from multiple sources you may want this lambda to normalize the data as guarantee its consistent. Since we only ingest data from one lambda function this is a minimal risk for us.

I have this option set to disabled but I just wanted to show you then the data can be transformed by Glue into Parquet files which are much more performant when using Athena. This is not a pain point for us currently so we are going to leave the data as is which is json. Also, I didn't feel like calculating the cost of Glue here at scale.

I had read somewhere in the docs that compression was needed for encryption in a specific use-case. When I used Glue create table using a crawler on snappy compression it produced a bizarre schema so I rolled back on this and just encrypted using KMS.

Since I am storing IP addresses I consider this sensitive data. We run Macie, so uncertain if it would alert on this if unencrypted.

The reason we collect IP Addresses for our click event data is to detect abnormal behaviour of a user. Such as account sharing, scraping or etc.


We need a database and table.

Database and Table via Glue Catalog and Glue Crawler

This is one way for you to create your Database and table.
So create a database. I am not going to recommend this way but showing you it can be done.

We will also need a table. We could easily define the columns manually but if we already have data in our S3 bucket we can just use a crawler once to determine that schema for us. So you choose your datastore being the s3 bucket and it does the rest.

If we check out table it should have determined our schema.

Database and Athena via SQL

When using Glue via automatic cralwer it would guess the wrong column types and did not partition based on date. We can just create what we need directly in Athena.

Create our database

CREATE DATABASE exampro_events
LOCATION 's3://exampro-events/';

And now create the table

  user_id    INT,
  event_at   STRING,
  event_id   INT,
  event_type STRING,
  user_agent STRING,
  ip_address STRING
WITH SERDEPROPERTIES ('paths' = 'user_id,event_at,event_id,event_type,user_agent,ip_address')
LOCATION 's3://exampro-events/';

Ensure the location ends with a forward slash or you'll get an error about the path.

ROW FORMAT SERDE tells it the data will be in JSON format.

A SerDe (Serializer/Deserializer) is a way in which Athena interacts with data in various formats.

Notice that for event_at I set it as STRING instead of TIMESTAMP. iso8601 is not the correct format for date, and we could change all our code to comply though since Athena has this sql function from_iso8601_timestamp I'm not concerned unless I run into a performance or limitations on the ability to query.

Athena expects this format: 2008-09-15 03:04:05.324


You can partition your tables on things such as date eg. Year 2020. This might be something I want to do in the future but for the time being, I am ignoring partitions.

Querying in Athena

To get started click on the ellipses beside the table and Preview Table. It will create the query and show you some data so you can save yourself the trouble to type all this yourself.

Writing Athena queries can be a painful experience even with prior SQL knowledge. Read the docs to help you learn the SQL syntax

CloudWatch Dashboard

If something goes wrong we want to have a CloudWatch Dashboard to gain some insight.

We are going to add a widget

Here we can see our custom Metric. If you don't see it here its because data has yet to ever be collected so ensure data is being logged and your metric filter is correctly filtered.

So there is our record-size. The other filter is just an old test one.

So here is my line graph. I don't know how useful it is but just getting something in here. Remember to Save dashboard !!!!

In DynamoDB there is the metric which could be useful to compare against the records which could be filtered in our Lambda.

Added a few more widgets.
We can see how many records are streaming, how many records the lambda passes to Firehose, how many incoming records were received, and how many were delivered to S3. Still missing Athena. We will get there.

Fake Data via Rake Command

I wanted some login data for the past 7 days so I can compose my Athena query to group logins per day for the week.

Rake commands are great for this. Also, I suppose you could test your read/write capacity using this method.

require 'faker'
namespace :track do
  namespace :put do
    task :login  => :environment do
      50.times.each do |t|
        ip_address = Faker::Internet.public_ip_v4_address
        user_agent = Faker::Internet.user_agent
        event_at   = rand(1..7).days.ago.iso8601

        v = [0..4].sample
          user_id: 1,
          event_type: 'login',
          user_agent: user_agent,
          ip_address: ip_address,
          event_at: event_at

        puts "#{ip_address} - #{user_agent} - #{event_at}"
        sleep 0.2 # sleep 1/5th of a second
      end # x.times
    end #login
  end #put

  namespace :get do
    task :logins  => :environment do
      results = Tracker::Get.logins 1
      puts results
    end #logins
  end #get
end # track

So here I am running my rake command to create logins:

~/Sites/exampro-projects/exampro[master]: rake track:put:login - Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A - 2019-06-29T16:46:23Z - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts) - 2019-06-23T16:46:24Z - Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko - 2019-06-29T16:46:24Z - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts) - 2019-06-29T16:46:24Z - Mozilla/5.0 (Windows NT x.y; Win64; x64; rv:10.0) Gecko/20100101 Firefox/10.0 - 2019-06-29T16:46:25Z - Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; AS; rv:11.0) like Gecko - 2019-06-24T16:46:25Z - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts) - 2019-06-23T16:46:25Z - Mozilla/5.0 (Windows NT x.y; Win64; x64; rv:10.0) Gecko/20100101 Firefox/10.0 - 2019-06-29T16:46:25Z - Mozilla/5.0 (compatible; MSIE 9.0; AOL 9.7; AOLBuild 4343.19; Windows NT 6.1; WOW64; Trident/5.0; FunWebProducts

Discussion (0)