In the following lines I am doing a write-up about everything I learned about data pipelines at the Udacity online class. It gives a general overview about data pipelines and provides also the core concepts of Airflow and some links to code examples on github.
WHAT - A series
A data pipeline is a series of steps in which data is processed, mostly ETL or ELT.
Data pipelines provide a set of logical guidelines and a common set of terminology.
The conceptual framework of data pipelines will help you better organize and execute everyday data engineering tasks.
Examples of use cases are automate marketing emails, real time pricing or targeted advertising based on the browsing history.
WHY - Data Quality
We want to provide high quality data.
There can be different requirements how to measure data quality based on the use case. For example:
- Data must be a certain size
- Data must be accurate to some margin of error
- Data must arrive within a given timeframe fro the start of the execution
- Pipelines must run on a particular schedule
- Data must not contain any sensitive information
Data Validation
is the process of ensuring that data is present, correct and meaningful. Ensuring the quality of your data through automated validation checks is a critical step when working with data.
Data Lineage
of a dataset describes the discrete steps involved in the creation, movement and calculation of a dataset. It is important for the following points:
Gain Confidence
If we can describe the data lineage of a dataset or analysis is building confidence in our data consumers like Engineers, Analyst, Data Scientists, Stakeholders.
Else if the data lineage is unclear it is very likely that our data consumers do not trust or want to use the data.
Defining Metrics
If we can surface data lineage, everyone in the company is able to agree on the definition of how a particular metric is calculated.
Debugging
If each step of the data movement and transformation process is well described, it's easy to find problems if they occur.
Airflow DAGs are a natural representation for the movement and transformation of data. The components can be used to track data lineage: the rendered code tab for a task, the graph view for a DAG, historical runs under the tree view.
Schedules
allow us to make assumption about the scope of the data. The scope of a pipeline run can be defined as the time of the current execution until the end of the last execution.
Schedules improve data quality by limiting our analysis to relevant data to a time period. If we use schedules appropriately, they are also a form of data partitioning, which can increase the speed of our pipeline runs.
With the help of schedules we also can leverage already completed work. For example we only would need the aggregation of the current month and add it to the existing totals instead of aggregating data of all times.
How to schedule
If we answer the below questions, we can find an appropriate schedule for our pipelines.
- What is the average size of the data for a time period? The more data we have, the more often the pipeline needs to be scheduled
- How frequently is data arriving and how often do we need to perform analysis? If the company needs data on a daily basis, that is the driving factor in determining the schedule.
- What is the frequency on related datasets? A rule of thumb is that the frequency of a pipeline's schedule should be determined by the dataset in our pipeline, that requires the most frequent analysis.
Data Partitioning
This is the process of isolating data to be analyzed by one or more attributes, such as time - schedule partitioning, conceptually related data into discrete groups - logical partitioning, data size - size partitioning or location.
This will lead to faster and more reliable pipelines. As smaller datasets, time periods and related concepts are easier to debug than big amounts of data and unrelated concepts. There will also be fewer dependencies.
Tasks operating on partitioned data my be more easy parallelized.
HOW does a pipeline work
DAGs - Directed Acyclic Graphs
A DAG is a collection of nodes and edges that describe the order of operations for a data pipeline.
The conceptual framework of data pipelines help us to better organize and execute everyday data engineering tasks.
NODE
A node is a step in a data pipeline process.
EDGE
The dependencies or relationships other between nodes.
GRAPH
A graph describes entities and relationships between the DAGS
In real world it is possible to model a data pipeline that is not a DAG, meaning it contains a cycle within the process. But the majority of pipelines can be described as a DAG. This makes the code more understandable and maintainable.
Apache Airflow
is an open-source DAG-based, schedulable, data-pipeline tool that can run mission-critical environments.
It is not a data processing framework, it is a tool that coordinates the movement between other data stores and data processing tools.
Airflow allows users to write DAGs in Python that run on a schedule and/or from an external trigger.
The advantage of defining pipelines in code are:
- maintainability
- versionable
- testable
- collaborative
Airflow is simple to maintain and can run data analysis itself, trigger external tools (Redshift, Spark, etc). It also provides a web-based UI for users to visuzalize and interact with their data pipelines.
Components of Airflow
A Scheduler for orchestrating the execution of jobs on a trigger or schedule. A Work Queue which holds the state of the running DAGs and Tasks.
Worker Processes that execute the operations defined in each DAG. A Database which saves credentials, connections, history and configuration.
A Web Interface that provides a control dashboard for users and maintainers.
How it works
The scheduler starts a DAG based on time or external triggers.
If a DAG is started, the scheduler looks at the steps within the DAG and determines which steps can run by looking at their dependencies.
The scheduler places runnable steps in the queue.
Workers pick up those tasks and run them
Once the worker has finished running a step, the final status of the task is recorded and additional tasks are placed by the scheduler until all tasks are complete.
Once all tasks have been completed, the DAG is complete.
Creating a DAG
To create a DAG you need a name, a description, a start data and an interval.
from airflow import DAG
my_first_dag = DAG(
'my_first',
description='Says hello world',
start_date=datetime(2022, 1, 22),
schedule_interval='@daily')
If the start date is in the past, Airflow will run your DAG as many times as
there are schedule intervals between the. start date and the current date. This is called backfill. If a company has established years of data that may need to be retroactively analyzed, this is useful.
Schedule intervals are optional and can be defined with cron strings or Airflow presets, like @once
, @hourly
, @daily
, @weekly
, @monthly
, @yearly
or None.
End date is optional, if it is not specified, the DAG will run until it is disabled or deleted. An end date might be useful to mark the end of life or handling data bounds by two points in time.
Operators
define the atomic steps of a work that make up a DAG. Instantiated operators are referred to as Tasks.
Airflow comes with many operators that can perform common operations, like S3ToRedshiftOperator
or SimpleHttpOperator
.
Task dependencies can be described programmatically using a >> b
or a.set_downstream(b)
, telling a comes before b or
a << b
or a.set_upstream(b)
, telling a comes after b
.
from airflow.operators.python_operator import PythonOperator
def hello_world():
print('Hello World')
def second_step():
print('Second Step')
my_first_sample_task = PythonOperator(
task_id='hello_world'
python_callable=hello_world,
dag=my_first_dag)
second_step_task = PythonOperator(
task_id='second_step',
python_callable=second_step,
dag=my_first_dag)
my_first_sample_task >> second_step_task
Task Boundaries
DAG tasks should be
- atomic and have a single, well defined purpose. The more work a task performs the less clear becomes its purpose. So it will be easy to maintain, understand and run fast.
Write programs that do one thing and do it well.
- Ken Thompson’s Unix Philosophy
- maximize parallelism, if a task is scoped properly we can minimize dependencies and enable parallelism. This parallelization can speed up the execution of DAGs.
We also can create custom operators as plugins. One example of a custom operator can be a certain data quality check, that is needed more often.
To create a custom operator we have to:
- Identify operators that perform similar functions and can be consolidated
- Define a new operator in the plugins folder
- Replace the original operators with your new custom one, re-parameterize, and instantiate them.
You can find a sample for custom operators here.
SubDAGs
Commonly repeated series of tasks within DAGs can be captured as reusable SubDAGs. An example would be the "S3ToRedshiftSubDag"
Advantages
- decrease the amount of code we need to write and maintain to create a new DAG
- easier to understand the high level goals of a DAG
- bug fixes, speedups, and other enhancements can be made more quickly and distributed to all DAGs that use that SubDAG #### Disadvantages
- limited visibility within the AirflowUI
- harder to understand because of the abstraction level
If you want you can also use nested subDAGs, but keep in mind that it makes it much harder to understand and maintain.
Hooks
Connections can be accessed in code via hooks. Hooks provide a reusable interface to external systems and databases. Airflow comes with many hooks, like HttpHook
, PostgresHook
, SlackHook
etc. We don't have to worry how and where to store connection strings and secrets. You can store those in the Airflow user interface under Admin - Variables
.
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator
def load():
# Create a PostgresHook option using the 'demo' connection
db_hook = PostgresHook('demo')
df = db_hook.get_pandas_df('SELECT * FROM my_sample')
print(f'your sample has {len(df)} records')
load_task = PythonOperator(task_id='load_sample_data', python_callable=hello_world, ...)
Like for operators we also can create custom hooks.
Before creating a new plugin you might want to check Airflow contrib to see if there was already a plugin created by community members for your needs. If not you can build one and contribute it to the community.
Runtime variables
Another feature is that Airflow provides runtime variables that can be used. One example is the {{ execution_date }}
, the execution date.
def hello_date(*args, **kwargs):
print(f“Hello {kwargs[‘execution_date’]}”)
divvy_dag = DAG(...)
task = PythonOperator(
task_id=’hello_date’,
python_callable=hello_date,
provide_context=True,
dag=my_first_dag)
Monitoring
DAGs can be configured to have a SLA - Service Level Agreement, which is defined as a time by which a DAG must complete.
We can email a list of missed SLAs or view it in the AirflowUI. Missed SLAs can also be early indicators of performance problems or inidicate that we need to scale up the size of your Airflow cluster.
If you are working on a time sensitive application an SLA would be crucial.
Airflow can be configured to send emails on DAG and task state changes. These state changes may include successes, failures, or retries. Failure emails can allow you to easily trigger alerts. It is common for alerting systems to accept emails as a source of alerts.
Metrics
Airflow comes out of the box with the ability to send system metrics using a metrics aggregator called statsd
. Statsd can be coupled with metrics visualization tools like Grafana to provide you and your team high level insights into the overall performance of your DAGs, jobs, and tasks. These systems can be integrated into your alerting system. These Airflow system-level metrics allow you and your team to stay ahead of issues before they even occur by watching long-term trends.
You can find code samples to all of the above mentioned topics here.
Top comments (0)