DEV Community

CincyBC
CincyBC

Posted on

Scraper Function to Airflow DAG

We have a self-created Python pipeline, and we can set up a cron job to run the script on a daily basis for whenever we want. Back when I started engineering pipelines at work, I was using Django/Celery/Redis, but at home I was using cron. I had some, but limited visibility into the jobs (mostly in logs) and decided I needed a better tool; that's where Airflow came in.

Each workflow is captured in a Directed Acyclic Graph (DAG) of tasks that are performed as you have laid out (not recursively). Some people have called Airflow "cron on steroids," but it's really much more than that. You'll see why it's more than that in a moment, but it's true that at its core, Airflow is a tool to schedule workflows on a cron based schedule (there are some aliases like @daily for everyday at midnight instead of 0 0 * * * and @hourly for 0 * * * *).

The classic, verbose way to set up a DAG is like this:

# From the Airflow tutorial
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
with DAG(
    "tutorial_dag",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={"retries": 2},
    description="DAG tutorial",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
Enter fullscreen mode Exit fullscreen mode

This will create a new workflow with ID "tutorial_dag" in the UI. The tooltip and description at the top of the screen on the DAG view will say "DAG tutorial." If it fails, it'll retry 2x before showing up as failed (something cron doesn't do!). The schedule or schedule_interval is set to None, so this workflow won't run on any schedule and can only be run manually. You could change that parameter to @hourly if you wanted it to be run every hour or 15 8 * * * if you wanted it to be run everyday at 8:15 UTC. The start_date is important because your workflow won't run before that date (set to 2021-01-01 here) and if you set catchup=True it'll run every run between your start_date and the latest run depending on your schedule; so be careful! tags are something you can filter on in the UI to find your DAGs quicker.

That's it! Of course, there are lots more knobs you can turn, but when you're just getting started you really just need dag_id, start_date, and schedule_interval.

How do you pass in a task? That's also super easy. Using the simple scraping script I wrote first, you would just do the following:

from airflow import DAG
from airflow.decorators import task
import pendulum
with DAG(
    dag_id='Functional_Sprott_Scraper',
    schedule_interval='5 20 * * 1-6',
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    default_args=args,
    render_template_as_native_obj=True,
    tags=['price', 'scraper']
) as dag:

    def web_call(url):  # Extract
        import requests
        r = requests.get(url)
        if r.status_code == 200:
            soup: BeautifulSoup = BeautifulSoup(r.content, "html.parser")
            return soup
        else:
            return r.status_code

    def get_fund_values(soup, index, class_name):  # Transform
        fund_values = soup.find_all('div', class_=class_name)
        value = fund_values[index].contents
        return str(value[0]).strip().replace('$US', '').replace(',', '')

    def write_json(data, filename='data.json'):  # Load
        import json
        with open(filename, 'w') as f:
            json.dump(data, f, indent=4)

    @task()
    def execute_scraper():
        soup = web_call(
            url='https://sprott.com/investment-strategies/physical-commodity-funds/uranium/')
        data = {}
        data['shareprice'] = get_fund_values(soup, 4, 'fundHeader_value')
        data['u3o8_stock'] = get_fund_values(soup, 6, 'fundHeader_value')
        write_json(data)
Enter fullscreen mode Exit fullscreen mode

Here is our DAG!

Functional_Sprott_Scraper DAG in Airflow

The @task decorator is short for this:

    def execute_scraper():
        soup = web_call(
            url='https://sprott.com/investment-strategies/physical-commodity-funds/uranium/')
        data = {}
        data['shareprice'] = get_fund_values(soup, 4, 'fundHeader_value')
        data['u3o8_stock'] = get_fund_values(soup, 6, 'fundHeader_value')
        write_json(data)

    scrape_task = PythonOperator(task_id='scrape_task', python_callable=execute_scraper)
Enter fullscreen mode Exit fullscreen mode

Most people nowadays just write Python scripts to execute in Airflow, so they short handed the way to call the most basic operator, the PythonOperator.

I'm going to detour and talk about Operators next time since they define your tasks. There are also things called hooks and sensors I'll briefly go into.

As always, the code is on Github here.

Top comments (0)