Learning Platform
Глоссарий Troubleshooting
Урок 11.05 · 30 мин
Продвинутый
Secrets BackendVaultKubernetes AuthAppRoleDynamic Secrets

Secrets Backends — HashiCorp Vault

В production хранить credentials в metadata DB через UI — anti-pattern. Нет audit, нет fine-grained access control, key rotation требует manual работы, нет dynamic secrets. Решение — Secrets Backend: абстракция, через которую Airflow читает Connections и Variables из external secret store.

Самый популярный choice для self-hosted — HashiCorp Vault. Этот урок препарирует: как абстракция работает, как настраивается VaultBackend, какие auth modes (Kubernetes Service Account, AppRole, Token), что такое dynamic secrets и зачем они нужны для DB credentials.


BaseSecretsBackend — абстракция

Все Secrets Backends — наследники airflow.secrets.BaseSecretsBackend:

# Из airflow/secrets/base_secrets.py (псевдокод)
class BaseSecretsBackend(ABC):

    def get_connection(self, conn_id: str) -> Connection | None:
        """Read connection by conn_id; return None если not found."""
        ...

    def get_variable(self, key: str) -> str | None:
        """Read variable by key; return None если not found."""
        ...

    def get_config(self, key: str) -> str | None:
        """Read airflow.cfg-style config option."""
        ...

None означает «not found, fallback на следующий source». Exception означает «backend broken, halt lookup» (по default — депенds на implementation).

ProvidersManager собирает все доступные backends при startup, но конфигурируется обычно один backend через airflow.cfg:

[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = '{...}'

backend — fully qualified class name. backend_kwargs — JSON, который passed как **kwargs в __init__.


Полный lookup flow

Kubernetes Secrets: типы, base64 vs encryption at rest

С Secrets Backend конфигурированным lookup chain выглядит так:

Полный Connection/Variable lookup
Hook.get_connection('pg_prod')Entry point — любой Hook вызывает BaseHook.get_connection при инициализации. Это запускает Connection.get_connection_from_secrets — единый entry для всех lookup-ов.
Stage 1: Secrets BackendЕсли [secrets] backend configured — call backend.get_connection(conn_id). Path = connections_path + '/' + conn_id. Например, 'airflow/connections/pg_prod'. Если backend returns Connection object — RETURN immediately, остальные stages skip. Если None — fallback на ENV.
returns None
Stage 2: Environmentos.environ.get('AIRFLOW_CONN_PG_PROD'). URI parsed в Connection object. Никогда не достигает DB если ENV var present. Это важно для local dev и K8s overrides — можно override prod connection per pod.
не set
Stage 3: Metadata DBSELECT * FROM connection WHERE conn_id = 'pg_prod'. Если row exists — построить Connection object с Fernet-decrypted password/extra. Это finalфаллбэк.
Hook gets Connection objectHook теперь имеет full Connection. Создаёт actual client (psycopg2.connect, boto3.client, etc.) и возвращает caller-у. Если ни один stage не вернул — AirflowNotFoundException.

Tip: order менять нельзя — он hard-coded в airflow.secrets.local_filesystem / airflow.secrets.environment_variables / airflow.secrets.metastore. Но можно конфигурировать multiple backends: backend = m1.M1Backend,m2.M2Backend (comma-separated) — пробуются по порядку.


VaultBackend basic setup

Установка provider:

pip install apache-airflow-providers-hashicorp

Конфиг в airflow.cfg:

[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
  "connections_path": "connections",
  "variables_path": "variables",
  "mount_point": "airflow",
  "url": "https://vault.internal:8200",
  "auth_type": "token",
  "token": "hvs.XXXXXX",
  "kv_engine_version": 2
}

Pathing convention в Vault для этого config:

airflow/connections/pg_prod_warehouse  → Connection 'pg_prod_warehouse'
airflow/connections/snowflake_prod      → Connection 'snowflake_prod'
airflow/variables/api_endpoint          → Variable 'api_endpoint'
airflow/variables/etl_config            → Variable 'etl_config'

mount_point — Vault concept: KV engine mounted at path. airflow/ — обычная convention.

kv_engine_version: Vault имеет KV v1 (legacy, no versioning) и KV v2 (versioned, soft delete). Используйте KV v2 для production — даёт possibility rollback compromised value и audit history.


Auth modes

Авторизация Airflow → Vault — критичный момент. Token напрямую в config — anti-pattern (где этот token секурно хранить?). Vault предлагает несколько secure modes.

1. Kubernetes Service Account (best для K8s deployment)

Vault может trust K8s service account tokens. Airflow pods запускаются с ServiceAccount → Vault kubernetes auth mode verifies SA token через Kubernetes TokenReview API → выдаёт Vault token.

backend_kwargs = {
  "url": "https://vault.internal:8200",
  "auth_type": "kubernetes",
  "kubernetes_role": "airflow-prod",
  "kubernetes_jwt_path": "/var/run/secrets/kubernetes.io/serviceaccount/token",
  "mount_point": "airflow"
}

Setup в Vault:

# Enable kubernetes auth
vault auth enable kubernetes

# Configure
vault write auth/kubernetes/config \
  kubernetes_host="https://kubernetes.default.svc" \
  kubernetes_ca_cert=@/path/to/ca.crt

# Create role
vault write auth/kubernetes/role/airflow-prod \
  bound_service_account_names=airflow \
  bound_service_account_namespaces=airflow \
  policies=airflow-policy \
  ttl=1h

No secrets в Airflow config — auth идёт через SA token, который k8s автоматически inject в pod. Это gold standard для production.

2. AppRole (для non-K8s deployments)

Vault выдаёт пару role_id (public) + secret_id (secret). Airflow использует обе для получения token.

backend_kwargs = {
  "url": "https://vault.internal:8200",
  "auth_type": "approle",
  "role_id": "abc-123-...",
  "secret_id": "xyz-789-...",
  "mount_point": "airflow"
}

secret_id чувствительный — храните в k8s Secret / OS keychain. role_id может быть в config.

3. Token (только для dev)

backend_kwargs = {
  "url": "http://localhost:8200",
  "auth_type": "token",
  "token": "hvs.dev-token",
  "mount_point": "airflow"
}

Token имеет TTL — нужен refresh logic. Airflow не renew-ит automatically для long-lived schedulers — token expired = backend lookup fails. Только для local dev.


Что Vault хранит

Connection in Vault — это JSON с теми же полями, что Connection object:

{
  "conn_type": "postgres",
  "login": "airflow",
  "password": "actualPassword",
  "host": "db.internal",
  "port": 5432,
  "schema": "warehouse",
  "extra": {
    "sslmode": "require",
    "connect_timeout": 10
  }
}

CLI для put:

vault kv put airflow/connections/pg_prod_warehouse \
  conn_type=postgres \
  login=airflow \
  password=actualPassword \
  host=db.internal \
  port=5432 \
  schema=warehouse \
  extra='{"sslmode":"require"}'

Variable — plain value (string):

vault kv put airflow/variables/api_endpoint value="https://api.example.com/v2"

С deserialize_json=True Airflow попытается JSON-парсить value field — храните JSON string внутри:

vault kv put airflow/variables/etl_config \
  value='{"batch_size": 1000, "retries": 3}'

Dynamic Secrets для DB credentials

Killer feature Vault — dynamic secrets. Идея: вместо static password в Vault, Vault сам создаёт ephemeral DB user при запросе credentials, и удаляет user через TTL.

Dynamic DB Credentials через Vault
Airflow task startsWorker запускает task. Hook.get_connection('pg_prod_warehouse') resolves через VaultBackend. Path: airflow/connections/pg_prod_warehouse. Это static path (КОНФИГ), но Vault может конфигурировать его читать из dynamic secrets engine.
VaultBackend → Vault APIGET /v1/airflow/data/connections/pg_prod_warehouse через HVAC client. Если этот path resolves через 'database' secrets engine (не KV), Vault dynamically creates new DB user.
Vault internal magic
Vault: CREATE USERVault выполняет CREATE USER 'v-airflow-abc123' WITH PASSWORD 'xyz' VALID UNTIL '2026-05-12 14:00'; GRANT SELECT ON ALL TABLES IN SCHEMA public TO 'v-airflow-abc123'; — на target Postgres. Это ephemeral user — exists только 1 hour (или TTL config).
Vault returns credentialsResponse: {login: 'v-airflow-abc123', password: 'xyz', lease_id: '...', lease_duration: 3600}. Airflow получает Connection с этими credentials и подключается к DB. Lease tracked Vault-ом — после expire Vault DROP USER autoматически.
После TTL
Vault: DROP USERПосле lease expire (default 1h) Vault выполняет revoke: DROP USER 'v-airflow-abc123'. User полностью удалён из DB. Если task ещё бежит — connection обрывается (новый Hook init создаст новый ephemeral user).

Vault database secrets engine setup:

# Enable database secrets engine
vault secrets enable database

# Configure connection к Postgres (использует "master" credentials)
vault write database/config/airflow-postgres \
  plugin_name=postgresql-database-plugin \
  allowed_roles="airflow-readwrite" \
  connection_url="postgresql://{{username}}:{{password}}@db.internal:5432/warehouse" \
  username="vault-admin" \
  password="vault-admin-pwd"

# Define role с creation statements
vault write database/roles/airflow-readwrite \
  db_name=airflow-postgres \
  creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'; \
                       GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA public TO \"{{name}}\";" \
  default_ttl="1h" \
  max_ttl="24h"

Теперь любой vault read database/creds/airflow-readwrite создаёт ephemeral user.

Преимущества dynamic secrets:

  • No long-lived passwords — каждый task получает свой credentials
  • Automatic rotation — TTL expire = user dropped, нет manual rotation
  • Granular audit — Vault audit log показывает, кто и когда читал secret
  • Damage containment — если credentials leaked, lease revoke kills их немедленно (vault lease revoke -prefix database/creds/airflow-readwrite)

Trade-offs:

  • Сложность setup и operations
  • Каждый Hook init = new DB user creation (overhead ~50-200ms на Postgres CREATE USER)
  • Postgres has limits на user count (для very high task throughput может быть bottleneck)
  • Не все DB engines поддерживают (Snowflake, ClickHouse, Trino — есть, но check provider docs)

В Airflow можно интегрировать через dynamic backend wrapper — Custom subclass of VaultBackend, который вместо kv read делает database/creds/<role> read. Это advanced pattern, обычно для secret-sensitive environments (finance, healthcare).


Failure modes

Что если Vault down? Backend implementations differ, но VaultBackend by default будет:

  1. 5xx error: Throws exception → get_connection fails → Hook init fails → Task fails (or retries).
  2. 404 not found: Returns None → fallback to ENV → DB.
  3. Auth token expired: Re-auth attempt → если fail, throws.

Best practice — wrap Vault HA cluster (3+ nodes), short connection timeouts (5-10s), Airflow-side retries through urllib3 retry adapter. Backend имеет connections_lookup_pattern / variables_lookup_pattern regex configs — можно ограничить какие conn_id вообще пытаются Vault, чтобы избежать unnecessary lookups (например, ^prod_.* — только prod connections).


use_cache + Vault

С [secrets] use_cache = True (рекомендуется для production):

  • Per-process cache (in scheduler / worker / triggerer)
  • TTL default 900s (15 min)
  • Существенно снижает нагрузку на Vault

Trade-off: после vault kv put нового значения — не подхватывается до TTL expire или process restart. Для secrets rotation это обычно OK (rotation медленнее cache TTL). Для часто меняющихся configs может быть проблема.


VaultBackend конфигурация — полный пример (K8s)

airflow.cfg:

[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {
  "url": "https://vault.svc.cluster.local:8200",
  "auth_type": "kubernetes",
  "kubernetes_role": "airflow-prod",
  "mount_point": "airflow",
  "connections_path": "connections",
  "variables_path": "variables",
  "config_path": null,
  "kv_engine_version": 2,
  "connections_lookup_pattern": "^prod_.*",
  "variables_lookup_pattern": "^prod_.*"
}

use_cache = True
cache_ttl_seconds = 600

connections_lookup_pattern = "^prod_.*" — только conn_id с prefix prod_ идут в Vault. dev_* или staging_* сразу falback на ENV/DB. Это значительно снижает RPS на Vault и упрощает namespace separation.


Проверка знанийKnowledge check
Production audit показал, что Vault получает 10к RPS от Airflow cluster (3 scheduler + 20 worker + 2 triggerer). Vault cluster на пределе. Что вы предпринимаете для снижения нагрузки без потери security и без отказа от Vault?
ОтветAnswer
Многошаговая стратегия. (1) **Enable use_cache + tune TTL**: Set [secrets] use_cache=True, cache_ttl_seconds=900. Cache process-local — каждый pod имеет свой in-memory cache. Снижает rate в десятки раз (один Vault hit на key за 15 минут на pod). (2) **Audit top-level Variable.get / Connection lookup в DAGs**: типичный root cause — top-level Variable.get в DAG-файле выполняется каждый parse cycle (30s). Перенести внутрь @task callable. (3) **connections_lookup_pattern**: ограничить какие conn_id вообще идут в Vault. Например, '^prod_.*' — dev/staging connections идут в DB напрямую, не нагружают Vault. (4) **min_file_process_interval up**: с 30s до 60-120s — снижает parse rate в 2-4 раза (но slower DAG change detection). (5) **Vault scaling**: добавить Vault read replicas (Performance Standby nodes — read scaled, writes идут на primary). Airflow read-only flow — это идеально под performance replicas. (6) **Investigate hot paths**: какой именно conn_id или variable читается чаще всего? Возможно, конкретный DAG factory pattern или sensor polling вызывает spike. Vault audit log → identify top callers. (7) **Long-term**: рассмотреть переезд hot configs (не secrets) на ENV vars — нет network round-trip вообще. Vault только для actual secrets (DB passwords, API keys), ENV для feature flags / batch sizes.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. BaseSecretsBackend контракт — что должен реализовать каждый backend?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7