DEV Community

Tanakrit Seangnet
Tanakrit Seangnet

Posted on • Updated on

การใช้งาน Apache Airflow บน Docker เบื้องต้น

Alt Text

Apache Airflow

Apache Airflow เป็น Open Source ที่เข้ามาจัดการ Task งานต่างๆ โดยต้องเขียนเป็น Python Code โดยแต่ละ Task สามารถดู Workflow การทำงานได้อย่างละเอียด

เราสามารถตั้งเวลาการทำงานคล้าย Crontab(Cron Jobs) บน Linux ได้ เช่น เมื่อรัน Job A ตอนเวลา 01.00 น.ทุกวัน รัน Job B ตอน 08.00 น. ในวันอาทิตย์ โดย Job ใน Airflow จะเรียกว่า DAGs (Directed Acyclic Graphs) มี Web UI ให้เรา Monitor Task Failured ที่เกิดขึ้นจากการทำงานในแต่ละ Task เราสามารถตั้งค่า Alert หากมี Job Failures เกิดขึ้น

Web UI

Web UI ของ Apache Airflow ที่ช่วยให้จัดการ DAG ต่างๆ
Screenshot (455)Screenshot (456)


เริ่มการติดตั้ง

Step 1: git clone

git clone https://gitlab.com/tanakrit1/apache-airflow-with-docker-compose.git
Enter fullscreen mode Exit fullscreen mode

config ต่างๆที่อยู่ภายใน docker-compose.yml
Screenshot (457)

อ่าน docker-compose เพิ่มเติม
  • airflow-scheduler - The scheduler monitors all tasks and DAGs, then triggers the task instances once their dependencies are complete.
  • airflow-webserver - The webserver available at http://localhost:8080.
  • airflow-worker - The worker that executes the tasks given by the scheduler.
  • airflow-init - The initialization service.
  • flower - The flower app for monitoring the environment. It is available at http://localhost:8080.
  • postgres - The database.
  • redis - The redis - broker that forwards messages from scheduler to worker.

โฟลเดอร์จะถูกเชื่อมต่อระหว่าง host กับ container

  • ./dags - you can put your DAG files here.
  • ./logs - contains logs from task execution and scheduler.
  • ./plugins - you can put your custom plugins here.

Step 2: ก่อนเริ่ม Start Airflow เป็นครั้งแรก

เราต้องเรียกใช้การสร้างฐานข้อมูลและสร้างบัญชีผู้ใช้แรก ในการดำเนินการให้เรียกใช้

docker-compose up airflow-init
Enter fullscreen mode Exit fullscreen mode

หลังจากการเริ่มต้นเสร็จสมบูรณ์จะเห็นข้อความดังต่อไปนี้
Screenshot (458)

ชื่อบัญชีผู้ใช้ที่ถูกสร้างขึ้นคือ airflow และรหัสผ่าน airflow

Step 3: เริ่มต้นการใช้งาน Airflow

ตอนนี้เราสามารถ Start Service ทั้งหมดด้วยคำสั่ง

docker-compose up
Enter fullscreen mode Exit fullscreen mode

เราสามารถล็อกอินเข้าใช้งาน web ui ของ airflow ได้ที่ http://localhost:8080.
Screenshot (459)


มาลองใช้งานกัน

Workflow ที่เราจะเขียนกันคือ

  • ดึงค่า covid-19 ประจำวันจาก API ของกรมควบคุมโรค
  • insert ข้อมูลลง postgres database
  • ส่งอีเมลเตือนการทำงาน (ใช้ Gmail SMTP Server)
CREATE SEQUENCE id;
CREATE TABLE public.daily_covid19_reports (
    id int4 NOT NULL DEFAULT nextval('id'::regclass),
    confirmed int4 NULL,
    recovered int4 NULL,
    hospitalized int4 NULL,
    deaths int4 NULL,
    new_confirmed int4 NULL,
    new_recovered int4 NULL,
    new_hospitalized int4 NULL,
    new_deaths int4 NULL,
    update_date timestamp NULL,
    "source" varchar(100) NULL,
    dev_by varchar(100) NULL,
    server_by varchar(100) NULL,
    CONSTRAINT daily_covid19_reports_pkey PRIMARY KEY (id)
);
Enter fullscreen mode Exit fullscreen mode
โค้ดทั้งหมดอยู่ใน ./dags/covid-daily.py แล้ว

Step 4: สร้าง DAG object ด้วย python เอาไว้ใน ./dags

from airflow import DAG

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 2, 17),
    'schedule_interval': None,
}
with DAG('covid19_daily',
         schedule_interval='@daily',
         default_args=default_args,
         description='A simple data pipeline for COVID-19 report',
         catchup=False) as dag:
Enter fullscreen mode Exit fullscreen mode

schedule_interval คือการสั่งรันเป็นเวลา สามารถใช้เครื่องมือกำหนดเวลา ได้ที่นี้

Step 5: ดึงข้อมูลจาก API

import json
import requests

def get_covid19_report_today():
    url = 'https://covid19.th-stat.com/api/open/today'
    response = requests.get(url)
    data = response.json()

    with open('data.json', 'w') as f:
        json.dump(data, f)

    return data
Enter fullscreen mode Exit fullscreen mode

Step 6: insert ข้อมูลลง postgres database

สร้าง Connection ด้วย Web UI ของ airflow ไปที่ Admin > Connections
ระบุ conn id:airflow_db
Screenshot (466)
ใช้ PostgresHook ในการเชื่อมต่อกับฐานข้อมูล

from airflow.hooks.postgres_hook import PostgresHook
from airflow.utils.dates import days_ago
from datetime import datetime

def insert_data():
    with open('data.json') as f:
        data = json.load(f)

    print(data) 

    mysql_hook = PostgresHook(postgres_conn_id='airflow_db')

    insert = """
        INSERT INTO public.daily_covid19_reports
            (confirmed, recovered, hospitalized, deaths, new_confirmed, new_recovered, new_hospitalized, new_deaths, update_date, "source", dev_by, server_by)
        VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
    """

    mysql_hook.run(insert, parameters=(data['Confirmed'],
                                       data['Recovered'],
                                       data['Hospitalized'],
                                       data['Deaths'],
                                       data['NewConfirmed'],
                                       data['NewRecovered'],
                                       data['NewHospitalized'],
                                       data['NewDeaths'],
                                       datetime.strptime(data['UpdateDate'], '%d/%m/%Y %H:%M'),
                                       data['Source'],
                                       data['DevBy'],
                                       data['SeverBy']))
Enter fullscreen mode Exit fullscreen mode

Step 7: สร้าง Task ด้วย Operator และเชื่อมต่อ Task

ใช้ PythonOperator เพื่อไปใช้ function ที่เขียนขึ้น

from airflow.operators.python_operator import PythonOperator

    t1 = PythonOperator(
        task_id='get_covid19_report_today',
        python_callable=get_covid19_report_today
    )

    t2 = PythonOperator(
        task_id='insert_data',
        python_callable=insert_data
    )

    t1 >> t2
Enter fullscreen mode Exit fullscreen mode

Step 8: สร้าง Task สำหรับส่งอีเมลด้วย EmailOperator

Setting up SMTP Server สำหรับส่งเมลด้วย Gmail แก้ไขภายในไฟล์ airflow.cfg โปรเจคนี้ได้เชื่อมต่อไฟล์นี้ระหว่าง host กับ container ให้แล้วอยู่ใน .\worker แก้ไขแล้วให้ restart docker-compose

[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = YOUR_EMAIL_ADDRESS
smtp_password = 16_DIGIT_APP_PASSWORD
smtp_port = 587
smtp_mail_from = YOUR_EMAIL_ADDRESS
Enter fullscreen mode Exit fullscreen mode

เพิ่ม Task ภายในไฟล์ Python

from airflow.operators.email_operator import EmailOperator

    t3 = EmailOperator(
        task_id='send_email',
        to=['your@gmail.com'],
        subject='Your covid19 report today is ready',
        html_content='Please check your dashboard. :)'
    )

    t1 >> t2 >> t3
Enter fullscreen mode Exit fullscreen mode

ดูเพิ่มเติมได้ที่

Top comments (4)

Collapse
 
modtanoii profile image
modtanoii

ใช้ version ไหนครับ ลองใช้ airflow 2.2.2 แล้วไม่สามารถ insert ข้อมูลลง postgresdb ได้

Collapse
 
tanakritseangnet profile image
Tanakrit Seangnet

ตอนนั้นที่เขียนใช้ 2.0.1 ครับ มี log ให้ดูไหม ไม่แน่ใจแก้ปัญหาได้รึยังครับ

Collapse
 
somprasongd profile image
Somprasong Damyos

เยี่ยมเลยครับ 👍

Some comments may only be visible to logged-in visitors. Sign in to view all comments.