DEV Community

loading...

AWS Lambda - Step Functions - RDS - S3 - Aurora - Part 1

diegonalvarez profile image Diego Updated on ・4 min read

This is the first post about integrate Step Functions to an specific workflow. It's a public repo with one of my first python code, learning about that too.

How it works

A explanation on how it works can founded here:

Requirementes

  • Chalice 1.1.0
  • Python 2.7

Json For Unique Machine

{
  "Comment": "State machine to populate databases for reports."
  "StartAt": "MySql To S3 Files"
  "States": {
    "MySql To S3 Files": {
      "Type": "Task"
      "Resource": "ARN",
      "ResultPath": "$.guid",
      "Next": "S3 Data To Mysql",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ]
    },
    "S3 Data To Mysql": {
      "Type": "Task",
      "Resource": "ARN",
      "InputPath": "$.guid",
      "End": true,
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ]
    }
  }
}

Json For Parallel Machines

{
  "Comment": "State machine to populate databases for reports.",
  "StartAt": "MySql To S3 Files",
  "States": {
    "MySql To S3 Files": {
      "Type": "Task",
      "Resource": "lambda-arn",
      "ResultPath": "$.guid",
      "Next": "InsertDataToMysql",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ]
    },

Objective:

The objective for this post, it's moving data from a MySQL database to S3, and from there to another database.
The case that we are using is from Amazon RDS to Amazon Aurora, it can be another database of course, the idea is connect our new dataset with Bussiness Inteligences tools without affecting the original dataset and with a custom structure.

The first tought was use AWS Data Pipeline, but there are certains behavior that we need to custom.

The idea is have three or more post to get the objective, this is:

  1. An admin database to handle the params, in our case we use:
    • Companies
    • Tables
  2. Moving the data from RDS to S3
  3. Moving the data from S3 to Aurora
  4. Schedule to run the functions
  5. An script to auto-create the information, only setting a customer(company in our use case)
  6. Script to truncate tables, change names and detect that the new data is available.
  7. Lambdas working in parallel with step functions and params for big tables.

In this post i'm going trough the points 1-2-3 and 4.

Admin Database

I'm going to put the structure that i'm using for the two main tables and how we use both:

Companies

I called the table companies, but you can change to customer or whatever you need (it's necessary to change the code to). The idea is that we have multiple companies, but only a few need Bussiness Inteligence, so for that companies we set-up a small configuration to get the info in our main database.

Name Description
company_id Unique identifier of the element
status The status represent if it need the report
alias Used to create csv and the bucket of the company
CREATE TABLE `companies` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `company_id` int(11) DEFAULT NULL,
  `status` int(11) DEFAULT '0',
  `alias` varchar(60) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;

Tables

This table is designed to get the queries related to what information it's needed to be migrated.

Name Description
name Name of the table i.e. users
status Logical status if it's necessary get the information
query_select SQL to get the information
params_select Params for the Select Query
query_insert SQL to insert the information
params_insert Params for the Insert Query
CREATE TABLE `tables` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT '',
  `status` int(11) DEFAULT '0',
  `query_select` text,
  `params_select` text,
  `query_insert` text NOT NULL,
  `params_insert` text NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=latin1;

This is an real example, in the future version I wanna do a better approach to handle params.
The related insert is going to be a detect the table "users" and get the data accordingly the value of query_select using the params_query values, with this information the CSV is generated and inserted in S3.

When the function that insert the data run, it's going to execute the insert in the field query_insert with the params of params_insert. Here you can put any information and play with the structure of the dataset with your business needs.

INSERT INTO `tables` (`id`, `name`, `status`, `query_select`, `params_select`, `query_insert`, `params_insert`)
VALUES
    (1, 'users', 1, 'SELECT id, username FROM users WHERE company = %s', 'company[1]', 'REPLACE INTO users(id, username) VALUES ', '\'(\'+row[0] +\',\"\'+ row[1]+\'\")\'');

Thursday 06 December of 2018 Changes:

I added the feature to get from the database config fields. The first field that we need to customize is the date, so I created two tables:

CREATE TABLE `configurations` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(192) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;

Here I the first record have the value "date Y-M-D", to identify the config added in the next table:

CREATE TABLE `companies_configurations` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `retail_id` int(11) DEFAULT NULL,
  `configuration_id` int(11) DEFAULT NULL,
  `configuration_value` text,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=latin1;

An example of insert to get the last 60 days in the query it's:

INSERT INTO `companies_configurations` (`id`, `retail_id`, `configuration_id`, `configuration_value`)
VALUES
    (1, 1, 1, '60');

Config File

A config file is in .chalice/.config.sample.json

Rename the file to config.json and complete the params. All the params explain for themselves, don't forget to add the IAM Role.

Lambdas

There are two main functions, used as lambdas in this repo:

  1. mysql_csv_to_s3
  2. s3_to_mysql

mysql_csv_to_s3

This Lambda take the information from tables, execute the select query and insert the data into S3.

s3_to_mysql

Here, the data is collected from S3 and with the customs query, do the inserts.

Step Function [State Machine Definition]

At a final step, it's create a new State Machine from AWS Step Function and add the following json. You need to modify the ARN for each new lambda, and runned.

{
  "Comment": "State machine to populate databases for reports.",
  "StartAt": "MySql To S3 Files",
  "States": {
    "MySql To S3 Files": {
      "Type": "Task",
      "Resource": "ARN",
      "ResultPath": "$.guid",
      "Next": "S3 Data To Mysql",
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ]
    },
    "S3 Data To Mysql": {
      "Type": "Task",
      "Resource": "ARN",
      "InputPath": "$.guid",
      "End": true,
      "Retry": [
        {
          "ErrorEquals": [
            "States.ALL"
          ],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ]
    }
  }
}

Cloudwatch Rule

Finally, you need to create a rule in cloudwatch related to the Machine State to generate a Schedule, it simply, here a documentation that helps

Hope the get some feedback and i'll be updating the posts with the second part.

Discussion (0)

pic
Editor guide