workflow
วันนี้จะมาสร้าง workflow ง่ายๆ ใช้งานกัน
- ดึงข้อมูล Covid-19 จาก API ของกรมควบคุมโรค
- ส่งแจ้งเตือนข้อมูลเข้า Line notify ทุกๆ 8 โมงเช้า
ถ้ายังไม่รู้จักการใช้ Airflow ให้เข้าไปอ่านได้ที่นี้
เริ่มต้น
ต้องเป็นเพื่อนกับ Line Notify ก่อน ด้วยการค้นหาเพื่อนชื่อ Line Notify แล้วแอดเป็นเพื่อน
หลังจากนั้น ให้ไปที่ https://notify-bot.line.me/th/ แล้วเข้าสู่ระบบ
Login บัญชี LINE ด้วยอีเมล และ รหัสผ่าน
หลังจาก Login สำเร็จ ให้เลือกเมนูด้านขวาบนตรงชื่อ account ของเรา และเลือกที่หน้าของฉัน
เลื่อนลงมาด้านล่างให้กดปุ่ม “Generate token”
จากนั้นให้ใส่
- ชื่อของ Token (จะแสดงตอนแจ้งเตือนข้อความ)
- เลือกห้องแชทที่ต้องการส่งข้อความแจ้งเตือน จากนั้นกดปุ่มออก Token เพื่อรับ Token key
ให้คัดลอก Token key ไว้ อย่าเพิ่งปิด pop up ไม่อย่างนั้นต้องออก Token ใหม่
ในขณะเดียวกัน line notify ก็จะแจ้งเตือนเรา
หลังจากเราเตรียม Token เสร็จแล้วก็เริ่มเขียนโค้ดได้เลย
ดึงข้อมูลจาก API
import json
import requests
def get_covid19_report_today():
url = 'https://covid19.th-stat.com/api/open/today'
response = requests.get(url)
data = response.json()
with open('data.json', 'w') as f:
json.dump(data, f)
return data
ได้ข้อมูลจาก API แล้วก็ส่งข้อมูลเข้า Line notify ผ่าน API กันเลย
เอา Token มาใส่แทน your_token ได้เลย
def send_line_notify():
url = 'https://notify-api.line.me/api/notify'
token = 'your_token'
headers = {
'content-type':
'application/x-www-form-urlencoded',
'Authorization': 'Bearer '+token
}
with open('data.json') as f:
data = json.load(f)
msg = "Covid-19 report today \n"
for (k,v) in data.items():
msg += str(k) + ":" + str(v) + "\n"
r = requests.post(url, headers=headers, data={'message': msg})
print(r.text)
อย่าลืมสร้าง DAG object ด้วยนะ
schedule_interval กำหนดเป็น 0 8 * * *
เราต้องการให้ทำงานทุกๆ 8 โมงของทุกวัน
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 2, 28, 0, 0, 0),
}
with DAG('line-notify',
schedule_interval='0 8 * * *',
default_args=default_args,
description='A simple data pipeline for line-notify',
catchup=False) as dag:
t1 = PythonOperator(
task_id='get_covid19_report_today',
python_callable=get_covid19_report_today
)
t2 = PythonOperator(
task_id='send_line_notify',
python_callable=send_line_notify
)
t1 >> t2
ภายในไฟล์ python ที่เราจะได้
import requests
import json
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def get_covid19_report_today():
url = 'https://covid19.th-stat.com/api/open/today'
response = requests.get(url)
data = response.json()
with open('data.json', 'w') as f:
json.dump(data, f)
return data
def send_line_notify():
url = 'https://notify-api.line.me/api/notify'
token = 'your_token'
headers = {
'content-type':
'application/x-www-form-urlencoded',
'Authorization': 'Bearer '+token
}
with open('data.json') as f:
data = json.load(f)
msg = "Covid-19 report today \n"
for (k,v) in data.items():
msg += str(k) + ":" + str(v) + "\n"
r = requests.post(url, headers=headers, data={'message': msg})
print(r.text)
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 2, 28, 0, 0, 0),
}
with DAG('line-notify',
schedule_interval='0 8 * * *',
default_args=default_args,
description='A simple data pipeline for line-notify',
catchup=False) as dag:
t1 = PythonOperator(
task_id='get_covid19_report_today',
python_callable=get_covid19_report_today
)
t2 = PythonOperator(
task_id='send_line_notify',
python_callable=send_line_notify
)
t1 >> t2
แจ้งเตือนที่เราจะได้
Top comments (0)