BaseOperator anatomy
В Airflow всё, что выполняется как task, — это инстанс BaseOperator или его наследник. PythonOperator, BashOperator, S3CopyObjectOperator, SparkSubmitOperator, ваш собственный MyCustomOperator — все они проходят через один и тот же lifecycle, описанный в корневом классе. Понимание этой механики отделяет тех, кто использует Airflow как чёрный ящик, от тех, кто пишет надёжные production operators и эффективно диагностирует странное поведение.
Этот урок препарирует BaseOperator до уровня методов: что происходит между моментом, когда scheduler решает «пора запускать task», и моментом, когда статус TaskInstance переходит в success.
Dunder-методы и slot wrappers
Где BaseOperator в иерархии классов
Корневая иерархия Airflow 2.x:
Важный момент: MappedOperator (это task после .expand(...)) — отдельная ветка от AbstractOperator. Он не наследник BaseOperator, а альтернативная реализация, которая на runtime разворачивается в обычные task instances.
Минимальный custom operator
Минимально необходимый код для собственного operator:
from airflow.models import BaseOperator
from airflow.utils.context import Context
class GreetOperator(BaseOperator):
template_fields = ("name",)
ui_color = "#a8e6cf"
ui_fgcolor = "#000000"
def __init__(self, *, name: str, **kwargs):
super().__init__(**kwargs)
self.name = name
def execute(self, context: Context):
msg = f"Hello, {self.name}!"
self.log.info(msg)
return msg
Что здесь критично:
super().__init__(**kwargs)— обязательный вызов. Без него scheduler не подхватитtask_id,retries,dagи десятки других kwargs.template_fields— tuple строк, имена атрибутов, которые scheduler должен прогнать через Jinja передexecute().execute(context)— единственная обязательная сигнатура. Возвращаемое значение пушится в XCom (return_value) автоматически.ui_color/ui_fgcolor— цвет node в Graph view UI.
Полный lifecycle execute() — пошагово
Когда scheduler переводит TI из queued в running, executor запускает worker process, который вызывает TaskInstance._run_raw_task(). Внутри происходит цепочка из 11 шагов:
Если на шаге 5 или 6 возникает Exception:
stateставитсяup_for_retry(если есть retries) илиfailed.- Вызывается
on_failure_callback(если задан). - Если это последний retry и есть
email_on_failure— отправляется email черезSmtpHook. - Если задан
on_retry_callbackи есть retries — он вызывается перед следующим try.
Template fields — Jinja-рендеринг атрибутов
template_fields — самый недооценённый механизм BaseOperator. Это объявление: «эти атрибуты на runtime прогоняются через Jinja с контекстом».
class S3DownloadOperator(BaseOperator):
template_fields = ("bucket_name", "key", "local_path")
template_ext = (".sql", ".json") # Если значение заканчивается на эти суффиксы — это путь к файлу, читается и render-ится контент
template_fields_renderers = {"key": "python"} # Подсказка UI как highlight-ить в Rendered Templates tab
def __init__(self, *, bucket_name: str, key: str, local_path: str, **kwargs):
super().__init__(**kwargs)
self.bucket_name = bucket_name
self.key = key
self.local_path = local_path
def execute(self, context):
self.log.info(f"Downloading s3://{self.bucket_name}/{self.key} → {self.local_path}")
Использование:
S3DownloadOperator(
task_id="download_orders"
bucket_name="data-prod"
key="orders/{{ ds }}.parquet", # ← Jinja
local_path="/tmp/{{ ts_nodash }}_orders.parquet",
)
На runtime до pre_execute():
self.key="orders/2026-05-12.parquet"self.local_path="/tmp/20260512T000000_orders.parquet"
template_fields — это tuple на классе, не на инстансе. Это значит, что наследники должны явно объявить свой набор:
class S3DownloadWithRegionOperator(S3DownloadOperator):
template_fields = (*S3DownloadOperator.template_fields, "region")
def __init__(self, *, region: str, **kwargs):
super().__init__(**kwargs)
self.region = region
Частая ошибка: добавить атрибут в __init__, забыть про template_fields, и удивляться, почему {{ ds }} не подставляется. Атрибут, не указанный в template_fields, прокидывается as-is — без рендеринга. В Rendered Templates UI tab такой атрибут просто не появится.
Что доступно в Jinja context
Стандартные variables, доступные в template fields:
| Variable | Что это |
|---|---|
ds | Дата data_interval_start в формате YYYY-MM-DD |
ds_nodash | То же, но YYYYMMDD |
ts | Timestamp ISO 8601: 2026-05-12T00:00:00+00:00 |
ts_nodash | 20260512T000000 |
data_interval_start / data_interval_end | datetime объекты интервала |
logical_date | logical run date (раньше execution_date) |
dag | объект DAG |
task | объект текущего operator |
ti | TaskInstance |
run_id | ID текущего DagRun |
params | dict из params={...} в DAG/task |
var.value.X, var.json.X | Airflow Variable |
conn.X | Airflow Connection |
macros | модуль с macros.ds_add, macros.datetime |
on_kill — что вызывается при tаsk timeout / clear running
on_kill() — критически важный, часто игнорируемый метод. Вызывается, когда:
- Task превышает
execution_timeout— scheduler шлёт SIGTERM в worker process. - Пользователь делает
clearна running task через UI/CLI. - Worker pod evicted в Kubernetes.
- Scheduler видит TI с
state=running, но heartbeat прокис — adopt_or_reset.
Default on_kill — pass. Но если ваш operator запускает внешний процесс (Spark job, Dataflow pipeline, K8s pod, BigQuery query), отсутствие cleanup означает leak external resources даже после того, как Airflow считает task failed.
class SparkSubmitOperator(BaseOperator):
def __init__(self, *, application: str, **kwargs):
super().__init__(**kwargs)
self.application = application
self._spark_app_id = None # запомним, чтобы убить при kill
def execute(self, context):
self._spark_app_id = self._submit_spark_application()
self._wait_for_completion(self._spark_app_id)
def on_kill(self):
if self._spark_app_id:
self.log.info(f"Killing Spark application {self._spark_app_id}")
self._cancel_spark_application(self._spark_app_id)
Контракт on_kill: метод должен быть idempotent и fast (worker даёт 60s на cleanup, потом SIGKILL). Не делайте здесь blocking I/O без timeout. Если внешняя система не отвечает за минуту — лучше leak и log, чем повесить worker слот навсегда.
Полезные attributes BaseOperator
Атрибуты, которые часто используются в production:
| Attribute | Тип | Зачем |
|---|---|---|
task_id | str | Уникален внутри DAG, обязателен |
retries | int | Число повторов при failure (default 0) |
retry_delay | timedelta | Задержка между retries (default 5min) |
retry_exponential_backoff | bool | Удваивать delay при каждом retry |
max_retry_delay | timedelta | Cap для exponential backoff |
execution_timeout | timedelta | Hard timeout, после которого on_kill |
priority_weight | int | Влияет на executor queue ordering (default 1) |
weight_rule | str | downstream / upstream / absolute — стратегия aggregation priority |
pool | str | Pool name для concurrency control |
pool_slots | int | Сколько slots task занимает |
queue | str | Celery queue для CeleryExecutor |
executor_config | dict | Per-task executor settings (например, K8sExecutor pod spec) |
trigger_rule | str | Когда task запускается (all_success, all_done, one_failed, …) |
do_xcom_push | bool | Пушить ли return value в XCom (default True) |
on_failure_callback | Callable | Hook при failure |
on_retry_callback | Callable | Hook перед retry |
on_success_callback | Callable | Hook при success |
sla | timedelta | SLA — если task не finished в этот интервал, шлётся miss email |
Production gotchas
-
template_fieldsзабыли на наследнике. Subclass добавил новый атрибут, не указал вtemplate_fields, и Jinja-выражения в нём не работают. Симптом: в UI Rendered Templates атрибут отсутствует, в логах видно raw{{ ds }}строку. Fix —template_fields = (*Parent.template_fields, "new_field"). -
super().__init__забыт или передан с*argsвместо**kwargs. Если вы пишете__init__(self, name, **kwargs)безsuper().__init__(**kwargs), тоtask_id,retries,dagи прочее не попадут в parent. DAG не запарсится сTypeError: BaseOperator.__init__() got an unexpected keyword argument 'task_id'или, что хуже, парсится, но retries молча игнорируются. -
Mutable default в
__init__. Classic Python pitfall в Airflow operators:def __init__(self, items: list = []):— этот list расшарен между всеми инстансами. На втором парсе DAG в нём окажутся данные первого. Используйтеitems: list | None = Noneи внутриself.items = items or []. -
on_killделает blocking call без timeout. Воркер начинает shutdown, on_kill ждёт ответа Spark master 30 минут, в итоге SIGKILL приходит, всё равно leak. Всегда добавляйтеtimeout=10к любым network calls в on_kill. -
State хранится на
selfмежду retries. Между try_number=1 и try_number=2 worker — отдельный процесс. Любое состояние, которое вы храните вself.Xпослеexecute(), теряется. Сохраняйте в XCom или внешнее хранилище. -
do_xcom_push=Falseзабыли на large-result tasks. По default любой return value пушится в XCom — это запись в metadata DB. Если ваш operator возвращает 50MB DataFrame as JSON, вы засоряете xcom table до тех пор, пока DB не начнёт деградировать. Для больших данных —do_xcom_push=False, а результат складывайте в S3/object store.
Краткий контракт собственного operator
Если пишете custom operator для production, чек-лист:
- Наследник
BaseOperator(илиBaseSensorOperatorдля sensors). template_fieldsобъявлен явно (даже если пустой).__init__принимает**kwargsи вызываетsuper().__init__(**kwargs).execute(self, context)— единственный обязательный метод.on_kill(self)реализован, если operator управляет внешним ресурсом.do_xcom_push=Falseесли return value большой.- Все network calls используют hook (наследник
BaseHook), не raw connections. - Логирование через
self.log(этоlogging.Logger), неprintи не свой logger.