DEV Community

Cover image for Apache Airflow 初體驗
HyperRedStart
HyperRedStart

Posted on

5 2

Apache Airflow 初體驗

Airflow 是由 Airbnb 所貢獻的 Apache 頂級專案,用於流程控制,建立有向無環的工作流程(DAGs) ,以事件排程的方式部屬我們的 DAG Schedule 並可以部屬到各個雲端平台上, GCP / AWS / Azure,可透過網頁 UI 調配部屬排程,主打彈性、高擴充性、搭配Jinja Template Engine 優雅的設計 Workflow 工作流程!

1.建置 Airflow 環境 - 使用 Docker

下載官方提供 docker-compose 文件

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml'
Enter fullscreen mode Exit fullscreen mode

產生相依目錄並設定 Airflow 使用者權限

mkdir -p ./dags ./logs ./plugins
echo -e "AIRFLOW_UID=$(id -u)" > .env
Enter fullscreen mode Exit fullscreen mode

2.資料庫初始化

docker-compose up airflow-init
Enter fullscreen mode Exit fullscreen mode

3.啟動 Airflow 服務

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

airflow service

確認我們的 Container 是否都健康!
dockerps

4.下載 airflow 命令工具

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.1/airflow.sh'
chmod +x airflow.sh
Enter fullscreen mode Exit fullscreen mode

5.撰寫 Demo 程式

將我們第一隻 DAGs 程式放至於 Container掛載目錄 dags 下
dags/test_app_v1.py

import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
  'owner' : '',
  'start_date' : datetime(2022, 1, 1, 0, 0),
  'schedule_interval': '@daily',
  'retries':2,
  'retry_delay': timedelta(minutes = 1)

}


def fn_one(): 
  print('execute jobs')

with DAG('test_app_v1' ,default_args=default_args) as dag:
  tesk = PythonOperator(
    task_id = 'one',
    python_callable=fn_one
  )
Enter fullscreen mode Exit fullscreen mode

6.透過 airflow.sh 將執行我們的 DAG 程式

# 建立運行 python container 
./airflow.sh bash
# 執行 python 檔案
python dags/test_app_v1.py
Enter fullscreen mode Exit fullscreen mode

7.網頁中查看新建立的 test_app_v1 DAG

Airflow Web Server
http://localhost:8080
airflow/airflow

在執行py腳本後我們就可以在 Airflow 首頁中查找到我們的 DAG 排程 !

dag

查看執行狀態,使否有異常失敗等情況
status

8.REST API

使用 Airflow Rest Api 可以取代我們在 WebUI 上的工作,進行 DAG 的抓取執行刪除等工作。

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

curl -X GET 'http://10.1.200.67:8080/api/v1/dags' --user "airflow:airflow"
Enter fullscreen mode Exit fullscreen mode
{
  "dag_id": "test_app_v1",
  "description": null,
  "file_token": "",
  "fileloc": "/opt/airflow/dags/test_app_v1.py",
  "is_active": true,
  "is_paused": false,
  "is_subdag": false,
  "owners": [],
  "root_dag_id": null,
  "schedule_interval": {
    "__type": "TimeDelta",
    "days": 1,
    "microseconds": 0,
    "seconds": 0
  },
  "tags": []
}

Enter fullscreen mode Exit fullscreen mode

Conclusion

很多需要進行排定的工作項目都可以在 Airflow 的幫助下進行運行,搭配 python的語法可以很快速的撰寫爬蟲、資料轉換、事件觸發等工作,Airflow 提供WebUI操作畫面可以讓我們更便利的去對我們的 DAGs 進行監控,解決了我們在設計排程上的諸多困難!

Sentry image

Hands-on debugging session: instrument, monitor, and fix

Join Lazar for a hands-on session where you’ll build it, break it, debug it, and fix it. You’ll set up Sentry, track errors, use Session Replay and Tracing, and leverage some good ol’ AI to find and fix issues fast.

RSVP here →

Top comments (0)

Billboard image

Create up to 10 Postgres Databases on Neon's free plan.

If you're starting a new project, Neon has got your databases covered. No credit cards. No trials. No getting in your way.

Try Neon for Free →

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay