Learning Platform
Глоссарий Troubleshooting
Урок 17.03 · 24 мин
Продвинутый
Unit TestsMockingpytest-mockCustom OperatorsFixtures

Unit tests для operators — mocking, pytest-mock, fixtures

DAG validity tests ловят синтаксические и структурные проблемы, но они не проверяют что operator делает. Unit tests на operator-level — это следующий tier тестов. Они изолированно проверяют логику одного operator: правильно ли он формирует SQL, корректно ли обрабатывает retries, отдаёт ли правильный XCom.

Этот урок — конкретные паттерны для unit-тестирования operators в Airflow 2.x: mocking через pytest-mock, fixtures для Connection/Variable, проверка execute() без запуска scheduler/workers.


Что считать unit test в Airflow

Граница между unit и integration test для Airflow size:

УровеньЧто тестируетВремяЧто нужно
UnitLogic одного operator/function, mocked dependencies< 100mspytest, mock
IntegrationDagRun на real Airflow с DB, real connections1-30ssqlite+airflow init
E2EПолный pipeline через staging Airflowminutesfull Airflow infra

Unit test НЕ запускает scheduler, НЕ читает реальный DB, НЕ требует Airflow init. Он импортирует operator класс и вызывает execute(context) с mocked context.

Unit test flow: TaskFlow .function direct call + mock patch
@task decoratorAirflow TaskFlow @task оборачивает function в Task object. Прямой импорт даёт Task instance — не вызываемая функция. Атрибут .function (или .__wrapped__) — оригинальная python-функция без обёртки. Для unit test нужна именно она.
classic OperatorКласс наследует BaseOperator с execute(self, context). Instance создаётся в test, execute вызывается напрямую с mocked context. Hook внутри patcher-ится через mocker.patch на module path, где импортирован — НЕ на оригинальный provider path.
extract callable / instantiate
.function attributefetch_orders.function — оригинальная python-функция. Вызывается с обычными args. Альтернатива: fetch_orders.__wrapped__ (если @task сохраняет ссылку). Если функция принимает hook param — dependency injection идеальна для test.
op.execute(context)Прямой вызов execute с MagicMock(spec=Context). Context может быть partial — операторы обычно нуждаются в task_instance, ds, run_id. Возвращаемое значение — implicit XCom push (но XCom не пишется без scheduler).
patch hook на module-where-used
mocker.patch('plugins.my_op.S3Hook')Critical: patch там где импортирован символ, не где определён. plugins/my_op.py делает 'from airflow...s3 import S3Hook' — patcher должен указать plugins.my_op.S3Hook. Patching airflow.providers...s3.S3Hook не работает — local reference уже создан.
assertions
return valueresult == expected — что function вернула. Это implicit XCom value (в runtime через TaskInstance, в test просто return).
mock callsmock.assert_called_once_with(...) — что Hook был вызван с правильными параметрами. Проверяет SQL string, connection_id, метод и args. Самая важная assertion для verifying operator behavior.
caplog (logs)pytest caplog fixture перехватывает logger output. caplog.at_level('INFO') + assertion 'Found N objects' проверяет что operator залогировал важное событие. Полезно для observability tests.

Mocks и monkeypatch — изоляция от внешних зависимостей

Базовый unit test для TaskFlow @task

# dags/etl/orders.py
from airflow.decorators import task

@task
def fetch_orders(execution_date: str, hook=None) -> int:
    """Fetch orders count from Postgres."""
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    hook = hook or PostgresHook(postgres_conn_id="orders_db")
    sql = f"SELECT COUNT(*) FROM orders WHERE created_date = '{execution_date}'"
    result = hook.get_first(sql)
    return int(result[0])

Test:

# tests/unit/test_fetch_orders.py
from unittest.mock import MagicMock
from dags.etl.orders import fetch_orders

def test_fetch_orders_returns_count():
    # Mock PostgresHook
    mock_hook = MagicMock()
    mock_hook.get_first.return_value = (42,)

    # @task возвращает Task object — берём .function для direct call
    result = fetch_orders.function(
        execution_date="2026-05-12"
        hook=mock_hook,
    )

    assert result == 42
    mock_hook.get_first.assert_called_once_with(
        "SELECT COUNT(*) FROM orders WHERE created_date = '2026-05-12'"
    )

def test_fetch_orders_zero_results():
    mock_hook = MagicMock()
    mock_hook.get_first.return_value = (0,)
    assert fetch_orders.function("2026-05-12", hook=mock_hook) == 0

def test_fetch_orders_sql_injection_check():
    """Бизнес-логика должна safely handle malicious dates."""
    mock_hook = MagicMock()
    mock_hook.get_first.return_value = (0,)
    # При SQL injection в date это бы упало или дало неправильный count
    # Лучше: переписать на parameterized query — отдельный test ловит regression
    fetch_orders.function("2026'; DROP TABLE orders; --", hook=mock_hook)
    # В этом примере SQL injection не safe — это плохо, но test показывает что вызов проходит

Pattern: @task-decorated function имеет атрибут .function — оригинальная функция без обёртки. Можно вызвать напрямую с подменёнными dependencies.


Unit test для classic operator

Classic operator наследует BaseOperator и реализует execute(self, context):

# plugins/custom_operators.py
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

class S3FilesCountOperator(BaseOperator):
    """Counts number of S3 keys matching prefix."""
    template_fields = ("bucket", "prefix")

    def __init__(self, bucket: str, prefix: str, s3_conn_id: str = "aws_default", **kwargs):
        super().__init__(**kwargs)
        self.bucket = bucket
        self.prefix = prefix
        self.s3_conn_id = s3_conn_id

    def execute(self, context):
        hook = S3Hook(aws_conn_id=self.s3_conn_id)
        keys = hook.list_keys(bucket_name=self.bucket, prefix=self.prefix)
        count = len(keys or [])
        self.log.info(f"Found {count} objects with prefix {self.prefix}")
        return count

Test через mocking S3Hook:

# tests/unit/test_s3_files_count_operator.py
import pytest
from unittest.mock import MagicMock, patch
from plugins.custom_operators import S3FilesCountOperator

@pytest.fixture
def context():
    """Minimal Airflow context for testing."""
    from airflow.utils.context import Context
    return Context(
        ds="2026-05-12"
        run_id="manual__2026-05-12T00:00:00+00:00"
        task_instance=MagicMock(),
        dag=MagicMock(),
    )

@patch("plugins.custom_operators.S3Hook")
def test_s3_files_count_returns_count(mock_s3hook_cls, context):
    # Arrange
    mock_hook_instance = MagicMock()
    mock_hook_instance.list_keys.return_value = ["key1", "key2", "key3"]
    mock_s3hook_cls.return_value = mock_hook_instance

    op = S3FilesCountOperator(
        task_id="count_files"
        bucket="my-bucket"
        prefix="data/2026-05-12/",
    )

    # Act
    result = op.execute(context)

    # Assert
    assert result == 3
    mock_s3hook_cls.assert_called_once_with(aws_conn_id="aws_default")
    mock_hook_instance.list_keys.assert_called_once_with(
        bucket_name="my-bucket"
        prefix="data/2026-05-12/",
    )

@patch("plugins.custom_operators.S3Hook")
def test_s3_files_count_empty_returns_zero(mock_s3hook_cls, context):
    """Если S3 пустой — list_keys может вернуть None."""
    mock_hook_instance = MagicMock()
    mock_hook_instance.list_keys.return_value = None
    mock_s3hook_cls.return_value = mock_hook_instance

    op = S3FilesCountOperator(task_id="x", bucket="b", prefix="p")
    assert op.execute(context) == 0

@patch("plugins.custom_operators.S3Hook")
def test_s3_files_count_logs_count(mock_s3hook_cls, context, caplog):
    """Operator должен залогировать count."""
    mock_hook_instance = MagicMock()
    mock_hook_instance.list_keys.return_value = ["k1", "k2"]
    mock_s3hook_cls.return_value = mock_hook_instance

    op = S3FilesCountOperator(task_id="x", bucket="b", prefix="p")

    with caplog.at_level("INFO"):
        op.execute(context)

    assert "Found 2 objects" in caplog.text

pytest-mock — более чистый синтаксис

pytest-mock (фактически тонкая обёртка над unittest.mock) даёт fixture mocker для inline patching:

# pip install pytest-mock

def test_s3_files_count_with_mocker(mocker, context):
    """Same test через pytest-mock."""
    mock_s3hook_cls = mocker.patch("plugins.custom_operators.S3Hook")
    mock_s3hook_cls.return_value.list_keys.return_value = ["a", "b"]

    op = S3FilesCountOperator(task_id="x", bucket="b", prefix="p")
    assert op.execute(context) == 2

def test_with_spy(mocker, context):
    """spy — patch but still call real function."""
    real_func = mocker.spy(SomeModule, "compute_value")
    op.execute(context)
    assert real_func.call_count == 3

Преимущество mocker — автоматически unmocks после test (нет нужды в with patch(): блоках или decorators).


Fixtures для Connection / Variable

В production коде встречается:

from airflow.models import Connection, Variable

class MyOperator(BaseOperator):
    def execute(self, context):
        conn = Connection.get_connection_from_secrets("my_db")
        password = Variable.get("api_key")

В тестах это требует Airflow init (DB) или mocking. Mocking предпочтительнее:

# tests/conftest.py — shared fixtures
import pytest
from unittest.mock import patch
from airflow.models import Connection

@pytest.fixture
def mock_connection():
    """Fixture для mocking airflow.models.Connection.get_connection_from_secrets."""
    def _make_conn(conn_id, host="localhost", login="user", password="pass", schema="db", **kwargs):
        return Connection(
            conn_id=conn_id,
            conn_type=kwargs.get("conn_type", "postgres"),
            host=host,
            login=login,
            password=password,
            schema=schema,
            port=kwargs.get("port", 5432),
            extra=kwargs.get("extra", "{}"),
        )

    with patch("airflow.models.Connection.get_connection_from_secrets") as mock:
        mock.side_effect = lambda conn_id: _make_conn(conn_id)
        yield mock

@pytest.fixture
def mock_variable():
    """Fixture для mocking Variable.get."""
    variables = {}

    def _set(key, value):
        variables[key] = value

    def _get(key, default=None):
        if key in variables:
            return variables[key]
        if default is not None:
            return default
        raise KeyError(key)

    with patch("airflow.models.Variable.get", side_effect=_get):
        # Yield setter, чтобы test мог наполнить
        yield _set

Использование:

def test_with_mocked_connection(mock_connection, mocker):
    """mock_connection делает Connection.get_connection_from_secrets возвращать fake conn."""
    op = SomeOperator(task_id="x", conn_id="my_db")
    # Operator.execute сделает Connection.get_connection_from_secrets("my_db")
    # → получит fake Connection(host="localhost", login="user", ...)
    result = op.execute(MagicMock())

def test_with_mocked_variable(mock_variable):
    mock_variable("api_key", "secret-token")
    mock_variable("region", "us-east-1")
    op = ApiOperator(task_id="x")
    op.execute(MagicMock())
    # Внутри execute Variable.get("api_key") вернёт "secret-token"

Testing retry logic

Retry поведение testable без запуска scheduler — через прямой вызов BaseOperator.execute с подменёнными exceptions:

class FlakyOperator(BaseOperator):
    def execute(self, context):
        import random
        if random.random() < 0.3:
            raise AirflowException("Transient failure")
        return "ok"

def test_flaky_operator_retries_via_airflow_runtime(mocker, context):
    """Test что retries настроены правильно."""
    op = FlakyOperator(task_id="x", retries=3, retry_delay=timedelta(seconds=1))
    assert op.retries == 3
    assert op.retry_delay == timedelta(seconds=1)

    # Для proper retry simulation — integration test через airflow tasks test
    # Unit test проверяет только настройки + поведение execute при exception

Для testing что execute() обрабатывает specific exception правильно:

class ResilientOperator(BaseOperator):
    def execute(self, context):
        try:
            return self._fetch()
        except TransientError:
            self.log.warning("Transient failed, returning empty")
            return []
        # TerminalError propagates → retry by Airflow

def test_resilient_handles_transient(mocker, context):
    op = ResilientOperator(task_id="x")
    mocker.patch.object(op, "_fetch", side_effect=TransientError("network"))
    result = op.execute(context)
    assert result == []

def test_resilient_propagates_terminal(mocker, context):
    op = ResilientOperator(task_id="x")
    mocker.patch.object(op, "_fetch", side_effect=TerminalError("auth"))
    with pytest.raises(TerminalError):
        op.execute(context)

Testing XCom-pushing operator

class ProcessOperator(BaseOperator):
    def execute(self, context):
        # Implicit xcom_push через return
        return {"count": 100, "status": "ok"}

class ConsumeOperator(BaseOperator):
    def execute(self, context):
        upstream = context["task_instance"].xcom_pull(task_ids="process")
        return upstream["count"] * 2

def test_process_returns_xcom(context):
    op = ProcessOperator(task_id="process")
    result = op.execute(context)
    assert result == {"count": 100, "status": "ok"}

def test_consume_pulls_xcom(context):
    # Mock xcom_pull на context
    context["task_instance"].xcom_pull.return_value = {"count": 50}
    op = ConsumeOperator(task_id="consume")
    result = op.execute(context)
    assert result == 100
    context["task_instance"].xcom_pull.assert_called_with(task_ids="process")

Test isolation — clean DB между тестами

Если test случайно касается реальной Airflow DB (например через Variable.set без mocking), тесты могут leak state. Best practice:

# pytest.ini
[pytest]
env =
    AIRFLOW_HOME=/tmp/airflow-test
    AIRFLOW__CORE__UNIT_TEST_MODE=True
    AIRFLOW__CORE__LOAD_EXAMPLES=False
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=sqlite:////tmp/airflow-test/airflow.db

# conftest.py
@pytest.fixture(autouse=True)
def reset_test_db():
    """Reset SQLite test DB перед каждым test."""
    import os
    db_path = "/tmp/airflow-test/airflow.db"
    if os.path.exists(db_path):
        os.remove(db_path)
    from airflow.utils.db import initdb
    initdb()
    yield

unit_test_mode=True — режим Airflow для test scenarios: использует sqlite, отключает heartbeat, ускоряет startup.


Production gotchas

Mock на правильном path. Распространённая ошибка — patch airflow.providers.amazon.aws.hooks.s3.S3Hook когда нужно plugins.custom_operators.S3Hook (где он импортирован). Правило: patch там, где используется, а не где определён.

@task function vs object. from dags.etl.orders import fetch_orders даёт Task object. Для direct call в unit test используйте fetch_orders.function(...). Альтернативно — @task_decorator сохраняет original function на __wrapped__: fetch_orders.__wrapped__(...).

Context fixture с реальным TaskInstance. Для тестов которые нужны realistic context — используйте airflow.utils.context.Context со всеми ключами (ds, dag_run, task_instance, params). Не все operators работают с partial Context.

SQL queries — лучше тестировать на real DB через integration test. Unit test проверяет что operator формирует правильный SQL string, но не проверяет что SQL валиден на real Postgres. Combine: unit test для logic + integration test для SQL syntax via testcontainers-postgres.

Hooks обычно мокаются, а не тестируются. Logic для Hook — это Airflow provider code, его не нужно тестировать в DAG tests. Patch hook constructor + asserting method calls — достаточно.


Проверка знанийKnowledge check
Команда пишет unit test для custom S3CopyOperator. Их patch `airflow.providers.amazon.aws.hooks.s3.S3Hook` не работает — реальный S3Hook все равно вызывается. Что не так?
ОтветAnswer
Classic Python mocking gotcha — patch на неправильном path. Правило: **patch там, где импортируется/используется**, а не где определён. Если operator в `plugins/my_op.py` делает `from airflow.providers.amazon.aws.hooks.s3 import S3Hook`, то этот name теперь существует в namespace `plugins.my_op` — нужно `mocker.patch('plugins.my_op.S3Hook')`, не `mocker.patch('airflow.providers.amazon.aws.hooks.s3.S3Hook')`. Объяснение: Python import создаёт reference на object в текущем namespace. Когда вы делаете `from X import Y`, в текущем module появляется local `Y` указывающий на тот же object. Patching `X.Y` меняет атрибут модуля X, но local reference в моём модуле уже создан и указывает на старый object. Solution: patch на module where the symbol is USED. Tools для верификации: (1) `print(plugins.my_op.S3Hook)` — какой path использует module; (2) `mocker.patch.object(plugins.my_op, 'S3Hook')` — explicit syntax; (3) В стиле dependency injection — pass hook as parameter в operator, then в test pass MagicMock — нет нужды в patching вообще (cleanest approach). Это самая частая ошибка в pytest mocking — занимает часы дебага если не знаешь правило.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Как unit-test для @task-decorated function вызвать оригинальную function напрямую?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7