Plugin manager — AirflowPlugin class, entry_points, discovery flow
В Airflow 2.x есть два способа расширить framework без форкания: file-based plugins через папку $AIRFLOW_HOME/plugins/ и provider packages через entry_points в setup.py. Понимание plugin manager критично для production: где Airflow ищет plugins, в каком порядке они загружаются, что можно регистрировать, какие компоненты не работают через plugin manager в 2.x. Этот урок — карта plugin system.
В Airflow 3.x некоторые компоненты (executors, operators, hooks) перевели исключительно на provider packages — plugin manager в 3.x сильно урезан. В 2.10/2.11 LTS — full backward compatibility, оба механизма работают.
Что регистрируется через plugin manager
В Airflow 2.x через AirflowPlugin (file-based) или provider entry_points регистрируются девять категорий компонентов:
| Категория | Что | Примеры |
|---|---|---|
| Hooks | Custom connection-based clients | MyApiHook(BaseHook) |
| Operators | Custom operators | MyApiOperator(BaseOperator) |
| Sensors | Custom sensors | MyEventSensor(BaseSensorOperator) |
| Macros | Custom Jinja macros | fiscal_year_for(ds) |
| Executors | Custom executors | MyCustomExecutor(BaseExecutor) |
| Timetables | Custom schedules | BusinessDayTimetable(Timetable) |
| Listeners | Hook impl modules (2.8+) | my_listener module с @hookimpl |
| Flask blueprints | Custom Flask routes / UI | my_blueprint = Blueprint(...) |
| FAB AppBuilder views | Webserver UI views | MyView(AppBuilderBaseView) |
В 2.10/2.11 LTS все девять работают через оба механизма. В 3.x только timetables / listeners / Flask UI остаются в plugin manager — остальное переехало в provider packages.
Способ 1: file-based — AirflowPlugin класс
Создаётся файл $AIRFLOW_HOME/plugins/my_plugin.py:
from airflow.plugins_manager import AirflowPlugin
from my_module.hooks import MyApiHook
from my_module.operators import MyApiOperator
from my_module.sensors import MyEventSensor
from my_module.timetable import BusinessDayTimetable
def fiscal_year_for(ds):
"""Custom macro."""
from datetime import datetime
d = datetime.strptime(ds, "%Y-%m-%d")
return d.year if d.month >= 7 else d.year - 1
class MyOrgPlugin(AirflowPlugin):
name = "my_org_plugin"
hooks = [MyApiHook]
operators = [MyApiOperator]
sensors = [MyEventSensor]
macros = [fiscal_year_for]
timetables = [BusinessDayTimetable]
# Listeners (2.8+) — отдельная категория
listeners = ["my_module.listeners.events"] # ← module path string!
# Flask blueprint для custom routes
from my_module.web import bp
flask_blueprints = [bp]
# FAB AppBuilder views для webserver UI
from my_module.web import MyView
appbuilder_views = [
{
"name": "My Custom View",
"category": "My Plugin",
"view": MyView(),
}
]
Ключевые моменты:
- Класс наследник
AirflowPlugin. Имя класса не важно, ноnameатрибут используется для namespacing. - Атрибуты — списки.
listeners— list of module path strings, не class references. Это потому что listeners используют pluggy-based hook discovery (см. урок 04).- Файл должен быть Python-importable —
$AIRFLOW_HOME/plugins/добавлен вsys.pathавтоматически.
Discovery sequence для file-based
# airflow/plugins_manager.py — упрощённо
def load_plugins_from_plugin_directory():
plugins_folder = conf.get("core", "plugins_folder") # default $AIRFLOW_HOME/plugins
if not plugins_folder or not os.path.isdir(plugins_folder):
return []
sys.path.insert(0, plugins_folder)
plugins = []
for entry in pkgutil.iter_modules([plugins_folder]):
module = importlib.import_module(entry.name)
for obj in vars(module).values():
if isclass(obj) and issubclass(obj, AirflowPlugin) and obj is not AirflowPlugin:
plugins.append(obj())
return plugins
Происходит:
- Папка
plugins_folder(default$AIRFLOW_HOME/plugins) добавляется вsys.path. - Все
.pyфайлы импортируются. - Каждый класс наследник
AirflowPluginинстанциируется. - Атрибуты (hooks, operators, etc.) регистрируются в global registry.
Способ 2: provider package через entry_points
Это рекомендуемый способ для shippable plugins. Создаётся pip-installable package с setup.py:
# setup.py
from setuptools import setup
setup(
name="apache-airflow-providers-my-org"
version="1.0.0"
install_requires=["apache-airflow>=2.10"],
packages=["my_org_provider"],
entry_points={
"apache_airflow_provider": [
"provider_info=my_org_provider:get_provider_info",
],
},
)
И функция get_provider_info:
# my_org_provider/__init__.py
def get_provider_info():
return {
"package-name": "apache-airflow-providers-my-org",
"name": "My Org Provider",
"description": "Custom org provider",
"versions": ["1.0.0"],
# Aiflow components
"hook-class-names": ["my_org_provider.hooks.MyApiHook"],
"operator-class-names": ["my_org_provider.operators.MyApiOperator"],
"sensor-class-names": ["my_org_provider.sensors.MyEventSensor"],
"timetable-class-names": ["my_org_provider.timetables.BusinessDayTimetable"],
# Listeners (2.8+)
"listener-modules": ["my_org_provider.listeners"],
# Macros — через extra registration
"extra-links": [],
# Connections (для UI dropdown в Connections)
"connection-types": [
{
"connection-type": "my_api",
"hook-class-name": "my_org_provider.hooks.MyApiHook",
}
],
}
Discovery sequence для providers
# airflow/providers_manager.py — упрощённо
def _discover_all_providers_from_packages():
"""Look for provider info entry points."""
for entry_point in entry_points().get("apache_airflow_provider", []):
provider_info_callback = entry_point.load()
provider_info = provider_info_callback()
register_provider(provider_info)
Происходит:
- При scheduler/worker/webserver startup сканируются все pip-installed packages.
- Те, что объявили
entry_points={"apache_airflow_provider": [...]}загружаются. get_provider_info()вызывается, возвращает dict с компонентами.- Hooks, operators, sensors, listeners регистрируются.
Загрузка plugins — full sequence
Sequence — important detail: file-based загружается первым, entry_points — вторым. При конфликте имён выигрывает entry_points (последний регистрируется).
Lazy loading и hot reload
В Airflow 2.x plugins загружаются лениво при первом обращении. После загрузки они кешируются в process memory.
# airflow/plugins_manager.py
plugins = None # ← module-level cache
def ensure_plugins_loaded():
global plugins
if plugins is None:
plugins = _load_all_plugins()
Это значит:
- Изменили plugin file — нужно рестарт scheduler / worker / webserver, не подхватится auto.
- В отличие от DAG-ов которые re-parse каждые
min_file_process_interval(30s default). - В Airflow 3.x появится hot reload plugins (3.1+), но в 2.x — restart обязателен.
Workaround для dev: при изменении plugin делать airflow scheduler (kill + start) и airflow webserver (kill + start).
file-based vs entry_points — когда какой
| Критерий | file-based ($AIRFLOW_HOME/plugins/) | entry_points (provider) |
|---|---|---|
| Лёгкий старт | да — просто файл | нет — нужен setup.py, pip install |
| Versioning | нет — какой код в plugins/ | yes — pip pin |
| Distribution | копировать файлы / sync с git | pip install из artifact registry |
| Hot prototyping | да | сложнее (нужен pip install -e .) |
| UI Connections registration | нет | да — connection-types |
| Production best practice | dev / небольшие plugins | да — major plugins, sharing across team |
| Airflow 3.x compatibility | будет deprecated | full support |
Production recommendation: entry_points provider для anything shared. file-based — только для on-premise quick fixes.
Что НЕ работает через plugin manager
Несколько вещей, которые часто пытаются делать через plugins, но они там не должны быть:
- DAGs. DAGs — в
dags_folder, не в plugins. Plugin для DAG — антипаттерн. - Connections. Connections — в metadata DB через UI/CLI/env vars / secrets backend. Plugin может зарегистрировать connection type (UI dropdown), но не connection instance.
- Variables. Variables — в metadata DB / env / secrets backend. Plugin не может «inject» variables.
- Config (
airflow.cfg). Plugins не должны менять config. Config — через env vars / cfg files. - Database migrations. Plugin не может create tables. Только Airflow core делает migrations.
Debugging plugin loading
Если plugin не работает:
# Что Airflow видит
airflow plugins
# Output:
# name | source | hooks | operators | listeners
# my_org_plugin | <module 'my_plugin' from ...> | MyApiHook | MyApiOperator | my_module.listeners.events
Если plugin не в списке:
- Проверить
$AIRFLOW_HOME/plugins/существует, файл там, syntax OK. - Проверить
[core] plugins_folderв airflow.cfg. - Проверить import:
python -c "from my_plugin import MyOrgPlugin". - Для providers:
pip list | grep apache-airflow-providers-my-org.
Common error в DAG file:
ImportError: No module named 'airflow.hooks.my_api'
→ plugin не загрузился, или не зарегистрирован hook.
Production gotchas
1. Plugin changes требуют restart всех процессов. scheduler, webserver, workers (Celery), triggerer, dag-processor — все. Иначе один process использует старую version, другой — новую → inconsistent behavior.
2. file-based plugins не versioned. Git history единственный source of truth. Production deploy должен copy/sync plugins explicitly, иначе разные deployments расходятся.
3. airflow plugins показывает только loaded plugins. Если webserver не рестартовали — airflow plugins в его context показывает loaded webserver-side, scheduler-side может отличаться.
4. Heavy imports в plugin module slow down boot. Plugin module импортируется первым при boot — если там import pandas, tensorflow, etc — Airflow scheduler/worker startup time раздувается.
5. Listeners — module path strings, не class refs. В listeners = ["..."] идут строки с module paths (e.g., "my_module.listeners.events"), не classes. Это потому что pluggy hook discovery работает через module reflection.