DEV Community

Cover image for Standing on the shoulders of giants. Part one: Airflow
biellls
biellls

Posted on

Standing on the shoulders of giants. Part one: Airflow

Airflow advanced the state of the art in ETL tools by providing an extremely flexible and reliable framework. It is easy to monitor your jobs and you can extend it with plugins to do anything that python can do. It also helped introduce the concept of functional batch data pipelines. By removing state from pipelines and enforcing strict boundaries on partitions of time you can more easily reprocess a partition of data without affecting the rest of it. If you're unfamiliar with that concept, this article by Airflow's creator Maxime Beauchemin is worth a read.

With Typhoon we aim to build on this concept to provide a framework with a stronger focus on software engineering principles. We will illustrate it in the following sections

Built for developers

Typhoon was built from the ground up to provide a great experience for developers. Besides providing great Intellisense it helps you implement software best practices.

Testable

Airflow is notoriously hard to test. Operators force coupling between logic, execution context and framework. Let's look at a really simple example:

class ExchangeRates(BaseOperator):
    def __init__(self, base: str, symbols: Optional[List[str]] = None):
        self.base = base
        self.symbols = symbols
        self.http_conn_id = http_conn_id

    def execute(context):
        params = {
            'start_at': context['execution_date'],
            'end_at': context['next_execution_date'],
        }
        full_endpoint = f'{ENDPOINT}/history'
        print(f'Calling endpoint {full_endpoint} for dates between {start_at}, {end_at}')
        if base:
            params['base'] = base
        if symbols:
            params['symbols'] = symbols
        hook = HttpsHook('get', http_conn_id=self.http_conn_id)
        response = hook.run(full_endpoint, params=params)
        context['task_instance'].xcom_push('response', response.json())
Enter fullscreen mode Exit fullscreen mode

All your logic is in the execute function of your opertor, so in order to run a test you need to import airflow and create an instance of the operator. Not only that, but we need to provide a context similar to the one that airflow would provide. Finally, you would need to mock xcom and see that it's called with the value you expect it is. This is only a simple example but it can get much more complex once there is a source and a destination in the same component, magic macro rendering and more. Just in case this doesn't sound complex enough, notice we create a hook from its connection id. Yeah, you'll need to mock the airflow database too or spin up a temporary one. Good luck with that.

In contrast, the logic for typhoon tasks lives inside regular python functions. They don't make use of the framework unless they use a hook and even then it can be instantiated without a metadata database.

 def get_exchange_rates(
        hook: HTTPHook
        start_at:datetime,
        end_at: datetime,
        base: Optional[str] = None,
        symbols: Optional[List[str]] = None,
) -> dict:
    params = {
        'start_at': start_at,
        'end_at': end_at,
    }
    full_endpoint = f'{ENDPOINT}/history'
    print(f'Calling endpoint {full_endpoint} for dates between {start_at}, {end_at}')
    if base:
        params['base'] = base
    if symbols:
        params['symbols'] = symbols
    response = requests.get(full_endpoint, params=params)
    return response.json()
Enter fullscreen mode Exit fullscreen mode

Testing this is as easy as:

def test_xr_get_history():
    symbols = ['EUR', 'PHP', 'HKD']
    start_at = date(2020, 1, 2)
    end_at = date(2020, 1, 3)
    hook = HTTPSHook(ConnParams(conn_type='https_hook', extra={'method': 'get'}))
    response = exchange_rates_api.get_history(
        hook=hook,
        start_at=start_at,
        end_at=end_at,
        base='USD',
        symbols=symbols,
    )
    print(response)
    assert set(response.keys()) == {'rates', 'start_at', 'end_at', 'base'}
    assert set(response['rates'].keys()) == {start_at.isoformat(), end_at.isoformat()}
    for k, v in response['rates'].items():
        assert set(v.keys()) == set(symbols)
Enter fullscreen mode Exit fullscreen mode

We don't need to import the framework, mock anything or have a database running. We just give it some input and test the output. This takes the functional aspect in functional data pipelines even further.

Composable

Composability is one of the principles of good software engineering because it enables you to reuse existing functions or objects in order to achieve new behaviour. Airflow gets in the way of that by coupling context, as we explained in the previous section, but also by encouraging task isolation. Tasks can't pass data between them, only some metadata through XCom and even that is discouraged. That means that you can't have an FTPExtractOperator and an S3LoadOperator, you need an FTPToS3Operator and every other possible combination of sources and destinations. This does not compose well as you end up with a lot of repeated code across different operators just because you can't easily reuse the logic.

In typhoon tasks can pass any data between them without any performance penalty. You can have a function that extracts data from a source and another one that loads into a destination. You can reuse those functions in any other DAG that uses that source or destination.

name: example
schedule_interval: rate(1 day)

tasks: 
    extract_files:
        component: typhoon.get_data_from_files
        args:
            hook: !Hook my_ftp
            pattern: /base/path/*.csv

    load_files:
        input: extract_files
        function: typhoon.filesystem.write_data
        args:
            hook: !Hook my_s3
            data: !Py $BATCH.data
            path: !MultiStep
                - !Py typhoon.files.name($BATCH.path)
                - !Py f'/some/path/{$1}'
Enter fullscreen mode Exit fullscreen mode

Extensible

There are several ways in which the framework facilitates extension.

  • Just python. One of typhoon's goals is to be easily extensible with regular python code. You can create python functions and call them in your DAGs.
  • Interfaces. Hooks are grouped into interfaces in a lot of cases where it makes sense to make them interchangeable. This means you can easily switch a hook that writes to files in your OS for local development into an S3 hook for the integration tests and production. More importantly, since a lot of functions take a hook of a specific interface, if you create a new hook that conforms to that interface it will automatically be compatible with all those functions.
  • Natively support additional connection types. When you create a new kind of hook and give it a conn_type, this will be used to discriminate the class when a hook instance is created from a connection defined in the metadata.

Quick feedback

Typhoon aims to provide a lightning fast feedback loop on all steps of the DAG creation process. From debug hooks that print whatever is passed to them, to interchangeable hooks so you can easily develop, test and deploy against different targets, to being able to run the whole DAG from the command line instead of needing to schedule it or run independent tasks.

Debugging

Typhoon is designed from the ground up to be easy to debug and it achieves this by compiling to regular python that can be executed locally and debugged from your favorite IDE.

Try it out!

If you're curious on what the future of data pipelines could look like check out Typhoon at https://github.com/typhoon-data-org/typhoon-orchestrator.

Top comments (0)