DEV Community

Cover image for Automating Cryptocurrency Data with Python, Apache Airflow and PostgreSQL
Milcah03
Milcah03

Posted on

Automating Cryptocurrency Data with Python, Apache Airflow and PostgreSQL

Introduction
As part of mastering Apache Airflow, here is a documentation of a project where I extracted BTC price data from Polygonio. In this article, we'll walk through how to use Python to automatically retrieve cryptocurrency price data from an API, process it, and store it in a PostgreSQL database for later analysis.

Problem definition
Often, we need to access accurate and up-to-date cryptocurrency prices for specific dates, especially for backtesting strategies, conducting financial analysis, or building data-driven applications. Manually collecting this data from different platforms can be time-consuming and error-prone.

Solution
Automating this process allows us to consistently pull fresh data and integrate it into our systems for real-time or historical analysis.
I used Python to Query the Polygon.io API to fetch historical cryptocurrency data for a specific date. I then transformed the data and stored it in a PostgreSQL database. Automate the process to run regularly with the help of environment variables and secure configurations.

Step by step

1. Importing Necessary Libraries

import requests
import json
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime
Enter fullscreen mode Exit fullscreen mode

2. Extraction

url='https://api.polygon.io/v1/open-close/crypto/BTC/USD/2025-03-31?adjusted=true&apiKey=YOUR_API_KEY'

response = requests.get(url)
Enter fullscreen mode Exit fullscreen mode

3. Parsing the JSON Data

if response.status_code == 200:
  data = response.json()
  open_price = data.get('open')
  close_price = data.get('close')
  date = data.get('day')

  symbol = data.get('symbol')
Enter fullscreen mode Exit fullscreen mode

4. Preparing the Data for Insertion into PostgreSQL

data_df = {
  'symbol': symbol,
  'open_price': open_price,
  'close_price': close_price,
  'date': date
}

df = pd.DataFrame(data_df, index=[0])
df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')
Enter fullscreen mode Exit fullscreen mode

5. Storing the Data in PostgreSQL


load_dotenv()

dbname = os.getenv('dbname')
user = os.getenv('user')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')

engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{dbname}')


try:
    df.to_sql("crypto_prices", con=engine, if_exists="append", index=False, schema="dataengineering")
    print(f"Successfully loaded crypto data for {df[date]}")
except Exception as e:
    print(f"Error: {e}")

Enter fullscreen mode Exit fullscreen mode

CREATING A DAG
I created a DAG for orchestration and automation and scheduled it to run daily. Here is the step-by-step guide on how I did it.

1. Importing Required Libraries

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
Enter fullscreen mode Exit fullscreen mode

2. Defining Default Arguments

default_args ={
    "owner": "milcah",
    "depends_on_past": False,
    "start_date": datetime(2025, 3, 31),
    "email": ["milcahredempter03@gmail.com"],
    "email_on_failure": False,
    "email_on_retry": True,
    "retries": 2,
    "retry_delay": timedelta(minutes=2)
}
Enter fullscreen mode Exit fullscreen mode

3. Creating the DAG

with DAG(
    'btc_data',
    default_args=default_args,
    schedule_interval='@daily',
) as dag:
Enter fullscreen mode Exit fullscreen mode

4. Defining the Tasks

Task 1: Activate Virtual Environment

activate_venv = BashOperator(
    task_id='activate_virtual_env',
    bash_command='source /home/user/project/cbk_daily/venv/bin/activate',
)
Enter fullscreen mode Exit fullscreen mode

Task 2: Execute Python Script

execute_file = BashOperator(
    task_id='execute_python_file',
    bash_command='python /home/user/project/crypto_price/app_data.py',
)
Enter fullscreen mode Exit fullscreen mode

5. Defining Task Order

activate_venv >> execute_file
Enter fullscreen mode Exit fullscreen mode

After creating the DAG, I accessed the airflow UI, triggered it, and then accessed DBeaver, where I queried the table.

Image description

Conclusion
The project adopts the best practices in Apache Airflow, like keeping the DAGs simple, hiding sensitive data, and defining the correct arguments. Also, most of the installation was done in a virtual environment to avoid overlapping dependencies.

Top comments (0)