В прошлом уроке мы выяснили, зачем нужна оркестрация и почему DAG — правильная абстракция. Теперь конкретно про Apache Airflow — самый распространённый оркестратор в мире DE на 2026 год.
Airflow родился в Airbnb в 2014 году, в 2019 стал top-level Apache. Сегодня используется в десятках тысяч компаний. Это, наверное, единственный инструмент, который встретишь почти в любой DE-вакансии.
Урок концептуальный. Мы НЕ погружаемся в производственные нюансы, Helm-чарты, Kubernetes-deployment, security, scaling, авторизацию. Для глубины — наш airflow-course на платформе.
Что такое Airflow на одной странице
Airflow — это:
- DAG-движок. Ты описываешь DAG-ы в Python-файлах. Airflow читает их, строит граф, выполняет по расписанию.
- Scheduler. Демон, который читает DAG-файлы, определяет, какие задачи должны запуститься сейчас, и отправляет их на исполнение.
- Executor. Компонент, который физически запускает задачи (локально, на Celery-воркерах, в Kubernetes-подах).
- Metadata DB. PostgreSQL/MySQL хранит состояние всех DAG-ов, runs, tasks, XCom, переменных, connection-ов.
- Web UI. Веб-интерфейс для просмотра DAG-ов, runs, логов, ручного запуска и backfill.
DAG-файлы лежат в FS, scheduler их читает, отправляет задачи в очередь, executor запускает их физически. Web UI показывает состояние.
Запуск Airflow в проде — это запустить scheduler, web server, executor (с пулом воркеров) и подключить их к общей metadata DB. В тестовой среде всё работает в одном Docker-контейнере.
DAG как Python-объект
В Airflow DAG — это Python-объект, который ты создаёшь декларативно. Файл dags/daily_summary.py:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
dag_id='daily_summary',
description='Ежедневный пайплайн сводной витрины',
schedule_interval='0 4 * * *',
start_date=datetime(2026, 5, 1),
catchup=False,
default_args={
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
},
) as dag:
def load_orders_fn(**context):
execution_date = context['execution_date']
# тут код выгрузки заказов за дату execution_date
pass
load_orders = PythonOperator(
task_id='load_orders',
python_callable=load_orders_fn,
)
# ... другие задачи
Airflow scheduler парсит этот файл, создаёт объект dag, регистрирует его. Каждый день в 04:00 запускается новый DAG run — экземпляр выполнения DAG за конкретную дату.
Ключевые параметры:
- dag_id — уникальный ID DAG-а.
- schedule_interval — cron-выражение или специальные значения (
@daily,@hourly). - start_date — с какой даты DAG начинает работать.
- catchup — должен ли Airflow пройти все пропущенные даты от start_date до сегодня.
- default_args — параметры по умолчанию для всех задач: retries, retry_delay, email_on_failure.
Task и Operator
Task — это единица работы в DAG. Реализуется через Operator — класс, который умеет выполнять определённый тип работы.
Самые частые Operators:
PythonOperator — запускает Python-функцию:
def process_data(**context):
# любой Python-код
df = pd.read_csv("source.csv")
df.to_parquet("dest.parquet")
task = PythonOperator(
task_id='process_data',
python_callable=process_data,
)
BashOperator — запускает bash-команду:
task = BashOperator(
task_id='run_spark_job',
bash_command='spark-submit /opt/jobs/transform.py',
)
SQLOperator / PostgresOperator / SnowflakeOperator — запускает SQL-запрос в базе:
task = SnowflakeOperator(
task_id='create_summary',
snowflake_conn_id='my_snowflake',
sql="""
CREATE OR REPLACE TABLE summary AS
SELECT customer_id, COUNT(*) FROM orders GROUP BY 1
""",
)
SensorOperator — ждёт определённого условия (файл появился в S3, партиция готова, статус API):
task = S3KeySensor(
task_id='wait_for_file',
bucket_key='s3://my-bucket/data/dt={ds}/_SUCCESS',
timeout=3600,
poke_interval=60,
)
KubernetesPodOperator — запускает Kubernetes-под с произвольным контейнером:
task = KubernetesPodOperator(
task_id='run_in_pod',
image='my-org/etl-image:1.0',
cmds=['python', '/app/run.py'],
)
В Airflow есть сотни operators в разных провайдерах (Snowflake, BigQuery, dbt, AWS, GCP, Slack). Хорошее правило: сначала ищи готовый operator, и только если его нет — пиши через PythonOperator.
Зависимости между задачами
Зависимости задаются операторами >> (после) и << (до):
load_orders >> build_summary
load_customers >> build_summary
load_products >> build_summary
# Эквивалентно:
[load_orders, load_customers, load_products] >> build_summary
# Эквивалентно через метод:
build_summary.set_upstream([load_orders, load_customers, load_products])
После выполнения этих строк Airflow знает граф: build_summary стартует после успеха всех трёх loaders.
Airflow строит граф во время парсинга DAG-файла. Это значит, что зависимости задаются статически (Python-код выполняется один раз при загрузке DAG-а), а не динамически в момент выполнения. Это и сила, и ограничение.
Scheduler и Executor
Scheduler — это бесконечно работающий процесс. Каждые 5-30 секунд он:
- Читает DAG-файлы из dags/ folder.
- Для каждого DAG смотрит расписание и определяет, должен ли запуститься новый DAG run.
- Для активных DAG runs смотрит состояние задач: какие готовы к запуску (upstream завершены, retries не исчерпаны)?
- Готовые задачи отправляются в очередь executor-а.
Executor — это компонент, который физически запускает задачи. Варианты:
LocalExecutor. Запускает задачи в подпроцессах на той же машине, что и scheduler. Подходит для маленьких deploy.
CeleryExecutor. Распределённый executor поверх Celery. Воркеры подключаются к Redis/RabbitMQ-очереди и выполняют задачи параллельно. Подходит для production.
KubernetesExecutor. Каждая задача запускается в отдельном поде Kubernetes. Динамически масштабируется. Самый популярный production-вариант 2026 года.
CeleryKubernetesExecutor. Гибрид — лёгкие задачи через Celery, тяжёлые в K8s-подах.
Выбор executor — это вопрос масштаба и операционных предпочтений. В тестах подходит Local, в production — Celery или Kubernetes.
XCom: обмен данными между задачами
XCom (cross-communication) — механизм передачи небольших данных между задачами одного DAG. Каждая задача может push значение и downstream-задачи могут pull это значение.
def push_value(**context):
return {'order_count': 12345, 'last_id': 999}
def use_value(**context):
ti = context['ti']
value = ti.xcom_pull(task_ids='push_value')
print(value['order_count'])
push_task = PythonOperator(task_id='push_value', python_callable=push_value)
use_task = PythonOperator(task_id='use_value', python_callable=use_value)
push_task >> use_task
Важно: XCom не для больших данных. Значения хранятся в metadata DB, лимит обычно 48 КБ. Для больших данных передавай через storage (S3, DWH-таблицу), а через XCom — только ссылку.
Variables и Connections
Variables — глобальные настройки, доступные всем DAG-ам:
from airflow.models import Variable
api_key = Variable.get("salesforce_api_key")
Connections — конфигурации для подключения к внешним системам (DWH, API, S3). Хранятся в metadata DB с зашифрованными секретами:
# В UI настроена connection "snowflake_prod"
SnowflakeOperator(snowflake_conn_id='snowflake_prod', ...)
Это типичное место для хранения паролей и токенов — секреты не должны быть в коде DAG.
SLA и алерты
Каждой задаче или DAG-у можно задать SLA — допустимое время выполнения:
PythonOperator(
task_id='heavy_aggregation',
python_callable=compute_aggregation,
sla=timedelta(hours=2),
)
Если задача не уложилась в 2 часа — Airflow зашлёт SLA-miss alert. Это критично для бизнес-обещаний типа «отчёт CFO готов к 09:00».
Что осталось за кадром
Глубокий Airflow — это:
- Production deployment на Kubernetes с Helm.
- Custom operators и hooks для специфичных систем.
- Dynamic DAG generation через factory-функции.
- TaskFlow API (Airflow 2.0+) — декораторное API для DAG-ов.
- Datasets — data-aware scheduling по событиям обновления datasets.
- Branching и trigger rules — условная логика в DAG.
- Pools и priority weights — управление параллелизмом.
- Backfill и catchup nuances.
- Monitoring через Prometheus.
В нашем airflow-course на платформе мы разбираем это с практическими примерами и hands-on lab-ом. Если работаешь с Airflow в проде — обязательно туда.
Airflow TaskFlow API: современный декораторный стиль DAG Airflow KubernetesExecutor: каждая задача в отдельном подеНа собеседовании DE Airflow спрашивают чаще всего из всех инструментов оркестрации. Нужно понимать как минимум: что такое DAG, Operator, Scheduler, Executor, XCom, retries, sensors, backfill. Это базовый словарь, без которого сложно работать на проекте.
Попробуй сам
Открой документацию Airflow и пройди quickstart: подними локальный Airflow через docker-compose, создай минимальный DAG из двух задач, запусти его и посмотри в UI. Это занимает 30 минут, но даёт фундаментальное понимание того, как работает оркестратор. Без этого опыта рассуждать про Airflow абстрактно сложно.