Процесс-level архитектура Airflow 2.x
Apache Airflow 2.x состоит из небольшого числа специализированных процессов, каждый со своей чёткой ответственностью. Webserver рендерит UI и обслуживает REST API. Scheduler — главный мозг — парсит DAGs, создаёт DagRuns, кидает tasks в очередь. Workers выполняют tasks. Triggerer держит асинхронные deferrable triggers. Все они общаются через единую metadata database — PostgreSQL.
Этот урок — карта местности. Дальнейшие модули препарируют каждый компонент в детали; здесь мы видим общую картину.
Пять компонентов 2.x
Каждый компонент имеет конкретную ответственность:
| Компонент | Что делает | Когда запускается |
|---|---|---|
| Webserver | Flask UI + REST API v1, рендеринг graph view | Всегда (1 или N replicas) |
| Scheduler | Main loop: create DagRuns, schedule TI, enqueue to executor | Всегда (HA = 2-4 instances с 2.0) |
| Triggerer | Async loop для deferrable operators | С 2.2+ (опционально, рекомендуется) |
| Workers | Executor-specific: Celery prefork, K8s pods | По executor type |
| Database | PostgreSQL 12+ с pgbouncer | Всегда |
DagFileProcessor — внутри scheduler в 2.x
Одна из частых вопросов — где парсятся DAG-файлы? В Airflow 2.x:
- Default:
DagFileProcessorManagerзапускается внутри scheduler process черезmultiprocessing.Process. Дочерние процессы (parsing_processes, default 2) парсят.pyфайлы и сериализуют в БД (serialized_dagtable). - Standalone option: можно запустить отдельный процесс
airflow dag-processor(опциональная production-оптимизация — изоляция parsing от scheduler loop).
В 2.x это внутри scheduler by default. В 3.0 standalone DAG Processor стал mandatory (AIP-66) — это одно из ключевых архитектурных изменений 3.x. Мы вернёмся к этому в финальном модуле upgrade path.
# В коде SchedulerJobRunner — упрощённо
class SchedulerJob:
def __init__(self):
if self.use_standalone_dag_processor:
# External airflow dag-processor process
pass
else:
# In-process DagFileProcessorManager
self.dag_processor_manager = DagFileProcessorManager(
parsing_processes=conf.getint('scheduler', 'parsing_processes')
)
Webserver в 2.x — Flask + Flask-AppBuilder
airflow-webserver — это:
- Flask application с Gunicorn в качестве WSGI server
- Flask-AppBuilder (FAB) — обеспечивает RBAC, security, admin views
- Jinja templates — server-rendered UI (graph view, gantt, calendar, audit log)
- REST API v1 через
flask-restful— stable since 2.0
Конфиг важных параметров:
[webserver]
web_server_port = 8080
workers = 4 # Gunicorn worker processes
worker_class = sync # sync / gevent / eventlet
worker_refresh_batch_size = 1 # restart N workers
worker_refresh_interval = 6000 # каждые 100 минут (DagBag cache bloat mitigation)
Production gotcha: webserver кэширует DagBag в memory каждого worker-а. При большом числе DAGs (>5k) memory растёт. Митигация: worker_refresh_interval для periodic restart workers.
В Airflow 3.0 Flask webserver полностью заменён на FastAPI-based API Server с React UI. Это одно из крупнейших архитектурных изменений 3.x. В 2.x мы остаёмся с проверенной Flask-стороной.
Workers и DB access в 2.x
Это важный архитектурный момент. В 2.x workers имеют прямой доступ к metadata DB через SQLAlchemy. Когда вы пишете внутри task code:
@task
def my_task():
val = Variable.get('my_var') # ← SQL query напрямую к Postgres
Connection.get('my_conn') # ← то же
xcom_push(key='r', value=v) # ← INSERT в xcom table
ti.heartbeat() # ← UPDATE в task_instance
Это удобно (быстро, синхронно), но создаёт три проблемы:
- Security: компрометация worker (через malicious package, supply chain attack) даёт DB access — атакующий может читать все Connections (после decrypt с Fernet key), все Variables, всю таблицу logs.
- Schema coupling: любое изменение schema metadata DB требует обновления всех workers синхронно. Невозможно делать rolling DB upgrades.
- Network requirement: worker должен иметь direct network access к DB. Не работает за NAT, в edge сценариях.
В Airflow 3.x появилась Task SDK boundary (AIP-72) — workers общаются только через REST API к API Server. В 2.x мы живём с direct DB access как trade-off.
Production mitigation в 2.x:
- Использовать Vault / AWS Secrets Manager как Secrets Backend → connection passwords не лежат в DB
- Network segmentation: workers в отдельном subnet с ограниченным DB access
- Regular Fernet key rotation
- Audit logs for DB access
Минимальный production setup vs full HA
Минимальный (dev/staging):
1 Webserver (1 replica)
1 Scheduler (включая DagFileProcessor pool)
1 Triggerer
1-2 Workers
1 PostgreSQL
1 Redis (если Celery)
Full production HA в 2.x:
2-3 Webservers (Load balancer + N replicas)
2-4 Schedulers (HA через row-level locks с 2.0)
1-2 standalone DAG Processors (для big DAG count)
2-3 Triggerers (asyncio scale → не больше нужно)
N Workers (autoscaled — Celery + K8s)
PostgreSQL RDS Multi-AZ + PgBouncer
Redis cluster (если Celery)
В модуле 15 (Production Deployment) мы детально разберём sizing, PgBouncer setup, network policy.
Что общее в metadata DB
Все компоненты Airflow — stateless в смысле, что они не хранят локальное состояние (кроме кэшей). Всё persistent state живёт в metadata database.
Канал общения 1: Metadata DB (для всего в 2.x)
Scheduler, Webserver, Triggerer, и Workers общаются через БД. Они:
- Конкурируют за tasks через row-level locks
- Видят heartbeat друг друга через таблицу
job - Координируют critical section через
slot_poollocks - Workers читают/пишут XCom, Variables, Connections напрямую
Канал общения 2: Executor-specific message bus
- Celery: Redis/RabbitMQ broker. Scheduler → broker → workers. Через broker идут commands и heartbeats.
- Kubernetes: K8s API. Scheduler создаёт pods, watch endpoint для events.
- Local: multiprocessing.Queue внутри scheduler process.
Канал общения 3: Filesystem (DAGs)
DAG-файлы хранятся в локальной папке (по умолчанию /opt/airflow/dags/). Все компоненты, которым нужен DAG code (scheduler, workers, webserver), должны видеть эту папку. На Kubernetes это типично решается через gitSync sidecar — каждый pod имеет init container, который git pull-ит DAGs из репо.
После parse DAG попадает в БД как serialized_dag row. Дальше scheduler и UI рендерят из serialized_dag, не парся .py заново.
В Airflow 3.x вместо gitSync sidecar появились DAG Bundles (AIP-66) — pluggable backends (git/S3/GCS), которые DAG Processor читает централизованно. Меньше движущихся частей в каждом pod. Это улучшение появилось в 3.x; в 2.x мы используем gitSync (или volume mount).
Что дальше в модуле
В следующих уроках модуля 01:
- 02-scheduler-deep — глубоко разберём SchedulerJob: что делает каждый тик, как DagFileProcessor работает.
- 03-webserver-rbac — Webserver internals, FAB security model, gunicorn tuning.
- 04-celery-vs-k8s-overview — обзор двух главных executor-ов перед deep dive в модуле 05.
- 05-metadata-db-schema — структура metadata DB, ключевые таблицы, production SQL queries.
- 06-job-submission-lifecycle — пошаговый walkthrough: trigger → scheduler picks → executor → worker → callback complete.