Learning Platform
Глоссарий Troubleshooting
Урок 17.04 · 24 мин
Продвинутый
Integration Testsairflow tasks testairflow dags testpytest-airflowtestcontainers

Integration tests — airflow tasks test, airflow dags test, ephemeral environments

Unit tests проверяют логику в изоляции с mocked dependencies. Но они не ловят integration bugs: «работает в unit test, ломается на real Airflow». Integration tests — следующий уровень: запускают operator или весь DAG на real Airflow с real DB, real Connection, но в ephemeral environment, который создаётся и удаляется per-test.

В Airflow 2.x две built-in команды для этого: airflow tasks test (один task) и airflow dags test (полный DAGRun). Плюс библиотеки вроде pytest-airflow для pytest integration. Этот урок — как организовать integration testing layer в CI/CD.


Pytest fixtures — scope hierarchy и cleanup

Что отличает integration от unit test

Unit testIntegration test
Mocked Hooks, ConnectionsReal Connection из DB (SQLite OK)
Mocked contextReal Context от scheduler
< 100ms1-30 секунд
No Airflow initairflow db init обязательно
Один operator/functionОдин task или весь DAG
Logic correctnessReal execution path

Integration tests проверяют:

  • Что real Hook правильно interacts с real Connection
  • Что XCom правильно передаётся между tasks
  • Что DAG schedule logic работает (data_interval, etc)
  • Что retry/trigger_rules ведут себя как ожидается
Integration test tooling: tasks test vs dags test vs pytest-airflow
airflow tasks testCLI команда для одного task в isolation. Loads DAG из dag_folder, создаёт fake DagRun + TaskInstance in-memory, не пишет state в DB (если без --mark-success). Вызывает operator.execute(context) с real Context. Streams logs к stdout. НЕ использует executor — inline execution. НЕ trigger downstream.
airflow dags testCLI команда для весь DagRun. Создаёт реальный DagRun в DB. Запускает tasks в правильном порядке dependencies. XCom передаётся через DB. Trigger rules, retries, branching работают. SequentialExecutor inline — НЕ Celery/K8s. Самый powerful tool — full DagRun локально без scheduler.
pytest-airflow pluginCommunity plugin для tighter pytest integration. Fixture airflow_dag даёт inited Airflow + parsed DAG. Берёт boilerplate (init DB, set env vars, create test environment). Менее mature чем raw airflow dags test, но удобнее для test suite organization.
что они проверяют
Hook ↔ Connectiontasks test использует real Connection из metadata DB. Operator делает Hook(conn_id='X') → лезет в DB → получает credentials → connects к real (или mocked через moto/wiremock) service. Проверяет full path который unit test обходит mocking-ом.
XCom between tasksdags test пишет XCom values в DB и читает в downstream tasks. Тестирует serialization, custom XCom backend (S3, etc), правильность task_ids в xcom_pull. В unit test это всё mocked — integration ловит реальные bugs.
data_interval semanticsairflow dags test --data-interval-start/-end явно задаёт interval. Проверяет что DAG правильно использует data_interval_start/end (не datetime.now). Критично для custom Timetable, backfill-safety, partition-based queries.
DB backend choice
SQLite (fast)0s setup (in-memory или файл). SequentialExecutor only — SQLite не поддерживает LocalExecutor concurrent writes. Different DB engine от production — schema поведение может отличаться. Хорошо для local dev, fast CI smoke tests.
Postgres testcontainerstestcontainers-python запускает ephemeral Postgres в Docker per test session. 5-10s setup. Same DB engine как production. Real concurrency. LocalExecutor работает. Container умирает в конце pytest run. Production-like testing — recommended для critical tests.

airflow tasks test — один task в isolation

CLI команда для запуска одного task без scheduler:

airflow tasks test <dag_id> <task_id> <execution_date>

# Пример
airflow tasks test orders_etl extract_orders 2026-05-12

# Output: logs task execution к stdout, не сохраняет state в DB

Что делает:

  1. Loads DAG из dag_folder
  2. Creates fake DagRun + TaskInstance в memory (не пишет в DB)
  3. Resolves dependencies (xcom_pull работает только если upstream был раньше run)
  4. Calls operator.execute(context) с real context
  5. Streams logs в stdout

Что НЕ делает:

  • Не использует executor (Celery/K8s) — все запускается inline
  • Не trigger downstream tasks
  • Не пишет в task_instance table state (если не указан --mark-success)

Test wrapper в pytest:

# tests/integration/test_orders_etl.py
import subprocess

def test_extract_orders_via_cli():
    """Integration test через airflow tasks test."""
    result = subprocess.run(
        ["airflow", "tasks", "test", "orders_etl", "extract_orders", "2026-05-12"],
        capture_output=True, text=True, timeout=60,
    )
    assert result.returncode == 0, f"Task failed:\n{result.stderr}"
    assert "Extracted 100 orders" in result.stdout

airflow dags test — весь DagRun

Запускает весь DAG в одном процессе, без scheduler/executor:

airflow dags test <dag_id> <execution_date>

# Пример
airflow dags test orders_etl 2026-05-12

# Output: tasks выполняются sequentially, logs к stdout

В отличие от tasks test, dags test:

  • Создаёт реальный DagRun в DB
  • Запускает tasks в правильном порядке dependencies
  • XCom передаётся между tasks через DB
  • Trigger rules, retries, branching — работают
  • НЕ использует executor — sequential execution в одном процессе
def test_full_orders_etl_run():
    """End-to-end test полного DagRun."""
    result = subprocess.run(
        ["airflow", "dags", "test", "orders_etl", "2026-05-12"],
        capture_output=True, text=True, timeout=300,
    )
    assert result.returncode == 0
    # Verify DAG completed successfully — query DB
    from airflow.models import DagRun
    from airflow.utils.state import State
    dr = DagRun.find(dag_id="orders_etl", execution_date="2026-05-12")[0]
    assert dr.state == State.SUCCESS
NOTE

airflow dags test — самый powerful tool для integration testing. Один command воспроизводит full DagRun локально, без scheduler. Используется в Astronomer Astro для astro dev test.


SQLite vs Postgres для test DB

Для local CI tests есть выбор:

SQLitePostgres (testcontainers)
Setup time0s (in-memory или файл)5-10s (docker pull + start)
Resemble productionDifferent DBSame DB engine
Concurrency supportLimited (один writer)Real concurrency
Used forLocal dev, fast CIIntegration tests близкие к prod

SQLite для скорости:

# В CI
export AIRFLOW_HOME=/tmp/airflow
export AIRFLOW__CORE__EXECUTOR=SequentialExecutor  # SQLite не поддерживает LocalExecutor
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=sqlite:///$AIRFLOW_HOME/airflow.db
airflow db init
airflow dags test orders_etl 2026-05-12

Postgres через testcontainers — для production-like testing:

# tests/integration/conftest.py
import pytest
from testcontainers.postgres import PostgresContainer

@pytest.fixture(scope="session")
def postgres_container():
    """Postgres container для integration tests."""
    with PostgresContainer("postgres:15") as postgres:
        yield postgres

@pytest.fixture(scope="session", autouse=True)
def airflow_with_postgres(postgres_container, monkeypatch_session):
    """Initialize Airflow с Postgres backend."""
    conn_url = postgres_container.get_connection_url()
    # postgresql+psycopg2://test:test@localhost:5432/test
    monkeypatch_session.setenv("AIRFLOW__DATABASE__SQL_ALCHEMY_CONN", conn_url.replace("psycopg2", ""))
    monkeypatch_session.setenv("AIRFLOW__CORE__EXECUTOR", "LocalExecutor")
    monkeypatch_session.setenv("AIRFLOW_HOME", "/tmp/airflow-test")

    subprocess.run(["airflow", "db", "init"], check=True)
    yield

testcontainers-python — библиотека для запуска ephemeral Postgres/Redis/MySQL в Docker для test session. Container умирает в конце pytest run.


Создание Connection в integration test

DAG в production использует PostgresHook(postgres_conn_id="orders_db"). В integration test нужно создать эту Connection:

# tests/integration/conftest.py
import json
import pytest
from airflow.models import Connection
from airflow.utils.db import provide_session

@pytest.fixture
def orders_db_connection(postgres_container):
    """Create Connection 'orders_db' pointing to postgres_container."""
    @provide_session
    def _create_conn(session=None):
        conn = Connection(
            conn_id="orders_db"
            conn_type="postgres"
            host=postgres_container.get_container_host_ip(),
            port=int(postgres_container.get_exposed_port(5432)),
            login=postgres_container.username,
            password=postgres_container.password,
            schema=postgres_container.dbname,
        )
        # Удалить existing если есть
        existing = session.query(Connection).filter_by(conn_id="orders_db").first()
        if existing:
            session.delete(existing)
        session.add(conn)
        session.commit()
        return conn

    return _create_conn()

# Использование в test
def test_extract_orders(orders_db_connection, airflow_with_postgres):
    """Operator может connect к real Postgres через connection."""
    # orders_db_connection fixture создал Connection в Airflow DB
    # PostgresHook(postgres_conn_id="orders_db") теперь работает
    result = subprocess.run(
        ["airflow", "tasks", "test", "orders_etl", "extract_orders", "2026-05-12"],
        capture_output=True, text=True,
    )
    assert result.returncode == 0

pytest-airflow plugin

pytest-airflow — community plugin для tighter integration:

pip install pytest-airflow
# tests/integration/test_with_plugin.py
import pytest

@pytest.mark.airflow(dag_folder="dags/")
def test_dag_runs(airflow_dag):
    """Fixture airflow_dag даёт inited Airflow + parsed DAG."""
    dag = airflow_dag("orders_etl")
    dagrun = dag.test(execution_date="2026-05-12")
    assert dagrun.state == "success"

@pytest.mark.airflow(dag_folder="dags/", executor="LocalExecutor")
def test_dag_with_local_executor(airflow_dag):
    dag = airflow_dag("orders_etl")
    # ...

Plugin берёт boilerplate (init DB, set env vars, create test environment). Минусы — менее mature чем raw airflow dags test.


Ephemeral test environment в CI

GitHub Actions workflow для full integration testing:

# .github/workflows/integration-tests.yml
name: Integration Tests

on:
  push:
    branches: [main]
  pull_request:

jobs:
  integration:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow_test
        ports:
          - 5432:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    env:
      AIRFLOW_HOME: /tmp/airflow
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@localhost:5432/airflow_test
      AIRFLOW__CORE__LOAD_EXAMPLES: "False"
      AIRFLOW__CORE__UNIT_TEST_MODE: "False"

    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install Airflow + providers
        run: |
          pip install "apache-airflow==2.10.5" \
            --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.11.txt"
          pip install -r requirements.txt
          pip install pytest pytest-airflow testcontainers

      - name: Initialize Airflow DB
        run: airflow db init

      - name: Create test Connections
        run: |
          airflow connections add 'orders_db' \
            --conn-type postgres \
            --conn-host localhost \
            --conn-port 5432 \
            --conn-login airflow \
            --conn-password airflow \
            --conn-schema airflow_test
          airflow connections add 'aws_default' \
            --conn-type aws \
            --conn-extra '{"aws_access_key_id": "test", "aws_secret_access_key": "test"}'

      - name: Run unit tests
        run: pytest tests/unit/ -v

      - name: Run integration tests
        run: pytest tests/integration/ -v --timeout=300

      - name: Run full dags test
        run: |
          for dag in $(airflow dags list -o json | jq -r '.[].dag_id'); do
            airflow dags test "$dag" 2026-05-12 || exit 1
          done

Mocking external services в integration tests

Real external services (Snowflake, S3, third-party APIs) — слишком expensive для CI. Используйте:

# Moto для AWS mocking
from moto import mock_aws

@mock_aws
def test_s3_operator_with_moto():
    """S3 operations через moto (in-memory S3)."""
    import boto3
    s3 = boto3.client("s3", region_name="us-east-1")
    s3.create_bucket(Bucket="test-bucket")
    s3.put_object(Bucket="test-bucket", Key="data.csv", Body=b"test")

    # Run Airflow operator — он будет use mocked S3
    op = S3FilesCountOperator(task_id="x", bucket="test-bucket", prefix="")
    assert op.execute(MagicMock()) == 1

# Wiremock для HTTP API mocking
@pytest.fixture
def wiremock_server():
    from testcontainers.wiremock import WireMockContainer
    with WireMockContainer("wiremock/wiremock:3.0.1") as wm:
        wm.create_stub({
            "request": {"method": "GET", "url": "/api/orders"},
            "response": {"status": 200, "jsonBody": {"orders": []}},
        })
        yield wm

moto — drop-in mock для всего AWS SDK. wiremock — HTTP API mock через container. testcontainers-python объединяет всё.


Тестирование data_interval

airflow dags test --conf принимает execution_date, но data_interval автоматически вычисляется:

# Чтобы test data_interval semantics
def test_data_interval():
    result = subprocess.run(
        ["airflow", "dags", "test", "orders_etl", "2026-05-12",
         "--data-interval-start", "2026-05-12T00:00:00+00:00",
         "--data-interval-end", "2026-05-13T00:00:00+00:00"],
        capture_output=True, text=True,
    )
    # ...

Это критично для DAGs с custom Timetable (модуль 02) — проверить что они правильно обрабатывают data_interval.


Production gotchas

Integration tests slow — выделить отдельный CI job. Unit tests должны быть <5 min, integration — может быть до 30 min. Запускайте integration tests на push в main, не на каждый PR commit. Используйте pytest -m integration для разделения.

airflow dags test пишет в DB — нужен clean state. Между test runs delete DagRuns:

@pytest.fixture(autouse=True)
def clean_dagruns():
    yield
    from airflow.models import DagRun
    from airflow.utils.db import provide_session
    @provide_session
    def _clean(session=None):
        session.query(DagRun).delete()
        session.commit()
    _clean()

airflow dags test НЕ запускает executor. Если operator проверяет self.executor_config — он не сработает в dags test (executor — SequentialExecutor inline). Используйте separate test через docker-compose Airflow для full executor test.

Connection passwords в env vars leak в CI logs. Используйте GitHub Secrets или OIDC для real credentials. В integration test используйте fake credentials через moto/wiremock — не real.

Postgres service in GitHub Actions может быть тяжёлым. Если slow CI — используйте SQLite + SequentialExecutor для quick integration tests, Postgres только для marathon test suite (раз в день).

AIRFLOW__CORE__UNIT_TEST_MODE отключает heartbeat. Это OK для unit, но для integration оставьте UNIT_TEST_MODE=False — heartbeat behavior может быть критичен.


Проверка знанийKnowledge check
Команда хочет настроить полный CI testing pipeline для Airflow. Что должно быть в каком CI job — какой минимальный workflow, который ловит большинство production-багов до merge?
ОтветAnswer
Минимальный 3-stage workflow: **Stage 1 (PR-блокирующий, < 30s)** — DAG validity tests (test_no_import_errors через DagBag, structural tests, tags/owner checks). Запускается на каждый PR commit. Покрывает 70% багов 'DAG не появился в UI'. **Stage 2 (PR-блокирующий, < 5 min)** — Unit tests с mocked Hooks (pytest tests/unit/ -v). Проверяет business logic operators. Использует SQLite + UNIT_TEST_MODE=True для скорости. **Stage 3 (post-merge, < 30 min)** — Integration tests через airflow dags test полностью на каждом merged DAG. Запускается на push в main, не блокирует PR (slow). Использует Postgres container (testcontainers) + moto/wiremock для external services. Опционально: **Stage 4 (nightly)** — E2E tests на staging Airflow с real connections к sample data, 5-10 critical DAGs полным циклом. Tooling: GitHub Actions с matrix для Python 3.11/3.12, services: postgres, env vars через secrets, constraints file для pinned dependencies. Key configs: AIRFLOW__CORE__LOAD_EXAMPLES=False, AIRFLOW__CORE__EXECUTOR=LocalExecutor (Postgres) или SequentialExecutor (SQLite). pytest markers: -m unit / -m integration / -m e2e. После настройки, среднее время от bug commit до catch — < 1 минута (stage 1) vs дни без tests. ROI огромный — один день setup экономит недели incident response.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Чем `airflow tasks test` отличается от `airflow dags test`?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7