DEV Community

Akmal Chaudhri for SingleStore

Posted on • Updated on

Using Apache Airflow with SingleStoreDB

Abstract

Apache Airflow can be used to manage workflows for data engineering pipelines. It can easily be used with SingleStoreDB and, in this article, we'll walk through the installation process and the creation of a simple workflow that uses several tables in a SingleStoreDB database.

Introduction

SingleStoreDB is MySQL wire-compatible, and we'll adapt a simple Apache Airflow MySQL example and show it working with SingleStoreDB.

Create a SingleStoreDB Cloud account

A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use Airflow Demo Group as our Workspace Group Name and airflow-demo as our Workspace Name. We'll make a note of our password and host name.

Create the database and tables

Our example is derived from a simple example described in the article Scheduling a SQL script, using Apache Airflow, with an example.

Using the SQL Editor in SingleStoreDB, we'll create a database and several tables:

CREATE DATABASE airflow_demo;

USE airflow_demo;

CREATE TABLE event (
    event_id INT,
    spend_amt FLOAT,
    user_id INT,
    date DATE
);

CREATE TABLE event_stats (
    date DATE,
    user_id INT,
    total_spend_amt FLOAT
);
Enter fullscreen mode Exit fullscreen mode

We'll populate the event table, as follows:

INSERT INTO event VALUES
(1,  34.36, 2,   CURRENT_DATE()),
(2,  94.92, 2,   CURRENT_DATE()),
(3,  70.76, 9,   CURRENT_DATE()),
(4,  34.26, 7,   CURRENT_DATE()),
(5,  58.36, 1,   CURRENT_DATE()),
(6,  39.64, 2,   CURRENT_DATE()),
(7,  64.83, 10,  CURRENT_DATE()),
(8,  39.33, 1,   CURRENT_DATE()),
(9,  100,   -99, CURRENT_DATE()),
(9,  69.06, 10,  ADDDATE(CURRENT_DATE(), 1)),
(10, 63.79, 3,   ADDDATE(CURRENT_DATE(), 1)),
(11, 40.87, 3,   ADDDATE(CURRENT_DATE(), 1)),
(12, 32.76, 10,  ADDDATE(CURRENT_DATE(), 1)),
(13, 11.84, 3,   ADDDATE(CURRENT_DATE(), 1)),
(14, 88.07, 2,   ADDDATE(CURRENT_DATE(), 1)),
(15, 100,   -99, ADDDATE(CURRENT_DATE(), 1));
Enter fullscreen mode Exit fullscreen mode

At the time of writing this article, this will produce an event table with these values:

+----------+-----------+---------+------------+
| event_id | spend_amt | user_id | date       |
+----------+-----------+---------+------------+
|        1 |     34.36 |       2 | 2022-10-15 |
|        2 |     94.92 |       2 | 2022-10-15 |
|        3 |     70.76 |       9 | 2022-10-15 |
|        4 |     34.26 |       7 | 2022-10-15 |
|        5 |     58.36 |       1 | 2022-10-15 |
|        6 |     39.64 |       2 | 2022-10-15 |
|        7 |     64.83 |      10 | 2022-10-15 |
|        8 |     39.33 |       1 | 2022-10-15 |
|        9 |       100 |     -99 | 2022-10-15 |
|        9 |     69.06 |      10 | 2022-10-16 |
|       10 |     63.79 |       3 | 2022-10-16 |
|       11 |     40.87 |       3 | 2022-10-16 |
|       12 |     32.76 |      10 | 2022-10-16 |
|       13 |     11.84 |       3 | 2022-10-16 |
|       14 |     88.07 |       2 | 2022-10-16 |
|       15 |       100 |     -99 | 2022-10-16 |
+----------+-----------+---------+------------+
Enter fullscreen mode Exit fullscreen mode

Install Apache Airflow

We'll use a Virtual Machine running Ubuntu 20.04.3 as our test environment. An alternative would be to use venv. Many of the following commands required sudo to ensure that the installation was completed successfully.

First, we'll upgrade pip:

pip install --upgrade pip
Enter fullscreen mode Exit fullscreen mode

Next, we'll make a directory and sub-directory in our home folder:

mkdir airflow

mkdir airflow/dags
Enter fullscreen mode Exit fullscreen mode

Since we are using MySQL tools, we'll need to install the following:

sudo apt install libmysqlclient-dev
Enter fullscreen mode Exit fullscreen mode

Next, we'll slightly modify the Installation from PyPI instructions, as follows:

AIRFLOW_VERSION=2.4.1

PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-no-providers-${PYTHON_VERSION}.txt"

pip install "apache-airflow[mysql]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" --ignore-installed PyYAML
Enter fullscreen mode Exit fullscreen mode

Now, we'll run the following:

airflow db init
Enter fullscreen mode Exit fullscreen mode

If we look in the directory /root/airflow, we'll see several files:

airflow.cfg  airflow.db  logs  webserver_config.py
Enter fullscreen mode Exit fullscreen mode

Let's modify the airflow.cfg file. We'll change the location of the dags folder:

[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /path/to/airflow/dags
Enter fullscreen mode Exit fullscreen mode

We'll replace /path/to with the absolute path to the dags folder we created earlier.

We can now launch airflow:

airflow standalone
Enter fullscreen mode Exit fullscreen mode

The last set of messages displayed by the above command should be similar to the following:

standalone | 
standalone | Airflow is ready
standalone | Login with username: admin  password: kRgbcnSCXEQ8SYQ6
standalone | Airflow Standalone is for development purposes only. Do not use this in production!
standalone |
Enter fullscreen mode Exit fullscreen mode

Make a note of the password in your environment.

Launching a web browser and entering http://localhost:8080 will show a login screen similar to Figure 1.

Figure 1. Sign In screen.

Figure 1. Sign In screen.

We'll use the credentials that we received earlier. After logging in, we should see a screen similar to Figure 2.

Figure 2. DAGs.

Figure 2. DAGs.

Create SingleStoreDB connection

In Apache Airflow, selecting Admin > Connections, we'll see a large list of connections. If we scroll down, we'll find mysql_default. Using the pencil icon, we'll edit the connection. Here is what we need to enter:

  • Connection Id: mysql_default
  • Connection Type: MySQL
  • Host: <host>
  • Schema: airflow_demo
  • Login: admin
  • Password: <password>
  • Port: 3306

We'll replace the <host> and <password> with the values from our SingleStoreDB Cloud account.

We can test the connection using the Test button. The response should be:

Connection successfully tested
Enter fullscreen mode Exit fullscreen mode

We'll then use the Save button to save the connection.

Create DAG and SQL files

Let's create a new Python file called airflow_demo_dag.py with the following code, derived from the article mentioned earlier:

from airflow import DAG
from airflow.operators.mysql_operator import MySqlOperator
from datetime import date

default_arg = {
        'owner' : 'airflow',
        'start_date' : str(date.today())
}

dag = DAG(
        'simple-s2-dag',
        default_args = default_arg,
        schedule_interval = '0 0 * * *'
)

s2_task = MySqlOperator(
        dag = dag,
        mysql_conn_id = 'mysql_default',
        task_id = 's2_task',
        autocommit = True,
        sql = 'airflow_demo.sql',
        params = { 'test_user_id' : -99 }
)

s2_task
Enter fullscreen mode Exit fullscreen mode

and a SQL file called airflow_demo.sql with the following code:

USE airflow_demo;

DROP TABLE IF EXISTS event_stats_staging;

CREATE TABLE event_stats_staging
AS SELECT date, user_id, SUM(spend_amt) total_spend_amt
   FROM event
   WHERE date = '{{ ds }}' AND user_id <> {{ params.test_user_id }}
   GROUP BY date, user_id;

UPSERT INTO event_stats
(date, user_id, total_spend_amt)
SELECT date, user_id, total_spend_amt
FROM event_stats_staging;

DROP TABLE event_stats_staging;
Enter fullscreen mode Exit fullscreen mode

The DAG file is scheduled to run at midnight. The SQL code performs an aggregation of the data in the event table and stores the result in the event_stats table. For example, the first run of the code would produce the following:

+------------+---------+-----------------+
| date       | user_id | total_spend_amt |
+------------+---------+-----------------+
| 2022-10-15 |       1 |           97.69 |
| 2022-10-15 |       2 |          168.92 |
| 2022-10-15 |       7 |           34.26 |
| 2022-10-15 |       9 |           70.76 |
| 2022-10-15 |      10 |           64.83 |
+------------+---------+-----------------+
Enter fullscreen mode Exit fullscreen mode

The second run of the code would produce the following:

+------------+---------+-----------------+
| date       | user_id | total_spend_amt |
+------------+---------+-----------------+
| 2022-10-15 |       1 |           97.69 |
| 2022-10-15 |       2 |          168.92 |
| 2022-10-15 |       7 |           34.26 |
| 2022-10-15 |       9 |           70.76 |
| 2022-10-15 |      10 |           64.83 |
| 2022-10-16 |       2 |           88.07 |
| 2022-10-16 |       3 |           116.5 |
| 2022-10-16 |      10 |          101.82 |
+------------+---------+-----------------+
Enter fullscreen mode Exit fullscreen mode

We'll save our two files in the directory ~/airflow/dags.

Run the code

In Apache Airflow, we should see a new DAG called simple-s2-dag in the list of DAGS. If it is not visible, it may be necessary to rerun the command:

airflow standalone
Enter fullscreen mode Exit fullscreen mode

We can toggle the button to the left of the name to enable the DAG. On the extreme right-hand side, we can see an arrow button (▶) ︎that will allow us to run the DAG immediately. Doing so should be successful. Clicking on the DAG name will provide additional details, as shown in Figure 3.

Figure 3. simple-s2-dag.

Figure 3. simple-s2-dag.

From SingleStoreDB, we can check the event_stats table:

SELECT * FROM event_stats ORDER BY user_id;
Enter fullscreen mode Exit fullscreen mode

The result should be similar to the following:

+------------+---------+-----------------+
| date       | user_id | total_spend_amt |
+------------+---------+-----------------+
| 2022-10-15 |       1 |           97.69 |
| 2022-10-15 |       2 |          168.92 |
| 2022-10-15 |       7 |           34.26 |
| 2022-10-15 |       9 |           70.76 |
| 2022-10-15 |      10 |           64.83 |
+------------+---------+-----------------+
5 rows in set (0.02 sec)
Enter fullscreen mode Exit fullscreen mode

Summary

In this article, we have used a straightforward example to demonstrate Apache Airflow with SingleStoreDB. Since SingleStoreDB is MySQL wire-compatible, many existing tools and techniques can be used out of the box. This makes it easy for MySQL and MariaDB developers to transition to SingleStoreDB.

Top comments (0)