DEV Community

HJ
HJ

Posted on

k8s Executor 적용기(2)

공식 문서

https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html

구성 방법


Airflow 공식 문서의 가장 큰 문제는 helm등 차트로만 설치를 가이드하고 있다는 점인데, 회사 특성상 오프라인 폐쇄망에 배포해야 될 수도 있기 때문에 helm으로 우선 설치하고 오브젝트를 뜯어 보는 식으로 진행을 했다. (힘들다)

Airflow의 구성 요소는 webserver, scheduler, trigger, worker인데, 우린 딱히 trigger를 쓸 일이 없어서 구성을 안 해도 무방하다.

webserver는 GUI를 제공하고, scheduler는 task를 분배하고, worker는 일을 한다. tigger는 특정 조건을 달성하는 task를 실행해 주는 역할을 하는데, 일반적으로는 플로우를 통해서 scheduling을 하니까 쓸 데가 많진 않다.

대략적인 flow는 다음과 같았다.

DB 초기화

PostgreSQL

airflow의 백엔드 DB로는 PostgreSQL을 사용했고, k8s에 올라가는만큼 초기화를 위한 Job이 하나 필요했다. 해당 Job은 한번만 실행되고 db migrate, init 등을 실행해 airflow를 사용할 수 있음.

helm이나 pip로 구성할땐 몰랐는데, airflow는 admin user를 따로 만들어 줘야 했다. 해당 커맨드를 통해 만들 수 있다.

airflow users create --username admin --firstname Admin --lastname User --role Admin --email lol@lol.com --password admin
Enter fullscreen mode Exit fullscreen mode

Scheduler

scheduler

스케줄러는 airflow scheduler 등으로 올라가는데, 몇가지 구성 요소가 필요했다.

당연히 PVC는 dag, log 등으로 분리하는거고, airflow.cfg를 configmap으로 동일하게 넣어 줘야 하므로 필수라고 할 수 있다.

k8s executor만의 특이한 점이라면 pod_template_file.yaml인데, 이 yaml 파일으로 worker가 만들어지고 실행된다. 얘는 다른 것들과는 다르게 core Executor가 local Executor로 설정된다.

  pod_template_file.yaml: |
    apiVersion: v1
    kind: Pod
    metadata:
      name: placeholder-name
      labels:
        tier: airflow
        component: worker
        release: airflow
    spec:
      containers:
        - envFrom: []
          env:
            - name: AIRFLOW__CORE__EXECUTOR
              value: LocalExecutor
            - name: AIRFLOW_HOME
              value: /opt/airflow
            - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
              valueFrom:
                secretKeyRef:
                  name: airflow-metadata
                  key: connection
            - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
              valueFrom:
                secretKeyRef:
                  name: airflow-metadata
                  key: connection
            - name: AIRFLOW_CONN_AIRFLOW_DB
              valueFrom:
                secretKeyRef:
                  name: airflow-metadata
                  key: connection
            # Dynamically created environment variables
            # Dynamically created secret envs
            # Extra env
          image: apache/airflow:2.8.3
...

Enter fullscreen mode Exit fullscreen mode

container env 스펙을 보면 AIRFLOW_CORE_EXECUTOR가 Local Executor이다.

즉 이런 구조로 돌아가는 녀석이라는 거다.

scheduler flow

Task를 관리하는 scheduler는 worker를 pod로 따로 띄우고, Worker는 Image를 불러와 Pod로 작업을 따로 수행하게 하고(Local Executor), Pod가 실행을 완료하면 worker 또한 작업을 종료하고 다시 죽는다.

worker flow

결과적으로 scheduler만 남게 되고, 이 작업은 병렬로 수행되므로 작업이 몰릴때는 많은 pod을 소환[?]해서 큰 케파를 유지하고 깔끔한 아키텍쳐를 유지 가능하게 된다.

새로운 고민

1. Code Entrypoint


말하기 앞서 k8s executor의 dag를 보면… 사실상의 Local executor처럼 동작하는 것을 볼 수 있다.

cmds를 통해 커맨드라인을 실행하고, 이 실행은 image를 불러와서 실행시키는 형식이다.

first_task = KubernetesPodOperator(
    task_id="airflow_task",
    image="<harbor>/airflow_src/<blabla>:latest",
    cmds=["python", "blabla.py"],
    arguments=["--execution_date", "execution_date"],
    provide_context=True,
    dag=dag,
)
Enter fullscreen mode Exit fullscreen mode

여기서 가장 큰 문제가 발생하는데… image의 진입점 문제이다.

Image flow

k8s executor dag는 특정 ‘이미지’를 불러와서 실행한다.

dag 하나에 최소한 하나의 이미지가 있어야 된다는 점인데, 이 부분이 가장 큰 문제가 된다.

사내에서는 harbor로 커스텀 이미지를 관리하고 있는데, python 이미지를 alphine 기준 최소한으로 잡았을 때도 어지간해선 100M 정도 사이즈가 나온다.

즉..pipeline당 100M, 10개면 1G, 100개면 10G…

이게 무한정 늘어난다고 생각해 봤을때 이미지서버에 대한 부하가 걱정될 수 밖에 없다.

처음부터 논의했던 문제긴 한데, 그래서 진입점을 어떻게 하느냐에 대한 의논은 좀 했었다.

이런 식으로 구성하면 용량 문제는 어느정도 해결은 할 수 있을 것이다.

image category flow

최대한 공통점을 가지는 파이프라인을 묶어서 이미지를 빌드하고, Image를 최소화한 다음에 cmd를 통해 호출하는 진입점을 다르게 가는 것이다.

이렇게 할 경우 총 이미지 용량은 줄어들게 된다. 물론 깔끔하진 않다….

나 같은 경우에는 이런 구조를 싫어해서, 파이프라인 갯수가 감당이 안 될 정도가 아닌 이상 버텨 보기로 하고 일단 뭉개고 있는 중[?]. 사실 하드웨어가 제일 싸니까~

2. Pod capacity


이게..생각보다 Pod가 많이 뜬다.

그리고 생각보다 메모리를 많이 먹는다.

Fail flow

테스트시 제일 많이 발생한 문제인데, Pod이 memory가 부족하거나 하는 경우 제대로 된 작업을 하지 못하게 되고, worker는 pod는 띄웠는데 일이 안 끝나니까 그냥 Fail 처리해 버린다.

어지간하면 Fail retry로 수행되기야 하겠지만, Airflow의 특성상 특정 시기에 작업이 몰릴 수도 있으니 작업을 분배하거나 node를 분배하는 등의 설정이 따로 필요할 수 있다.

cfg를 통한 동시성 설정이나 동시수행 등의 설정을 바꿔서 분배하면 좀 나아질 듯.

# DAG
start_task = KubernetesPodOperator(
    ...
    startup_timeout_seconds=30000 # pod이 끝날때까지 3만초 기다림
    ...
)

# cfg
[core]
parallelism = 8
[scheduler]
dag_concurrentcy = 8
max_active_tasks_per_dag = 4
Enter fullscreen mode Exit fullscreen mode

소회

  • 사실 대부분 내 욕심으로 만들어진 아키텍쳐이다. 아직 플랫폼 초기니까…가능하겠지.
  • 생각만큼 k8s의 장점이 장점으로 와닿지 않는다. 작은 구조는 작은 구조대로 문제가 생기는데, 이것이 대규모화 됐을 때의 문제 스케일이 훨씬 클 것 같아서 그게 가장 걱정이다.
  • 구조가 직관적이고 쓸데 없는 구조가 없이 깔끔하다는 점은 좋지만, 그거 외에 단점이 이것저것 너무 많아서 누구보고 쓰라고 하면 딱히 추천하고 싶진 않다. MQ 하나 관리하지 않는다고 생기는 단점이 큰 것 같기도 하고.
  • 쓰진 않았지만 사실 가장 귀찮은 건 인자값 전달이었는데, 다른 operator와는 달리 변환이 필요하거나 하기도 해서 좀 불편하다.(cmds를 사용하니…). 쉽게 전달하기 위해 xcom이라는 게 있다던데 훑어보니 이게 더 불편해 보여서 아예 시도도 안함.
  • 그럼에도 CI/CD가 잘 되어 있거나 하는 환경에선 메리트가 있을 것이다. MSA 구조 등엔 도움될 부분이 있을 것임.
  • 특히 배포가 쉽다는 점에선 장점이 있을 것이다. (사실 이것도 MQ쓴다고 배포가 어려울 것은 없다만)

Top comments (0)