Learning Platform
Глоссарий Troubleshooting
Урок 11.04 · 24 мин
Продвинутый
VariableLookup Orderuse_cacheJSONDAG Parse

Variables — storage, lookup и пара жёстких pitfalls

Variable в Airflow — это именованное конфигурационное значение (строка или JSON), которое DAG может читать на runtime. В отличие от Connection (где есть schema с обязательными полями), Variable — это plain key→value store. Используется для feature flags, environment-specific configs, schedule overrides, API endpoints — всего, что не credentials.

Этот урок разбирает: как Variable хранится в DB, lookup chain Backend → ENV → DB, JSON deserialization, use_cache, и самый болезненный pitfall — Variable.get в top-level DAG-файле.


Storage

Variable хранится в таблице variable:

CREATE TABLE variable (
    id              SERIAL PRIMARY KEY,
    key             VARCHAR(250) UNIQUE NOT NULL,
    val             TEXT,             -- ← Fernet encrypted (если is_encrypted=true)
    description     TEXT,
    is_encrypted    BOOLEAN
);

Поле val шифруется Fernet только если у Airflow есть валидный fernet_key и при создании is_encrypted=true. Значение всегда строка — если хотите хранить dict/list, нужно явно сериализовать в JSON.


Basic usage

from airflow.models import Variable

# Read
value = Variable.get("api_endpoint")  # str
default_value = Variable.get("flag", default_var="off")  # default если key not found

# Write
Variable.set("api_endpoint", "https://api.example.com/v2")

# JSON
config = Variable.get("etl_config", deserialize_json=True)
# {"batch_size": 1000, "retries": 3, "buckets": ["s3://a", "s3://b"]}

# Delete
Variable.delete("api_endpoint")

С deserialize_json=True Airflow вызывает json.loads(value) после чтения и возвращает Python object.

В TaskFlow API можно прокидывать через templated arguments:

@task
def process_batch(config: dict):
    batch_size = config["batch_size"]
    ...

@dag(...)
def my_dag():
    config = Variable.get("etl_config", deserialize_json=True)  # ⚠ см. ниже
    process_batch(config)

Однако здесь притаился крупный pitfall — top-level Variable.get. Разбираем ниже.


Lookup chain: Backend → ENV → DB

Variable.get(key) под капотом запускает точно такую же chain, что и Connection.get_connection_from_secrets:

# Псевдокод
@classmethod
def get_variable(cls, key: str) -> str | None:
    # 1. Secrets Backend (если настроен)
    for backend in secrets_backend_list:  # типично один backend
        value = backend.get_variable(key)
        if value is not None:
            return value

    # 2. Environment variable
    env_key = f"AIRFLOW_VAR_{key.upper()}"
    if env_key in os.environ:
        return os.environ[env_key]

    # 3. Metadata DB
    var = session.query(Variable).filter_by(key=key).first()
    return var.val if var else None
Variable lookup chain
Variable.get('api_endpoint')Entry point. Внутри: Variable.get_variable_from_secrets(key). Запускает chain в strict order — первый не-None возврат wins, никаких merge.
Stage 1: Secrets BackendЕсли [secrets] backend настроен в airflow.cfg — пытается read из Vault / Secrets Manager / GCP Secret Manager. variables_path + key собирается в полный path. Если backend недоступен — fallback на следующий stage (зависит от backend_kwargs).
не найдено
Stage 2: EnvironmentConventional name: AIRFLOW_VAR_<KEY_UPPER>. Например, AIRFLOW_VAR_API_ENDPOINT. Strict uppercase — key 'api_endpoint' маппится в AIRFLOW_VAR_API_ENDPOINT. Если env var присутствует — её value возвращается как есть (no JSON parse, no Fernet decrypt).
не найдено
Stage 3: Metadata DBSELECT val FROM variable WHERE key = 'api_endpoint'. Если is_encrypted=true — расшифровывается Fernet перед return. Это самый медленный stage (~ms), запускается каждый раз без caching по default.
не найдено
Default или KeyErrorЕсли default_var передан в Variable.get — возвращается он. Иначе AirflowConfigException или KeyError (зависит от Airflow version). default_var=None также legitimate.

Важно: каждый stage может быть полностью пропущен в зависимости от config. Если Secrets Backend не настроен, stage 1 skip. Если ENV var отсутствует — stage 2 skip. Order fixed: Backend → ENV → DB.


ENV variable convention

Конвенция Airflow для variables:

# Variable key 'my_api_key' → env var:
export AIRFLOW_VAR_MY_API_KEY="secret_value"

# Variable key 'etl_config' (JSON) → env var:
export AIRFLOW_VAR_ETL_CONFIG='{"batch_size": 1000, "retries": 3}'

Обратите внимание:

  • Префикс AIRFLOW_VAR_ обязателен (без него env vars не подхватываются)
  • Key в env var uppercase
  • Underscores OK, dashes/dots — НЕ работают (env vars не позволяют их в стандартных shell)

Это удобный pattern для dev/local development: положили env vars в .env, не нужно DB или Vault.

Production caveat: env vars не encrypted в k8s. Если используете k8s Secret для AIRFLOW_VAR_* — оно сохранено в etcd plaintext (если etcd encryption-at-rest не включён). Лучше Vault для production secrets, env vars для config flags.


use_cache (caching strategy)

В Airflow 2.10/2.11 secrets backend имеет optional caching:

[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"url": "..."}

use_cache = True
cache_ttl_seconds = 900  # 15 минут

С use_cache = True:

  • Первый read Variable из Vault → cache (in-memory, process-local)
  • Следующие reads в течение 900 seconds — из cache, без HTTP к Vault
  • После TTL — re-fetch

В Airflow 2.10/2.11 use_cache = False по default (нужно явно включать). В Airflow 3.x default стал True.

NOTE

Cache — process-local. Каждый scheduler/worker/triggerer имеет свой in-memory cache. Это значит:

  1. Изменения Variable в UI не invalidate cache на workers — нужно ждать TTL expire или restart processes.
  2. Cache не consistent между schedulers — если у вас 3 schedulers, у каждого свой view of Variable.
  3. Memory overhead minimal — это dict, не больше нескольких MB на pod.

Top-level Variable.get pitfall — THE pitfall

Самая частая и болезненная ошибка новичков и опытных:

# anti-pattern.py
from airflow.decorators import dag, task
from airflow.models import Variable

# ⚠ TOP-LEVEL Variable.get
API_ENDPOINT = Variable.get("api_endpoint")  # ← выполняется при каждом DAG parse!
BATCH_SIZE = Variable.get("batch_size", deserialize_json=False)

@dag(schedule="@daily", start_date=datetime(2026, 1, 1))
def my_dag():
    @task
    def call_api():
        response = requests.get(API_ENDPOINT)
        ...

    call_api()

my_dag()

Что происходит:

DAG Processor парсит файл каждые min_file_process_interval = 30 секунд (default). Каждый parse:

  1. Импортирует .py → выполняется top-level code → Variable.get("api_endpoint") → Fernet decrypt + SQL query (или Vault HTTP call)
  2. То же для Variable.get("batch_size")

Для 100 DAGs с подобным паттерном:

  • 200 Variable.get calls × 2 раза в минуту = 400 calls/min
  • Если используется Vault — это 400 HTTP requests/min к Vault, под нагрузкой может завалить Vault (Vault rate limits, network bandwidth, latency growth)
  • Если DB — это SQL queries, дополнительная нагрузка на metadata DB
Top-level Variable.get pitfall
DAG Processor t=0sDAG Processor запускает file processor для каждого .py файла. Каждые 30s (min_file_process_interval) делает full re-parse. Тысячи строк Python кода исполняются — включая весь top-level.
Execute top-levelPython interpreter выполняет API_ENDPOINT = Variable.get(...). Это полноценный API call в Vault или SELECT в Postgres. Каждый Variable.get на top-level = N calls/min где N = total parse rate.
Variable.get('batch_size')Второй call. И так каждый Variable.get на top-level умножается на parse rate. Для 100 DAGs × 3 Variable.get × 2 раза/мин = 600 calls/min на Vault. Vault under load → 5xx → DAG processing breaks → cascade failures.
каждые 30s повторяется
Vault overloadПосле некоторого порога Vault начинает rate-limit (default 1000 req/s, но настраиваемо). 5xx errors → Variable.get raises → DAG parse fails → DAG помечен как broken. UI показывает 'Broken DAG' badge. Решение часто 'добавить Vault replicas' — лечение симптома, не причины.

Solutions:

Solution 1: Move inside task

Самое чистое и правильное:

@dag(schedule="@daily", start_date=datetime(2026, 1, 1))
def my_dag():
    @task
    def call_api():
        # Variable.get выполняется ТОЛЬКО при runtime task — не при parse
        api_endpoint = Variable.get("api_endpoint")
        response = requests.get(api_endpoint)
        ...

    call_api()

Variable.get внутри task body исполняется только когда task runs — то есть один раз на каждый DAG run, не каждые 30s. Для @daily DAG это 1 call/day против 2880 calls/day на parse.

Solution 2: Templated arguments (Jinja)

В operator kwargs Jinja templated args выполняются только при task execute:

PostgresOperator(
    task_id="extract"
    sql="SELECT * FROM orders WHERE batch_size = {{ var.value.batch_size }}",
    # var.value.batch_size — Jinja template, resolved at runtime, не parse
)

{{ var.value.X }} и {{ var.json.X }} — Jinja syntax для Variable resolution в templated fields.

Solution 3: Enable use_cache

Если top-level Variable.get невозможно убрать (legacy code, third-party DAG factory):

[secrets]
use_cache = True
cache_ttl_seconds = 900

С caching парсинг 100 DAGs делает один Variable.get на key за 15 минут → нагрузка падает на 30×.

Это не идеал (изменения Variable не подхватываются 15 минут), но pragmatic для legacy кодовой базы.

Solution 4: ENV vars для config

Static config (не secrets, не часто меняющиеся) лучше хранить в ENV:

# Set один раз при scheduler/worker startup
export AIRFLOW_VAR_API_ENDPOINT="https://api.example.com/v2"

Каждый Variable.get теперь читает из os.environ (in-memory) — это наносекунды, никакой нагрузки на backend.


Variable vs Connection — когда что

АспектVariableConnection
Schemafree-form string/JSONstructured (host, port, login, password, extra)
Encryptionoptional (Fernet, key based)password/extra encrypted, остальное plaintext
Hook resolutionнетHook.get_connection() уже знает как использовать
ENV conventionAIRFLOW_VAR_<KEY>AIRFLOW_CONN_<KEY>
UI editingда, simple formда, structured form с conn_type-specific fields
Use casefeature flags, configs, schedule overridesDB / API / cloud credentials

Правило: если значение представляет «как подключиться к системе» (credentials + endpoint) — это Connection. Если значение — конфигурация поведения (batch size, feature flag, list of buckets) — это Variable.

Анти-паттерн: хранить database password в Variable — теряете Hook integration, должны рукой передавать host/port/etc.


Production checklist

  • НЕ используйте Variable.get() на top-level DAG-файла (даже если у вас 5 DAGs — это привычка)
  • Включите [secrets] use_cache = True (с разумным TTL) для production
  • Для config (не secrets) — ENV vars AIRFLOW_VAR_*
  • Для secrets — Secrets Backend (Vault / Secrets Manager), не Variable в DB
  • Naming: <env>_<system>_<purpose> (например prod_etl_batch_size)
  • Audit: log изменений Variable через Listener API (Module 12) — кто и когда менял
  • НЕ кладите большие values в Variable — column val TEXT без size limit, но JSON > 100 KB замедлит UI rendering

Проверка знанийKnowledge check
DAG factory pattern генерирует 50 DAGs из YAML файла, top-level: `for cfg in configs: Variable.get(f'{cfg.name}_settings', deserialize_json=True)`. Production scheduler начинает падать с timeout-ами после migration на Vault. Что произошло и как пофиксить?
ОтветAnswer
Что произошло: DAG Processor парсит файлы каждые 30s (default min_file_process_interval). Каждый parse выполняет top-level loop: 50 Variable.get → 50 Vault HTTP calls. Для каждого parse cycle. Если у вас 4 file processor (parsing_processes=4), это 50×4 = 200 calls в течение каждого 30s window = ~400 RPS к Vault. Это часто превышает Vault rate limit или вызывает thread pool exhaustion. Раньше с DB metadata это были fast local SQL queries, теперь — HTTP к external Vault. Fix: (1) Enable [secrets] use_cache = True + cache_ttl_seconds = 600 — снижает effective rate в десятки раз. (2) Increase min_file_process_interval до 60-120s — но это slows down detection of DAG changes. (3) **Лучший fix**: rewrite DAG factory чтобы Variable.get выполнялся ВНУТРИ task callable (lazy resolution) — нагрузка падает с 'каждые 30s × 50' до 'один раз при run'. (4) Если configs static — переместить в YAML/JSON файл рядом с DAG (читается через open() — это просто disk read). (5) Если configs dynamic — рассмотреть Asset/Dataset triggering вместо polling Variable. (6) Vault side: scale Vault cluster + increase rate limits + add monitoring на slow secret fetch.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Variable.get('x') lookup order в Airflow 2.10/2.11?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7