Learning Platform
Глоссарий Troubleshooting
Урок 04.06 · 22 мин
Продвинутый
ProvidersHooksentry_pointsConnectionsProvider PackageBaseHook

Provider architecture

Airflow 2.0 совершил тихий, но фундаментальный архитектурный сдвиг: ядро (apache-airflow) и интеграции с внешними системами (AWS, GCP, Postgres, Snowflake, Slack, Kubernetes) разделились на отдельные pip-пакеты — provider packages. Это решение делает Airflow модульным, позволяет провайдерам релизиться независимо от core, и открывает дорогу к 3000+ provider packages в registry.

Понимание архитектуры providers важно по трём причинам: (1) вы каждый день импортируете из airflow.providers.* и нужно знать, откуда они берутся; (2) version compatibility — провайдеры релизятся независимо, и несовместимости с core версия — частая боль; (3) рано или поздно вы напишете custom provider для внутреннего сервиса, и нужно знать как это правильно сделать.


Архитектурная картина

Airflow + Provider packages
apache-airflow (core)Ядро: scheduler, webserver, executor framework, DAG/Task/Operator базовые классы, metadata DB schema, REST API, CLI. Содержит только `airflow.operators.python`, `airflow.operators.bash`, `airflow.operators.empty` и подобные базовые operators.
depends on
apache-airflow-providers-amazonВсе интеграции с AWS: S3Hook, S3KeySensor, RedshiftSQLOperator, EmrCreateJobFlowOperator, AthenaOperator. Регистрируется в `airflow.providers.amazon.*`. Релиз независимый от core.
apache-airflow-providers-googleGCP интеграции: GCSHook, BigQueryOperator, DataprocSubmitJobOperator, ComposerCreateWorkflow. Импорты через `airflow.providers.google.*`.
apache-airflow-providers-postgresPostgresHook, PostgresOperator, PostgresToS3Operator, S3ToPostgresOperator. Тонкая обёртка над psycopg2.
apache-airflow-providers-cncf-kubernetesKubernetesPodOperator, KubernetesExecutor support, KubernetesHook. Использует kubernetes-asyncio для deferrable operators.
apache-airflow-providers-snowflakeSnowflakeHook, SnowflakeOperator, SnowflakeSqlApiOperator (deferrable). Имеет SDK dependencies — snowflake-connector-python.
apache-airflow-providers-...3000+ providers в registry.astronomer.io: Slack, Databricks, Mongo, Redis, MySQL, ClickHouse, Atlassian, Azure, Tableau, dbt, FiveTran, Hightouch — практически всё.

Ядро Airflow знает о существовании providers через entry_points (Python packaging mechanism). При старте scheduler/webserver:

  1. Сканируется sys.path на пакеты с entry_points={"apache_airflow_provider": ["provider_info=..."]}.
  2. Каждый найденный provider регистрирует свои hooks, operators, sensors, connection types в ProviderManager.
  3. UI берёт оттуда список доступных connection types для формы “Add Connection”.
  4. CLI команда airflow providers list показывает зарегистрированные provider packages с версиями.

Провайдер как pip-пакет

apache-airflow-providers-amazon — обычный pip-пакет. Установка:

pip install apache-airflow-providers-amazon==8.20.0

Или extras при установке core:

pip install "apache-airflow[amazon,google,snowflake,postgres]==2.10.5"

Constraint files обязательны:

pip install "apache-airflow[amazon]==2.10.5" \
  --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.10.txt"

Constraints fixируют точные версии всех transitive deps для каждой комбинации (Airflow version × Python version). Без constraints pip может затащить несовместимые версии provider, urllib3, sqlalchemy и сломать вам кластер.

Структура provider package

Пакет имеет фиксированный layout:

apache-airflow-providers-amazon/
├── pyproject.toml                   # entry_points регистрация
├── airflow/
│   └── providers/
│       └── amazon/
│           ├── __init__.py          # get_provider_info()
│           ├── aws/
│           │   ├── hooks/
│           │   │   ├── s3.py        # S3Hook
│           │   │   └── ec2.py
│           │   ├── operators/
│           │   │   └── s3.py        # S3CopyObjectOperator, S3DeleteObjectsOperator
│           │   ├── sensors/
│           │   │   └── s3.py        # S3KeySensor, S3PrefixSensor
│           │   ├── transfers/
│           │   │   └── s3_to_redshift.py
│           │   └── triggers/
│           │       └── s3.py        # S3KeyTrigger для deferrable

В __init__.py функция get_provider_info():

def get_provider_info():
    return {
        "package-name": "apache-airflow-providers-amazon",
        "name": "Amazon",
        "description": "Amazon integration providing operators, hooks, and sensors for AWS services",
        "versions": ["8.20.0"],
        "additional-extras": [...],
        "integrations": [
            {"integration-name": "Amazon S3", "external-doc-url": "..."},
            ...
        ],
        "operators": [
            {"integration-name": "Amazon S3", "python-modules": ["airflow.providers.amazon.aws.operators.s3"]},
            ...
        ],
        "hooks": [...],
        "sensors": [...],
        "connection-types": [
            {"hook-class-name": "airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook", "connection-type": "aws"},
            ...
        ],
    }

Это metadata, которая регистрирует все operators/hooks/connections в ProviderManager.

Entry_points в pyproject.toml

[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.amazon.__init__:get_provider_info"

Это критический механизм регистрации. Без entry_point Airflow не узнает о существовании пакета даже если он установлен.


BaseHook — основа интеграций

Каждый provider определяет один или несколько Hooks. Hook — это абстракция connection к внешней системе, переиспользуемая разными operators и sensors.

from airflow.hooks.base import BaseHook

class MyServiceHook(BaseHook):
    conn_name_attr = "my_service_conn_id"
    default_conn_name = "my_service_default"
    conn_type = "myservice"
    hook_name = "My Service"

    def __init__(self, my_service_conn_id: str = default_conn_name):
        super().__init__()
        self.my_service_conn_id = my_service_conn_id
        self._client = None

    def get_conn(self):
        if self._client is None:
            conn = self.get_connection(self.my_service_conn_id)
            self._client = SomeSDK(
                host=conn.host,
                user=conn.login,
                password=conn.password,
                token=conn.extra_dejson.get("api_token"),
            )
        return self._client

    def list_items(self, prefix: str) -> list[str]:
        return self.get_conn().list(prefix=prefix)

    @classmethod
    def get_ui_field_behaviour(cls) -> dict:
        return {
            "hidden_fields": ["schema", "extra", "port"],
            "relabeling": {"login": "API User", "password": "API Secret"},
            "placeholders": {"host": "https://api.example.com"},
        }

Что делает BaseHook:

  • get_connection(conn_id) — достаёт Connection объект из metadata DB или secrets backend (Vault, AWS Secrets Manager, etc).
  • get_records(sql), get_first(sql), run(sql) — generic SQL methods (для DbApiHook subclasses).
  • Логирование через self.log.

get_ui_field_behaviour() — особая classmethod, которую вызывает webserver при рендере формы “Add Connection”. Позволяет переименовывать поля (login → “API User”), скрывать ненужные, добавлять placeholders.


Connection retrieval flow

Когда operator делает hook = S3Hook(aws_conn_id="aws_default") и потом hook.get_conn():

Как Hook получает Connection
hook.get_conn() calledOperator на execute() вызывает hook.get_conn() для получения клиента к external system. Это lazy инициализация — connection берётся только при первом обращении.
hook.get_connection('aws_default')Внутри hook → BaseHook.get_connection() → Connection.get_connection_from_secrets(conn_id). Этот метод обходит зарегистрированные secrets backends в порядке приоритета.
iterate secrets backends
1. EnvironmentVariablesBackendПроверяет AIRFLOW_CONN_AWS_DEFAULT env var. Если есть — парсит URL-формат (aws://access_key:secret@/?region=us-east-1) и возвращает Connection. Самый быстрый — нет I/O.
2. Custom secrets backendЕсли в airflow.cfg указан [secrets] backend = airflow.providers.hashicorp.secrets.vault.VaultBackend — обращение к Vault. Каждый запрос — HTTP к Vault server. Кэшируется в use_cache=True (default).
3. Metadata DBFallback — SELECT * FROM connection WHERE conn_id='aws_default'. Самый медленный, но без внешних зависимостей. Connection password зашифрован Fernet key.
Connection object returnedОбъект Connection с host, login, password, extra (JSON dict с custom fields). Hook конструирует свой клиент: boto3.client('s3', aws_access_key=conn.login, ...).

Подробнее secrets backends — в Module 10.


Версионная совместимость

Каждый provider имеет:

  • Минимальную поддерживаемую Airflow versionapache-airflow-providers-amazon==8.20.0 требует apache-airflow>=2.7.0.
  • Свой semver — provider может релизиться независимо. New version provider может добавить deferrable operators, не меняя core Airflow.

Constraint files фиксируют совместимую комбинацию. Например, для Airflow 2.10.5 + Python 3.10 constraint указывает apache-airflow-providers-amazon==8.20.0. Если попытаетесь поставить 9.x — может работать, может ломаться на конкретных operators.

WARNING

Главное правило: всегда используйте constraints при pip install. Без них pip resolver выбирает latest provider, который может тащить новые deps (например, sqlalchemy 2.x), несовместимые с core (2.x пока на sqlalchemy 1.4). Симптом: import errors, странные TypeError в местах, где раньше работало.

Где смотреть совместимость

  • Airflow constraints: https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.10.txt
  • Provider release notes: на странице каждого provider в Airflow docs (apache-airflow-providers-amazon → “Provider package” → changelog).
  • Provider registry: https://registry.astronomer.io/providers — поиск по operators/hooks, версии, ссылки на исходники.

Полезные provider packages

Часто используемые в production:

ProviderЧто внутриКогда нужен
apache-airflow-providers-amazonS3, EMR, Redshift, Athena, Lambda, Glue, SQS, EventBridgeЛюбой AWS deployment
apache-airflow-providers-googleGCS, BigQuery, Dataproc, Cloud ComposerGCP deployment
apache-airflow-providers-microsoft-azureAzure Blob, ADF, SynapseAzure deployment
apache-airflow-providers-postgresPostgresHook, PostgresOperatorЛюбой Postgres workload
apache-airflow-providers-snowflakeSnowflakeHook, SnowflakeSqlApiOperator (deferrable)Snowflake warehouse
apache-airflow-providers-cncf-kubernetesKubernetesPodOperator, KubernetesExecutor supportK8s deployment
apache-airflow-providers-dockerDockerOperatorDocker-based tasks
apache-airflow-providers-httpHttpHook, HttpSensor, SimpleHttpOperatorREST API integrations
apache-airflow-providers-slackSlackWebhookOperator, SlackAPIPostOperatorNotifications
apache-airflow-providers-databricksDatabricksRunNowOperator, DatabricksJobsCreateOperatorDatabricks jobs
apache-airflow-providers-dbt-cloudDbtCloudRunJobOperatordbt orchestration
apache-airflow-providers-common-sqlБазовый SQL hook, common SQL operatorsTransitive dep многих provider

Custom provider — кратко

Если ваша компания имеет внутренний сервис (например, internal data warehouse), правильный путь — собственный provider package. Минимальная структура:

my-airflow-provider-internal/
├── pyproject.toml
└── airflow/
    └── providers/
        └── internal/
            ├── __init__.py
            ├── hooks/
            │   └── warehouse.py
            └── operators/
                └── warehouse_query.py

pyproject.toml:

[project]
name = "my-airflow-provider-internal"
version = "1.0.0"
dependencies = ["apache-airflow>=2.10.0", "my-internal-sdk>=2.0"]

[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.internal.__init__:get_provider_info"

__init__.py:

def get_provider_info():
    return {
        "package-name": "my-airflow-provider-internal",
        "name": "Internal Services",
        "description": "Hooks and operators for internal data services",
        "versions": ["1.0.0"],
        "hooks": [
            {
                "integration-name": "Internal Warehouse",
                "python-modules": ["airflow.providers.internal.hooks.warehouse"],
            },
        ],
        "operators": [
            {
                "integration-name": "Internal Warehouse",
                "python-modules": ["airflow.providers.internal.operators.warehouse_query"],
            },
        ],
        "connection-types": [
            {
                "hook-class-name": "airflow.providers.internal.hooks.warehouse.InternalWarehouseHook",
                "connection-type": "internal_warehouse",
            },
        ],
    }

После pip install -e . в worker/scheduler containers airflow providers list покажет ваш provider, а airflow.providers.internal.* импорты будут работать.

TIP

Альтернатива custom provider — plugin (airflow.plugins_manager). Plugin проще (AirflowPlugin class с operators=[...], hooks=[...]), но менее структурирован. Для внутренней команды на 1-2 hooks plugin подходит, для библиотеки на 20+ operators — пишите provider package. Подробно в Module 12.


Production gotchas

  1. Установили apache-airflow-providers-X без constraints. pip затащил latest version, которая требует sqlalchemy 2.x, а core на 1.4. Получаете import errors при старте webserver. Fix — pip install ... --constraint <constraint-url>, или фиксируйте версии provider в requirements явно.

  2. Provider package не появляется в airflow providers list. Симптом: ваш custom provider установлен, но импорты падают. Причина 99% случаев — забыли entry_points в pyproject.toml, либо неправильный путь к get_provider_info. Без entry_point Airflow не узнает о пакете.

  3. Hook кэширует connection между tasks одного worker. hook.get_conn() кэширует client в self._client. На следующем task с тем же conn_id worker создаёт новый Hook instance, но если ваш кастомный Hook хранит client в class variable — он шарится между tasks. При rotation секретов в Vault старый client используется → 403 errors. Кэшируйте только на instance, не на класс.

  4. Connection extra не дешёвый JSON. Connection.extra — Text поле в DB с JSON-encoded строкой. Каждый conn.extra_dejson делает json.loads. Если в hot path (внутри циклов в operator) — будет тормозить. Кэшируйте result в hook.

  5. Major version bump provider ломает DAG. Например, providers-google 11.0.0 переименовал BigQueryOperatorBigQueryExecuteQueryOperator. После upgrade DAG parse fails. План: всегда читать CHANGELOG.md provider перед upgrade, тестировать на staging cluster, не пушить master в pip-install без review.

  6. Несколько versions одного provider в registry. Из-за того что provider релизится независимо, могут одновременно поддерживаться 3-4 major versions. Не путайтесь — выбирайте версию по совместимости с вашей core версией (из constraints), а не latest.

  7. Custom provider import errors в triggerer. Provider зарегистрирован, импорты работают в scheduler/worker, но triggerer не видит provider. Причина — triggerer запускается из своего venv (или container), куда provider не установлен. Установите provider во все компоненты: webserver, scheduler, worker, triggerer.


Что в Airflow 3.x

В 3.x несколько изменений в provider layer:

  • DAG Bundles — providers могут включать DAG bundles (templates DAGs), которые подгружаются в development через airflow dags bundle install.
  • Versioned providers с stricter compatibility checks — провайдер объявляет нижнюю и верхнюю границу совместимых core версий, install ошибается явно.
  • Task SDK — операторы теперь могут импортироваться через airflow.sdk для cross-version compatibility.

Это всё на горизонте upgrade. В 2.10/2.11 entry_point механизм остаётся стабильным и хорошо работает.


Проверка знанийKnowledge check
Вы решили использовать `airflow.providers.snowflake.operators.snowflake.SnowflakeOperator` в DAG. Импорт работает локально, но в production кластере DAG падает с ModuleNotFoundError. Где смотреть проблему, и какие три уровня проверки нужно пройти прежде чем переходить к глубокой диагностике?
ОтветAnswer
**Уровень 1 — установка пакета**: `pip show apache-airflow-providers-snowflake` в контейнерах scheduler, worker и triggerer. Если пакет не установлен — добавить в requirements.txt / Dockerfile и пересобрать image. Установка должна быть **во все** компоненты Airflow, не только в scheduler. **Уровень 2 — entry_point регистрация**: `airflow providers list` (либо в UI: Admin → Providers). Если provider установлен, но не в списке — проблема с entry_point. Чаще всего это значит, что pyproject.toml пакета не имеет `[project.entry-points.apache_airflow_provider]` (для legacy провайдеров проверить setup.py). Также может быть, что pip установил пакет, но не выполнил metadata install (редко). Попробовать `pip install --force-reinstall`. **Уровень 3 — version compatibility**: `pip show` показывает версию provider и `apache-airflow`. Сверить с constraints file для своей core версии: `https://raw.githubusercontent.com/apache/airflow/constraints-2.10.5/constraints-3.10.txt`. Если установлена несовместимая комбинация (например, providers-snowflake 6.x на core 2.7) — некоторые модули могут отсутствовать или сломаться при import. Fix — установка через constraint file: `pip install apache-airflow[snowflake]==2.10.5 --constraint <url>`. Только после этих трёх уровней лезть в strace / Python import debugger / inspect sys.path.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Как Airflow узнаёт о существовании provider package, чтобы он появился в `airflow providers list` и его operators были импортируемы?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6