Integration tests — airflow tasks test, airflow dags test, ephemeral environments
Unit tests проверяют логику в изоляции с mocked dependencies. Но они не ловят integration bugs: «работает в unit test, ломается на real Airflow». Integration tests — следующий уровень: запускают operator или весь DAG на real Airflow с real DB, real Connection, но в ephemeral environment, который создаётся и удаляется per-test.
В Airflow 2.x две built-in команды для этого: airflow tasks test (один task) и airflow dags test (полный DAGRun). Плюс библиотеки вроде pytest-airflow для pytest integration. Этот урок — как организовать integration testing layer в CI/CD.
Pytest fixtures — scope hierarchy и cleanup
Что отличает integration от unit test
| Unit test | Integration test |
|---|---|
| Mocked Hooks, Connections | Real Connection из DB (SQLite OK) |
| Mocked context | Real Context от scheduler |
< 100ms | 1-30 секунд |
| No Airflow init | airflow db init обязательно |
| Один operator/function | Один task или весь DAG |
| Logic correctness | Real execution path |
Integration tests проверяют:
- Что real Hook правильно interacts с real Connection
- Что XCom правильно передаётся между tasks
- Что DAG schedule logic работает (data_interval, etc)
- Что retry/trigger_rules ведут себя как ожидается
airflow tasks test — один task в isolation
CLI команда для запуска одного task без scheduler:
airflow tasks test <dag_id> <task_id> <execution_date>
# Пример
airflow tasks test orders_etl extract_orders 2026-05-12
# Output: logs task execution к stdout, не сохраняет state в DB
Что делает:
- Loads DAG из dag_folder
- Creates fake DagRun + TaskInstance в memory (не пишет в DB)
- Resolves dependencies (xcom_pull работает только если upstream был раньше run)
- Calls
operator.execute(context)с real context - Streams logs в stdout
Что НЕ делает:
- Не использует executor (Celery/K8s) — все запускается inline
- Не trigger downstream tasks
- Не пишет в
task_instancetable state (если не указан--mark-success)
Test wrapper в pytest:
# tests/integration/test_orders_etl.py
import subprocess
def test_extract_orders_via_cli():
"""Integration test через airflow tasks test."""
result = subprocess.run(
["airflow", "tasks", "test", "orders_etl", "extract_orders", "2026-05-12"],
capture_output=True, text=True, timeout=60,
)
assert result.returncode == 0, f"Task failed:\n{result.stderr}"
assert "Extracted 100 orders" in result.stdout
airflow dags test — весь DagRun
Запускает весь DAG в одном процессе, без scheduler/executor:
airflow dags test <dag_id> <execution_date>
# Пример
airflow dags test orders_etl 2026-05-12
# Output: tasks выполняются sequentially, logs к stdout
В отличие от tasks test, dags test:
- Создаёт реальный DagRun в DB
- Запускает tasks в правильном порядке dependencies
- XCom передаётся между tasks через DB
- Trigger rules, retries, branching — работают
- НЕ использует executor — sequential execution в одном процессе
def test_full_orders_etl_run():
"""End-to-end test полного DagRun."""
result = subprocess.run(
["airflow", "dags", "test", "orders_etl", "2026-05-12"],
capture_output=True, text=True, timeout=300,
)
assert result.returncode == 0
# Verify DAG completed successfully — query DB
from airflow.models import DagRun
from airflow.utils.state import State
dr = DagRun.find(dag_id="orders_etl", execution_date="2026-05-12")[0]
assert dr.state == State.SUCCESS
airflow dags test — самый powerful tool для integration testing. Один command воспроизводит full DagRun локально, без scheduler. Используется в Astronomer Astro для astro dev test.
SQLite vs Postgres для test DB
Для local CI tests есть выбор:
| SQLite | Postgres (testcontainers) | |
|---|---|---|
| Setup time | 0s (in-memory или файл) | 5-10s (docker pull + start) |
| Resemble production | Different DB | Same DB engine |
| Concurrency support | Limited (один writer) | Real concurrency |
| Used for | Local dev, fast CI | Integration tests близкие к prod |
SQLite для скорости:
# В CI
export AIRFLOW_HOME=/tmp/airflow
export AIRFLOW__CORE__EXECUTOR=SequentialExecutor # SQLite не поддерживает LocalExecutor
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=sqlite:///$AIRFLOW_HOME/airflow.db
airflow db init
airflow dags test orders_etl 2026-05-12
Postgres через testcontainers — для production-like testing:
# tests/integration/conftest.py
import pytest
from testcontainers.postgres import PostgresContainer
@pytest.fixture(scope="session")
def postgres_container():
"""Postgres container для integration tests."""
with PostgresContainer("postgres:15") as postgres:
yield postgres
@pytest.fixture(scope="session", autouse=True)
def airflow_with_postgres(postgres_container, monkeypatch_session):
"""Initialize Airflow с Postgres backend."""
conn_url = postgres_container.get_connection_url()
# postgresql+psycopg2://test:test@localhost:5432/test
monkeypatch_session.setenv("AIRFLOW__DATABASE__SQL_ALCHEMY_CONN", conn_url.replace("psycopg2", ""))
monkeypatch_session.setenv("AIRFLOW__CORE__EXECUTOR", "LocalExecutor")
monkeypatch_session.setenv("AIRFLOW_HOME", "/tmp/airflow-test")
subprocess.run(["airflow", "db", "init"], check=True)
yield
testcontainers-python — библиотека для запуска ephemeral Postgres/Redis/MySQL в Docker для test session. Container умирает в конце pytest run.
Создание Connection в integration test
DAG в production использует PostgresHook(postgres_conn_id="orders_db"). В integration test нужно создать эту Connection:
# tests/integration/conftest.py
import json
import pytest
from airflow.models import Connection
from airflow.utils.db import provide_session
@pytest.fixture
def orders_db_connection(postgres_container):
"""Create Connection 'orders_db' pointing to postgres_container."""
@provide_session
def _create_conn(session=None):
conn = Connection(
conn_id="orders_db"
conn_type="postgres"
host=postgres_container.get_container_host_ip(),
port=int(postgres_container.get_exposed_port(5432)),
login=postgres_container.username,
password=postgres_container.password,
schema=postgres_container.dbname,
)
# Удалить existing если есть
existing = session.query(Connection).filter_by(conn_id="orders_db").first()
if existing:
session.delete(existing)
session.add(conn)
session.commit()
return conn
return _create_conn()
# Использование в test
def test_extract_orders(orders_db_connection, airflow_with_postgres):
"""Operator может connect к real Postgres через connection."""
# orders_db_connection fixture создал Connection в Airflow DB
# PostgresHook(postgres_conn_id="orders_db") теперь работает
result = subprocess.run(
["airflow", "tasks", "test", "orders_etl", "extract_orders", "2026-05-12"],
capture_output=True, text=True,
)
assert result.returncode == 0
pytest-airflow plugin
pytest-airflow — community plugin для tighter integration:
pip install pytest-airflow
# tests/integration/test_with_plugin.py
import pytest
@pytest.mark.airflow(dag_folder="dags/")
def test_dag_runs(airflow_dag):
"""Fixture airflow_dag даёт inited Airflow + parsed DAG."""
dag = airflow_dag("orders_etl")
dagrun = dag.test(execution_date="2026-05-12")
assert dagrun.state == "success"
@pytest.mark.airflow(dag_folder="dags/", executor="LocalExecutor")
def test_dag_with_local_executor(airflow_dag):
dag = airflow_dag("orders_etl")
# ...
Plugin берёт boilerplate (init DB, set env vars, create test environment). Минусы — менее mature чем raw airflow dags test.
Ephemeral test environment в CI
GitHub Actions workflow для full integration testing:
# .github/workflows/integration-tests.yml
name: Integration Tests
on:
push:
branches: [main]
pull_request:
jobs:
integration:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow_test
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
AIRFLOW_HOME: /tmp/airflow
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql://airflow:airflow@localhost:5432/airflow_test
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
AIRFLOW__CORE__UNIT_TEST_MODE: "False"
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install Airflow + providers
run: |
pip install "apache-airflow==2.10.5" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.11.txt"
pip install -r requirements.txt
pip install pytest pytest-airflow testcontainers
- name: Initialize Airflow DB
run: airflow db init
- name: Create test Connections
run: |
airflow connections add 'orders_db' \
--conn-type postgres \
--conn-host localhost \
--conn-port 5432 \
--conn-login airflow \
--conn-password airflow \
--conn-schema airflow_test
airflow connections add 'aws_default' \
--conn-type aws \
--conn-extra '{"aws_access_key_id": "test", "aws_secret_access_key": "test"}'
- name: Run unit tests
run: pytest tests/unit/ -v
- name: Run integration tests
run: pytest tests/integration/ -v --timeout=300
- name: Run full dags test
run: |
for dag in $(airflow dags list -o json | jq -r '.[].dag_id'); do
airflow dags test "$dag" 2026-05-12 || exit 1
done
Mocking external services в integration tests
Real external services (Snowflake, S3, third-party APIs) — слишком expensive для CI. Используйте:
# Moto для AWS mocking
from moto import mock_aws
@mock_aws
def test_s3_operator_with_moto():
"""S3 operations через moto (in-memory S3)."""
import boto3
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="test-bucket")
s3.put_object(Bucket="test-bucket", Key="data.csv", Body=b"test")
# Run Airflow operator — он будет use mocked S3
op = S3FilesCountOperator(task_id="x", bucket="test-bucket", prefix="")
assert op.execute(MagicMock()) == 1
# Wiremock для HTTP API mocking
@pytest.fixture
def wiremock_server():
from testcontainers.wiremock import WireMockContainer
with WireMockContainer("wiremock/wiremock:3.0.1") as wm:
wm.create_stub({
"request": {"method": "GET", "url": "/api/orders"},
"response": {"status": 200, "jsonBody": {"orders": []}},
})
yield wm
moto — drop-in mock для всего AWS SDK. wiremock — HTTP API mock через container. testcontainers-python объединяет всё.
Тестирование data_interval
airflow dags test --conf принимает execution_date, но data_interval автоматически вычисляется:
# Чтобы test data_interval semantics
def test_data_interval():
result = subprocess.run(
["airflow", "dags", "test", "orders_etl", "2026-05-12",
"--data-interval-start", "2026-05-12T00:00:00+00:00",
"--data-interval-end", "2026-05-13T00:00:00+00:00"],
capture_output=True, text=True,
)
# ...
Это критично для DAGs с custom Timetable (модуль 02) — проверить что они правильно обрабатывают data_interval.
Production gotchas
Integration tests slow — выделить отдельный CI job. Unit tests должны быть <5 min, integration — может быть до 30 min. Запускайте integration tests на push в main, не на каждый PR commit. Используйте pytest -m integration для разделения.
airflow dags test пишет в DB — нужен clean state. Между test runs delete DagRuns:
@pytest.fixture(autouse=True)
def clean_dagruns():
yield
from airflow.models import DagRun
from airflow.utils.db import provide_session
@provide_session
def _clean(session=None):
session.query(DagRun).delete()
session.commit()
_clean()
airflow dags test НЕ запускает executor. Если operator проверяет self.executor_config — он не сработает в dags test (executor — SequentialExecutor inline). Используйте separate test через docker-compose Airflow для full executor test.
Connection passwords в env vars leak в CI logs. Используйте GitHub Secrets или OIDC для real credentials. В integration test используйте fake credentials через moto/wiremock — не real.
Postgres service in GitHub Actions может быть тяжёлым. Если slow CI — используйте SQLite + SequentialExecutor для quick integration tests, Postgres только для marathon test suite (раз в день).
AIRFLOW__CORE__UNIT_TEST_MODE отключает heartbeat. Это OK для unit, но для integration оставьте UNIT_TEST_MODE=False — heartbeat behavior может быть критичен.