DEV Community

Ng'ang'a Njongo
Ng'ang'a Njongo

Posted on

Building Your First Airflow DAG: Extracting Stock Data with Massive

When stepping into the world of data engineering, Apache Airflow is likely one of the first tools you will encounter. It is the industry standard for programmatically authoring, scheduling, and monitoring workflows.

Understanding the New Airflow 3.1.0 Syntax

Before building our first DAG, it's important to know what has changed in Airflow 3.1.0.

Initially, Airflow users imported DAGs and tasks from airflow.models and airflow.decorators. In Airflow 3.0 and later, versions introduced the airflow.sdk

This means, you will now use:

from airflow.sdk import dag, task
Enter fullscreen mode Exit fullscreen mode

This ensures your DAGs remain compatible with future Airflow upgrades

The Pipeline Overview

Our data pipeline consists of two main tasks:
1 Extract: Fetch the daily open, high, low, close, and volume data for a list of stock tickers from the Massive API.
2 Transform and Load: Convert the raw data into a structured format, connect to a PostgreSQL database, create the target table if it does not exist, and insert the records.

Step 1: Setting Up Dependencies and Configuration

First, we import the necessary libraries. We need the RESTClient from the massive Python package to interact with the Massive API. We also import pandas for data manipulation, psycopg2 for PostgreSQL database connectivity, and the new dag and task decorators from airflow.sdk.

from massive import RESTClient 
from datetime import datetime, timedelta
import pandas as pd
import psycopg2 
from psycopg2 import sql
from psycopg2 import extras
from airflow.sdk import dag, task

# Database Configuration:
DB_HOST = 'my_host'
DB_NAME = 'my_db'
DB_USER = 'my_user'
DB_PASSWORD = 'my_password'
DB_PORT = '1234'
TARGET_TABLE = 'my_table'

@dag(
   dag_id="nganga_massive_tickers",
   start_date=datetime(2026,4,24),
   schedule=timedelta(days=1),
   catchup=False
)
Enter fullscreen mode Exit fullscreen mode

Step 2: Extracting Data from Massive

The first task in our DAG is to extract the stock data. We define a function extract_tickers and decorate it with @task to register it as an Airflow task.

def nganga_massive_tickers():

    #Extract from Massive
    @task()
    def extract_tickers():

        #Extract from Massive
        client = RESTClient("my_api_key")
        symbols = ["AAPL","GOOGL","TSLA","NFLX","AMZN"]
        data_list = []
        yesterday = datetime.today() - timedelta(days=2)
        yesterday_formatted = yesterday.strftime('%Y-%m-%d')

        for ticker in symbols:
            try:
                request = client.get_daily_open_close_agg(ticker,yesterday_formatted,adjusted="true",)
                data_list.append({
                    "ticker": ticker,
                    "open": request.open,
                    "high": request.high,
                    "low": request.low,
                    "close": request.close,
                    "volume": request.volume,
                    "ticker_date": yesterday_formatted
                })

            except Exception as e:
                print(f"Error fetching tickers: {e}")

        return data_list
Enter fullscreen mode Exit fullscreen mode

In this function, we call the RESTClient with our API key. We then loop through a list of popular chosen stocks. For each stock, we call get_daily_open_close_agg function to get the stock data for a specific date. The results are appended to a list of dictionaries, which is returned by the task.

Step 3: Transforming and Loading the Data

The second task takes the raw data extracted in the previous step, transforms it, and loads it into PostgreSQL that is hosted on Aiven.

#Transform and load Tickers
    @task()
    def transform_load_tickers(raw_tickers):

        #Transform data_list to a DataFrame
        data_df = pd.DataFrame(raw_tickers)

        data_df["volume"] = data_df["volume"].round(2)

        #Connect to Postgres 
        conn = psycopg2.connect(
                host = DB_HOST,
                dbname = DB_NAME,
                user = DB_USER,
                password = DB_PASSWORD,
                port = DB_PORT
                )

        cur = conn.cursor()

        try:
            print(f"Successfully connected to database: {DB_NAME} with user: {DB_USER}")

        except psycopg2.Error as e:
            print(f"Error in connection: {e}")


        #Create table on Postgres 
        ##data_df.columns.to_list

        create_table = sql.SQL("""
        CREATE TABLE IF NOT EXISTS {table} (                       
            ticker VARCHAR(10), 
            open NUMERIC(10,2), 
            high NUMERIC(10,2), 
            low NUMERIC(10,2), 
            close NUMERIC(10,2),
            volume NUMERIC(10,2),
            ticker_date DATE, 
            load_date TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP); 
        """).format(table = sql.Identifier(TARGET_TABLE))

        try:
            cur.execute(create_table)
            conn.commit()
            print(f"Successfully created table: {TARGET_TABLE} or already exists")

        except psycopg2.Error as e:
            print(f"Error in syntax: {e}")


        #Load data to Postrgres 
        ##data_df.columns.to_list

        columns_to_load = ['ticker', 'open', 'high', 'low', 'close', 'volume','ticker_date']
        data_df = data_df[columns_to_load]

        values_to_load = [tuple(row) for row in data_df.values]

        insert_query = sql.SQL("""
                    INSERT INTO {table} ({columns})
                    VALUES %s
        """).format(table = sql.Identifier(TARGET_TABLE),
                    columns = sql.SQL(', ').join(map(sql.Identifier,columns_to_load)))

        try:
            extras.execute_values(cur, insert_query, values_to_load)
            conn.commit()
            print(f"Sucessfully inserted {len(values_to_load)} rows in table: {TARGET_TABLE}")

        except psycopg2.Error as e:
            conn.rollback()
            print(f"Check syntax error: {e}")
    #Define Task Dependencies
    extracted = extract_tickers()

    Loaded = transform_load_tickers(extracted)
dag = nganga_massive_tickers()
Enter fullscreen mode Exit fullscreen mode

Here, we convert the list of dictionaries into a Pandas DataFrame. We then connect to the PostgreSQL database using psycopg2.

We execute a CREATE TABLE IF NOT EXISTS statement to ensure our destination table is ready. Finally, we use psycopg2.extras.execute_values to insert the DataFrame rows into the database.

Step 4: Defining the DAG and Task Dependencies

The final step, as shown above, ties everything together into a DAG and define the execution order of our tasks.

    #Define Task Dependencies
    extracted = extract_tickers()

    Loaded = transform_load_tickers(extracted)
dag = nganga_massive_tickers()
Enter fullscreen mode Exit fullscreen mode

Conclusion

Congratulations! You have just built a modern data pipeline using Apache Airflow 3.1.0. By leveraging the new airflow.sdk, your DAG is future-proofed against core architectural changes. You also learned how to interact with the Massive API to pull financial data and load it into a relational database. .

Top comments (0)