Apache Airflow
Apache Airflow เป็น Open Source ที่เข้ามาจัดการ Task งานต่างๆ โดยต้องเขียนเป็น Python Code โดยแต่ละ Task สามารถดู Workflow การทำงานได้อย่างละเอียด
เราสามารถตั้งเวลาการทำงานคล้าย Crontab(Cron Jobs) บน Linux ได้ เช่น เมื่อรัน Job A ตอนเวลา 01.00 น.ทุกวัน รัน Job B ตอน 08.00 น. ในวันอาทิตย์ โดย Job ใน Airflow จะเรียกว่า DAGs (Directed Acyclic Graphs) มี Web UI ให้เรา Monitor Task Failured ที่เกิดขึ้นจากการทำงานในแต่ละ Task เราสามารถตั้งค่า Alert หากมี Job Failures เกิดขึ้น
Web UI
Web UI ของ Apache Airflow ที่ช่วยให้จัดการ DAG ต่างๆ
เริ่มการติดตั้ง
Step 1: git clone
git clone https://gitlab.com/tanakrit1/apache-airflow-with-docker-compose.git
config ต่างๆที่อยู่ภายใน docker-compose.yml
อ่าน docker-compose เพิ่มเติม
-
airflow-scheduler
- The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete. -
airflow-webserver
- The webserver available at http://localhost:8080. -
airflow-worker
- The worker that executes the tasks given by the scheduler. -
airflow-init
- The initialization service. -
flower
- The flower app for monitoring the environment. It is available at http://localhost:8080. -
postgres
- The database. -
redis
- The redis - broker that forwards messages from scheduler to worker.
โฟลเดอร์จะถูกเชื่อมต่อระหว่าง host กับ container
-
./dags
- you can put your DAG files here. -
./logs
- contains logs from task execution and scheduler. -
./plugins
- you can put your custom plugins here.
Step 2: ก่อนเริ่ม Start Airflow เป็นครั้งแรก
เราต้องเรียกใช้การสร้างฐานข้อมูลและสร้างบัญชีผู้ใช้แรก ในการดำเนินการให้เรียกใช้
docker-compose up airflow-init
หลังจากการเริ่มต้นเสร็จสมบูรณ์จะเห็นข้อความดังต่อไปนี้
ชื่อบัญชีผู้ใช้ที่ถูกสร้างขึ้นคือ airflow
และรหัสผ่าน airflow
Step 3: เริ่มต้นการใช้งาน Airflow
ตอนนี้เราสามารถ Start Service ทั้งหมดด้วยคำสั่ง
docker-compose up
เราสามารถล็อกอินเข้าใช้งาน web ui ของ airflow ได้ที่ http://localhost:8080.
มาลองใช้งานกัน
Workflow ที่เราจะเขียนกันคือ
- ดึงค่า covid-19 ประจำวันจาก API ของกรมควบคุมโรค
- insert ข้อมูลลง postgres database
- ส่งอีเมลเตือนการทำงาน (ใช้ Gmail SMTP Server)
CREATE SEQUENCE id;
CREATE TABLE public.daily_covid19_reports (
id int4 NOT NULL DEFAULT nextval('id'::regclass),
confirmed int4 NULL,
recovered int4 NULL,
hospitalized int4 NULL,
deaths int4 NULL,
new_confirmed int4 NULL,
new_recovered int4 NULL,
new_hospitalized int4 NULL,
new_deaths int4 NULL,
update_date timestamp NULL,
"source" varchar(100) NULL,
dev_by varchar(100) NULL,
server_by varchar(100) NULL,
CONSTRAINT daily_covid19_reports_pkey PRIMARY KEY (id)
);
โค้ดทั้งหมดอยู่ใน ./dags/covid-daily.py แล้ว
Step 4: สร้าง DAG object ด้วย python เอาไว้ใน ./dags
from airflow import DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 2, 17),
'schedule_interval': None,
}
with DAG('covid19_daily',
schedule_interval='@daily',
default_args=default_args,
description='A simple data pipeline for COVID-19 report',
catchup=False) as dag:
schedule_interval คือการสั่งรันเป็นเวลา สามารถใช้เครื่องมือกำหนดเวลา ได้ที่นี้
Step 5: ดึงข้อมูลจาก API
import json
import requests
def get_covid19_report_today():
url = 'https://covid19.th-stat.com/api/open/today'
response = requests.get(url)
data = response.json()
with open('data.json', 'w') as f:
json.dump(data, f)
return data
Step 6: insert ข้อมูลลง postgres database
สร้าง Connection ด้วย Web UI ของ airflow ไปที่ Admin > Connections
ระบุ conn id:airflow_db
ใช้ PostgresHook ในการเชื่อมต่อกับฐานข้อมูล
from airflow.hooks.postgres_hook import PostgresHook
from airflow.utils.dates import days_ago
from datetime import datetime
def insert_data():
with open('data.json') as f:
data = json.load(f)
print(data)
mysql_hook = PostgresHook(postgres_conn_id='airflow_db')
insert = """
INSERT INTO public.daily_covid19_reports
(confirmed, recovered, hospitalized, deaths, new_confirmed, new_recovered, new_hospitalized, new_deaths, update_date, "source", dev_by, server_by)
VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""
mysql_hook.run(insert, parameters=(data['Confirmed'],
data['Recovered'],
data['Hospitalized'],
data['Deaths'],
data['NewConfirmed'],
data['NewRecovered'],
data['NewHospitalized'],
data['NewDeaths'],
datetime.strptime(data['UpdateDate'], '%d/%m/%Y %H:%M'),
data['Source'],
data['DevBy'],
data['SeverBy']))
Step 7: สร้าง Task ด้วย Operator และเชื่อมต่อ Task
ใช้ PythonOperator เพื่อไปใช้ function ที่เขียนขึ้น
from airflow.operators.python_operator import PythonOperator
t1 = PythonOperator(
task_id='get_covid19_report_today',
python_callable=get_covid19_report_today
)
t2 = PythonOperator(
task_id='insert_data',
python_callable=insert_data
)
t1 >> t2
Step 8: สร้าง Task สำหรับส่งอีเมลด้วย EmailOperator
Setting up SMTP Server สำหรับส่งเมลด้วย Gmail แก้ไขภายในไฟล์ airflow.cfg
โปรเจคนี้ได้เชื่อมต่อไฟล์นี้ระหว่าง host กับ container ให้แล้วอยู่ใน .\worker
แก้ไขแล้วให้ restart docker-compose
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = YOUR_EMAIL_ADDRESS
smtp_password = 16_DIGIT_APP_PASSWORD
smtp_port = 587
smtp_mail_from = YOUR_EMAIL_ADDRESS
เพิ่ม Task ภายในไฟล์ Python
from airflow.operators.email_operator import EmailOperator
t3 = EmailOperator(
task_id='send_email',
to=['your@gmail.com'],
subject='Your covid19 report today is ready',
html_content='Please check your dashboard. :)'
)
t1 >> t2 >> t3
ดูเพิ่มเติมได้ที่
- https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html
- https://towardsdatascience.com/getting-started-with-apache-airflow-df1aa77d7b1b
- https://towardsdatascience.com/getting-started-with-airflow-using-docker-cd8b44dbff98
- https://itorn.net/l-ngaich-apache-airflow/
- https://www.bluebirz.net/th/619/try-apache-airflow/
- https://link.medium.com/Ytno47gW7db
Top comments (4)
ใช้ version ไหนครับ ลองใช้ airflow 2.2.2 แล้วไม่สามารถ insert ข้อมูลลง postgresdb ได้
ตอนนั้นที่เขียนใช้ 2.0.1 ครับ มี log ให้ดูไหม ไม่แน่ใจแก้ปัญหาได้รึยังครับ
เยี่ยมเลยครับ 👍
Some comments may only be visible to logged-in visitors. Sign in to view all comments.