Learning Platform
Глоссарий Troubleshooting
Урок 13.02 · 26 мин
Продвинутый
PluginsAirflowPluginentry_pointsProvider PackageDiscovery

Plugin manager — AirflowPlugin class, entry_points, discovery flow

В Airflow 2.x есть два способа расширить framework без форкания: file-based plugins через папку $AIRFLOW_HOME/plugins/ и provider packages через entry_points в setup.py. Понимание plugin manager критично для production: где Airflow ищет plugins, в каком порядке они загружаются, что можно регистрировать, какие компоненты не работают через plugin manager в 2.x. Этот урок — карта plugin system.

В Airflow 3.x некоторые компоненты (executors, operators, hooks) перевели исключительно на provider packages — plugin manager в 3.x сильно урезан. В 2.10/2.11 LTS — full backward compatibility, оба механизма работают.


Что регистрируется через plugin manager

В Airflow 2.x через AirflowPlugin (file-based) или provider entry_points регистрируются девять категорий компонентов:

КатегорияЧтоПримеры
HooksCustom connection-based clientsMyApiHook(BaseHook)
OperatorsCustom operatorsMyApiOperator(BaseOperator)
SensorsCustom sensorsMyEventSensor(BaseSensorOperator)
MacrosCustom Jinja macrosfiscal_year_for(ds)
ExecutorsCustom executorsMyCustomExecutor(BaseExecutor)
TimetablesCustom schedulesBusinessDayTimetable(Timetable)
ListenersHook impl modules (2.8+)my_listener module с @hookimpl
Flask blueprintsCustom Flask routes / UImy_blueprint = Blueprint(...)
FAB AppBuilder viewsWebserver UI viewsMyView(AppBuilderBaseView)

В 2.10/2.11 LTS все девять работают через оба механизма. В 3.x только timetables / listeners / Flask UI остаются в plugin manager — остальное переехало в provider packages.


Способ 1: file-based — AirflowPlugin класс

Создаётся файл $AIRFLOW_HOME/plugins/my_plugin.py:

from airflow.plugins_manager import AirflowPlugin
from my_module.hooks import MyApiHook
from my_module.operators import MyApiOperator
from my_module.sensors import MyEventSensor
from my_module.timetable import BusinessDayTimetable

def fiscal_year_for(ds):
    """Custom macro."""
    from datetime import datetime
    d = datetime.strptime(ds, "%Y-%m-%d")
    return d.year if d.month >= 7 else d.year - 1


class MyOrgPlugin(AirflowPlugin):
    name = "my_org_plugin"

    hooks = [MyApiHook]
    operators = [MyApiOperator]
    sensors = [MyEventSensor]
    macros = [fiscal_year_for]
    timetables = [BusinessDayTimetable]

    # Listeners (2.8+) — отдельная категория
    listeners = ["my_module.listeners.events"]  # ← module path string!

    # Flask blueprint для custom routes
    from my_module.web import bp
    flask_blueprints = [bp]

    # FAB AppBuilder views для webserver UI
    from my_module.web import MyView
    appbuilder_views = [
        {
            "name": "My Custom View",
            "category": "My Plugin",
            "view": MyView(),
        }
    ]

Ключевые моменты:

  • Класс наследник AirflowPlugin. Имя класса не важно, но name атрибут используется для namespacing.
  • Атрибуты — списки.
  • listeners — list of module path strings, не class references. Это потому что listeners используют pluggy-based hook discovery (см. урок 04).
  • Файл должен быть Python-importable$AIRFLOW_HOME/plugins/ добавлен в sys.path автоматически.

Discovery sequence для file-based

# airflow/plugins_manager.py — упрощённо
def load_plugins_from_plugin_directory():
    plugins_folder = conf.get("core", "plugins_folder")  # default $AIRFLOW_HOME/plugins
    if not plugins_folder or not os.path.isdir(plugins_folder):
        return []

    sys.path.insert(0, plugins_folder)

    plugins = []
    for entry in pkgutil.iter_modules([plugins_folder]):
        module = importlib.import_module(entry.name)
        for obj in vars(module).values():
            if isclass(obj) and issubclass(obj, AirflowPlugin) and obj is not AirflowPlugin:
                plugins.append(obj())
    return plugins

Происходит:

  1. Папка plugins_folder (default $AIRFLOW_HOME/plugins) добавляется в sys.path.
  2. Все .py файлы импортируются.
  3. Каждый класс наследник AirflowPlugin инстанциируется.
  4. Атрибуты (hooks, operators, etc.) регистрируются в global registry.

Способ 2: provider package через entry_points

Это рекомендуемый способ для shippable plugins. Создаётся pip-installable package с setup.py:

# setup.py
from setuptools import setup

setup(
    name="apache-airflow-providers-my-org"
    version="1.0.0"
    install_requires=["apache-airflow>=2.10"],
    packages=["my_org_provider"],
    entry_points={
        "apache_airflow_provider": [
            "provider_info=my_org_provider:get_provider_info",
        ],
    },
)

И функция get_provider_info:

# my_org_provider/__init__.py
def get_provider_info():
    return {
        "package-name": "apache-airflow-providers-my-org",
        "name": "My Org Provider",
        "description": "Custom org provider",
        "versions": ["1.0.0"],

        # Aiflow components
        "hook-class-names": ["my_org_provider.hooks.MyApiHook"],
        "operator-class-names": ["my_org_provider.operators.MyApiOperator"],
        "sensor-class-names": ["my_org_provider.sensors.MyEventSensor"],
        "timetable-class-names": ["my_org_provider.timetables.BusinessDayTimetable"],

        # Listeners (2.8+)
        "listener-modules": ["my_org_provider.listeners"],

        # Macros — через extra registration
        "extra-links": [],

        # Connections (для UI dropdown в Connections)
        "connection-types": [
            {
                "connection-type": "my_api",
                "hook-class-name": "my_org_provider.hooks.MyApiHook",
            }
        ],
    }

Discovery sequence для providers

# airflow/providers_manager.py — упрощённо
def _discover_all_providers_from_packages():
    """Look for provider info entry points."""
    for entry_point in entry_points().get("apache_airflow_provider", []):
        provider_info_callback = entry_point.load()
        provider_info = provider_info_callback()
        register_provider(provider_info)

Происходит:

  1. При scheduler/worker/webserver startup сканируются все pip-installed packages.
  2. Те, что объявили entry_points={"apache_airflow_provider": [...]} загружаются.
  3. get_provider_info() вызывается, возвращает dict с компонентами.
  4. Hooks, operators, sensors, listeners регистрируются.

Загрузка plugins — full sequence

Plugin discovery sequence при Airflow startup
airflow scheduler / webserver startupЛюбой Airflow процесс (scheduler, worker, webserver, triggerer, dag-processor) загружает plugins при startup. Это происходит в airflow.plugins_manager.ensure_plugins_loaded() лениво при первом обращении.
ensure_plugins_loaded()
Phase 1: file-based$AIRFLOW_HOME/plugins/ сканируется. Все .py файлы импортируются (pkgutil.iter_modules). Классы AirflowPlugin instantiated. Атрибуты добавлены в global registry.
Phase 2: entry_pointsВсе pip packages с entry_points 'apache_airflow_provider' сканируются. get_provider_info() вызывается. Компоненты регистрируются в providers_manager.
register components
Global registryairflow.plugins_manager._integrate_*_plugins() для каждой категории: hooks → registered_hooks dict; operators → registered_operators; etc. После этого все DAG могут импортировать from airflow.plugins import MyApiHook.
DAG parse
DAG может использовать pluginsfrom airflow.hooks.my_api import MyApiHook — работает после plugins loaded. Если plugin не нашёлся — ImportError при DAG parse, DAG помечается as broken в UI.

Sequence — important detail: file-based загружается первым, entry_points — вторым. При конфликте имён выигрывает entry_points (последний регистрируется).


Lazy loading и hot reload

В Airflow 2.x plugins загружаются лениво при первом обращении. После загрузки они кешируются в process memory.

# airflow/plugins_manager.py
plugins = None  # ← module-level cache

def ensure_plugins_loaded():
    global plugins
    if plugins is None:
        plugins = _load_all_plugins()

Это значит:

  • Изменили plugin file — нужно рестарт scheduler / worker / webserver, не подхватится auto.
  • В отличие от DAG-ов которые re-parse каждые min_file_process_interval (30s default).
  • В Airflow 3.x появится hot reload plugins (3.1+), но в 2.x — restart обязателен.

Workaround для dev: при изменении plugin делать airflow scheduler (kill + start) и airflow webserver (kill + start).


file-based vs entry_points — когда какой

Критерийfile-based ($AIRFLOW_HOME/plugins/)entry_points (provider)
Лёгкий стартда — просто файлнет — нужен setup.py, pip install
Versioningнет — какой код в plugins/yes — pip pin
Distributionкопировать файлы / sync с gitpip install из artifact registry
Hot prototypingдасложнее (нужен pip install -e .)
UI Connections registrationнетда — connection-types
Production best practicedev / небольшие pluginsда — major plugins, sharing across team
Airflow 3.x compatibilityбудет deprecatedfull support

Production recommendation: entry_points provider для anything shared. file-based — только для on-premise quick fixes.


Что НЕ работает через plugin manager

Несколько вещей, которые часто пытаются делать через plugins, но они там не должны быть:

  1. DAGs. DAGs — в dags_folder, не в plugins. Plugin для DAG — антипаттерн.
  2. Connections. Connections — в metadata DB через UI/CLI/env vars / secrets backend. Plugin может зарегистрировать connection type (UI dropdown), но не connection instance.
  3. Variables. Variables — в metadata DB / env / secrets backend. Plugin не может «inject» variables.
  4. Config (airflow.cfg). Plugins не должны менять config. Config — через env vars / cfg files.
  5. Database migrations. Plugin не может create tables. Только Airflow core делает migrations.

Debugging plugin loading

Если plugin не работает:

# Что Airflow видит
airflow plugins

# Output:
# name             | source                            | hooks      | operators       | listeners
# my_org_plugin    | <module 'my_plugin' from ...>     | MyApiHook  | MyApiOperator   | my_module.listeners.events

Если plugin не в списке:

  • Проверить $AIRFLOW_HOME/plugins/ существует, файл там, syntax OK.
  • Проверить [core] plugins_folder в airflow.cfg.
  • Проверить import: python -c "from my_plugin import MyOrgPlugin".
  • Для providers: pip list | grep apache-airflow-providers-my-org.

Common error в DAG file:

ImportError: No module named 'airflow.hooks.my_api'

→ plugin не загрузился, или не зарегистрирован hook.


Production gotchas

1. Plugin changes требуют restart всех процессов. scheduler, webserver, workers (Celery), triggerer, dag-processor — все. Иначе один process использует старую version, другой — новую → inconsistent behavior.

2. file-based plugins не versioned. Git history единственный source of truth. Production deploy должен copy/sync plugins explicitly, иначе разные deployments расходятся.

3. airflow plugins показывает только loaded plugins. Если webserver не рестартовали — airflow plugins в его context показывает loaded webserver-side, scheduler-side может отличаться.

4. Heavy imports в plugin module slow down boot. Plugin module импортируется первым при boot — если там import pandas, tensorflow, etc — Airflow scheduler/worker startup time раздувается.

5. Listeners — module path strings, не class refs. В listeners = ["..."] идут строки с module paths (e.g., "my_module.listeners.events"), не classes. Это потому что pluggy hook discovery работает через module reflection.


Проверка знанийKnowledge check
Создали custom hook MyApiHook в $AIRFLOW_HOME/plugins/my_plugin.py через AirflowPlugin class. DAG ImportError. Что чек-листом проверить?
ОтветAnswer
Полный диагностический чеклист: (1) `airflow plugins` показывает 'my_plugin' в списке? Если нет — plugin не загрузился. (2) Файл импортируется самостоятельно: `python -c 'import my_plugin; print(my_plugin.MyOrgPlugin.hooks)'` — должен показать [MyApiHook]. (3) [core] plugins_folder в airflow.cfg указывает на правильную папку. (4) Class наследник AirflowPlugin (не просто class), и hooks=[MyApiHook] список (не tuple). (5) DAG импортирует через airflow.hooks.<your_path>.MyApiHook (Airflow создаёт virtual module airflow.hooks.<plugin_name> где живут зарегистрированные hooks). (6) Все процессы restarted после plugin add — scheduler, webserver, workers (для Celery — все voprosов), triggerer, dag-processor. Если только webserver restarted — scheduler не видит. (7) Нет circular imports — plugin module не импортирует что-то требующее уже загруженных plugins. (8) Python module syntax OK — `python -m py_compile my_plugin.py`. Самая частая ошибка — пункт 6 (forgot to restart всех процессов) и пункт 5 (неправильный import path).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Два способа регистрации plugins в Airflow 2.10/2.11 — какой рекомендуется для production?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6