DEV Community

Cover image for Airflow Quick Start With docker-compose on AWS EC2

Airflow Quick Start With docker-compose on AWS EC2

Abstract

For quick set up and start learning Apache Airflow, we will deploy airflow using docker-compose and running on AWS EC2

Table Of Contents


πŸš€ Introduction

The docker-compose.yaml contains several service definitions:
- airflow-scheduler - The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
- airflow-webserver - The webserver available at http://localhost:8080.
- airflow-worker - The worker that executes the tasks given by the scheduler.
- airflow-init - The initialization service.
- flower - The flower app for monitoring the environment. It is available at http://localhost:5555.
- postgres - The database.
- redis - The redis - broker that forwards messages from scheduler to worker.

Some directories in the container are mounted, which means that their contents are synchronized between the services and persistent.

  • ./dags - you can put your DAG files here.
  • ./logs - contains logs from task execution and scheduler.
  • ./plugins - you can put your custom plugins here.

πŸš€ Additional PIP requirements

  • airflow image contains almost enough PIP packages for operating, but we still need to install extra packages such as clickhouse-driver, pandahouse and apache-airflow-providers-slack.

  • Airflow from 2.1.1 supports ENV _PIP_ADDITIONAL_REQUIREMENTS to add additional requirements when starting all containers



_PIP_ADDITIONAL_REQUIREMENTS: 'pandahouse==0.2.7 clickhouse-driver==0.2.1 apache-airflow-providers-slack'


Enter fullscreen mode Exit fullscreen mode
  • It's not recommended to use this way on production, we should build the own image which contain all necessary pip packages then push to AWS ECR

πŸš€ How to build customize airflow image

  • Build own image.

  • requirements.txt

    
    

pandahouse
clickhouse-driver


- `Dockerfile`
Enter fullscreen mode Exit fullscreen mode

FROM apache/airflow:2.1.2-python3.9
COPY requirements.txt .
RUN pip install -r requirements.txt


- `docker build -t my-airlfow .`

## πŸš€ Persistent airflow log, dags, and plugins <a name="Persistent-airflow-log,-dags,-and-plugins"></a>
Not only persistent the folders but also share these ones between scheduler, worker and web-server 
Enter fullscreen mode Exit fullscreen mode

volumes:
- /mnt/airflow/dags:/opt/airflow/dags
- /mnt/airflow/logs:/opt/airflow/logs
- /mnt/airflow/plugins:/opt/airflow/plugins
- /mnt/airflow/data:/opt/airflow/data


## πŸš€ Using git-sync to up-to-date DAGs <a name="Using-git-sync-to-up-to-date-DAGs"></a>
- The git-sync service will polling the registered project each 10s to clone the new commit to /dags
- We use HTTP method and Access key token to provide permission for the container
Enter fullscreen mode Exit fullscreen mode

af-gitsync:
container_name: af-gitsync
image: k8s.gcr.io/git-sync/git-sync:v3.2.2
environment:
- GIT_SYNC_REV=HEAD
- GIT_SYNC_DEPTH=1
- GIT_SYNC_USERNAME=airflow
- GIT_SYNC_MAX_FAILURES=0
- GIT_KNOWN_HOSTS=false
- GIT_SYNC_DEST=repo
- GIT_SYNC_REPO=https://cloudopz.co/devops/airflow-dags.git
- GIT_SYNC_WAIT=60
- GIT_SYNC_TIMEOUT=120
- GIT_SYNC_ADD_USER=true
- GIT_SYNC_PASSWORD=
- GIT_SYNC_ROOT=/dags
- GIT_SYNC_BRANCH=master
volumes:
- /mnt/airflow/dags:/dags


## πŸš€ How to run <a name="How-to-run"></a>
1. **Initializing Environment**
Enter fullscreen mode Exit fullscreen mode

mkdir ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env


2. **Prepare `docker-compose.yaml`**
<details>
  <summary>docker-compose.yaml</summary>

Enter fullscreen mode Exit fullscreen mode

version: '3.5'
x-airflow-common:
&airflow-common
image: apache/airflow:2.1.2-python3.9
environment:
&airflow-common-env
AIRFLOW_COREEXECUTOR: CeleryExecutor
AIRFLOW
CORESQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@af-pg/airflow
AIRFLOW
CELERYRESULT_BACKEND: db+postgresql://airflow:airflow@af-pg/airflow
AIRFLOW
CELERYBROKER_URL: redis://:@af-redis:6379/0
AIRFLOW
COREFERNET_KEY: ''
AIRFLOW
COREDAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW
CORELOAD_EXAMPLES: 'true'
AIRFLOW
API_AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW_CONN_RDB_CONN: 'postgresql://dbapplication_user:dbapplication_user@rdb:5432/postgres'
_PIP_ADDITIONAL_REQUIREMENTS: 'pandahouse==0.2.7 clickhouse-driver==0.2.1 apache-airflow-providers-slack'
volumes:
- /mnt/airflow/dags:/opt/airflow/dags
- /mnt/airflow/logs:/opt/airflow/logs
- /mnt/airflow/plugins:/opt/airflow/plugins
- /mnt/airflow/data:/opt/airflow/data
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on:
- af-redis
- af-pg

services:
af-pg:
image: postgres:13
container_name: af-pg
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always

af-redis:
container_name: af-redis
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always

af-websrv:
container_name: af-websrv
<<: *airflow-common
command: webserver
ports:
- 28080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always

af-sch:
container_name: af-sch
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always

af-w:
container_name: af-w
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always

af-int:
container_name: af-int
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'

af-flower:
container_name: af-flower
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always

af-gitsync:
container_name: af-gitsync
image: k8s.gcr.io/git-sync/git-sync:v3.2.2
environment:
- GIT_SYNC_REV=HEAD
- GIT_SYNC_DEPTH=1
- GIT_SYNC_USERNAME=airflow
- GIT_SYNC_MAX_FAILURES=0
- GIT_KNOWN_HOSTS=false
- GIT_SYNC_DEST=repo
- GIT_SYNC_REPO=https://cloudopz.co/devops/airflow-dags.git
- GIT_SYNC_WAIT=60
- GIT_SYNC_TIMEOUT=120
- GIT_SYNC_ADD_USER=true
- GIT_SYNC_PASSWORD=
- GIT_SYNC_ROOT=/dags
- GIT_SYNC_BRANCH=master
volumes:
- /mnt/airflow/dags:/dags

volumes:
postgres-db-volume:


</details>

3. **Run airflow-init to setup airflow database**
Enter fullscreen mode Exit fullscreen mode

docker-compose up airflow-init


- Running airflow - Up

Enter fullscreen mode Exit fullscreen mode

docker-compose up -d


- Read more [Extension fields](https://docs.docker.com/compose/compose-file/compose-file-v3/#extension-fields) to understand docker-compose.yaml contents

## πŸš€ Add airflow connectors <a name="Add-airflow-connectors"></a>
- Add slack connection
Enter fullscreen mode Exit fullscreen mode

airflow connections add 'airflow-slack' \
--conn-type 'http'
--conn-password '/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX' \
--conn-host 'https://hooks.slack.com/services'


- We can add connector in UI
<p align="left">
  <a href="https://dev.to/vumdao">
    <img alt="Airflow Quick Start With docker-compose on AWS EC2" src="https://github.com/vumdao/airflow-docker/blob/master/slack-connection.png?raw=true" width="800" />
  </a>
</p>

- We can add connection through env file eg. `.env` in docker-compose
Enter fullscreen mode Exit fullscreen mode

AIRFLOW_CONN_MY_POSTGRES_CONN: 'postgresql://airflow:airflow@postgres-pg:5432/airflow'
AIRFLOW_CONN_AIRFLOW_SLACK_CONN: 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'


## πŸš€ Understand airflow parameters in airflow.models <a name="Understand-airflow-parameters-in-airflow.models"></a>
- [airflow.models](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/index.html)

- `Context`
    - `on_failure_callback` (TaskStateChangeCallback) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
    - `on_execute_callback` (TaskStateChangeCallback) – much like the on_failure_callback except that it is executed right before the task is executed.
    - `on_retry_callback` (TaskStateChangeCallback) – much like the on_failure_callback except that it is executed when retries occur.
    - `on_success_callback` (TaskStateChangeCallback) – much like the on_failure_callback except that it is executed when the task succeeds.
    - `trigger_rule` (str) – defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule

---





Enter fullscreen mode Exit fullscreen mode

Top comments (2)

Collapse
 
hanifmusa96 profile image
Muhammad Hanif Mohamad Musa

is there any aws services in that same as cloud composer?

Collapse
 
okrohan profile image
Rohan Salunke

Haven't used it, but i believe follow should work
aws.amazon.com/managed-workflows-f...