Custom operator testing — isolation, XCom, retry logic, error handling
Custom operators — это сердце сложных Airflow проектов. Они инкапсулируют business logic, integration с внутренними системами, custom retry strategies. Их тестирование требует больше внимания чем testing TaskFlow @task functions — operator имеет template_fields, lifecycle hooks, parametric behavior.
Этот урок — конкретные patterns для testing custom operators: isolation от реальных connections, проверка template rendering, XCom push/pull verification, retry logic edge cases, error handling в execute().
Тестирование классов Python — паттерны для операторов
Anatomy custom operator под тест
Рассмотрим production-grade custom operator:
# plugins/operators/snowflake_merge_operator.py
from airflow.models import BaseOperator
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.utils.decorators import apply_defaults
class SnowflakeMergeOperator(BaseOperator):
"""
MERGE INTO operator with idempotent retry logic.
Pushes (rows_merged, sql) to XCom.
"""
template_fields = ("source_table", "target_table", "merge_keys", "execution_date_str")
ui_color = "#4287f5"
@apply_defaults
def __init__(
self,
source_table: str,
target_table: str,
merge_keys: list[str],
snowflake_conn_id: str = "snowflake_default",
execution_date_str: str = "{{ ds }}",
**kwargs,
):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self.merge_keys = merge_keys
self.snowflake_conn_id = snowflake_conn_id
self.execution_date_str = execution_date_str
def execute(self, context):
if not self.merge_keys:
raise ValueError("merge_keys must be non-empty list")
sql = self._build_merge_sql()
self.log.info(f"Executing MERGE: {sql[:200]}...")
hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
try:
rows_merged = hook.run(sql)
except Exception as e:
self.log.error(f"MERGE failed: {e}")
raise
# XCom push для downstream tasks
context["task_instance"].xcom_push(key="rows_merged", value=rows_merged)
context["task_instance"].xcom_push(key="sql", value=sql)
return rows_merged
def _build_merge_sql(self) -> str:
on_clause = " AND ".join(f"tgt.{k} = src.{k}" for k in self.merge_keys)
return f"""
MERGE INTO {self.target_table} tgt
USING (SELECT * FROM {self.source_table} WHERE ds = '{self.execution_date_str}') src
ON {on_clause}
WHEN MATCHED THEN UPDATE SET tgt.value = src.value
WHEN NOT MATCHED THEN INSERT (*) VALUES (src.*);
"""
У operator пять testable аспектов:
__init__валидация параметров_build_merge_sqlкорректное SQL для разных параметровexecute()happy path с mocked hookexecute()error handling (hook raises)- Template rendering для template_fields
Test 1 — __init__ validation
# tests/unit/test_snowflake_merge_operator.py
import pytest
from plugins.operators.snowflake_merge_operator import SnowflakeMergeOperator
def test_init_with_valid_params():
op = SnowflakeMergeOperator(
task_id="merge_orders"
source_table="staging.orders"
target_table="prod.orders"
merge_keys=["id"],
)
assert op.source_table == "staging.orders"
assert op.target_table == "prod.orders"
assert op.merge_keys == ["id"]
assert op.snowflake_conn_id == "snowflake_default" # default
def test_init_with_custom_conn_id():
op = SnowflakeMergeOperator(
task_id="x"
source_table="s"
target_table="t"
merge_keys=["id"],
snowflake_conn_id="snowflake_prod",
)
assert op.snowflake_conn_id == "snowflake_prod"
def test_init_inherits_base_operator_args():
"""BaseOperator args (retries, retry_delay) должны pass через."""
from datetime import timedelta
op = SnowflakeMergeOperator(
task_id="x"
source_table="s"
target_table="t"
merge_keys=["id"],
retries=5,
retry_delay=timedelta(minutes=10),
)
assert op.retries == 5
assert op.retry_delay == timedelta(minutes=10)
Test 2 — _build_merge_sql правильность
Логика SQL building — это самая критичная часть. Тестируем все ветки:
def test_build_merge_sql_single_key():
op = SnowflakeMergeOperator(
task_id="x"
source_table="staging.orders"
target_table="prod.orders"
merge_keys=["id"],
execution_date_str="2026-05-12",
)
sql = op._build_merge_sql()
assert "MERGE INTO prod.orders tgt" in sql
assert "FROM staging.orders WHERE ds = '2026-05-12'" in sql
assert "ON tgt.id = src.id" in sql
def test_build_merge_sql_composite_key():
op = SnowflakeMergeOperator(
task_id="x"
source_table="s", target_table="t"
merge_keys=["customer_id", "order_id"],
execution_date_str="2026-05-12",
)
sql = op._build_merge_sql()
assert "ON tgt.customer_id = src.customer_id AND tgt.order_id = src.order_id" in sql
def test_build_merge_sql_special_chars_in_table():
"""Tables с dots / quotes."""
op = SnowflakeMergeOperator(
task_id="x"
source_table='"DATABASE"."SCHEMA"."TABLE"',
target_table="prod.orders"
merge_keys=["id"],
)
sql = op._build_merge_sql()
assert '"DATABASE"."SCHEMA"."TABLE"' in sql
SQL injection — здесь reasonable concern. Production-grade SnowflakeMergeOperator должен использовать parameterized queries или escape identifiers. Если test показывает что operator vulnerable — это bug в operator, не в test. Тест на SQL injection — defensive: добавить assertion что dangerous chars escaped.
Test 3 — execute() happy path
from unittest.mock import MagicMock, patch
from airflow.utils.context import Context
@pytest.fixture
def context():
ti = MagicMock()
ti.xcom_push = MagicMock()
return Context(
ds="2026-05-12"
run_id="manual__2026-05-12T00:00:00+00:00"
task_instance=ti,
dag=MagicMock(),
)
@patch("plugins.operators.snowflake_merge_operator.SnowflakeHook")
def test_execute_happy_path(mock_hook_cls, context):
# Setup mock
mock_hook = MagicMock()
mock_hook.run.return_value = 42 # 42 rows merged
mock_hook_cls.return_value = mock_hook
# Execute
op = SnowflakeMergeOperator(
task_id="x"
source_table="staging.orders"
target_table="prod.orders"
merge_keys=["id"],
execution_date_str="2026-05-12",
)
result = op.execute(context)
# Assertions
assert result == 42
# Hook was called с правильным conn_id
mock_hook_cls.assert_called_once_with(snowflake_conn_id="snowflake_default")
# SQL содержит ожидаемое
actual_sql = mock_hook.run.call_args[0][0]
assert "MERGE INTO prod.orders" in actual_sql
assert "FROM staging.orders WHERE ds = '2026-05-12'" in actual_sql
# XCom push был вызван
context["task_instance"].xcom_push.assert_any_call(key="rows_merged", value=42)
# SQL also pushed
xcom_calls = context["task_instance"].xcom_push.call_args_list
assert any(call.kwargs.get("key") == "sql" for call in xcom_calls)
Test 4 — error handling
@patch("plugins.operators.snowflake_merge_operator.SnowflakeHook")
def test_execute_raises_on_hook_failure(mock_hook_cls, context):
"""Operator должен propagate exception от hook для retry."""
mock_hook = MagicMock()
mock_hook.run.side_effect = Exception("Connection timeout")
mock_hook_cls.return_value = mock_hook
op = SnowflakeMergeOperator(
task_id="x", source_table="s", target_table="t", merge_keys=["id"]
)
with pytest.raises(Exception, match="Connection timeout"):
op.execute(context)
def test_execute_raises_on_empty_merge_keys():
"""Validation в execute должна fail fast."""
op = SnowflakeMergeOperator(
task_id="x", source_table="s", target_table="t", merge_keys=[]
)
context = MagicMock()
with pytest.raises(ValueError, match="merge_keys must be non-empty"):
op.execute(context)
@patch("plugins.operators.snowflake_merge_operator.SnowflakeHook")
def test_execute_logs_error_before_raising(mock_hook_cls, context, caplog):
"""Operator должен log error для debugging."""
mock_hook = MagicMock()
mock_hook.run.side_effect = Exception("Auth failed")
mock_hook_cls.return_value = mock_hook
op = SnowflakeMergeOperator(
task_id="x", source_table="s", target_table="t", merge_keys=["id"]
)
with caplog.at_level("ERROR"):
with pytest.raises(Exception):
op.execute(context)
assert "MERGE failed" in caplog.text
assert "Auth failed" in caplog.text
Test 5 — template field rendering
template_fields — это поля, которые рендерятся через Jinja с context переменными ({{ ds }}, {{ params }}, {{ ti.xcom_pull(...) }}). Это критично тестировать — typo в template ломает silently.
def test_template_fields_declared():
"""template_fields должны включать все Jinja-rendered поля."""
assert "source_table" in SnowflakeMergeOperator.template_fields
assert "target_table" in SnowflakeMergeOperator.template_fields
assert "execution_date_str" in SnowflakeMergeOperator.template_fields
def test_template_rendering():
"""Render template_fields с real Airflow Jinja."""
from airflow.models import DAG
from datetime import datetime
dag = DAG(
dag_id="test_dag"
start_date=datetime(2026, 1, 1),
schedule="@daily",
)
op = SnowflakeMergeOperator(
task_id="x"
source_table="staging.orders_{{ ds_nodash }}"
target_table="prod.orders"
merge_keys=["id"],
execution_date_str="{{ ds }}"
dag=dag,
)
# Build context with ds
context = {"ds": "2026-05-12", "ds_nodash": "20260512", "dag": dag, "task": op}
op.render_template_fields(context)
assert op.source_table == "staging.orders_20260512"
assert op.execution_date_str == "2026-05-12"
op.render_template_fields(context) — built-in Airflow method, который применяет Jinja к каждому template_field. Это даёт real rendering без запуска scheduler.
Testing XCom interactions
Для operator который pull/push XCom, тестируем оба направления:
class XComConsumerOperator(BaseOperator):
"""Operator который pulls upstream XCom + pushes computed result."""
def __init__(self, upstream_task_id: str, **kwargs):
super().__init__(**kwargs)
self.upstream_task_id = upstream_task_id
def execute(self, context):
upstream_value = context["task_instance"].xcom_pull(task_ids=self.upstream_task_id)
if upstream_value is None:
raise ValueError(f"No XCom from {self.upstream_task_id}")
return upstream_value * 2
def test_xcom_consumer_happy_path(context):
"""Pull от upstream → push computed."""
context["task_instance"].xcom_pull.return_value = 50
op = XComConsumerOperator(task_id="x", upstream_task_id="produce")
result = op.execute(context)
assert result == 100
context["task_instance"].xcom_pull.assert_called_with(task_ids="produce")
def test_xcom_consumer_fails_on_none(context):
context["task_instance"].xcom_pull.return_value = None
op = XComConsumerOperator(task_id="x", upstream_task_id="produce")
with pytest.raises(ValueError, match="No XCom from produce"):
op.execute(context)
Retry logic testing
Airflow runtime обрабатывает retries — operator только raise exception. Что тестируем:
def test_operator_retries_configured():
"""Operator имеет retries=3 by default."""
op = SnowflakeMergeOperator(
task_id="x", source_table="s", target_table="t", merge_keys=["id"],
retries=3, retry_delay=timedelta(minutes=5),
)
assert op.retries == 3
assert op.retry_delay == timedelta(minutes=5)
# Для testing real retry behavior — integration test через airflow tasks test
def test_retry_via_airflow_runtime():
"""Test real retry — task fails 2 times, succeeds на 3rd."""
# Это integration test — нужен real Airflow execution path
# Через airflow tasks test и mock внутри DAG file
pass
Exponential backoff verification:
def test_exponential_backoff_setup():
"""Operator с exponential backoff должен иметь правильные defaults."""
op = SnowflakeMergeOperator(
task_id="x", source_table="s", target_table="t", merge_keys=["id"],
retries=5,
retry_delay=timedelta(minutes=1),
retry_exponential_backoff=True,
max_retry_delay=timedelta(hours=1),
)
assert op.retry_exponential_backoff is True
assert op.max_retry_delay == timedelta(hours=1)
Идемпотентность через test
Production operators должны быть idempotent — повторный execute не ломает data. Тестируем:
@patch("plugins.operators.snowflake_merge_operator.SnowflakeHook")
def test_execute_idempotent_via_merge(mock_hook_cls, context):
"""Двойной execute с тем же data — same result, no duplicates."""
mock_hook = MagicMock()
mock_hook.run.return_value = 42
mock_hook_cls.return_value = mock_hook
op = SnowflakeMergeOperator(
task_id="x", source_table="s", target_table="t"
merge_keys=["id"], execution_date_str="2026-05-12",
)
# Первый execute
result1 = op.execute(context)
# Второй execute с тем же context — SQL identical
result2 = op.execute(context)
# Оба раза SQL identical (deterministic)
call1_sql = mock_hook.run.call_args_list[0][0][0]
call2_sql = mock_hook.run.call_args_list[1][0][0]
assert call1_sql == call2_sql
# MERGE — idempotent. INSERT — нет.
assert "MERGE INTO" in call1_sql
Production gotchas
apply_defaults deprecated. В 2.x @apply_defaults decorator deprecated — BaseOperator делает это автоматически. Не нужно в новых operators.
template_fields с tuple, не list. Distinction matters: template_fields = ("a",) — tuple. template_fields = ["a"] — list, который mutate-able. Используйте tuple.
render_template_fields модифицирует op. Не вызывайте дважды — поля уже rendered. В test используйте fresh operator instance per test.
Hook constructor может делать work. Если Hook.__init__ лезет в Airflow DB (например Connection.get_connection_from_secrets), mocking Hook class недостаточно — mock и __init__. Лучше: pass hook as parameter (dependency injection).
Context dict — airflow.utils.context.Context, не dict. Использовать Context() factory для proper typing. Для tests часто достаточно MagicMock(spec=Context).
self.log доступен только после __init__. При testing _build_sql вызывайте после __init__ (operator уже создан). Иначе self.log AttributeError.