DEV Community

Rubens Barbosa
Rubens Barbosa

Posted on

ELT Data Pipeline with Kubernetes CronJob, Azure Data Lake, Azure Databricks (Part 1)

Hey world, the concept of ETL are far from new, but nowadays it is widely used in the industry. ETL stands for Extract, Transform, and Load. Okay, but what does that mean? The easiest way to understand how ETL works is to understand what happens in each step of the process. Let's dive into it.

Extract

During the extraction, raw data is moved from a structured or unstructured data pool to a staging data repository.

Transform

The data source might have a different structure than the target destination, we'll transform the data from the source schema to the destination schema.

Load

In this phase, we'll then load the transformed data into the data warehouse.

A disadvantage of the ETL approach is that the transformation stage can take a long time. An alternative approach is extract, load, and transform (ELT). In ELT, the data is immediately extracted and loaded into a large data repository, such as Azure Data Lake Storage. We can begin transforming the data as soon as the load is complete.

Hands on

In this first part I will show how to create an ELT. We'll extract data from a Public API called IntegraSUS regarding Covid-19 data, and load it on Azure Data Lake Storage. So, this ELT will be containerized on Azure Container Registry (ACR), and we will use Azure Kubernetes Service (AKS) to schedule our job on K8s cluster to run daily.

In the second part of this project, we will integrate the Azure Data Lake with Apache Spark on Azure Databricks to perform a small transformation on top of the files sent to the Data Lake and then we will store the result of the transformation in a Data Warehouse.

We will learn how to:

  • Create a Python ELT and load into Azure Data Lake;
  • Create an Azure Container Registry and push images into it;
  • Create an Azure Kubernetes Service;
  • Deploy CronJob into Azure Kubernetes Cluster;
  • Integrate Azure Data Lake with Apache Spark on Databricks;
  • Transform data using PySpark on Azure Databricks
  • Load new data into Data Warehouse.

The project code is available here: github repository

1. Create a Python ELT

In the extraction phase we will get data from a Public API about Fortaleza/Ceará/Brazil Covid-19 data, and store the data into a json file. After that, we will load it into Azure Data Lake. You can see the project code below.

#!/usr/local/bin/python
import os
import sys
import yaml
import json
import logging
import requests
from datetime import datetime, timedelta
from azure.storage.filedatalake import DataLakeServiceClient

logging.basicConfig(stream=sys.stdout,
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")

logger = logging.getLogger(__name__)

date = datetime.today() - timedelta(days=2)
previous_date = f"{date.year}-{date.month}-{date.day}"


def extract():
    logger.info('EXTRACTING...')
    #extract data from API
    url = 'https://indicadores.integrasus.saude.ce.gov.br/api/casos-coronavirus?dataInicio='+ previous_date +'&dataFim=' + previous_date
    req = requests.get(url)
    data = req.json()

    if data:
        with open(f'covid-data-{previous_date}.json', 'w', encoding='utf-8') as file:
            json.dump(data, file, ensure_ascii=False, indent=4)
    else:
        logger.info('THERE IS NOT DATA')

def initialize_storage_account(storage_account_name, storage_account_key):
    try:
        global service_client
        service_client = DataLakeServiceClient(account_url="https://"+ storage_account_name +".dfs.core.windows.net", credential=storage_account_key)

    except Exception as e:
        logger.info('EXCEPTION...')
        logger.info(e)

def create_file_system(container_name):
    try:
        global file_system_client
        logger.info('CREATING A CONTAINER NAMED AZ-COVID-DATA')
        file_system_client = service_client.create_file_system(file_system=container_name)

    except Exception as e:
        logger.info('EXCEPTION...')
        logger.info(e)

def create_directory():
    try:
        logger.info('CREATING A DIRECTORY NAMED DIRECTORY-COVID22')
        file_system_client.create_directory("directory-covid19")

    except Exception as e:
        logger.info('EXCEPTION...')
        logger.info(e)

def upload_file_to_container_datalake(local_file, container_name):
    try:
        logger.info('UPLOADING FILE TO AZURE DATA LAKE STORAGE...')
        file_system_client = service_client.get_file_system_client(file_system=container_name)
        directory_client = file_system_client.get_directory_client("directory-covid19")

        file_client = directory_client.get_file_client(f"covid-{previous_date}.json")

        with open(local_file, "rb") as data:
            file_client.upload_data(data, overwrite=True)
            logger.info('UPLOADED TO AZURE DATA LAKE')

    except Exception as e:
        logger.info('EXCEPTION...')
        logger.info(e)

def load_config():
    directory = os.path.dirname(os.path.abspath(__file__))
    with open(directory + "/config.yaml", "r") as yamlfile:
        return yaml.load(yamlfile, Loader=yaml.FullLoader)


if __name__ == "__main__":
    extract()
    config = load_config()
    initialize_storage_account(config["AZURE_DL_STORAGE_ACCOUNT_NAME"], config["AZURE_DL_ACCOUNT_KEY"])
    create_file_system(config["AZURE_DL_CONTAINER_NAME"])
    upload_file_to_container_datalake(f"covid-data-{previous_date}.json", config["AZURE_DL_CONTAINER_NAME"])
Enter fullscreen mode Exit fullscreen mode

2. Create docker image for our Python ELT

We are going to build a docker image for our ELT job and run it inside the container. So, let's create a Dockerfile, which describes how a docker image is built. You can see a list of instructions below.

FROM python:3.9.12-buster

WORKDIR /usr/src/app

COPY requirements.txt /usr/src/app
RUN pip install -r requirements.txt

COPY config.yaml /usr/src/app

COPY el2datalake.py /usr/src/app
RUN chmod a+x el2datalake.py

CMD ["./el2datalake.py"]
Enter fullscreen mode Exit fullscreen mode

We can build the docker image using docker build command

$ docker build -t el2datalakejob .
Enter fullscreen mode Exit fullscreen mode

Now we should run our ELT inside the container

$ docker run -it el2datalakejob:latest
Enter fullscreen mode Exit fullscreen mode

3. Push docker images to Azure Container Registry

Azure Container Registry handles private Docker container images and allow us to build, store, and manage container images. We are going to deploy an ACR instance and push a docker image to it.

To create an any instance on Azure, we must create a resource group. We create a new resource group with az group create command.

$ az group create --name myResourceGroup --location westeurope
Enter fullscreen mode Exit fullscreen mode

Once we have a resource group, we can create an Azure Container Registry with az acr create command.

$ az acr create \
  --resource-group myResourceGroup \
  --name azcrjobs \
  --sku Basic \
  --location westeurope
Enter fullscreen mode Exit fullscreen mode

Let's login on Azure Container Registry

$ az acr login --name azcrjobs
Enter fullscreen mode Exit fullscreen mode

Let us tag the image to the login server azcrjobs.azurecr.io

$ docker tag el2datalakejob \
azcrjobs.azurecr.io/el2datalakejob:v1
Enter fullscreen mode Exit fullscreen mode

Push the Docker image to ACR

$ docker push azcrjobs.azurecr.io/el2datalakejob:v1
Enter fullscreen mode Exit fullscreen mode

Now we have our ELT on Azure Container Registry let's move on to the next step.

4. Create and Deploy CronJobs on Azure Kubernetes Service

Azure Kubernetes Service (AKS) deploy and manage containerized applications more easily with a fully managed Kubernetes service. Let’s create an AKS cluster with az aks create command.

$ az aks create \
  --resource-group myResourceGroup \
  --name az-aks-jobs \
  --node-count 1 \
  --attach-acr azcrjobs \
  --location westeurope
Enter fullscreen mode Exit fullscreen mode

To connect to the cluster from local machine we use Kubernetes client kubectl, open the terminal to connect to the cluster

$ az aks get-credentials --resource-group myResourceGroup \ --name az-aks-jobs
Enter fullscreen mode Exit fullscreen mode

Let us see our node available on AKS

$ kubectl get nodes
Enter fullscreen mode Exit fullscreen mode

We start from creating a manifest file for our ELT cron job.

apiVersion: batch/v1
kind: CronJob
metadata:
  creationTimestamp: null
  name: k8sjob
spec:
  jobTemplate:
    metadata:
      creationTimestamp: null
      name: k8sjob
    spec:
      template:
        metadata:
          creationTimestamp: null
        spec:
          containers:
          - image: azcrjobs.azurecr.io/el2datalakejob:v1
            imagePullPolicy: IfNotPresent
            name: k8sjob
            resources: {}
          restartPolicy: OnFailure
  schedule: '55 23 * * *'
status: {}
Enter fullscreen mode Exit fullscreen mode

Above on our manifest file we defined the crontab expression used as a schedule for our job, and is scheduled to run everyday at 23:55. We put the name of the docker image to be pulled from container registry attached to cluster.

To deploy our job, we will use the kubectl apply command.

kubectl apply -f job.yml
Enter fullscreen mode Exit fullscreen mode

We can view some details about the job with

kubectl get cronjobs
Enter fullscreen mode Exit fullscreen mode

To retrieve cron job logs from Kubernetes, we can use kubectl logs command, but first we must get the pod name.

$ kubectl get pods
NAME                       READY   STATUS      RESTARTS   AGE
k8sjob-27513350--1-xnj8x   0/1     Completed   0          4m2s
Enter fullscreen mode Exit fullscreen mode

Retrieve cron job logs from Kubernetes

$ kubectl logs k8sjob-27513350--1-xnj8x
Enter fullscreen mode Exit fullscreen mode

Conclusion

Finally, we have the first stage of our project completed. Now we have Covid data on Azure Data Lake. For the next step, we will read this file from Azure Data Lake and perform a little processing of this data using Apache Spark on Azure Databricks, and we will be able to make the result of this processing available in a Data Warehouse.

Top comments (0)