Learning Platform
Глоссарий Troubleshooting
Урок 17.05 · 22 мин
Продвинутый
Custom OperatorsTest IsolationXComRetryTemplates

Custom operator testing — isolation, XCom, retry logic, error handling

Custom operators — это сердце сложных Airflow проектов. Они инкапсулируют business logic, integration с внутренними системами, custom retry strategies. Их тестирование требует больше внимания чем testing TaskFlow @task functions — operator имеет template_fields, lifecycle hooks, parametric behavior.

Этот урок — конкретные patterns для testing custom operators: isolation от реальных connections, проверка template rendering, XCom push/pull verification, retry logic edge cases, error handling в execute().


Тестирование классов Python — паттерны для операторов

Anatomy custom operator под тест

Рассмотрим production-grade custom operator:

# plugins/operators/snowflake_merge_operator.py
from airflow.models import BaseOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.decorators import apply_defaults

class SnowflakeMergeOperator(BaseOperator):
    """
    MERGE INTO operator with idempotent retry logic.
    Pushes (rows_merged, sql) to XCom.
    """
    template_fields = ("source_table", "target_table", "merge_keys", "execution_date_str")
    ui_color = "#4287f5"

    @apply_defaults
    def __init__(
        self,
        source_table: str,
        target_table: str,
        merge_keys: list[str],
        snowflake_conn_id: str = "snowflake_default",
        execution_date_str: str = "{{ ds }}",
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self.merge_keys = merge_keys
        self.snowflake_conn_id = snowflake_conn_id
        self.execution_date_str = execution_date_str

    def execute(self, context):
        if not self.merge_keys:
            raise ValueError("merge_keys must be non-empty list")

        sql = self._build_merge_sql()
        self.log.info(f"Executing MERGE: {sql[:200]}...")

        hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
        try:
            rows_merged = hook.run(sql)
        except Exception as e:
            self.log.error(f"MERGE failed: {e}")
            raise

        # XCom push для downstream tasks
        context["task_instance"].xcom_push(key="rows_merged", value=rows_merged)
        context["task_instance"].xcom_push(key="sql", value=sql)
        return rows_merged

    def _build_merge_sql(self) -> str:
        on_clause = " AND ".join(f"tgt.{k} = src.{k}" for k in self.merge_keys)
        return f"""
MERGE INTO {self.target_table} tgt
USING (SELECT * FROM {self.source_table} WHERE ds = '{self.execution_date_str}') src
  ON {on_clause}
WHEN MATCHED THEN UPDATE SET tgt.value = src.value
WHEN NOT MATCHED THEN INSERT (*) VALUES (src.*);
"""

У operator пять testable аспектов:

  1. __init__ валидация параметров
  2. _build_merge_sql корректное SQL для разных параметров
  3. execute() happy path с mocked hook
  4. execute() error handling (hook raises)
  5. Template rendering для template_fields
Custom operator lifecycle: test points через execute()
__init__(self, ...)Scheduler создаёт operator instance во время DAG parse. Параметры сохраняются как self.attrs. Test point #1: assert op.source_table == 'X', op.retries == 3 — проверка что BaseOperator args (retries, retry_delay) pass через **kwargs.
DAG parse → serialized_dag
render_template_fields(context)Перед execute Airflow runtime применяет Jinja к каждому template_field. {{ ds }} → '2026-05-12'. Test point #2: op.render_template_fields({'ds_nodash': '20260512', 'dag': dag, 'task': op}) — проверка что rendering работает И что field в template_fields tuple.
scheduler enqueue → worker picks
pre_execute hooksOptional lifecycle hook BaseOperator.pre_execute(context). Редко используется в custom operators — обычно validation идёт внутри execute. Здесь скорее на уровне @task.before_execute (3.x) или on_execute_callback.
execute(self, context)Main method. Test point #3 (happy path): mock Hook class, call op.execute(context), assert return value, hook.run called с правильным SQL. Test point #4 (error): hook.run.side_effect = Exception → assert pytest.raises с правильным message + caplog.
внутри execute: build SQL + hook.run
_build_merge_sql()Pure function — easy для testing. Test point #5: вызывайте напрямую с разными merge_keys (single, composite, special chars), assert правильный SQL в output. Не нужен mock — это deterministic.
hook.run(sql)Mocked в test. mock_hook.run.return_value = 42 (rows merged). Assert mock_hook.run.call_args[0][0] содержит ожидаемое SQL. mock_hook_cls.assert_called_once_with(snowflake_conn_id='...') — verify Hook constructed правильно.
xcom_push(key, value)context['task_instance'].xcom_push — MagicMock в test. assert_any_call(key='rows_merged', value=42) проверяет что operator pushed правильный XCom. Implicit return value тоже push-ится в default 'return_value' key.
return valueВозвращаемое значение становится default XCom (key='return_value'). Test point: result = op.execute(context); assert result == 42. Простой и важный assertion — проверяет contract.
post_execute → state=success
on_success_callback / on_failure_callbackCallbacks runs после execute. Если operator определяет custom callback логику — testable через context['task'].on_failure_callback(context). Чаще callback на DAG-level — там test через integration.

Test 1 — __init__ validation

# tests/unit/test_snowflake_merge_operator.py
import pytest
from plugins.operators.snowflake_merge_operator import SnowflakeMergeOperator

def test_init_with_valid_params():
    op = SnowflakeMergeOperator(
        task_id="merge_orders"
        source_table="staging.orders"
        target_table="prod.orders"
        merge_keys=["id"],
    )
    assert op.source_table == "staging.orders"
    assert op.target_table == "prod.orders"
    assert op.merge_keys == ["id"]
    assert op.snowflake_conn_id == "snowflake_default"  # default

def test_init_with_custom_conn_id():
    op = SnowflakeMergeOperator(
        task_id="x"
        source_table="s"
        target_table="t"
        merge_keys=["id"],
        snowflake_conn_id="snowflake_prod",
    )
    assert op.snowflake_conn_id == "snowflake_prod"

def test_init_inherits_base_operator_args():
    """BaseOperator args (retries, retry_delay) должны pass через."""
    from datetime import timedelta
    op = SnowflakeMergeOperator(
        task_id="x"
        source_table="s"
        target_table="t"
        merge_keys=["id"],
        retries=5,
        retry_delay=timedelta(minutes=10),
    )
    assert op.retries == 5
    assert op.retry_delay == timedelta(minutes=10)

Test 2 — _build_merge_sql правильность

Логика SQL building — это самая критичная часть. Тестируем все ветки:

def test_build_merge_sql_single_key():
    op = SnowflakeMergeOperator(
        task_id="x"
        source_table="staging.orders"
        target_table="prod.orders"
        merge_keys=["id"],
        execution_date_str="2026-05-12",
    )
    sql = op._build_merge_sql()
    assert "MERGE INTO prod.orders tgt" in sql
    assert "FROM staging.orders WHERE ds = '2026-05-12'" in sql
    assert "ON tgt.id = src.id" in sql

def test_build_merge_sql_composite_key():
    op = SnowflakeMergeOperator(
        task_id="x"
        source_table="s", target_table="t"
        merge_keys=["customer_id", "order_id"],
        execution_date_str="2026-05-12",
    )
    sql = op._build_merge_sql()
    assert "ON tgt.customer_id = src.customer_id AND tgt.order_id = src.order_id" in sql

def test_build_merge_sql_special_chars_in_table():
    """Tables с dots / quotes."""
    op = SnowflakeMergeOperator(
        task_id="x"
        source_table='"DATABASE"."SCHEMA"."TABLE"',
        target_table="prod.orders"
        merge_keys=["id"],
    )
    sql = op._build_merge_sql()
    assert '"DATABASE"."SCHEMA"."TABLE"' in sql
WARNING

SQL injection — здесь reasonable concern. Production-grade SnowflakeMergeOperator должен использовать parameterized queries или escape identifiers. Если test показывает что operator vulnerable — это bug в operator, не в test. Тест на SQL injection — defensive: добавить assertion что dangerous chars escaped.


Test 3 — execute() happy path

from unittest.mock import MagicMock, patch
from airflow.utils.context import Context

@pytest.fixture
def context():
    ti = MagicMock()
    ti.xcom_push = MagicMock()
    return Context(
        ds="2026-05-12"
        run_id="manual__2026-05-12T00:00:00+00:00"
        task_instance=ti,
        dag=MagicMock(),
    )

@patch("plugins.operators.snowflake_merge_operator.SnowflakeHook")
def test_execute_happy_path(mock_hook_cls, context):
    # Setup mock
    mock_hook = MagicMock()
    mock_hook.run.return_value = 42  # 42 rows merged
    mock_hook_cls.return_value = mock_hook

    # Execute
    op = SnowflakeMergeOperator(
        task_id="x"
        source_table="staging.orders"
        target_table="prod.orders"
        merge_keys=["id"],
        execution_date_str="2026-05-12",
    )
    result = op.execute(context)

    # Assertions
    assert result == 42

    # Hook was called с правильным conn_id
    mock_hook_cls.assert_called_once_with(snowflake_conn_id="snowflake_default")

    # SQL содержит ожидаемое
    actual_sql = mock_hook.run.call_args[0][0]
    assert "MERGE INTO prod.orders" in actual_sql
    assert "FROM staging.orders WHERE ds = '2026-05-12'" in actual_sql

    # XCom push был вызван
    context["task_instance"].xcom_push.assert_any_call(key="rows_merged", value=42)
    # SQL also pushed
    xcom_calls = context["task_instance"].xcom_push.call_args_list
    assert any(call.kwargs.get("key") == "sql" for call in xcom_calls)

Test 4 — error handling

@patch("plugins.operators.snowflake_merge_operator.SnowflakeHook")
def test_execute_raises_on_hook_failure(mock_hook_cls, context):
    """Operator должен propagate exception от hook для retry."""
    mock_hook = MagicMock()
    mock_hook.run.side_effect = Exception("Connection timeout")
    mock_hook_cls.return_value = mock_hook

    op = SnowflakeMergeOperator(
        task_id="x", source_table="s", target_table="t", merge_keys=["id"]
    )
    with pytest.raises(Exception, match="Connection timeout"):
        op.execute(context)

def test_execute_raises_on_empty_merge_keys():
    """Validation в execute должна fail fast."""
    op = SnowflakeMergeOperator(
        task_id="x", source_table="s", target_table="t", merge_keys=[]
    )
    context = MagicMock()
    with pytest.raises(ValueError, match="merge_keys must be non-empty"):
        op.execute(context)

@patch("plugins.operators.snowflake_merge_operator.SnowflakeHook")
def test_execute_logs_error_before_raising(mock_hook_cls, context, caplog):
    """Operator должен log error для debugging."""
    mock_hook = MagicMock()
    mock_hook.run.side_effect = Exception("Auth failed")
    mock_hook_cls.return_value = mock_hook

    op = SnowflakeMergeOperator(
        task_id="x", source_table="s", target_table="t", merge_keys=["id"]
    )
    with caplog.at_level("ERROR"):
        with pytest.raises(Exception):
            op.execute(context)

    assert "MERGE failed" in caplog.text
    assert "Auth failed" in caplog.text

Test 5 — template field rendering

template_fields — это поля, которые рендерятся через Jinja с context переменными ({{ ds }}, {{ params }}, {{ ti.xcom_pull(...) }}). Это критично тестировать — typo в template ломает silently.

def test_template_fields_declared():
    """template_fields должны включать все Jinja-rendered поля."""
    assert "source_table" in SnowflakeMergeOperator.template_fields
    assert "target_table" in SnowflakeMergeOperator.template_fields
    assert "execution_date_str" in SnowflakeMergeOperator.template_fields

def test_template_rendering():
    """Render template_fields с real Airflow Jinja."""
    from airflow.models import DAG
    from datetime import datetime

    dag = DAG(
        dag_id="test_dag"
        start_date=datetime(2026, 1, 1),
        schedule="@daily",
    )

    op = SnowflakeMergeOperator(
        task_id="x"
        source_table="staging.orders_{{ ds_nodash }}"
        target_table="prod.orders"
        merge_keys=["id"],
        execution_date_str="{{ ds }}"
        dag=dag,
    )

    # Build context with ds
    context = {"ds": "2026-05-12", "ds_nodash": "20260512", "dag": dag, "task": op}
    op.render_template_fields(context)

    assert op.source_table == "staging.orders_20260512"
    assert op.execution_date_str == "2026-05-12"

op.render_template_fields(context) — built-in Airflow method, который применяет Jinja к каждому template_field. Это даёт real rendering без запуска scheduler.


Testing XCom interactions

Для operator который pull/push XCom, тестируем оба направления:

class XComConsumerOperator(BaseOperator):
    """Operator который pulls upstream XCom + pushes computed result."""

    def __init__(self, upstream_task_id: str, **kwargs):
        super().__init__(**kwargs)
        self.upstream_task_id = upstream_task_id

    def execute(self, context):
        upstream_value = context["task_instance"].xcom_pull(task_ids=self.upstream_task_id)
        if upstream_value is None:
            raise ValueError(f"No XCom from {self.upstream_task_id}")
        return upstream_value * 2

def test_xcom_consumer_happy_path(context):
    """Pull от upstream → push computed."""
    context["task_instance"].xcom_pull.return_value = 50
    op = XComConsumerOperator(task_id="x", upstream_task_id="produce")
    result = op.execute(context)
    assert result == 100
    context["task_instance"].xcom_pull.assert_called_with(task_ids="produce")

def test_xcom_consumer_fails_on_none(context):
    context["task_instance"].xcom_pull.return_value = None
    op = XComConsumerOperator(task_id="x", upstream_task_id="produce")
    with pytest.raises(ValueError, match="No XCom from produce"):
        op.execute(context)

Retry logic testing

Airflow runtime обрабатывает retries — operator только raise exception. Что тестируем:

def test_operator_retries_configured():
    """Operator имеет retries=3 by default."""
    op = SnowflakeMergeOperator(
        task_id="x", source_table="s", target_table="t", merge_keys=["id"],
        retries=3, retry_delay=timedelta(minutes=5),
    )
    assert op.retries == 3
    assert op.retry_delay == timedelta(minutes=5)

# Для testing real retry behavior — integration test через airflow tasks test
def test_retry_via_airflow_runtime():
    """Test real retry — task fails 2 times, succeeds на 3rd."""
    # Это integration test — нужен real Airflow execution path
    # Через airflow tasks test и mock внутри DAG file
    pass

Exponential backoff verification:

def test_exponential_backoff_setup():
    """Operator с exponential backoff должен иметь правильные defaults."""
    op = SnowflakeMergeOperator(
        task_id="x", source_table="s", target_table="t", merge_keys=["id"],
        retries=5,
        retry_delay=timedelta(minutes=1),
        retry_exponential_backoff=True,
        max_retry_delay=timedelta(hours=1),
    )
    assert op.retry_exponential_backoff is True
    assert op.max_retry_delay == timedelta(hours=1)

Идемпотентность через test

Production operators должны быть idempotent — повторный execute не ломает data. Тестируем:

@patch("plugins.operators.snowflake_merge_operator.SnowflakeHook")
def test_execute_idempotent_via_merge(mock_hook_cls, context):
    """Двойной execute с тем же data — same result, no duplicates."""
    mock_hook = MagicMock()
    mock_hook.run.return_value = 42
    mock_hook_cls.return_value = mock_hook

    op = SnowflakeMergeOperator(
        task_id="x", source_table="s", target_table="t"
        merge_keys=["id"], execution_date_str="2026-05-12",
    )

    # Первый execute
    result1 = op.execute(context)
    # Второй execute с тем же context — SQL identical
    result2 = op.execute(context)

    # Оба раза SQL identical (deterministic)
    call1_sql = mock_hook.run.call_args_list[0][0][0]
    call2_sql = mock_hook.run.call_args_list[1][0][0]
    assert call1_sql == call2_sql
    # MERGE — idempotent. INSERT — нет.
    assert "MERGE INTO" in call1_sql

Production gotchas

apply_defaults deprecated. В 2.x @apply_defaults decorator deprecated — BaseOperator делает это автоматически. Не нужно в новых operators.

template_fields с tuple, не list. Distinction matters: template_fields = ("a",) — tuple. template_fields = ["a"] — list, который mutate-able. Используйте tuple.

render_template_fields модифицирует op. Не вызывайте дважды — поля уже rendered. В test используйте fresh operator instance per test.

Hook constructor может делать work. Если Hook.__init__ лезет в Airflow DB (например Connection.get_connection_from_secrets), mocking Hook class недостаточно — mock и __init__. Лучше: pass hook as parameter (dependency injection).

Context dict — airflow.utils.context.Context, не dict. Использовать Context() factory для proper typing. Для tests часто достаточно MagicMock(spec=Context).

self.log доступен только после __init__. При testing _build_sql вызывайте после __init__ (operator уже создан). Иначе self.log AttributeError.


Проверка знанийKnowledge check
Custom operator SnowflakeMergeOperator имеет template_fields=('source_table', 'target_table'). Production команда написала DAG с `source_table='staging.orders_{{ ds_nodash }}'` и видит в S3 файлы `staging.orders_{{ ds_nodash }}.parquet` — Jinja не rendered. Что не так и как тестировать чтобы это не случилось?
ОтветAnswer
Jinja template rendering работает ТОЛЬКО когда: (1) field указан в `template_fields`; (2) operator имеет `dag` reference; (3) Airflow runtime вызывает `render_template_fields(context)` перед execute (это делается scheduler/executor, НЕ автоматически в `tasks test --no-render-templates`). Вероятные причины проблемы: (a) `source_table` не в `template_fields` tuple — но в задаче указано, что добавлен; (b) Operator вызывается через `airflow tasks test --no-render-templates` (skip rendering); (c) Operator используется в context где `dag` reference потерян; (d) Typo в template — например `{{ ds_nodash }}` с trailing space или `{ ds_nodash }` (один brace). Test для catch такого: ОБЯЗАТЕЛЬНО написать explicit test render: `def test_template_rendering(): dag = DAG(...); op = SnowflakeMergeOperator(...source_table='staging.orders_{{ ds_nodash }}', dag=dag); op.render_template_fields({'ds_nodash': '20260512', 'dag': dag, 'task': op}); assert op.source_table == 'staging.orders_20260512'`. Это catches: (1) typo в template; (2) field не в template_fields; (3) wrong template variable name; (4) что render_template_fields реально работает на этом operator. Дополнительно: structural test `assert 'source_table' in SnowflakeMergeOperator.template_fields` — catches regressions если кто-то удалит field. Plus airflow tasks test (БЕЗ --no-render-templates flag) — это real-runtime test, который тоже бы catch problem. Урок: template_fields — это easy gotcha с тихим failure mode. Один explicit unit test rendering экономит часы дебага production data quality issues.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. template_fields в custom operator — что необходимо тестировать?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7