PythonOperator, TaskFlow и venv-варианты
PythonOperator — самый используемый operator в Airflow. Это execute(), который вызывает вашу функцию. Но за этой простотой стоит семейство из шести operators (Python, TaskFlow @task, ExternalPython, Virtualenv, Docker, KubernetesPod), каждый со своими гарантиями изоляции, скорости и cost. Этот урок разбирает, чем они отличаются на уровне execute, как resolves op_args/op_kwargs и какой выбрать под конкретную задачу.
В предыдущем уроке (02-baseoperator-anatomy) мы видели lifecycle BaseOperator. Здесь конкретизируем — что именно происходит в execute() PythonOperator и его обёртки @task.
Базовый PythonOperator
Псевдокод airflow.operators.python.PythonOperator (Airflow 2.10):
class PythonOperator(BaseOperator):
template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs")
template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}
def __init__(
self,
*,
python_callable: Callable,
op_args: Collection | None = None,
op_kwargs: Mapping[str, Any] | None = None,
templates_dict: dict | None = None,
templates_exts: list[str] | None = None,
show_return_value_in_logs: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
if not callable(python_callable):
raise AirflowException("`python_callable` must be callable")
self.python_callable = python_callable
self.op_args = op_args or ()
self.op_kwargs = op_kwargs or {}
self.templates_dict = templates_dict
# ...
def execute(self, context: Context) -> Any:
context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
self.op_kwargs = self.determine_kwargs(context)
return_value = self.execute_callable()
if self.show_return_value_in_logs:
self.log.info("Done. Returned value was: %s", return_value)
return return_value
def execute_callable(self) -> Any:
return self.python_callable(*self.op_args, **self.op_kwargs)
Ключевые механики:
op_args/op_kwargstemplate_fields. Значения внутри них прогоняются через Jinja. Значит можно писатьop_kwargs={"date": "{{ ds }}"}— после render это будет реальная дата строкой.determine_kwargs(context)— фильтрует context по signature функции. Еслиpython_callableпринимает**contextили явноds,ti,execution_date— они подмешиваются автоматически. Если не принимает — context не пробрасывается.templates_dict— устаревший механизм передачи dict с rendered values отдельно. Используйтеop_kwargsдля нового кода.
op_args vs op_kwargs vs context resolution
def my_func(positional, *, named, ds=None, ti=None):
print(positional, named, ds, ti)
PythonOperator(
task_id="t"
python_callable=my_func,
op_args=["arg1"],
op_kwargs={"named": "value", "rendered": "{{ ds }}"},
)
Что произойдёт на execute:
- Airflow видит, что
my_funcпринимаетdsиti— добавляет их вop_kwargsиз context. op_kwargs["rendered"]пройдёт через Jinja → станет"2026-05-12".- Финальный вызов:
my_func("arg1", named="value", rendered="2026-05-12", ds="2026-05-12", ti=<TI>).
Если функция не объявила ds и ti в signature — Airflow не подмешивает их (избежать TypeError).
@task — обёртка _PythonDecoratedOperator
@task decorator создаёт обёртку — _PythonDecoratedOperator, наследника PythonOperator. Упрощённо:
class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
custom_operator_name: str = "@task"
template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs")
def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs):
kwargs_to_upstream = {
"python_callable": python_callable,
"op_args": op_args,
"op_kwargs": op_kwargs,
}
super().__init__(
kwargs_to_upstream=kwargs_to_upstream,
python_callable=python_callable,
op_args=op_args,
op_kwargs=op_kwargs,
**kwargs,
)
Ключевая разница с PythonOperator — DecoratedOperator добавляет:
- XComArg resolution. Если в
op_kwargsестьXComArg(то есть результат другого@task), на execute Airflow делаетxcom_pullза вас. - multiple_outputs. Если
@task(multiple_outputs=True)или return type —TypedDict, return value распаковывается по ключам и каждый идёт в свой XCom. - Function metadata propagation.
task_idберётся из__name__функции, doc — из docstring.
Подробно _PythonDecoratedOperator разобран в Module 02 lesson 03 — здесь акцент на семействе.
Семейство Python-operators
Шесть способов запустить Python код в Airflow. Различия — в изоляции и скорости:
Сравнительная таблица
| Operator | Startup overhead | Isolation | Когда выбирать |
|---|---|---|---|
PythonOperator / @task | 0s (в worker) | Нет | Default. Если deps совместимы с worker image |
PythonVirtualenvOperator | 20-60s (создание venv) | Deps только | Один-off задача с уникальными deps, редкий запуск |
ExternalPythonOperator | 0.5s (просто subprocess) | Deps только | Регулярные tasks с альтернативным Python (например, 3.10 vs 3.11 worker) |
DockerOperator | 2-10s (image pull + start) | Full namespace | Legacy job, контролируемое окружение, не на K8s |
KubernetesPodOperator | 10-30s (pod schedule + pull) | Full cluster | High isolation, large compute (Spark driver, GPU), unbounded scale |
PythonVirtualenvOperator — изоляция deps на лету
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def venv_demo():
@task.virtualenv(
requirements=["pandas==1.5.3", "scikit-learn==1.2.0"],
python_version="3.10"
system_site_packages=False,
use_dill=False,
)
def train_model():
import pandas as pd
from sklearn.linear_model import LinearRegression
# pandas 1.5.3 даже если worker имеет 2.x
df = pd.read_parquet("/data/features.parquet")
model = LinearRegression().fit(df[["x"]], df["y"])
return {"coef": float(model.coef_[0])}
train_model()
venv_demo()
Что происходит под капотом:
- Airflow создаёт temp directory:
/tmp/airflow_venv_xxxxx/. - Запускает
python3.10 -m venv /tmp/airflow_venv_xxxxx. pip install pandas==1.5.3 scikit-learn==1.2.0внутрь.- Сериализует
train_modelчерезcloudpickle(не pickle) — он может сериализовать closures и nested functions. - Запускает
subprocess.Popen([venv_python, "exec.py"])— exec.py загружает pickled function и вызывает. - Читает return value из stdout (json-encoded для simple types, cloudpickle для complex).
PythonVirtualenvOperator создаёт venv на каждый запуск task. Если task запускается каждый час и имеет 200MB deps — каждый час будет pip install (минута+ в зависимости от индекса). Используйте ExternalPythonOperator с pre-built venv, или закэшируйте pip wheels локально, или вообще закладывайте venv в worker image.
Ограничения cloudpickle
- Нельзя использовать модули из worker, если они не в requirements (например, нельзя
import airflowвнутри venv task — Airflow не указан в requirements). - Аргументы и return value должны быть picklable.
- Нет доступа к
context— sensor-like signatures не работают, кромеds,tiи других primitives, которые сериализуются.
ExternalPythonOperator — pre-built venv
Решает главную проблему PythonVirtualenvOperator — медленный startup. Используется существующий interpreter.
@task.external_python(
python="/opt/airflow/venvs/ml-py310/bin/python"
expect_airflow=False,
)
def train_model():
import pandas as pd
return pd.__version__
В Docker image worker’а заранее создаёте /opt/airflow/venvs/ml-py310 с нужными deps:
FROM apache/airflow:2.10.5
USER root
RUN python3.10 -m venv /opt/airflow/venvs/ml-py310 && \
/opt/airflow/venvs/ml-py310/bin/pip install pandas==1.5.3 scikit-learn==1.2.0
USER airflow
Скорость: subprocess.Popen к готовому Python — 100-500ms overhead. Это +/- стоимость bash subprocess.
expect_airflow=False критичен: если внутри venv нет airflow package, скажите явно — иначе wrapper попытается импортировать airflow.utils для своих нужд и упадёт.
KubernetesPodOperator — overview
KubernetesPodOperator — отдельный мир. Создаёт pod в K8s, ждёт completion, читает logs. Базовый пример:
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
run_etl = KubernetesPodOperator(
task_id="run_etl"
name="etl-{{ ds_nodash }}"
namespace="airflow-tasks"
image="myregistry/etl-job:1.2.3"
cmds=["python", "/app/etl.py"],
arguments=["--date={{ ds }}", "--env=prod"],
env_vars={"AWS_REGION": "us-east-1"},
resources={"requests": {"memory": "2Gi", "cpu": "1"}, "limits": {"memory": "4Gi", "cpu": "2"}},
is_delete_operator_pod=True,
get_logs=True,
log_events_on_failure=True,
in_cluster=True,
)
Lifecycle:
execute()собирает pod manifest из всех параметров.- Через kubernetes client отправляет
POST /api/v1/namespaces/airflow-tasks/pods. - Подключается к pod logs через
watch=True, стримит в worker logs. - На каждом poll смотрит
pod.status.phase— ждётSucceededилиFailed. - При
Succeededчитает результат изxcom_sidecarcontainer (если объявлен) или из последнего сообщения stdout. - Если
is_delete_operator_pod=True—DELETEpod после завершения.
on_kill() отправляет DELETE с graceful period — pod получает SIGTERM, потом SIGKILL.
KubernetesPodOperator vs KubernetesExecutor — две независимых вещи. Executor определяет, где сами Airflow tasks запускаются (как Kubernetes pods). KubernetesPodOperator — это operator, который запускает custom pods независимо от того, в каком executor работает Airflow. Можно использовать KPO даже на LocalExecutor.
Production gotchas
-
@taskфункция не имеет доступа к Airflow context, если signature этого не объявила.@taskresolves толькоti,ds,dag_run,paramsи т.п. если они присутствуют в args. Если хотите весь context — добавьте**contextв signature, тогда Airflow подмешает полный dict. -
PythonVirtualenvOperator не подхватывает дополнительные модули из dags folder. Внутри venv нет sys.path к
/opt/airflow/dags/include. Если у вас shared helper module, либо ставьте его как pip wheel в requirements, либо передавайте код как inline в decorated function. -
ExternalPythonOperator: venv стух при upgrade Airflow. Если в venv установлен
pandas==1.5.3, а потом Airflow upgrade требуетnumpy>=1.25— конфликт. Решение: переустанавливайте venv в Docker image при каждом upgrade Airflow, либо используйте полностью изолированную K8s pod operator. -
KubernetesPodOperator с
is_delete_operator_pod=False. Удобно для debug, но в production — каждый task оставляет orphan pod вSucceededstate. Через неделю в namespace 100k pods, etcd деградирует, kubectl get pods тормозит. ВсегдаTrueв prod, для debug — отдельный namespace с retention policy. -
do_xcom_push=True(default) + return large object. PythonOperator пушит return value в xcom через JSON serialization (или pickle еслиenable_xcom_pickling=True). 100MB DataFrame → 100MB строка в metadata DB → query на TI list начнёт тормозить. Для большого результата —do_xcom_push=False, складывайте в S3, возвращайте только S3 path. -
op_kwargs с XComArg внутри nested dict.
op_kwargs={"config": {"date": some_task_output()}}—some_task_output()этоXComArg, но Airflow по default resolves только top-level keys вop_kwargs. Для nested resolution нужен явный flag или TaskFlow API (@taskделает deep resolve автоматически).