나의 잡다한 노트 및 메모

Airflow Executor의 역할, 종류, 동작 흐름 본문

Airflow

Airflow Executor의 역할, 종류, 동작 흐름

peanutwalnut 2025. 11. 10. 20:39

1. Executor의 역할

Airflow 아키텍처를 아주 크게 보면:

  • Webserver: UI
  • Scheduler: 언제 어떤 Task를 실행할지 결정
  • Metadata DB: DAG/Task 상태 저장
  • Executor: Task 실행을 위한 “실행 명령”을 담당
  • Worker(있을 수도, 없을 수도): 실제로 Task를 수행하는 프로세스/컨테이너

여기서 Executor는 Scheduler와 Worker 사이의 브리지 역할을 합니다.

조금 더 구체적으로:

  1. Scheduler가 TaskInstance를 “scheduled/queued” 상태로 만든다.
  2. Executor는 이 정보를 보고:
    • 어떤 Task를 지금 실행할지 결정 (슬롯, pool, concurrency 고려)
    • 어떤 방식으로 실행할지 결정 (로컬 프로세스, Celery 큐, K8s Pod 등)
  3. 실행 명령을 발행하고, 그에 대한 상태를 주기적으로 수집해서 DB에 반영합니다.

즉 Executor는:

  • 실행할 Task를 큐잉하고
  • 적절한 작업자(Worker or 프로세스 or Pod) 에게 일을 던져주고
  • 그 결과를 다시 끊임없이 폴링/콜백 해서 Task 상태를 업데이트합니다.

 

Airflow에 존재하는 주요 Executor 종류

버전에 따라 다양하지만, 실무에서 주로 만나게 되는 것들 기준으로 정리하면:

2.1 SequentialExecutor

  • 특징:
    • 동시에 하나의 Task만 실행 (병렬 X)
    • DB 연결도 단일, SQLite 같은 간단한 백엔드와 주로 사용
  • 용도:
    • 로컬 개발/테스트 용
    • Prod에서는 사실상 사용하지 않음
  • 동작 방식:
    • Scheduler 프로세스와 같은 환경에서 직접 Task를 순차적으로 실행
    • “테스트 모드”용이라고 보면 편합니다.

2.2 LocalExecutor

  • 특징:
    • 로컬 머신(또는 한 서버) 안에서 병렬 실행 지원
    • parallelism, dag_concurrency, max_active_runs_per_dag 등의 설정에 따라 동시에 여러 Task를 실행
  • 동작 방식:
    • Scheduler/Executor가 Task를 큐에 넣으면
    • 같은 머신 안에서 멀티 프로세스/멀티 쓰레드 형태로 Task를 실행
    • 별도의 Worker Pool(예: Celery Worker)이 필요 없음
  • 장점:
    • 설치가 단순 (DB + Scheduler + LocalExecutor 구성)
    • 작은 규모의 서비스나 POC에서 사용하기 좋음
  • 단점:
    • 스케일 아웃이 어렵다 (노드 1개에 종속)
    • 머신 리소스(CPU/RAM) 한계를 쉽게 만남

2.3 CeleryExecutor

  • 특징:
    • Task 실행을 Celery 기반 분산 Worker 들에 넘김
    • Message Broker(Redis/RabbitMQ 등) 필요
  • 구성 요소:
    • Airflow Scheduler + CeleryExecutor
    • Celery Broker (Redis/RabbitMQ)
    • Celery Worker 여러 개 (여러 서버에 분산 가능)
  • 동작 방식(요약):
    • Executor가 “이 Task 실행해” → Celery 큐에 메시지 push
    • Celery Worker가 큐에서 메시지를 가져와 Task 실행
    • 실행 완료 후 결과/상태를 다시 DB에 반영 (Worker → DB or Scheduler)
  • 장점:
    • 수평 확장이 쉬움 (Worker 서버 수를 늘리면 됨)
    • 비교적 전통적인 분산 구조라 이해/운영이 쉬움
  • 단점:
    • Celery/Broker/Result backend 등 구성 요소가 늘어남 → 운영 복잡도 증가
    • 대규모 환경에서는 Broker 튜닝, Worker 관리 필요

2.4 KubernetesExecutor

  • 특징:
    • 각 Task를 Kubernetes Pod 단위로 실행
    • “Task = 1 Pod”라고 보면 됨
  • 구성:
    • Airflow Scheduler + KubernetesExecutor
    • K8s Cluster
  • 동작 방식(요약):
    • Executor가 Task를 큐잉
    • 실행해야 할 시점이 되면 해당 Task 전용 Pod를 생성
    • Pod 내부에서 airflow tasks run ... 를 실행
    • 완료 후 Pod는 종료/삭제 (정책에 따라)
  • 장점:
    • 확장성이 매우 좋음 (K8s의 오토스케일링 활용)
    • Task마다 독립적인 리소스 제한/이미지/환경 구성 가능
  • 단점:
    • K8s에 대한 운영/권한/네트워킹 지식 필요
    • Pod 생성 오버헤드 존재 (짧은 Task가 매우 많으면 비효율)

 

3. 실제 DAG가 돌아갈 때 Executor 동작 흐름

이제 “DAG가 한 번 도는 순간”을 시간 순서대로, Executor 관점에서 자세히 볼게요.
(Executor 종류에 따라 세부 구현은 다르지만, 추상 흐름은 동일합니다.)

3.1 DAG 파싱 및 스케줄링 준비

  1. Scheduler가 DAG 파일들을 주기적으로 스캔/파싱
    • .py DAG 파일 읽어서 DAG 객체 생성
  2. 스케줄러가 “지금 실행할 타이밍인지(cron, timetable)” 체크
  3. 실행해야 할 시점이면 DagRun 객체 생성 (state = running)

이 시점까지는 Executor가 직접 개입하진 않고, Scheduler의 역할입니다.


3.2 TaskInstance 생성 및 “Queued” 상태 진입

  1. DagRun이 생성되면, DAG 정의에 따라 각 Task에 대한 TaskInstance(TI)들이 생성
  2. 스케줄러는:
    • upstream dependency (depends_on_past, trigger_rule 등) 체크
    • SLA, pool, concurrency, priority 등을 고려
  3. 실행 가능한 TaskInstance는 state = scheduled/queued 로 설정

여기서부터 Executor가 본격적으로 움직입니다.


3.3 Executor가 Task를 “실행 큐”에 넣는 단계

Executor의 핵심 인터페이스(개념적으로):

  • executor.queue_task_instance(task_instance)
  • executor.heartbeat()
    (내부에서 큐된 Task 상태 관리, 워커 상태 체크 등)

흐름:

  1. 스케줄러는 실행 가능한 TaskInstance를 찾고
  2. Executor의 queue 메서드를 호출해 “실행 요청” 을 전달
  3. Executor는 내부적으로:
    • 실행 대기 중인 Task 목록 관리
    • 시스템 전역 parallelism / pool 등 체크
    • 실행 가능하면 실제 실행 로직을 돌립니다.

Executor 종류에 따라 여기서 동작 차이가 납니다.


3.4 Executor별 실제 실행 방식

3.4.1 SequentialExecutor / LocalExecutor

  • SequentialExecutor:
    • 큐에 Task가 들어오면, 현재 실행 중인 게 없을 때
    • 해당 Task에 대해 바로 subprocess.Popen(["airflow", "tasks", "run", ...]) 같은 식으로 동기/직접 실행
    • 한 번에 하나만 돌기 때문에 병렬성 없음
  • LocalExecutor:
    • 내부에 worker process pool (예: multiprocessing.Pool) 같은 구조를 가지고 있음
    • Executor가 queue에 Task를 올리면:
      • 빈 슬롯이 있는 worker 프로세스에게 Task를 할당
      • 각 워커는 별도의 프로세스로 airflow tasks run ... 실행
    • 주기적으로 워커 상태를 체크해:
      • 성공/실패 여부를 가져와 DB에 반영

요약:

“한 서버 안에서 여러 프로세스를 띄워서 Task를 돌린다.”


3.4.2 CeleryExecutor

CeleryExecutor의 흐름은 다음과 같습니다.

  1. Queueing
    • Executor가 TaskInstance를 큐잉할 때:
      • Celery의 task 메시지로 변환
      • Broker(예: Redis)에 celery queue에 메시지를 push
  2. Worker에서 실행
    • Celery Worker 프로세스(별도 서버 가능)가 Broker를 폴링
    • 메시지를 가져오면:
      • 내부에서 airflow tasks run DAG_ID TASK_ID EXECUTION_DATE ... 명령을 실행
      • 실제 PythonOperator, BashOperator 등 비즈니스 로직 수행
  3. 상태 업데이트
    • Celery Worker는 Task 종료 후:
      • 성공/실패 결과를 반환
    • Executor는 주기적으로 result backend(또는 Celery 상태)를 확인하여
      • 해당 TaskInstance의 상태를 success/failed 등으로 Metadata DB에 반영

요약:

“Executor는 Celery 큐에 던지고, Worker들은 큐에서 뽑아서 실행, 결과를 다시 DB에 반영”


3.4.3 KubernetesExecutor

KubernetesExecutor는 다음과 같이 동작합니다.

  1. Queueing
    • Executor가 TaskInstance를 큐잉하면
    • 실행할 차례가 되었을 때, 해당 Task를 위한 K8s Pod spec을 생성
      • 이미지, env, resources(cpu/mem limit), volumeMount 등 설정
    • Kubernetes API를 호출해서 Pod 생성
  2. Pod 내부 실행
    • 생성된 Pod는 entrypoint로 airflow tasks run ... 을 실행
    • Pod는 Task 수행 (Operator 내부 로직 실행)
  3. 상태 수집
    • Executor는 K8s API를 통해 Pod 상태를 주기적으로 watch
      • Running, Succeeded, Failed 등
    • Pod가 Succeeded → TaskInstance 성공
    • Pod가 Failed → TaskInstance 실패
    • 이 정보를 Metadata DB에 반영
  4. 정리(Cleanup)
    • 정책에 따라 실행이 끝난 Pod를 즉시 삭제하거나, 일정 시간 유지 후 삭제

요약:

“Task 실행 단위가 K8s Pod이며, Executor는 Pod를 생성/관리하면서 Task 상태를 DB에 반영”


3.5 Task 종료 후 후속 처리

Task가 성공/실패로 끝나면:

  1. TaskInstance 상태가 DB에 업데이트 (success / failed / up_for_retry 등)
  2. Scheduler는 다음 heartbeat 때:
    • 이 Task를 의존하는 downstream Task들의 상태를 보고
    • 이제 실행 가능해진 Task들을 다시 Executor에게 큐잉
  3. 전체 DAG의 모든 필수 Task들이 종료되면:
    • DagRun 상태도 success / failed 등으로 마무리

이 전체 사이클 동안 Executor는:

  • 슬롯 관리 (parallelism)
  • 큐 관리
  • 실행 환경/리소스 할당(Local, Celery Worker, K8s Pod 등)
  • 상태 수집 및 반영
    역할을 계속 수행합니다.