Learning Platform
Глоссарий Troubleshooting
Урок 04.03 · 22 мин
Продвинутый
PythonOperatorTaskFlowExternalPythonVirtualenvKubernetesPodOperator

PythonOperator, TaskFlow и venv-варианты

PythonOperator — самый используемый operator в Airflow. Это execute(), который вызывает вашу функцию. Но за этой простотой стоит семейство из шести operators (Python, TaskFlow @task, ExternalPython, Virtualenv, Docker, KubernetesPod), каждый со своими гарантиями изоляции, скорости и cost. Этот урок разбирает, чем они отличаются на уровне execute, как resolves op_args/op_kwargs и какой выбрать под конкретную задачу.

В предыдущем уроке (02-baseoperator-anatomy) мы видели lifecycle BaseOperator. Здесь конкретизируем — что именно происходит в execute() PythonOperator и его обёртки @task.


Базовый PythonOperator

Псевдокод airflow.operators.python.PythonOperator (Airflow 2.10):

class PythonOperator(BaseOperator):
    template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs")
    template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}

    def __init__(
        self,
        *,
        python_callable: Callable,
        op_args: Collection | None = None,
        op_kwargs: Mapping[str, Any] | None = None,
        templates_dict: dict | None = None,
        templates_exts: list[str] | None = None,
        show_return_value_in_logs: bool = True,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        if not callable(python_callable):
            raise AirflowException("`python_callable` must be callable")
        self.python_callable = python_callable
        self.op_args = op_args or ()
        self.op_kwargs = op_kwargs or {}
        self.templates_dict = templates_dict
        # ...

    def execute(self, context: Context) -> Any:
        context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
        self.op_kwargs = self.determine_kwargs(context)

        return_value = self.execute_callable()
        if self.show_return_value_in_logs:
            self.log.info("Done. Returned value was: %s", return_value)
        return return_value

    def execute_callable(self) -> Any:
        return self.python_callable(*self.op_args, **self.op_kwargs)

Ключевые механики:

  • op_args / op_kwargs template_fields. Значения внутри них прогоняются через Jinja. Значит можно писать op_kwargs={"date": "{{ ds }}"} — после render это будет реальная дата строкой.
  • determine_kwargs(context) — фильтрует context по signature функции. Если python_callable принимает **context или явно ds, ti, execution_date — они подмешиваются автоматически. Если не принимает — context не пробрасывается.
  • templates_dict — устаревший механизм передачи dict с rendered values отдельно. Используйте op_kwargs для нового кода.

op_args vs op_kwargs vs context resolution

def my_func(positional, *, named, ds=None, ti=None):
    print(positional, named, ds, ti)

PythonOperator(
    task_id="t"
    python_callable=my_func,
    op_args=["arg1"],
    op_kwargs={"named": "value", "rendered": "{{ ds }}"},
)

Что произойдёт на execute:

  1. Airflow видит, что my_func принимает ds и ti — добавляет их в op_kwargs из context.
  2. op_kwargs["rendered"] пройдёт через Jinja → станет "2026-05-12".
  3. Финальный вызов: my_func("arg1", named="value", rendered="2026-05-12", ds="2026-05-12", ti=<TI>).

Если функция не объявила ds и ti в signature — Airflow не подмешивает их (избежать TypeError).


@task — обёртка _PythonDecoratedOperator

@task decorator создаёт обёртку — _PythonDecoratedOperator, наследника PythonOperator. Упрощённо:

class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
    custom_operator_name: str = "@task"

    template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs")

    def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs):
        kwargs_to_upstream = {
            "python_callable": python_callable,
            "op_args": op_args,
            "op_kwargs": op_kwargs,
        }
        super().__init__(
            kwargs_to_upstream=kwargs_to_upstream,
            python_callable=python_callable,
            op_args=op_args,
            op_kwargs=op_kwargs,
            **kwargs,
        )

Ключевая разница с PythonOperator — DecoratedOperator добавляет:

  1. XComArg resolution. Если в op_kwargs есть XComArg (то есть результат другого @task), на execute Airflow делает xcom_pull за вас.
  2. multiple_outputs. Если @task(multiple_outputs=True) или return type — TypedDict, return value распаковывается по ключам и каждый идёт в свой XCom.
  3. Function metadata propagation. task_id берётся из __name__ функции, doc — из docstring.

Подробно _PythonDecoratedOperator разобран в Module 02 lesson 03 — здесь акцент на семействе.


Семейство Python-operators

Шесть способов запустить Python код в Airflow. Различия — в изоляции и скорости:

Семейство Python operators
PythonOperator / @taskЗапускает callable в том же Python процессе, что и worker. Все deps worker'а доступны. Скорость — наивысшая, изоляция — нулевая. Crash в malloc убивает worker.
PythonVirtualenvOperator / @task.virtualenvСоздаёт временный venv при каждом execute, ставит туда requirements, сериализует function через cloudpickle, выполняет. Изоляция deps есть, но time-per-run медленный (создание venv ~30s).
ExternalPythonOperator / @task.external_pythonИспользует существующий Python interpreter (path указывается). Не создаёт venv заново — экономия 30s/run. Подходит если у вас pre-baked venv в worker image.
DockerOperator / @task.dockerЗапускает функцию внутри Docker container. Изоляция — полная (Linux namespaces). Worker должен иметь доступ к docker daemon. Хорошо для legacy deps, плохо для K8s deployment.
KubernetesPodOperator / @task.kubernetesЗапускает pod в K8s с указанным образом. Worker отправляет manifest, ждёт completion, читает logs. Изоляция максимальна, scale неограничен, но startup latency 10-30s.

Сравнительная таблица

OperatorStartup overheadIsolationКогда выбирать
PythonOperator / @task0s (в worker)НетDefault. Если deps совместимы с worker image
PythonVirtualenvOperator20-60s (создание venv)Deps толькоОдин-off задача с уникальными deps, редкий запуск
ExternalPythonOperator0.5s (просто subprocess)Deps толькоРегулярные tasks с альтернативным Python (например, 3.10 vs 3.11 worker)
DockerOperator2-10s (image pull + start)Full namespaceLegacy job, контролируемое окружение, не на K8s
KubernetesPodOperator10-30s (pod schedule + pull)Full clusterHigh isolation, large compute (Spark driver, GPU), unbounded scale

PythonVirtualenvOperator — изоляция deps на лету

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def venv_demo():
    @task.virtualenv(
        requirements=["pandas==1.5.3", "scikit-learn==1.2.0"],
        python_version="3.10"
        system_site_packages=False,
        use_dill=False,
    )
    def train_model():
        import pandas as pd
        from sklearn.linear_model import LinearRegression
        # pandas 1.5.3 даже если worker имеет 2.x
        df = pd.read_parquet("/data/features.parquet")
        model = LinearRegression().fit(df[["x"]], df["y"])
        return {"coef": float(model.coef_[0])}

    train_model()

venv_demo()

Что происходит под капотом:

  1. Airflow создаёт temp directory: /tmp/airflow_venv_xxxxx/.
  2. Запускает python3.10 -m venv /tmp/airflow_venv_xxxxx.
  3. pip install pandas==1.5.3 scikit-learn==1.2.0 внутрь.
  4. Сериализует train_model через cloudpickle (не pickle) — он может сериализовать closures и nested functions.
  5. Запускает subprocess.Popen([venv_python, "exec.py"]) — exec.py загружает pickled function и вызывает.
  6. Читает return value из stdout (json-encoded для simple types, cloudpickle для complex).
venv / virtualenv: как устроены виртуальные окружения Python
WARNING

PythonVirtualenvOperator создаёт venv на каждый запуск task. Если task запускается каждый час и имеет 200MB deps — каждый час будет pip install (минута+ в зависимости от индекса). Используйте ExternalPythonOperator с pre-built venv, или закэшируйте pip wheels локально, или вообще закладывайте venv в worker image.

Ограничения cloudpickle

  • Нельзя использовать модули из worker, если они не в requirements (например, нельзя import airflow внутри venv task — Airflow не указан в requirements).
  • Аргументы и return value должны быть picklable.
  • Нет доступа к context — sensor-like signatures не работают, кроме ds, ti и других primitives, которые сериализуются.

ExternalPythonOperator — pre-built venv

Решает главную проблему PythonVirtualenvOperator — медленный startup. Используется существующий interpreter.

@task.external_python(
    python="/opt/airflow/venvs/ml-py310/bin/python"
    expect_airflow=False,
)
def train_model():
    import pandas as pd
    return pd.__version__

В Docker image worker’а заранее создаёте /opt/airflow/venvs/ml-py310 с нужными deps:

FROM apache/airflow:2.10.5

USER root
RUN python3.10 -m venv /opt/airflow/venvs/ml-py310 && \
    /opt/airflow/venvs/ml-py310/bin/pip install pandas==1.5.3 scikit-learn==1.2.0

USER airflow

Скорость: subprocess.Popen к готовому Python — 100-500ms overhead. Это +/- стоимость bash subprocess.

expect_airflow=False критичен: если внутри venv нет airflow package, скажите явно — иначе wrapper попытается импортировать airflow.utils для своих нужд и упадёт.


KubernetesPodOperator — overview

KubernetesPodOperator — отдельный мир. Создаёт pod в K8s, ждёт completion, читает logs. Базовый пример:

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

run_etl = KubernetesPodOperator(
    task_id="run_etl"
    name="etl-{{ ds_nodash }}"
    namespace="airflow-tasks"
    image="myregistry/etl-job:1.2.3"
    cmds=["python", "/app/etl.py"],
    arguments=["--date={{ ds }}", "--env=prod"],
    env_vars={"AWS_REGION": "us-east-1"},
    resources={"requests": {"memory": "2Gi", "cpu": "1"}, "limits": {"memory": "4Gi", "cpu": "2"}},
    is_delete_operator_pod=True,
    get_logs=True,
    log_events_on_failure=True,
    in_cluster=True,
)

Lifecycle:

  1. execute() собирает pod manifest из всех параметров.
  2. Через kubernetes client отправляет POST /api/v1/namespaces/airflow-tasks/pods.
  3. Подключается к pod logs через watch=True, стримит в worker logs.
  4. На каждом poll смотрит pod.status.phase — ждёт Succeeded или Failed.
  5. При Succeeded читает результат из xcom_sidecar container (если объявлен) или из последнего сообщения stdout.
  6. Если is_delete_operator_pod=TrueDELETE pod после завершения.

on_kill() отправляет DELETE с graceful period — pod получает SIGTERM, потом SIGKILL.

TIP

KubernetesPodOperator vs KubernetesExecutor — две независимых вещи. Executor определяет, где сами Airflow tasks запускаются (как Kubernetes pods). KubernetesPodOperator — это operator, который запускает custom pods независимо от того, в каком executor работает Airflow. Можно использовать KPO даже на LocalExecutor.


Production gotchas

  1. @task функция не имеет доступа к Airflow context, если signature этого не объявила. @task resolves только ti, ds, dag_run, params и т.п. если они присутствуют в args. Если хотите весь context — добавьте **context в signature, тогда Airflow подмешает полный dict.

  2. PythonVirtualenvOperator не подхватывает дополнительные модули из dags folder. Внутри venv нет sys.path к /opt/airflow/dags/include. Если у вас shared helper module, либо ставьте его как pip wheel в requirements, либо передавайте код как inline в decorated function.

  3. ExternalPythonOperator: venv стух при upgrade Airflow. Если в venv установлен pandas==1.5.3, а потом Airflow upgrade требует numpy>=1.25 — конфликт. Решение: переустанавливайте venv в Docker image при каждом upgrade Airflow, либо используйте полностью изолированную K8s pod operator.

  4. KubernetesPodOperator с is_delete_operator_pod=False. Удобно для debug, но в production — каждый task оставляет orphan pod в Succeeded state. Через неделю в namespace 100k pods, etcd деградирует, kubectl get pods тормозит. Всегда True в prod, для debug — отдельный namespace с retention policy.

  5. do_xcom_push=True (default) + return large object. PythonOperator пушит return value в xcom через JSON serialization (или pickle если enable_xcom_pickling=True). 100MB DataFrame → 100MB строка в metadata DB → query на TI list начнёт тормозить. Для большого результата — do_xcom_push=False, складывайте в S3, возвращайте только S3 path.

  6. op_kwargs с XComArg внутри nested dict. op_kwargs={"config": {"date": some_task_output()}}some_task_output() это XComArg, но Airflow по default resolves только top-level keys в op_kwargs. Для nested resolution нужен явный flag или TaskFlow API (@task делает deep resolve автоматически).


Проверка знанийKnowledge check
Ваша команда хочет запускать ML training task с deps `pandas==1.5.3` и `scikit-learn==1.2.0`, которые конфликтуют с worker image (там pandas 2.1). Task запускается каждый час и должна стартовать быстро (< 2s overhead). Какой operator выбрать и почему не другие?
ОтветAnswer
Выбор — **ExternalPythonOperator** (или `@task.external_python`) с pre-built venv в worker Docker image. Создаём в Dockerfile `RUN python3.10 -m venv /opt/airflow/venvs/ml && /opt/airflow/venvs/ml/bin/pip install pandas==1.5.3 scikit-learn==1.2.0`, в task указываем `python='/opt/airflow/venvs/ml/bin/python'`. Overhead — subprocess.Popen к готовому интерпретатору, около 100-500ms. Почему не другие: (1) **PythonOperator** не подходит — конфликт deps с worker; (2) **PythonVirtualenvOperator** не подходит — создаёт venv на каждом execute, `pip install` ~30-60s, не уложимся в 2s; (3) **KubernetesPodOperator** перебор — startup pod 10-30s, требует K8s cluster и image push pipeline; (4) **DockerOperator** медленнее (image start 2-10s) и сложнее в deploy на K8s worker. ExternalPython даёт ровно то, что нужно: deps isolation + минимальный startup. Trade-off: venv нужно поддерживать в Docker image, синхронизировать обновления.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Команда хочет запускать ML training task каждый час с deps `pandas==1.5.3` и `scikit-learn==1.2.0`, которые конфликтуют с worker image. Требование — overhead старта < 2s. Какой operator выбрать?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6