Provider architecture
Airflow 2.0 совершил тихий, но фундаментальный архитектурный сдвиг: ядро (apache-airflow) и интеграции с внешними системами (AWS, GCP, Postgres, Snowflake, Slack, Kubernetes) разделились на отдельные pip-пакеты — provider packages. Это решение делает Airflow модульным, позволяет провайдерам релизиться независимо от core, и открывает дорогу к 3000+ provider packages в registry.
Понимание архитектуры providers важно по трём причинам: (1) вы каждый день импортируете из airflow.providers.* и нужно знать, откуда они берутся; (2) version compatibility — провайдеры релизятся независимо, и несовместимости с core версия — частая боль; (3) рано или поздно вы напишете custom provider для внутреннего сервиса, и нужно знать как это правильно сделать.
Архитектурная картина
Ядро Airflow знает о существовании providers через entry_points (Python packaging mechanism). При старте scheduler/webserver:
- Сканируется
sys.pathна пакеты сentry_points={"apache_airflow_provider": ["provider_info=..."]}. - Каждый найденный provider регистрирует свои hooks, operators, sensors, connection types в
ProviderManager. - UI берёт оттуда список доступных connection types для формы “Add Connection”.
- CLI команда
airflow providers listпоказывает зарегистрированные provider packages с версиями.
Провайдер как pip-пакет
apache-airflow-providers-amazon — обычный pip-пакет. Установка:
pip install apache-airflow-providers-amazon==8.20.0
Или extras при установке core:
pip install "apache-airflow[amazon,google,snowflake,postgres]==2.10.5"
Constraint files обязательны:
pip install "apache-airflow[amazon]==2.10.5" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.10.txt"
Constraints fixируют точные версии всех transitive deps для каждой комбинации (Airflow version × Python version). Без constraints pip может затащить несовместимые версии provider, urllib3, sqlalchemy и сломать вам кластер.
Структура provider package
Пакет имеет фиксированный layout:
apache-airflow-providers-amazon/
├── pyproject.toml # entry_points регистрация
├── airflow/
│ └── providers/
│ └── amazon/
│ ├── __init__.py # get_provider_info()
│ ├── aws/
│ │ ├── hooks/
│ │ │ ├── s3.py # S3Hook
│ │ │ └── ec2.py
│ │ ├── operators/
│ │ │ └── s3.py # S3CopyObjectOperator, S3DeleteObjectsOperator
│ │ ├── sensors/
│ │ │ └── s3.py # S3KeySensor, S3PrefixSensor
│ │ ├── transfers/
│ │ │ └── s3_to_redshift.py
│ │ └── triggers/
│ │ └── s3.py # S3KeyTrigger для deferrable
В __init__.py функция get_provider_info():
def get_provider_info():
return {
"package-name": "apache-airflow-providers-amazon",
"name": "Amazon",
"description": "Amazon integration providing operators, hooks, and sensors for AWS services",
"versions": ["8.20.0"],
"additional-extras": [...],
"integrations": [
{"integration-name": "Amazon S3", "external-doc-url": "..."},
...
],
"operators": [
{"integration-name": "Amazon S3", "python-modules": ["airflow.providers.amazon.aws.operators.s3"]},
...
],
"hooks": [...],
"sensors": [...],
"connection-types": [
{"hook-class-name": "airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook", "connection-type": "aws"},
...
],
}
Это metadata, которая регистрирует все operators/hooks/connections в ProviderManager.
Entry_points в pyproject.toml
[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.amazon.__init__:get_provider_info"
Это критический механизм регистрации. Без entry_point Airflow не узнает о существовании пакета даже если он установлен.
BaseHook — основа интеграций
Каждый provider определяет один или несколько Hooks. Hook — это абстракция connection к внешней системе, переиспользуемая разными operators и sensors.
from airflow.hooks.base import BaseHook
class MyServiceHook(BaseHook):
conn_name_attr = "my_service_conn_id"
default_conn_name = "my_service_default"
conn_type = "myservice"
hook_name = "My Service"
def __init__(self, my_service_conn_id: str = default_conn_name):
super().__init__()
self.my_service_conn_id = my_service_conn_id
self._client = None
def get_conn(self):
if self._client is None:
conn = self.get_connection(self.my_service_conn_id)
self._client = SomeSDK(
host=conn.host,
user=conn.login,
password=conn.password,
token=conn.extra_dejson.get("api_token"),
)
return self._client
def list_items(self, prefix: str) -> list[str]:
return self.get_conn().list(prefix=prefix)
@classmethod
def get_ui_field_behaviour(cls) -> dict:
return {
"hidden_fields": ["schema", "extra", "port"],
"relabeling": {"login": "API User", "password": "API Secret"},
"placeholders": {"host": "https://api.example.com"},
}
Что делает BaseHook:
get_connection(conn_id)— достаётConnectionобъект из metadata DB или secrets backend (Vault, AWS Secrets Manager, etc).get_records(sql),get_first(sql),run(sql)— generic SQL methods (для DbApiHook subclasses).- Логирование через
self.log.
get_ui_field_behaviour() — особая classmethod, которую вызывает webserver при рендере формы “Add Connection”. Позволяет переименовывать поля (login → “API User”), скрывать ненужные, добавлять placeholders.
Connection retrieval flow
Когда operator делает hook = S3Hook(aws_conn_id="aws_default") и потом hook.get_conn():
Подробнее secrets backends — в Module 10.
Версионная совместимость
Каждый provider имеет:
- Минимальную поддерживаемую Airflow version —
apache-airflow-providers-amazon==8.20.0требуетapache-airflow>=2.7.0. - Свой semver — provider может релизиться независимо. New version provider может добавить deferrable operators, не меняя core Airflow.
Constraint files фиксируют совместимую комбинацию. Например, для Airflow 2.10.5 + Python 3.10 constraint указывает apache-airflow-providers-amazon==8.20.0. Если попытаетесь поставить 9.x — может работать, может ломаться на конкретных operators.
Главное правило: всегда используйте constraints при pip install. Без них pip resolver выбирает latest provider, который может тащить новые deps (например, sqlalchemy 2.x), несовместимые с core (2.x пока на sqlalchemy 1.4). Симптом: import errors, странные TypeError в местах, где раньше работало.
Где смотреть совместимость
- Airflow constraints:
https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.10.txt - Provider release notes: на странице каждого provider в Airflow docs (
apache-airflow-providers-amazon→ “Provider package” → changelog). - Provider registry:
https://registry.astronomer.io/providers— поиск по operators/hooks, версии, ссылки на исходники.
Полезные provider packages
Часто используемые в production:
| Provider | Что внутри | Когда нужен |
|---|---|---|
apache-airflow-providers-amazon | S3, EMR, Redshift, Athena, Lambda, Glue, SQS, EventBridge | Любой AWS deployment |
apache-airflow-providers-google | GCS, BigQuery, Dataproc, Cloud Composer | GCP deployment |
apache-airflow-providers-microsoft-azure | Azure Blob, ADF, Synapse | Azure deployment |
apache-airflow-providers-postgres | PostgresHook, PostgresOperator | Любой Postgres workload |
apache-airflow-providers-snowflake | SnowflakeHook, SnowflakeSqlApiOperator (deferrable) | Snowflake warehouse |
apache-airflow-providers-cncf-kubernetes | KubernetesPodOperator, KubernetesExecutor support | K8s deployment |
apache-airflow-providers-docker | DockerOperator | Docker-based tasks |
apache-airflow-providers-http | HttpHook, HttpSensor, SimpleHttpOperator | REST API integrations |
apache-airflow-providers-slack | SlackWebhookOperator, SlackAPIPostOperator | Notifications |
apache-airflow-providers-databricks | DatabricksRunNowOperator, DatabricksJobsCreateOperator | Databricks jobs |
apache-airflow-providers-dbt-cloud | DbtCloudRunJobOperator | dbt orchestration |
apache-airflow-providers-common-sql | Базовый SQL hook, common SQL operators | Transitive dep многих provider |
Custom provider — кратко
Если ваша компания имеет внутренний сервис (например, internal data warehouse), правильный путь — собственный provider package. Минимальная структура:
my-airflow-provider-internal/
├── pyproject.toml
└── airflow/
└── providers/
└── internal/
├── __init__.py
├── hooks/
│ └── warehouse.py
└── operators/
└── warehouse_query.py
pyproject.toml:
[project]
name = "my-airflow-provider-internal"
version = "1.0.0"
dependencies = ["apache-airflow>=2.10.0", "my-internal-sdk>=2.0"]
[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.internal.__init__:get_provider_info"
__init__.py:
def get_provider_info():
return {
"package-name": "my-airflow-provider-internal",
"name": "Internal Services",
"description": "Hooks and operators for internal data services",
"versions": ["1.0.0"],
"hooks": [
{
"integration-name": "Internal Warehouse",
"python-modules": ["airflow.providers.internal.hooks.warehouse"],
},
],
"operators": [
{
"integration-name": "Internal Warehouse",
"python-modules": ["airflow.providers.internal.operators.warehouse_query"],
},
],
"connection-types": [
{
"hook-class-name": "airflow.providers.internal.hooks.warehouse.InternalWarehouseHook",
"connection-type": "internal_warehouse",
},
],
}
После pip install -e . в worker/scheduler containers airflow providers list покажет ваш provider, а airflow.providers.internal.* импорты будут работать.
Альтернатива custom provider — plugin (airflow.plugins_manager). Plugin проще (AirflowPlugin class с operators=[...], hooks=[...]), но менее структурирован. Для внутренней команды на 1-2 hooks plugin подходит, для библиотеки на 20+ operators — пишите provider package. Подробно в Module 12.
Production gotchas
-
Установили
apache-airflow-providers-Xбез constraints. pip затащил latest version, которая требует sqlalchemy 2.x, а core на 1.4. Получаете import errors при старте webserver. Fix —pip install ... --constraint <constraint-url>, или фиксируйте версии provider в requirements явно. -
Provider package не появляется в
airflow providers list. Симптом: ваш custom provider установлен, но импорты падают. Причина 99% случаев — забылиentry_pointsвpyproject.toml, либо неправильный путь кget_provider_info. Без entry_point Airflow не узнает о пакете. -
Hook кэширует connection между tasks одного worker.
hook.get_conn()кэширует client вself._client. На следующем task с тем жеconn_idworker создаёт новый Hook instance, но если ваш кастомный Hook хранит client в class variable — он шарится между tasks. При rotation секретов в Vault старый client используется → 403 errors. Кэшируйте только на instance, не на класс. -
Connection extra не дешёвый JSON. Connection.extra —
Textполе в DB с JSON-encoded строкой. Каждыйconn.extra_dejsonделаетjson.loads. Если в hot path (внутри циклов в operator) — будет тормозить. Кэшируйте result в hook. -
Major version bump provider ломает DAG. Например,
providers-google 11.0.0переименовалBigQueryOperator→BigQueryExecuteQueryOperator. После upgrade DAG parse fails. План: всегда читать CHANGELOG.md provider перед upgrade, тестировать на staging cluster, не пушить master в pip-install без review. -
Несколько versions одного provider в registry. Из-за того что provider релизится независимо, могут одновременно поддерживаться 3-4 major versions. Не путайтесь — выбирайте версию по совместимости с вашей core версией (из constraints), а не latest.
-
Custom provider import errors в triggerer. Provider зарегистрирован, импорты работают в scheduler/worker, но triggerer не видит provider. Причина — triggerer запускается из своего venv (или container), куда provider не установлен. Установите provider во все компоненты: webserver, scheduler, worker, triggerer.
Что в Airflow 3.x
В 3.x несколько изменений в provider layer:
- DAG Bundles — providers могут включать DAG bundles (templates DAGs), которые подгружаются в development через
airflow dags bundle install. - Versioned providers с stricter compatibility checks — провайдер объявляет нижнюю и верхнюю границу совместимых core версий, install ошибается явно.
- Task SDK — операторы теперь могут импортироваться через
airflow.sdkдля cross-version compatibility.
Это всё на горизонте upgrade. В 2.10/2.11 entry_point механизм остаётся стабильным и хорошо работает.