DEV Community

Cover image for Data Engineering 101: Writing Your First Pipeline
SeattleDataGuy
SeattleDataGuy

Posted on

Data Engineering 101: Writing Your First Pipeline

One of the main roles of a data engineer can be summed up as getting data from point A to point B.

We often need to pull data out of one system and insert it into another. This could be for various purposes. This includes analytics, integrations, and machine learning.

But in order to get that data moving, we need to use what are known as ETLs/Data pipelines.

These are processes that pipe data from one data system to another.

One question we need to answer as data engineers is how often do we need this data to be updated. This is where the question about batch vs. stream comes into play. These are the two main types of ETLs/ELTs that exist.


Batch vs. Stream

For a very long time, almost every data pipeline was what we consider a batch pipeline. This means that the pipeline usually runs once per day, hour, week, etc. There's some specific time interval, but the data is not live.

Batch jobs refers to the data being loading in chunks or batches rather than right away. Thus the term batch jobs as the data is loaded in batches.

Compare this to streaming data where as soon as a new row is added into the application database it's passed along into the analytical system.

This is usually done using various forms of Pub/Sub or event bus type models. All these systems allow for transactional data to be passed along almost as soon as the transaction occurs.

Some might ask why we don't just use streaming for everything. Isn't it better to have live data all the time?

In some regard this is true. But oftentimes creating streaming systems is technically more challenging, and maintaining it is also difficult.

Whereas while batch jobs run at normal intervals could fail, they don't need to be fixed right away because they often have a few hours or days before they run again.

In comparison, a streaming system is live all the time. Failures and bugs need to be fixed as soon as possible.

For now, we're going to focus on developing what are traditionally more batch jobs.

Besides picking your overall paradigm for your ETL, you will need to decide on your ETL tool.


ETL Tool Options

If you just want to get to the coding section, feel free to skip to the section below.

But we can't get too far in developing data pipelines without referencing a few options your data team has to work with.

There are plenty of data pipeline and workflow automation tools.

Let's break them down into two specific options.

Drag and drop vs. frameworks.

Drag and drop options offer you the ability to know almost nothing about code --- this would be like SSIS and Informatica.

These are great for people who require almost no custom code to be implemented.

Although many of these tools offer custom code to be added, it kind of defeats the purpose.

If your team is able to write code, we find it more beneficial to write pipelines using frameworks as they often allow for better tuning. Although Informatica is pretty powerful and does a lot of heavy lifting as long as you can foot the bill.

Even so, many people rely on code-based frameworks for their ETLs (some companies like Airbnb and Spotify have developed their own).

These frameworks are often implemented in Python and are called Airflow and Luigi. Both of these frameworks can be used as workflows and offer various benefits.

But for now, let's look at what it's like building a basic pipeline in Airflow and Luigi.

In later posts, we will talk more about design. But for now, we're just demoing how to write ETL pipelines.


Pipelines in Airflow

In order to make pipelines in Airflow, there are several specific configurations that you need to set up.

There is a set of arguments you want to set, and then you will also need to call out the actual DAG you are creating with those default args.

See the config below.

default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
  #With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
 #upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}


dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

This is just the base of your DAG.

You can set things like how often you run the actual data pipeline --- like if you want to run your schedule daily, then use the following code parameters. For example, you can useschedule_interval='@daily'. Or you can use cron instead, like this: schedule_interval='0 0 * * *'.

Once you have set up your baseline configuration, then you can start to put together the operators for Airflow.

Operators are essentially the isolated tasks you want to be done. This could be extracting data, moving a file, running some data transformation, etc.

For example, if you look below we are using several operators. These include the PythonOperator and BashOperator.

This allows you to run commands in Python or bash and create dependencies between said tasks.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'default_user',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': True,
  #With this set to true, the pipeline won't run if the previous day failed
'email': ['info@example.com'],
'email_on_failure': True,
 #upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}

dag = DAG(
'basic_dag_1',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

def my_func():
    print('Hello from my_func')


bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

dummy_task  = DummyOperator(task_id='dummy_task', retries=3)

python_task = PythonOperator(task_id='python_task', python_callable=my_func)

dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

We do go a little more in-depth on Airflow pipelines here. But this is the general gist of it.

You can continue to create more tasks or develop abstractions to help manage the complexity of the pipeline. But the usage above of the Airflow operators is a great introduction.

Now onto Luigi.


Pipelines in Luigi

Luigi is another workflow framework that can be used to develop pipelines. In some ways, we find it simpler, and in other ways, it can quickly become more complex.

The reason we personally find Luigi simpler is because it breaks the main tasks into three main steps.

These can be seen in what Luigi defines as a "Task."

Task

Within a Luigi Task, the class three functions that are the most utilized are requires(), run(), and output().

What do each of these functions do in Luigi?

The requires() is similar to the dependencies in airflow.

You are essentially referencing a previous task class, a file output, or other output.

For example:


import luigi

class BasicTask(luigi.Task):

  def requires(self): 
    [FileExistsTask(self.input_filepath)]

In this case, the requires function is waiting for a file to land.

But it could also wait for a task to finish or some other output.
Not every task needs a requires function. But it can be used to reference a previous task that needs to be finished in order for the current task to start.

But tasks do need the run() function. The run() function is essentially the actual task itself. What do you want to get done?
For example:

class LoadTask(luigi.Task):

file_path = luigi.Parameter()
    def run(self):
        cnx = mysql.connector.connect(user='joe', database='test')
        stmt = "insert into basic_date(col1,col2,col3)  select distinct col1, col2, col3 from table1" 
        curs=cnx.cursor()
        curs.execute(stmt)
        curs.commit()
        curs.close()
        with self.output().open('w') as out_file:
            print >> out_file, strDate1, strDate2
    def output(self):
            return luigi.file.LocalTarget(path='/tmp/123')

The output of a task is a target, which can be a file on the local filesystem, a file on Amazon's S3, some piece of data in a database, etc.

You can see the slight difference between the two pipeline frameworks. Airflow is wrapped up in one specific operator whereas Luigi is developed as a larger class.

At the end of the day, this slight difference can lead to a lot of design changes in your pipeline.


So Which Do You Pick?

Personally, we enjoy Airflow due to a larger community. However, in many ways, Luigi can have a slightly lower bar to entry as far as figuring it out. There aren't a lot of different operators that can be used. Instead, you decide what each task really does.

This can allow a little more freedom but also a lot more thinking through for design and development.

So in the end, you will have to pick what you want to deal with. Regardless of the framework you pick, there will always be bugs in your code.

Good luck, and thanks for reading!

Data Engineering 101: An Introduction To Data Engineering

What Are The Different Kinds Of Cloud Computing

4 Simple Python Ideas To Automate Your Workflow

4 Must Have Skills For Data Scientists

SQL Best Practices --- Designing An ETL Video

5 Great Libraries To Manage Big Data With Python

Joining Data in DynamoDB and S3 for Live Ad Hoc Analysis

Top comments (2)

Collapse
 
stanbright profile image
Stan Bright

Thanks for sharing your experience, mate. I will get your post featured on the Awesome Python Weekly. Keep up the good work!

Collapse
 
seattledataguy profile image
SeattleDataGuy

Thank you!