나의 잡다한 노트 및 메모
Airflow Executor의 역할, 종류, 동작 흐름 본문
1. Executor의 역할
Airflow 아키텍처를 아주 크게 보면:
- Webserver: UI
- Scheduler: 언제 어떤 Task를 실행할지 결정
- Metadata DB: DAG/Task 상태 저장
- Executor: Task 실행을 위한 “실행 명령”을 담당
- Worker(있을 수도, 없을 수도): 실제로 Task를 수행하는 프로세스/컨테이너
여기서 Executor는 Scheduler와 Worker 사이의 브리지 역할을 합니다.
조금 더 구체적으로:
- Scheduler가 TaskInstance를 “scheduled/queued” 상태로 만든다.
- Executor는 이 정보를 보고:
- 어떤 Task를 지금 실행할지 결정 (슬롯, pool, concurrency 고려)
- 어떤 방식으로 실행할지 결정 (로컬 프로세스, Celery 큐, K8s Pod 등)
- 실행 명령을 발행하고, 그에 대한 상태를 주기적으로 수집해서 DB에 반영합니다.
즉 Executor는:
- 실행할 Task를 큐잉하고
- 적절한 작업자(Worker or 프로세스 or Pod) 에게 일을 던져주고
- 그 결과를 다시 끊임없이 폴링/콜백 해서 Task 상태를 업데이트합니다.
Airflow에 존재하는 주요 Executor 종류
버전에 따라 다양하지만, 실무에서 주로 만나게 되는 것들 기준으로 정리하면:
2.1 SequentialExecutor
- 특징:
- 동시에 하나의 Task만 실행 (병렬 X)
- DB 연결도 단일, SQLite 같은 간단한 백엔드와 주로 사용
- 용도:
- 로컬 개발/테스트 용
- Prod에서는 사실상 사용하지 않음
- 동작 방식:
- Scheduler 프로세스와 같은 환경에서 직접 Task를 순차적으로 실행
- “테스트 모드”용이라고 보면 편합니다.
2.2 LocalExecutor
- 특징:
- 로컬 머신(또는 한 서버) 안에서 병렬 실행 지원
- parallelism, dag_concurrency, max_active_runs_per_dag 등의 설정에 따라 동시에 여러 Task를 실행
- 동작 방식:
- Scheduler/Executor가 Task를 큐에 넣으면
- 같은 머신 안에서 멀티 프로세스/멀티 쓰레드 형태로 Task를 실행
- 별도의 Worker Pool(예: Celery Worker)이 필요 없음
- 장점:
- 설치가 단순 (DB + Scheduler + LocalExecutor 구성)
- 작은 규모의 서비스나 POC에서 사용하기 좋음
- 단점:
- 스케일 아웃이 어렵다 (노드 1개에 종속)
- 머신 리소스(CPU/RAM) 한계를 쉽게 만남
2.3 CeleryExecutor
- 특징:
- Task 실행을 Celery 기반 분산 Worker 들에 넘김
- Message Broker(Redis/RabbitMQ 등) 필요
- 구성 요소:
- Airflow Scheduler + CeleryExecutor
- Celery Broker (Redis/RabbitMQ)
- Celery Worker 여러 개 (여러 서버에 분산 가능)
- 동작 방식(요약):
- Executor가 “이 Task 실행해” → Celery 큐에 메시지 push
- Celery Worker가 큐에서 메시지를 가져와 Task 실행
- 실행 완료 후 결과/상태를 다시 DB에 반영 (Worker → DB or Scheduler)
- 장점:
- 수평 확장이 쉬움 (Worker 서버 수를 늘리면 됨)
- 비교적 전통적인 분산 구조라 이해/운영이 쉬움
- 단점:
- Celery/Broker/Result backend 등 구성 요소가 늘어남 → 운영 복잡도 증가
- 대규모 환경에서는 Broker 튜닝, Worker 관리 필요
2.4 KubernetesExecutor
- 특징:
- 각 Task를 Kubernetes Pod 단위로 실행
- “Task = 1 Pod”라고 보면 됨
- 구성:
- Airflow Scheduler + KubernetesExecutor
- K8s Cluster
- 동작 방식(요약):
- Executor가 Task를 큐잉
- 실행해야 할 시점이 되면 해당 Task 전용 Pod를 생성
- Pod 내부에서 airflow tasks run ... 를 실행
- 완료 후 Pod는 종료/삭제 (정책에 따라)
- 장점:
- 확장성이 매우 좋음 (K8s의 오토스케일링 활용)
- Task마다 독립적인 리소스 제한/이미지/환경 구성 가능
- 단점:
- K8s에 대한 운영/권한/네트워킹 지식 필요
- Pod 생성 오버헤드 존재 (짧은 Task가 매우 많으면 비효율)
3. 실제 DAG가 돌아갈 때 Executor 동작 흐름
이제 “DAG가 한 번 도는 순간”을 시간 순서대로, Executor 관점에서 자세히 볼게요.
(Executor 종류에 따라 세부 구현은 다르지만, 추상 흐름은 동일합니다.)
3.1 DAG 파싱 및 스케줄링 준비
- Scheduler가 DAG 파일들을 주기적으로 스캔/파싱
- .py DAG 파일 읽어서 DAG 객체 생성
- 스케줄러가 “지금 실행할 타이밍인지(cron, timetable)” 체크
- 실행해야 할 시점이면 DagRun 객체 생성 (state = running)
이 시점까지는 Executor가 직접 개입하진 않고, Scheduler의 역할입니다.
3.2 TaskInstance 생성 및 “Queued” 상태 진입
- DagRun이 생성되면, DAG 정의에 따라 각 Task에 대한 TaskInstance(TI)들이 생성
- 스케줄러는:
- upstream dependency (depends_on_past, trigger_rule 등) 체크
- SLA, pool, concurrency, priority 등을 고려
- 실행 가능한 TaskInstance는 state = scheduled/queued 로 설정
여기서부터 Executor가 본격적으로 움직입니다.
3.3 Executor가 Task를 “실행 큐”에 넣는 단계
Executor의 핵심 인터페이스(개념적으로):
- executor.queue_task_instance(task_instance)
- executor.heartbeat()
(내부에서 큐된 Task 상태 관리, 워커 상태 체크 등)
흐름:
- 스케줄러는 실행 가능한 TaskInstance를 찾고
- Executor의 queue 메서드를 호출해 “실행 요청” 을 전달
- Executor는 내부적으로:
- 실행 대기 중인 Task 목록 관리
- 시스템 전역 parallelism / pool 등 체크
- 실행 가능하면 실제 실행 로직을 돌립니다.
Executor 종류에 따라 여기서 동작 차이가 납니다.
3.4 Executor별 실제 실행 방식
3.4.1 SequentialExecutor / LocalExecutor
- SequentialExecutor:
- 큐에 Task가 들어오면, 현재 실행 중인 게 없을 때
- 해당 Task에 대해 바로 subprocess.Popen(["airflow", "tasks", "run", ...]) 같은 식으로 동기/직접 실행
- 한 번에 하나만 돌기 때문에 병렬성 없음
- LocalExecutor:
- 내부에 worker process pool (예: multiprocessing.Pool) 같은 구조를 가지고 있음
- Executor가 queue에 Task를 올리면:
- 빈 슬롯이 있는 worker 프로세스에게 Task를 할당
- 각 워커는 별도의 프로세스로 airflow tasks run ... 실행
- 주기적으로 워커 상태를 체크해:
- 성공/실패 여부를 가져와 DB에 반영
요약:
“한 서버 안에서 여러 프로세스를 띄워서 Task를 돌린다.”
3.4.2 CeleryExecutor
CeleryExecutor의 흐름은 다음과 같습니다.
- Queueing
- Executor가 TaskInstance를 큐잉할 때:
- Celery의 task 메시지로 변환
- Broker(예: Redis)에 celery queue에 메시지를 push
- Executor가 TaskInstance를 큐잉할 때:
- Worker에서 실행
- Celery Worker 프로세스(별도 서버 가능)가 Broker를 폴링
- 메시지를 가져오면:
- 내부에서 airflow tasks run DAG_ID TASK_ID EXECUTION_DATE ... 명령을 실행
- 실제 PythonOperator, BashOperator 등 비즈니스 로직 수행
- 상태 업데이트
- Celery Worker는 Task 종료 후:
- 성공/실패 결과를 반환
- Executor는 주기적으로 result backend(또는 Celery 상태)를 확인하여
- 해당 TaskInstance의 상태를 success/failed 등으로 Metadata DB에 반영
- Celery Worker는 Task 종료 후:
요약:
“Executor는 Celery 큐에 던지고, Worker들은 큐에서 뽑아서 실행, 결과를 다시 DB에 반영”
3.4.3 KubernetesExecutor
KubernetesExecutor는 다음과 같이 동작합니다.
- Queueing
- Executor가 TaskInstance를 큐잉하면
- 실행할 차례가 되었을 때, 해당 Task를 위한 K8s Pod spec을 생성
- 이미지, env, resources(cpu/mem limit), volumeMount 등 설정
- Kubernetes API를 호출해서 Pod 생성
- Pod 내부 실행
- 생성된 Pod는 entrypoint로 airflow tasks run ... 을 실행
- Pod는 Task 수행 (Operator 내부 로직 실행)
- 상태 수집
- Executor는 K8s API를 통해 Pod 상태를 주기적으로 watch
- Running, Succeeded, Failed 등
- Pod가 Succeeded → TaskInstance 성공
- Pod가 Failed → TaskInstance 실패
- 이 정보를 Metadata DB에 반영
- Executor는 K8s API를 통해 Pod 상태를 주기적으로 watch
- 정리(Cleanup)
- 정책에 따라 실행이 끝난 Pod를 즉시 삭제하거나, 일정 시간 유지 후 삭제
요약:
“Task 실행 단위가 K8s Pod이며, Executor는 Pod를 생성/관리하면서 Task 상태를 DB에 반영”
3.5 Task 종료 후 후속 처리
Task가 성공/실패로 끝나면:
- TaskInstance 상태가 DB에 업데이트 (success / failed / up_for_retry 등)
- Scheduler는 다음 heartbeat 때:
- 이 Task를 의존하는 downstream Task들의 상태를 보고
- 이제 실행 가능해진 Task들을 다시 Executor에게 큐잉
- 전체 DAG의 모든 필수 Task들이 종료되면:
- DagRun 상태도 success / failed 등으로 마무리
이 전체 사이클 동안 Executor는:
- 슬롯 관리 (parallelism)
- 큐 관리
- 실행 환경/리소스 할당(Local, Celery Worker, K8s Pod 등)
- 상태 수집 및 반영
역할을 계속 수행합니다.