Learning Platform
Глоссарий Troubleshooting
Урок 13.02 · 22 мин
Начальный
airflowdagoperatorschedulerexecutorxcom

В прошлом уроке мы выяснили, зачем нужна оркестрация и почему DAG — правильная абстракция. Теперь конкретно про Apache Airflow — самый распространённый оркестратор в мире DE на 2026 год.

Airflow родился в Airbnb в 2014 году, в 2019 стал top-level Apache. Сегодня используется в десятках тысяч компаний. Это, наверное, единственный инструмент, который встретишь почти в любой DE-вакансии.

Урок концептуальный. Мы НЕ погружаемся в производственные нюансы, Helm-чарты, Kubernetes-deployment, security, scaling, авторизацию. Для глубины — наш airflow-course на платформе.

Что такое Airflow на одной странице

Airflow — это:

  • DAG-движок. Ты описываешь DAG-ы в Python-файлах. Airflow читает их, строит граф, выполняет по расписанию.
  • Scheduler. Демон, который читает DAG-файлы, определяет, какие задачи должны запуститься сейчас, и отправляет их на исполнение.
  • Executor. Компонент, который физически запускает задачи (локально, на Celery-воркерах, в Kubernetes-подах).
  • Metadata DB. PostgreSQL/MySQL хранит состояние всех DAG-ов, runs, tasks, XCom, переменных, connection-ов.
  • Web UI. Веб-интерфейс для просмотра DAG-ов, runs, логов, ручного запуска и backfill.
Архитектура Airflow

DAG-файлы лежат в FS, scheduler их читает, отправляет задачи в очередь, executor запускает их физически. Web UI показывает состояние.

DAG files.py файлыPython-файлы, описывающие DAG-и. Лежат в общей файловой системе или в Git, синхронизируются на все узлы Airflow.
SchedulerпланировщикДемон, читающий DAG-и каждые N секунд. Определяет, какие задачи должны запуститься, ставит их в очередь executor-а.
Queuetask queueОчередь задач. У разных executors — разные реализации очереди: для Local это просто in-memory, для Celery — Redis/RabbitMQ.
ExecutorLocal / Celery / K8sКомпонент, физически запускающий задачи. LocalExecutor — на одной машине, Celery — на пуле воркеров, KubernetesExecutor — каждая задача в отдельном поде.
Workerstask runnersФизические воркеры, которые выполняют код задачи. Запускают Python-функции, Bash-команды, SQL-запросы, Spark-job-ы.
Metadata DBPostgres / MySQLСостояние DAG-ов, runs, tasks, XCom, variables, connections. Без этой БД Airflow не работает.
Web serverUIВеб-интерфейс. Показывает DAG-и, runs, логи, метрики. Позволяет ручной запуск и backfill.
DEuserДата-инженер использует UI для отладки, мониторинга, ручного запуска.

Запуск Airflow в проде — это запустить scheduler, web server, executor (с пулом воркеров) и подключить их к общей metadata DB. В тестовой среде всё работает в одном Docker-контейнере.

DAG как Python-объект

В Airflow DAG — это Python-объект, который ты создаёшь декларативно. Файл dags/daily_summary.py:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='daily_summary',
    description='Ежедневный пайплайн сводной витрины',
    schedule_interval='0 4 * * *',
    start_date=datetime(2026, 5, 1),
    catchup=False,
    default_args={
        'owner': 'data-team',
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'email_on_failure': True,
    },
) as dag:

    def load_orders_fn(**context):
        execution_date = context['execution_date']
        # тут код выгрузки заказов за дату execution_date
        pass

    load_orders = PythonOperator(
        task_id='load_orders',
        python_callable=load_orders_fn,
    )

    # ... другие задачи

Airflow scheduler парсит этот файл, создаёт объект dag, регистрирует его. Каждый день в 04:00 запускается новый DAG run — экземпляр выполнения DAG за конкретную дату.

Ключевые параметры:

  • dag_id — уникальный ID DAG-а.
  • schedule_interval — cron-выражение или специальные значения (@daily, @hourly).
  • start_date — с какой даты DAG начинает работать.
  • catchup — должен ли Airflow пройти все пропущенные даты от start_date до сегодня.
  • default_args — параметры по умолчанию для всех задач: retries, retry_delay, email_on_failure.

Task и Operator

Task — это единица работы в DAG. Реализуется через Operator — класс, который умеет выполнять определённый тип работы.

Самые частые Operators:

PythonOperator — запускает Python-функцию:

def process_data(**context):
    # любой Python-код
    df = pd.read_csv("source.csv")
    df.to_parquet("dest.parquet")

task = PythonOperator(
    task_id='process_data',
    python_callable=process_data,
)

BashOperator — запускает bash-команду:

task = BashOperator(
    task_id='run_spark_job',
    bash_command='spark-submit /opt/jobs/transform.py',
)

SQLOperator / PostgresOperator / SnowflakeOperator — запускает SQL-запрос в базе:

task = SnowflakeOperator(
    task_id='create_summary',
    snowflake_conn_id='my_snowflake',
    sql="""
        CREATE OR REPLACE TABLE summary AS
        SELECT customer_id, COUNT(*) FROM orders GROUP BY 1
    """,
)

SensorOperator — ждёт определённого условия (файл появился в S3, партиция готова, статус API):

task = S3KeySensor(
    task_id='wait_for_file',
    bucket_key='s3://my-bucket/data/dt={ds}/_SUCCESS',
    timeout=3600,
    poke_interval=60,
)

KubernetesPodOperator — запускает Kubernetes-под с произвольным контейнером:

task = KubernetesPodOperator(
    task_id='run_in_pod',
    image='my-org/etl-image:1.0',
    cmds=['python', '/app/run.py'],
)

В Airflow есть сотни operators в разных провайдерах (Snowflake, BigQuery, dbt, AWS, GCP, Slack). Хорошее правило: сначала ищи готовый operator, и только если его нет — пиши через PythonOperator.

Зависимости между задачами

Зависимости задаются операторами >> (после) и << (до):

load_orders >> build_summary
load_customers >> build_summary
load_products >> build_summary

# Эквивалентно:
[load_orders, load_customers, load_products] >> build_summary

# Эквивалентно через метод:
build_summary.set_upstream([load_orders, load_customers, load_products])

После выполнения этих строк Airflow знает граф: build_summary стартует после успеха всех трёх loaders.

NOTE

Airflow строит граф во время парсинга DAG-файла. Это значит, что зависимости задаются статически (Python-код выполняется один раз при загрузке DAG-а), а не динамически в момент выполнения. Это и сила, и ограничение.

Scheduler и Executor

Scheduler — это бесконечно работающий процесс. Каждые 5-30 секунд он:

  1. Читает DAG-файлы из dags/ folder.
  2. Для каждого DAG смотрит расписание и определяет, должен ли запуститься новый DAG run.
  3. Для активных DAG runs смотрит состояние задач: какие готовы к запуску (upstream завершены, retries не исчерпаны)?
  4. Готовые задачи отправляются в очередь executor-а.

Executor — это компонент, который физически запускает задачи. Варианты:

LocalExecutor. Запускает задачи в подпроцессах на той же машине, что и scheduler. Подходит для маленьких deploy.

CeleryExecutor. Распределённый executor поверх Celery. Воркеры подключаются к Redis/RabbitMQ-очереди и выполняют задачи параллельно. Подходит для production.

KubernetesExecutor. Каждая задача запускается в отдельном поде Kubernetes. Динамически масштабируется. Самый популярный production-вариант 2026 года.

CeleryKubernetesExecutor. Гибрид — лёгкие задачи через Celery, тяжёлые в K8s-подах.

Выбор executor — это вопрос масштаба и операционных предпочтений. В тестах подходит Local, в production — Celery или Kubernetes.

XCom: обмен данными между задачами

XCom (cross-communication) — механизм передачи небольших данных между задачами одного DAG. Каждая задача может push значение и downstream-задачи могут pull это значение.

def push_value(**context):
    return {'order_count': 12345, 'last_id': 999}

def use_value(**context):
    ti = context['ti']
    value = ti.xcom_pull(task_ids='push_value')
    print(value['order_count'])

push_task = PythonOperator(task_id='push_value', python_callable=push_value)
use_task = PythonOperator(task_id='use_value', python_callable=use_value)

push_task >> use_task

Важно: XCom не для больших данных. Значения хранятся в metadata DB, лимит обычно 48 КБ. Для больших данных передавай через storage (S3, DWH-таблицу), а через XCom — только ссылку.

Variables и Connections

Variables — глобальные настройки, доступные всем DAG-ам:

from airflow.models import Variable
api_key = Variable.get("salesforce_api_key")

Connections — конфигурации для подключения к внешним системам (DWH, API, S3). Хранятся в metadata DB с зашифрованными секретами:

# В UI настроена connection "snowflake_prod"
SnowflakeOperator(snowflake_conn_id='snowflake_prod', ...)

Это типичное место для хранения паролей и токенов — секреты не должны быть в коде DAG.

SLA и алерты

Каждой задаче или DAG-у можно задать SLA — допустимое время выполнения:

PythonOperator(
    task_id='heavy_aggregation',
    python_callable=compute_aggregation,
    sla=timedelta(hours=2),
)

Если задача не уложилась в 2 часа — Airflow зашлёт SLA-miss alert. Это критично для бизнес-обещаний типа «отчёт CFO готов к 09:00».

Что осталось за кадром

Глубокий Airflow — это:

  • Production deployment на Kubernetes с Helm.
  • Custom operators и hooks для специфичных систем.
  • Dynamic DAG generation через factory-функции.
  • TaskFlow API (Airflow 2.0+) — декораторное API для DAG-ов.
  • Datasets — data-aware scheduling по событиям обновления datasets.
  • Branching и trigger rules — условная логика в DAG.
  • Pools и priority weights — управление параллелизмом.
  • Backfill и catchup nuances.
  • Monitoring через Prometheus.

В нашем airflow-course на платформе мы разбираем это с практическими примерами и hands-on lab-ом. Если работаешь с Airflow в проде — обязательно туда.

Airflow TaskFlow API: современный декораторный стиль DAG Airflow KubernetesExecutor: каждая задача в отдельном поде
TIP

На собеседовании DE Airflow спрашивают чаще всего из всех инструментов оркестрации. Нужно понимать как минимум: что такое DAG, Operator, Scheduler, Executor, XCom, retries, sensors, backfill. Это базовый словарь, без которого сложно работать на проекте.

Попробуй сам

Открой документацию Airflow и пройди quickstart: подними локальный Airflow через docker-compose, создай минимальный DAG из двух задач, запусти его и посмотри в UI. Это занимает 30 минут, но даёт фундаментальное понимание того, как работает оркестратор. Без этого опыта рассуждать про Airflow абстрактно сложно.

Проверка знанийKnowledge check
Какие компоненты составляют production-deploy Airflow и за что отвечает каждый?
ОтветAnswer
Production Airflow состоит из пяти ключевых компонентов. Первое — DAG-файлы (Python) в общей файловой системе или Git, описывающие пайплайны. Второе — Scheduler: демон, который каждые 5-30 секунд читает DAG-и, определяет, какие задачи должны запуститься, и отправляет их в очередь. Третье — Executor (LocalExecutor для теста, CeleryExecutor или KubernetesExecutor для production): компонент, физически запускающий задачи на воркерах. Четвёртое — Metadata DB (Postgres/MySQL): хранит состояние всех DAG-ов, runs, tasks, XCom, variables, connections. Пятое — Web server (UI): веб-интерфейс для просмотра состояния и ручного управления. Все компоненты общаются через metadata DB.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. За что отвечает Scheduler в Airflow?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4