Unit tests для operators — mocking, pytest-mock, fixtures
DAG validity tests ловят синтаксические и структурные проблемы, но они не проверяют что operator делает. Unit tests на operator-level — это следующий tier тестов. Они изолированно проверяют логику одного operator: правильно ли он формирует SQL, корректно ли обрабатывает retries, отдаёт ли правильный XCom.
Этот урок — конкретные паттерны для unit-тестирования operators в Airflow 2.x: mocking через pytest-mock, fixtures для Connection/Variable, проверка execute() без запуска scheduler/workers.
Что считать unit test в Airflow
Граница между unit и integration test для Airflow size:
| Уровень | Что тестирует | Время | Что нужно |
|---|---|---|---|
| Unit | Logic одного operator/function, mocked dependencies | < 100ms | pytest, mock |
| Integration | DagRun на real Airflow с DB, real connections | 1-30s | sqlite+airflow init |
| E2E | Полный pipeline через staging Airflow | minutes | full Airflow infra |
Unit test НЕ запускает scheduler, НЕ читает реальный DB, НЕ требует Airflow init. Он импортирует operator класс и вызывает execute(context) с mocked context.
Mocks и monkeypatch — изоляция от внешних зависимостей
Базовый unit test для TaskFlow @task
# dags/etl/orders.py
from airflow.decorators import task
@task
def fetch_orders(execution_date: str, hook=None) -> int:
"""Fetch orders count from Postgres."""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = hook or PostgresHook(postgres_conn_id="orders_db")
sql = f"SELECT COUNT(*) FROM orders WHERE created_date = '{execution_date}'"
result = hook.get_first(sql)
return int(result[0])
Test:
# tests/unit/test_fetch_orders.py
from unittest.mock import MagicMock
from dags.etl.orders import fetch_orders
def test_fetch_orders_returns_count():
# Mock PostgresHook
mock_hook = MagicMock()
mock_hook.get_first.return_value = (42,)
# @task возвращает Task object — берём .function для direct call
result = fetch_orders.function(
execution_date="2026-05-12"
hook=mock_hook,
)
assert result == 42
mock_hook.get_first.assert_called_once_with(
"SELECT COUNT(*) FROM orders WHERE created_date = '2026-05-12'"
)
def test_fetch_orders_zero_results():
mock_hook = MagicMock()
mock_hook.get_first.return_value = (0,)
assert fetch_orders.function("2026-05-12", hook=mock_hook) == 0
def test_fetch_orders_sql_injection_check():
"""Бизнес-логика должна safely handle malicious dates."""
mock_hook = MagicMock()
mock_hook.get_first.return_value = (0,)
# При SQL injection в date это бы упало или дало неправильный count
# Лучше: переписать на parameterized query — отдельный test ловит regression
fetch_orders.function("2026'; DROP TABLE orders; --", hook=mock_hook)
# В этом примере SQL injection не safe — это плохо, но test показывает что вызов проходит
Pattern: @task-decorated function имеет атрибут .function — оригинальная функция без обёртки. Можно вызвать напрямую с подменёнными dependencies.
Unit test для classic operator
Classic operator наследует BaseOperator и реализует execute(self, context):
# plugins/custom_operators.py
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
class S3FilesCountOperator(BaseOperator):
"""Counts number of S3 keys matching prefix."""
template_fields = ("bucket", "prefix")
def __init__(self, bucket: str, prefix: str, s3_conn_id: str = "aws_default", **kwargs):
super().__init__(**kwargs)
self.bucket = bucket
self.prefix = prefix
self.s3_conn_id = s3_conn_id
def execute(self, context):
hook = S3Hook(aws_conn_id=self.s3_conn_id)
keys = hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
count = len(keys or [])
self.log.info(f"Found {count} objects with prefix {self.prefix}")
return count
Test через mocking S3Hook:
# tests/unit/test_s3_files_count_operator.py
import pytest
from unittest.mock import MagicMock, patch
from plugins.custom_operators import S3FilesCountOperator
@pytest.fixture
def context():
"""Minimal Airflow context for testing."""
from airflow.utils.context import Context
return Context(
ds="2026-05-12"
run_id="manual__2026-05-12T00:00:00+00:00"
task_instance=MagicMock(),
dag=MagicMock(),
)
@patch("plugins.custom_operators.S3Hook")
def test_s3_files_count_returns_count(mock_s3hook_cls, context):
# Arrange
mock_hook_instance = MagicMock()
mock_hook_instance.list_keys.return_value = ["key1", "key2", "key3"]
mock_s3hook_cls.return_value = mock_hook_instance
op = S3FilesCountOperator(
task_id="count_files"
bucket="my-bucket"
prefix="data/2026-05-12/",
)
# Act
result = op.execute(context)
# Assert
assert result == 3
mock_s3hook_cls.assert_called_once_with(aws_conn_id="aws_default")
mock_hook_instance.list_keys.assert_called_once_with(
bucket_name="my-bucket"
prefix="data/2026-05-12/",
)
@patch("plugins.custom_operators.S3Hook")
def test_s3_files_count_empty_returns_zero(mock_s3hook_cls, context):
"""Если S3 пустой — list_keys может вернуть None."""
mock_hook_instance = MagicMock()
mock_hook_instance.list_keys.return_value = None
mock_s3hook_cls.return_value = mock_hook_instance
op = S3FilesCountOperator(task_id="x", bucket="b", prefix="p")
assert op.execute(context) == 0
@patch("plugins.custom_operators.S3Hook")
def test_s3_files_count_logs_count(mock_s3hook_cls, context, caplog):
"""Operator должен залогировать count."""
mock_hook_instance = MagicMock()
mock_hook_instance.list_keys.return_value = ["k1", "k2"]
mock_s3hook_cls.return_value = mock_hook_instance
op = S3FilesCountOperator(task_id="x", bucket="b", prefix="p")
with caplog.at_level("INFO"):
op.execute(context)
assert "Found 2 objects" in caplog.text
pytest-mock — более чистый синтаксис
pytest-mock (фактически тонкая обёртка над unittest.mock) даёт fixture mocker для inline patching:
# pip install pytest-mock
def test_s3_files_count_with_mocker(mocker, context):
"""Same test через pytest-mock."""
mock_s3hook_cls = mocker.patch("plugins.custom_operators.S3Hook")
mock_s3hook_cls.return_value.list_keys.return_value = ["a", "b"]
op = S3FilesCountOperator(task_id="x", bucket="b", prefix="p")
assert op.execute(context) == 2
def test_with_spy(mocker, context):
"""spy — patch but still call real function."""
real_func = mocker.spy(SomeModule, "compute_value")
op.execute(context)
assert real_func.call_count == 3
Преимущество mocker — автоматически unmocks после test (нет нужды в with patch(): блоках или decorators).
Fixtures для Connection / Variable
В production коде встречается:
from airflow.models import Connection, Variable
class MyOperator(BaseOperator):
def execute(self, context):
conn = Connection.get_connection_from_secrets("my_db")
password = Variable.get("api_key")
В тестах это требует Airflow init (DB) или mocking. Mocking предпочтительнее:
# tests/conftest.py — shared fixtures
import pytest
from unittest.mock import patch
from airflow.models import Connection
@pytest.fixture
def mock_connection():
"""Fixture для mocking airflow.models.Connection.get_connection_from_secrets."""
def _make_conn(conn_id, host="localhost", login="user", password="pass", schema="db", **kwargs):
return Connection(
conn_id=conn_id,
conn_type=kwargs.get("conn_type", "postgres"),
host=host,
login=login,
password=password,
schema=schema,
port=kwargs.get("port", 5432),
extra=kwargs.get("extra", "{}"),
)
with patch("airflow.models.Connection.get_connection_from_secrets") as mock:
mock.side_effect = lambda conn_id: _make_conn(conn_id)
yield mock
@pytest.fixture
def mock_variable():
"""Fixture для mocking Variable.get."""
variables = {}
def _set(key, value):
variables[key] = value
def _get(key, default=None):
if key in variables:
return variables[key]
if default is not None:
return default
raise KeyError(key)
with patch("airflow.models.Variable.get", side_effect=_get):
# Yield setter, чтобы test мог наполнить
yield _set
Использование:
def test_with_mocked_connection(mock_connection, mocker):
"""mock_connection делает Connection.get_connection_from_secrets возвращать fake conn."""
op = SomeOperator(task_id="x", conn_id="my_db")
# Operator.execute сделает Connection.get_connection_from_secrets("my_db")
# → получит fake Connection(host="localhost", login="user", ...)
result = op.execute(MagicMock())
def test_with_mocked_variable(mock_variable):
mock_variable("api_key", "secret-token")
mock_variable("region", "us-east-1")
op = ApiOperator(task_id="x")
op.execute(MagicMock())
# Внутри execute Variable.get("api_key") вернёт "secret-token"
Testing retry logic
Retry поведение testable без запуска scheduler — через прямой вызов BaseOperator.execute с подменёнными exceptions:
class FlakyOperator(BaseOperator):
def execute(self, context):
import random
if random.random() < 0.3:
raise AirflowException("Transient failure")
return "ok"
def test_flaky_operator_retries_via_airflow_runtime(mocker, context):
"""Test что retries настроены правильно."""
op = FlakyOperator(task_id="x", retries=3, retry_delay=timedelta(seconds=1))
assert op.retries == 3
assert op.retry_delay == timedelta(seconds=1)
# Для proper retry simulation — integration test через airflow tasks test
# Unit test проверяет только настройки + поведение execute при exception
Для testing что execute() обрабатывает specific exception правильно:
class ResilientOperator(BaseOperator):
def execute(self, context):
try:
return self._fetch()
except TransientError:
self.log.warning("Transient failed, returning empty")
return []
# TerminalError propagates → retry by Airflow
def test_resilient_handles_transient(mocker, context):
op = ResilientOperator(task_id="x")
mocker.patch.object(op, "_fetch", side_effect=TransientError("network"))
result = op.execute(context)
assert result == []
def test_resilient_propagates_terminal(mocker, context):
op = ResilientOperator(task_id="x")
mocker.patch.object(op, "_fetch", side_effect=TerminalError("auth"))
with pytest.raises(TerminalError):
op.execute(context)
Testing XCom-pushing operator
class ProcessOperator(BaseOperator):
def execute(self, context):
# Implicit xcom_push через return
return {"count": 100, "status": "ok"}
class ConsumeOperator(BaseOperator):
def execute(self, context):
upstream = context["task_instance"].xcom_pull(task_ids="process")
return upstream["count"] * 2
def test_process_returns_xcom(context):
op = ProcessOperator(task_id="process")
result = op.execute(context)
assert result == {"count": 100, "status": "ok"}
def test_consume_pulls_xcom(context):
# Mock xcom_pull на context
context["task_instance"].xcom_pull.return_value = {"count": 50}
op = ConsumeOperator(task_id="consume")
result = op.execute(context)
assert result == 100
context["task_instance"].xcom_pull.assert_called_with(task_ids="process")
Test isolation — clean DB между тестами
Если test случайно касается реальной Airflow DB (например через Variable.set без mocking), тесты могут leak state. Best practice:
# pytest.ini
[pytest]
env =
AIRFLOW_HOME=/tmp/airflow-test
AIRFLOW__CORE__UNIT_TEST_MODE=True
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=sqlite:////tmp/airflow-test/airflow.db
# conftest.py
@pytest.fixture(autouse=True)
def reset_test_db():
"""Reset SQLite test DB перед каждым test."""
import os
db_path = "/tmp/airflow-test/airflow.db"
if os.path.exists(db_path):
os.remove(db_path)
from airflow.utils.db import initdb
initdb()
yield
unit_test_mode=True — режим Airflow для test scenarios: использует sqlite, отключает heartbeat, ускоряет startup.
Production gotchas
Mock на правильном path. Распространённая ошибка — patch airflow.providers.amazon.aws.hooks.s3.S3Hook когда нужно plugins.custom_operators.S3Hook (где он импортирован). Правило: patch там, где используется, а не где определён.
@task function vs object. from dags.etl.orders import fetch_orders даёт Task object. Для direct call в unit test используйте fetch_orders.function(...). Альтернативно — @task_decorator сохраняет original function на __wrapped__: fetch_orders.__wrapped__(...).
Context fixture с реальным TaskInstance. Для тестов которые нужны realistic context — используйте airflow.utils.context.Context со всеми ключами (ds, dag_run, task_instance, params). Не все operators работают с partial Context.
SQL queries — лучше тестировать на real DB через integration test. Unit test проверяет что operator формирует правильный SQL string, но не проверяет что SQL валиден на real Postgres. Combine: unit test для logic + integration test для SQL syntax via testcontainers-postgres.
Hooks обычно мокаются, а не тестируются. Logic для Hook — это Airflow provider code, его не нужно тестировать в DAG tests. Patch hook constructor + asserting method calls — достаточно.