When stepping into the world of data engineering, Apache Airflow is likely one of the first tools you will encounter. It is the industry standard for programmatically authoring, scheduling, and monitoring workflows.
Understanding the New Airflow 3.1.0 Syntax
Before building our first DAG, it's important to know what has changed in Airflow 3.1.0.
Initially, Airflow users imported DAGs and tasks from airflow.models and airflow.decorators. In Airflow 3.0 and later, versions introduced the airflow.sdk
This means, you will now use:
from airflow.sdk import dag, task
This ensures your DAGs remain compatible with future Airflow upgrades
The Pipeline Overview
Our data pipeline consists of two main tasks:
1 Extract: Fetch the daily open, high, low, close, and volume data for a list of stock tickers from the Massive API.
2 Transform and Load: Convert the raw data into a structured format, connect to a PostgreSQL database, create the target table if it does not exist, and insert the records.
Step 1: Setting Up Dependencies and Configuration
First, we import the necessary libraries. We need the RESTClient from the massive Python package to interact with the Massive API. We also import pandas for data manipulation, psycopg2 for PostgreSQL database connectivity, and the new dag and task decorators from airflow.sdk.
from massive import RESTClient
from datetime import datetime, timedelta
import pandas as pd
import psycopg2
from psycopg2 import sql
from psycopg2 import extras
from airflow.sdk import dag, task
# Database Configuration:
DB_HOST = 'my_host'
DB_NAME = 'my_db'
DB_USER = 'my_user'
DB_PASSWORD = 'my_password'
DB_PORT = '1234'
TARGET_TABLE = 'my_table'
@dag(
dag_id="nganga_massive_tickers",
start_date=datetime(2026,4,24),
schedule=timedelta(days=1),
catchup=False
)
Step 2: Extracting Data from Massive
The first task in our DAG is to extract the stock data. We define a function extract_tickers and decorate it with @task to register it as an Airflow task.
def nganga_massive_tickers():
#Extract from Massive
@task()
def extract_tickers():
#Extract from Massive
client = RESTClient("my_api_key")
symbols = ["AAPL","GOOGL","TSLA","NFLX","AMZN"]
data_list = []
yesterday = datetime.today() - timedelta(days=2)
yesterday_formatted = yesterday.strftime('%Y-%m-%d')
for ticker in symbols:
try:
request = client.get_daily_open_close_agg(ticker,yesterday_formatted,adjusted="true",)
data_list.append({
"ticker": ticker,
"open": request.open,
"high": request.high,
"low": request.low,
"close": request.close,
"volume": request.volume,
"ticker_date": yesterday_formatted
})
except Exception as e:
print(f"Error fetching tickers: {e}")
return data_list
In this function, we call the RESTClient with our API key. We then loop through a list of popular chosen stocks. For each stock, we call get_daily_open_close_agg function to get the stock data for a specific date. The results are appended to a list of dictionaries, which is returned by the task.
Step 3: Transforming and Loading the Data
The second task takes the raw data extracted in the previous step, transforms it, and loads it into PostgreSQL that is hosted on Aiven.
#Transform and load Tickers
@task()
def transform_load_tickers(raw_tickers):
#Transform data_list to a DataFrame
data_df = pd.DataFrame(raw_tickers)
data_df["volume"] = data_df["volume"].round(2)
#Connect to Postgres
conn = psycopg2.connect(
host = DB_HOST,
dbname = DB_NAME,
user = DB_USER,
password = DB_PASSWORD,
port = DB_PORT
)
cur = conn.cursor()
try:
print(f"Successfully connected to database: {DB_NAME} with user: {DB_USER}")
except psycopg2.Error as e:
print(f"Error in connection: {e}")
#Create table on Postgres
##data_df.columns.to_list
create_table = sql.SQL("""
CREATE TABLE IF NOT EXISTS {table} (
ticker VARCHAR(10),
open NUMERIC(10,2),
high NUMERIC(10,2),
low NUMERIC(10,2),
close NUMERIC(10,2),
volume NUMERIC(10,2),
ticker_date DATE,
load_date TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP);
""").format(table = sql.Identifier(TARGET_TABLE))
try:
cur.execute(create_table)
conn.commit()
print(f"Successfully created table: {TARGET_TABLE} or already exists")
except psycopg2.Error as e:
print(f"Error in syntax: {e}")
#Load data to Postrgres
##data_df.columns.to_list
columns_to_load = ['ticker', 'open', 'high', 'low', 'close', 'volume','ticker_date']
data_df = data_df[columns_to_load]
values_to_load = [tuple(row) for row in data_df.values]
insert_query = sql.SQL("""
INSERT INTO {table} ({columns})
VALUES %s
""").format(table = sql.Identifier(TARGET_TABLE),
columns = sql.SQL(', ').join(map(sql.Identifier,columns_to_load)))
try:
extras.execute_values(cur, insert_query, values_to_load)
conn.commit()
print(f"Sucessfully inserted {len(values_to_load)} rows in table: {TARGET_TABLE}")
except psycopg2.Error as e:
conn.rollback()
print(f"Check syntax error: {e}")
#Define Task Dependencies
extracted = extract_tickers()
Loaded = transform_load_tickers(extracted)
dag = nganga_massive_tickers()
Here, we convert the list of dictionaries into a Pandas DataFrame. We then connect to the PostgreSQL database using psycopg2.
We execute a CREATE TABLE IF NOT EXISTS statement to ensure our destination table is ready. Finally, we use psycopg2.extras.execute_values to insert the DataFrame rows into the database.
Step 4: Defining the DAG and Task Dependencies
The final step, as shown above, ties everything together into a DAG and define the execution order of our tasks.
#Define Task Dependencies
extracted = extract_tickers()
Loaded = transform_load_tickers(extracted)
dag = nganga_massive_tickers()
Conclusion
Congratulations! You have just built a modern data pipeline using Apache Airflow 3.1.0. By leveraging the new airflow.sdk, your DAG is future-proofed against core architectural changes. You also learned how to interact with the Massive API to pull financial data and load it into a relational database. .
Top comments (0)