Learning Platform
Глоссарий Troubleshooting
Урок 04.02 · 24 мин
Продвинутый
BaseOperatorexecutetemplate_fieldsOperator Lifecycleon_kill

BaseOperator anatomy

В Airflow всё, что выполняется как task, — это инстанс BaseOperator или его наследник. PythonOperator, BashOperator, S3CopyObjectOperator, SparkSubmitOperator, ваш собственный MyCustomOperator — все они проходят через один и тот же lifecycle, описанный в корневом классе. Понимание этой механики отделяет тех, кто использует Airflow как чёрный ящик, от тех, кто пишет надёжные production operators и эффективно диагностирует странное поведение.

Этот урок препарирует BaseOperator до уровня методов: что происходит между моментом, когда scheduler решает «пора запускать task», и моментом, когда статус TaskInstance переходит в success.


Dunder-методы и slot wrappers

Где BaseOperator в иерархии классов

Корневая иерархия Airflow 2.x:

Inheritance hierarchy operators
AbstractOperatorСамый верхний класс. Абстракция, общая для BaseOperator и MappedOperator (dynamic task mapping). Определяет интерфейс task_id, dag, downstream_list, upstream_list. Не имеет execute().
inherits
BaseOperatorОсновной класс всех operators. Содержит execute(), pre_execute(), post_execute(), on_kill(), retry-логику, render_template_fields. Все user-written operators наследуются отсюда. 90 атрибутов на инстанс.
PythonOperatorПросто вызывает python_callable. execute(context) делает self.python_callable(*op_args, **op_kwargs). Базовый класс для _PythonDecoratedOperator (TaskFlow), BranchPythonOperator, ShortCircuitOperator.
BashOperatorexecute() запускает subprocess через subprocess.Popen, читает stdout/stderr построчно, прокидывает env vars. on_kill() отправляет SIGTERM в process group.
BaseSensorOperatorSensors добавляют poke()/async-метод и три mode — poke/reschedule/deferrable. execute() становится циклом poke() с обработкой timeout и backoff.
S3KeySensor, KubernetesPodOperator, SparkSubmitOperator...Provider operators наследуются от BaseOperator (или BaseSensorOperator для sensors) и реализуют свой execute(). Большинство также используют BaseHook-наследников для работы с external services.

Важный момент: MappedOperator (это task после .expand(...)) — отдельная ветка от AbstractOperator. Он не наследник BaseOperator, а альтернативная реализация, которая на runtime разворачивается в обычные task instances.


Минимальный custom operator

Минимально необходимый код для собственного operator:

from airflow.models import BaseOperator
from airflow.utils.context import Context

class GreetOperator(BaseOperator):
    template_fields = ("name",)
    ui_color = "#a8e6cf"
    ui_fgcolor = "#000000"

    def __init__(self, *, name: str, **kwargs):
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context: Context):
        msg = f"Hello, {self.name}!"
        self.log.info(msg)
        return msg

Что здесь критично:

  • super().__init__(**kwargs) — обязательный вызов. Без него scheduler не подхватит task_id, retries, dag и десятки других kwargs.
  • template_fields — tuple строк, имена атрибутов, которые scheduler должен прогнать через Jinja перед execute().
  • execute(context) — единственная обязательная сигнатура. Возвращаемое значение пушится в XCom (return_value) автоматически.
  • ui_color / ui_fgcolor — цвет node в Graph view UI.

Полный lifecycle execute() — пошагово

Когда scheduler переводит TI из queued в running, executor запускает worker process, который вызывает TaskInstance._run_raw_task(). Внутри происходит цепочка из 11 шагов:

execute() lifecycle
1. TI._run_raw_task() запущенWorker process получает task через executor.execute_async() (LocalExecutor) или через celery message (CeleryExecutor). Открывает new DB session, начинает транзакцию.
2. render_templates(context)Для каждого attribute в template_fields: достаёт значение, прогоняет через Jinja с context (ds, execution_date, ti, params, var, conn). Заменяет attribute на rendered value. Делается ДО любых hooks.
3. clear_xcom_data() (если не retry)Очищает XCom от предыдущего run этого TI. При retry XCom сохраняется (бывший спор в community, в 2.x по умолчанию сохраняется для retry, очистка делается только для свежего try_number=1).
4. pre_execute(context)Hook метод. Default — pass. Используется для setup: открыть connection, создать temp directory, прогреть кэш. Вызывается ПОСЛЕ Jinja, но ДО execute(). Exception здесь == task failure.
5. execute(context) ← main workГлавный метод. Содержит business logic operator. Любое исключение поднимается выше и приводит к failure / retry. Возвращаемое значение → XCom return_value.
6. post_execute(context, result)Hook метод. Default — pass. Используется для cleanup или validation результата. Получает return value execute(). Exception здесь == task failure (XCom уже залит).
7. xcom_push('return_value', result)Если result не None и не do_xcom_push=False, пушит в xcom table. INSERT INTO xcom (key, value, task_id, dag_id, run_id) VALUES ('return_value', pickled_or_json, ...).
8. TI.state = SUCCESS, commitUPDATE task_instance SET state='success', end_date=now(), duration=... WHERE id=... → COMMIT. После этого момента downstream tasks могут стартовать.
9. on_success_callback (если задан)Опциональный callback, передаётся в operator constructor. Получает context. Если raise — task всё равно success (callback errors logged, не меняют state).

Если на шаге 5 или 6 возникает Exception:

  1. state ставится up_for_retry (если есть retries) или failed.
  2. Вызывается on_failure_callback (если задан).
  3. Если это последний retry и есть email_on_failure — отправляется email через SmtpHook.
  4. Если задан on_retry_callback и есть retries — он вызывается перед следующим try.

Template fields — Jinja-рендеринг атрибутов

template_fields — самый недооценённый механизм BaseOperator. Это объявление: «эти атрибуты на runtime прогоняются через Jinja с контекстом».

class S3DownloadOperator(BaseOperator):
    template_fields = ("bucket_name", "key", "local_path")
    template_ext = (".sql", ".json")  # Если значение заканчивается на эти суффиксы — это путь к файлу, читается и render-ится контент
    template_fields_renderers = {"key": "python"}  # Подсказка UI как highlight-ить в Rendered Templates tab

    def __init__(self, *, bucket_name: str, key: str, local_path: str, **kwargs):
        super().__init__(**kwargs)
        self.bucket_name = bucket_name
        self.key = key
        self.local_path = local_path

    def execute(self, context):
        self.log.info(f"Downloading s3://{self.bucket_name}/{self.key}{self.local_path}")

Использование:

S3DownloadOperator(
    task_id="download_orders"
    bucket_name="data-prod"
    key="orders/{{ ds }}.parquet",          # ← Jinja
    local_path="/tmp/{{ ts_nodash }}_orders.parquet",
)

На runtime до pre_execute():

  • self.key = "orders/2026-05-12.parquet"
  • self.local_path = "/tmp/20260512T000000_orders.parquet"

template_fields — это tuple на классе, не на инстансе. Это значит, что наследники должны явно объявить свой набор:

class S3DownloadWithRegionOperator(S3DownloadOperator):
    template_fields = (*S3DownloadOperator.template_fields, "region")

    def __init__(self, *, region: str, **kwargs):
        super().__init__(**kwargs)
        self.region = region
WARNING

Частая ошибка: добавить атрибут в __init__, забыть про template_fields, и удивляться, почему {{ ds }} не подставляется. Атрибут, не указанный в template_fields, прокидывается as-is — без рендеринга. В Rendered Templates UI tab такой атрибут просто не появится.

Что доступно в Jinja context

Стандартные variables, доступные в template fields:

VariableЧто это
dsДата data_interval_start в формате YYYY-MM-DD
ds_nodashТо же, но YYYYMMDD
tsTimestamp ISO 8601: 2026-05-12T00:00:00+00:00
ts_nodash20260512T000000
data_interval_start / data_interval_enddatetime объекты интервала
logical_datelogical run date (раньше execution_date)
dagобъект DAG
taskобъект текущего operator
tiTaskInstance
run_idID текущего DagRun
paramsdict из params={...} в DAG/task
var.value.X, var.json.XAirflow Variable
conn.XAirflow Connection
macrosмодуль с macros.ds_add, macros.datetime

on_kill — что вызывается при tаsk timeout / clear running

on_kill() — критически важный, часто игнорируемый метод. Вызывается, когда:

  1. Task превышает execution_timeout — scheduler шлёт SIGTERM в worker process.
  2. Пользователь делает clear на running task через UI/CLI.
  3. Worker pod evicted в Kubernetes.
  4. Scheduler видит TI с state=running, но heartbeat прокис — adopt_or_reset.

Default on_killpass. Но если ваш operator запускает внешний процесс (Spark job, Dataflow pipeline, K8s pod, BigQuery query), отсутствие cleanup означает leak external resources даже после того, как Airflow считает task failed.

class SparkSubmitOperator(BaseOperator):
    def __init__(self, *, application: str, **kwargs):
        super().__init__(**kwargs)
        self.application = application
        self._spark_app_id = None  # запомним, чтобы убить при kill

    def execute(self, context):
        self._spark_app_id = self._submit_spark_application()
        self._wait_for_completion(self._spark_app_id)

    def on_kill(self):
        if self._spark_app_id:
            self.log.info(f"Killing Spark application {self._spark_app_id}")
            self._cancel_spark_application(self._spark_app_id)
TIP

Контракт on_kill: метод должен быть idempotent и fast (worker даёт 60s на cleanup, потом SIGKILL). Не делайте здесь blocking I/O без timeout. Если внешняя система не отвечает за минуту — лучше leak и log, чем повесить worker слот навсегда.


Полезные attributes BaseOperator

Атрибуты, которые часто используются в production:

AttributeТипЗачем
task_idstrУникален внутри DAG, обязателен
retriesintЧисло повторов при failure (default 0)
retry_delaytimedeltaЗадержка между retries (default 5min)
retry_exponential_backoffboolУдваивать delay при каждом retry
max_retry_delaytimedeltaCap для exponential backoff
execution_timeouttimedeltaHard timeout, после которого on_kill
priority_weightintВлияет на executor queue ordering (default 1)
weight_rulestrdownstream / upstream / absolute — стратегия aggregation priority
poolstrPool name для concurrency control
pool_slotsintСколько slots task занимает
queuestrCelery queue для CeleryExecutor
executor_configdictPer-task executor settings (например, K8sExecutor pod spec)
trigger_rulestrКогда task запускается (all_success, all_done, one_failed, …)
do_xcom_pushboolПушить ли return value в XCom (default True)
on_failure_callbackCallableHook при failure
on_retry_callbackCallableHook перед retry
on_success_callbackCallableHook при success
slatimedeltaSLA — если task не finished в этот интервал, шлётся miss email

Production gotchas

  1. template_fields забыли на наследнике. Subclass добавил новый атрибут, не указал в template_fields, и Jinja-выражения в нём не работают. Симптом: в UI Rendered Templates атрибут отсутствует, в логах видно raw {{ ds }} строку. Fix — template_fields = (*Parent.template_fields, "new_field").

  2. super().__init__ забыт или передан с *args вместо **kwargs. Если вы пишете __init__(self, name, **kwargs) без super().__init__(**kwargs), то task_id, retries, dag и прочее не попадут в parent. DAG не запарсится с TypeError: BaseOperator.__init__() got an unexpected keyword argument 'task_id' или, что хуже, парсится, но retries молча игнорируются.

  3. Mutable default в __init__. Classic Python pitfall в Airflow operators: def __init__(self, items: list = []): — этот list расшарен между всеми инстансами. На втором парсе DAG в нём окажутся данные первого. Используйте items: list | None = None и внутри self.items = items or [].

  4. on_kill делает blocking call без timeout. Воркер начинает shutdown, on_kill ждёт ответа Spark master 30 минут, в итоге SIGKILL приходит, всё равно leak. Всегда добавляйте timeout=10 к любым network calls в on_kill.

  5. State хранится на self между retries. Между try_number=1 и try_number=2 worker — отдельный процесс. Любое состояние, которое вы храните в self.X после execute(), теряется. Сохраняйте в XCom или внешнее хранилище.

  6. do_xcom_push=False забыли на large-result tasks. По default любой return value пушится в XCom — это запись в metadata DB. Если ваш operator возвращает 50MB DataFrame as JSON, вы засоряете xcom table до тех пор, пока DB не начнёт деградировать. Для больших данных — do_xcom_push=False, а результат складывайте в S3/object store.


Краткий контракт собственного operator

Если пишете custom operator для production, чек-лист:

  1. Наследник BaseOperator (или BaseSensorOperator для sensors).
  2. template_fields объявлен явно (даже если пустой).
  3. __init__ принимает **kwargs и вызывает super().__init__(**kwargs).
  4. execute(self, context) — единственный обязательный метод.
  5. on_kill(self) реализован, если operator управляет внешним ресурсом.
  6. do_xcom_push=False если return value большой.
  7. Все network calls используют hook (наследник BaseHook), не raw connections.
  8. Логирование через self.log (это logging.Logger), не print и не свой logger.

Проверка знанийKnowledge check
Вы написали custom SparkSubmitOperator, который запускает Spark application на удалённом EMR cluster. Что обязательно реализовать кроме execute(), чтобы operator был production-ready, и что произойдёт, если этого не сделать?
ОтветAnswer
Обязательно реализовать **on_kill()**, который отправляет cancel запрос в EMR/Spark master по application_id. Если этого не сделать: при срабатывании execution_timeout, при clear через UI или при evict worker pod Airflow считает task failed/cleared, но Spark application продолжает работать на cluster — это **resource leak**. Кластер занят тысячами orphan jobs, бюджет AWS вырастает, slot в YARN/K8s остаётся занятым. Дополнительно: (1) объявить `template_fields` для всех jinja-renderable атрибутов (application path, conf parameters); (2) хранить _application_id на self после submit, чтобы on_kill знал что killить; (3) в on_kill использовать timeout 10-30s — worker shutdown даёт 60s до SIGKILL; (4) сделать idempotent — повторный kill уже killed application не должен raise. И отдельно: использовать BaseHook subclass для общения с EMR, не raw boto3, чтобы переиспользовать Connection management.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Что делает атрибут `template_fields` на класс-наследнике BaseOperator?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6