DEV Community

Tanakrit Seangnet
Tanakrit Seangnet

Posted on

Line Notify in Airflow

workflow

วันนี้จะมาสร้าง workflow ง่ายๆ ใช้งานกัน

  • ดึงข้อมูล Covid-19 จาก API ของกรมควบคุมโรค
  • ส่งแจ้งเตือนข้อมูลเข้า Line notify ทุกๆ 8 โมงเช้า

ถ้ายังไม่รู้จักการใช้ Airflow ให้เข้าไปอ่านได้ที่นี้

เริ่มต้น

ต้องเป็นเพื่อนกับ Line Notify ก่อน ด้วยการค้นหาเพื่อนชื่อ Line Notify แล้วแอดเป็นเพื่อน

Screenshot (495)

หลังจากนั้น ให้ไปที่ https://notify-bot.line.me/th/ แล้วเข้าสู่ระบบ

Login บัญชี LINE ด้วยอีเมล และ รหัสผ่าน

Screenshot (489)

หลังจาก Login สำเร็จ ให้เลือกเมนูด้านขวาบนตรงชื่อ account ของเรา และเลือกที่หน้าของฉัน

Screenshot (490)

เลื่อนลงมาด้านล่างให้กดปุ่ม “Generate token”

จากนั้นให้ใส่

  • ชื่อของ Token (จะแสดงตอนแจ้งเตือนข้อความ)
  • เลือกห้องแชทที่ต้องการส่งข้อความแจ้งเตือน จากนั้นกดปุ่มออก Token เพื่อรับ Token key

Screenshot (493)

ให้คัดลอก Token key ไว้ อย่าเพิ่งปิด pop up ไม่อย่างนั้นต้องออก Token ใหม่

Screenshot (494)

ในขณะเดียวกัน line notify ก็จะแจ้งเตือนเรา

Screenshot (496)

หลังจากเราเตรียม Token เสร็จแล้วก็เริ่มเขียนโค้ดได้เลย

ดึงข้อมูลจาก API

import json
import requests

def get_covid19_report_today():
    url = 'https://covid19.th-stat.com/api/open/today'
    response = requests.get(url)
    data = response.json()

    with open('data.json', 'w') as f:
        json.dump(data, f)

    return data
Enter fullscreen mode Exit fullscreen mode

ได้ข้อมูลจาก API แล้วก็ส่งข้อมูลเข้า Line notify ผ่าน API กันเลย
เอา Token มาใส่แทน your_token ได้เลย

def send_line_notify():
    url = 'https://notify-api.line.me/api/notify'
    token = 'your_token'
    headers = {
        'content-type':
        'application/x-www-form-urlencoded',
        'Authorization': 'Bearer '+token
    }

    with open('data.json') as f:
        data = json.load(f)

    msg = "Covid-19 report today \n"
    for (k,v) in data.items():
        msg += str(k) + ":" + str(v) + "\n"

    r = requests.post(url, headers=headers, data={'message': msg})
    print(r.text)
Enter fullscreen mode Exit fullscreen mode

อย่าลืมสร้าง DAG object ด้วยนะ

schedule_interval กำหนดเป็น 0 8 * * * เราต้องการให้ทำงานทุกๆ 8 โมงของทุกวัน

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 2, 28, 0, 0, 0),
}
with DAG('line-notify',
         schedule_interval='0 8 * * *',
         default_args=default_args,
         description='A simple data pipeline for line-notify',
         catchup=False) as dag:

    t1 = PythonOperator(
        task_id='get_covid19_report_today',
        python_callable=get_covid19_report_today
    )

    t2 = PythonOperator(
        task_id='send_line_notify',
        python_callable=send_line_notify
    )

    t1 >> t2
Enter fullscreen mode Exit fullscreen mode

ภายในไฟล์ python ที่เราจะได้

import requests
import json
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def get_covid19_report_today():
    url = 'https://covid19.th-stat.com/api/open/today'
    response = requests.get(url)
    data = response.json()

    with open('data.json', 'w') as f:
        json.dump(data, f)

    return data


def send_line_notify():
    url = 'https://notify-api.line.me/api/notify'
    token = 'your_token'
    headers = {
        'content-type':
        'application/x-www-form-urlencoded',
        'Authorization': 'Bearer '+token
    }

    with open('data.json') as f:
        data = json.load(f)

    msg = "Covid-19 report today \n"
    for (k,v) in data.items():
        msg += str(k) + ":" + str(v) + "\n"

    r = requests.post(url, headers=headers, data={'message': msg})
    print(r.text)


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 2, 28, 0, 0, 0),
}
with DAG('line-notify',
         schedule_interval='0 8 * * *',
         default_args=default_args,
         description='A simple data pipeline for line-notify',
         catchup=False) as dag:

    t1 = PythonOperator(
        task_id='get_covid19_report_today',
        python_callable=get_covid19_report_today
    )

    t2 = PythonOperator(
        task_id='send_line_notify',
        python_callable=send_line_notify
    )

    t1 >> t2
Enter fullscreen mode Exit fullscreen mode

แจ้งเตือนที่เราจะได้

image

Top comments (0)