Variables — storage, lookup и пара жёстких pitfalls
Variable в Airflow — это именованное конфигурационное значение (строка или JSON), которое DAG может читать на runtime. В отличие от Connection (где есть schema с обязательными полями), Variable — это plain key→value store. Используется для feature flags, environment-specific configs, schedule overrides, API endpoints — всего, что не credentials.
Этот урок разбирает: как Variable хранится в DB, lookup chain Backend → ENV → DB, JSON deserialization, use_cache, и самый болезненный pitfall — Variable.get в top-level DAG-файле.
Storage
Variable хранится в таблице variable:
CREATE TABLE variable (
id SERIAL PRIMARY KEY,
key VARCHAR(250) UNIQUE NOT NULL,
val TEXT, -- ← Fernet encrypted (если is_encrypted=true)
description TEXT,
is_encrypted BOOLEAN
);
Поле val шифруется Fernet только если у Airflow есть валидный fernet_key и при создании is_encrypted=true. Значение всегда строка — если хотите хранить dict/list, нужно явно сериализовать в JSON.
Basic usage
from airflow.models import Variable
# Read
value = Variable.get("api_endpoint") # str
default_value = Variable.get("flag", default_var="off") # default если key not found
# Write
Variable.set("api_endpoint", "https://api.example.com/v2")
# JSON
config = Variable.get("etl_config", deserialize_json=True)
# {"batch_size": 1000, "retries": 3, "buckets": ["s3://a", "s3://b"]}
# Delete
Variable.delete("api_endpoint")
С deserialize_json=True Airflow вызывает json.loads(value) после чтения и возвращает Python object.
В TaskFlow API можно прокидывать через templated arguments:
@task
def process_batch(config: dict):
batch_size = config["batch_size"]
...
@dag(...)
def my_dag():
config = Variable.get("etl_config", deserialize_json=True) # ⚠ см. ниже
process_batch(config)
Однако здесь притаился крупный pitfall — top-level Variable.get. Разбираем ниже.
Lookup chain: Backend → ENV → DB
Variable.get(key) под капотом запускает точно такую же chain, что и Connection.get_connection_from_secrets:
# Псевдокод
@classmethod
def get_variable(cls, key: str) -> str | None:
# 1. Secrets Backend (если настроен)
for backend in secrets_backend_list: # типично один backend
value = backend.get_variable(key)
if value is not None:
return value
# 2. Environment variable
env_key = f"AIRFLOW_VAR_{key.upper()}"
if env_key in os.environ:
return os.environ[env_key]
# 3. Metadata DB
var = session.query(Variable).filter_by(key=key).first()
return var.val if var else None
Важно: каждый stage может быть полностью пропущен в зависимости от config. Если Secrets Backend не настроен, stage 1 skip. Если ENV var отсутствует — stage 2 skip. Order fixed: Backend → ENV → DB.
ENV variable convention
Конвенция Airflow для variables:
# Variable key 'my_api_key' → env var:
export AIRFLOW_VAR_MY_API_KEY="secret_value"
# Variable key 'etl_config' (JSON) → env var:
export AIRFLOW_VAR_ETL_CONFIG='{"batch_size": 1000, "retries": 3}'
Обратите внимание:
- Префикс
AIRFLOW_VAR_обязателен (без него env vars не подхватываются) - Key в env var uppercase
- Underscores OK, dashes/dots — НЕ работают (env vars не позволяют их в стандартных shell)
Это удобный pattern для dev/local development: положили env vars в .env, не нужно DB или Vault.
Production caveat: env vars не encrypted в k8s. Если используете k8s Secret для AIRFLOW_VAR_* — оно сохранено в etcd plaintext (если etcd encryption-at-rest не включён). Лучше Vault для production secrets, env vars для config flags.
use_cache (caching strategy)
В Airflow 2.10/2.11 secrets backend имеет optional caching:
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"url": "..."}
use_cache = True
cache_ttl_seconds = 900 # 15 минут
С use_cache = True:
- Первый read Variable из Vault → cache (in-memory, process-local)
- Следующие reads в течение 900 seconds — из cache, без HTTP к Vault
- После TTL — re-fetch
В Airflow 2.10/2.11 use_cache = False по default (нужно явно включать). В Airflow 3.x default стал True.
Cache — process-local. Каждый scheduler/worker/triggerer имеет свой in-memory cache. Это значит:
- Изменения Variable в UI не invalidate cache на workers — нужно ждать TTL expire или restart processes.
- Cache не consistent между schedulers — если у вас 3 schedulers, у каждого свой view of Variable.
- Memory overhead minimal — это dict, не больше нескольких MB на pod.
Top-level Variable.get pitfall — THE pitfall
Самая частая и болезненная ошибка новичков и опытных:
# anti-pattern.py
from airflow.decorators import dag, task
from airflow.models import Variable
# ⚠ TOP-LEVEL Variable.get
API_ENDPOINT = Variable.get("api_endpoint") # ← выполняется при каждом DAG parse!
BATCH_SIZE = Variable.get("batch_size", deserialize_json=False)
@dag(schedule="@daily", start_date=datetime(2026, 1, 1))
def my_dag():
@task
def call_api():
response = requests.get(API_ENDPOINT)
...
call_api()
my_dag()
Что происходит:
DAG Processor парсит файл каждые min_file_process_interval = 30 секунд (default). Каждый parse:
- Импортирует .py → выполняется top-level code →
Variable.get("api_endpoint")→ Fernet decrypt + SQL query (или Vault HTTP call) - То же для
Variable.get("batch_size")
Для 100 DAGs с подобным паттерном:
- 200 Variable.get calls × 2 раза в минуту = 400 calls/min
- Если используется Vault — это 400 HTTP requests/min к Vault, под нагрузкой может завалить Vault (Vault rate limits, network bandwidth, latency growth)
- Если DB — это SQL queries, дополнительная нагрузка на metadata DB
Solutions:
Solution 1: Move inside task
Самое чистое и правильное:
@dag(schedule="@daily", start_date=datetime(2026, 1, 1))
def my_dag():
@task
def call_api():
# Variable.get выполняется ТОЛЬКО при runtime task — не при parse
api_endpoint = Variable.get("api_endpoint")
response = requests.get(api_endpoint)
...
call_api()
Variable.get внутри task body исполняется только когда task runs — то есть один раз на каждый DAG run, не каждые 30s. Для @daily DAG это 1 call/day против 2880 calls/day на parse.
Solution 2: Templated arguments (Jinja)
В operator kwargs Jinja templated args выполняются только при task execute:
PostgresOperator(
task_id="extract"
sql="SELECT * FROM orders WHERE batch_size = {{ var.value.batch_size }}",
# var.value.batch_size — Jinja template, resolved at runtime, не parse
)
{{ var.value.X }} и {{ var.json.X }} — Jinja syntax для Variable resolution в templated fields.
Solution 3: Enable use_cache
Если top-level Variable.get невозможно убрать (legacy code, third-party DAG factory):
[secrets]
use_cache = True
cache_ttl_seconds = 900
С caching парсинг 100 DAGs делает один Variable.get на key за 15 минут → нагрузка падает на 30×.
Это не идеал (изменения Variable не подхватываются 15 минут), но pragmatic для legacy кодовой базы.
Solution 4: ENV vars для config
Static config (не secrets, не часто меняющиеся) лучше хранить в ENV:
# Set один раз при scheduler/worker startup
export AIRFLOW_VAR_API_ENDPOINT="https://api.example.com/v2"
Каждый Variable.get теперь читает из os.environ (in-memory) — это наносекунды, никакой нагрузки на backend.
Variable vs Connection — когда что
| Аспект | Variable | Connection |
|---|---|---|
| Schema | free-form string/JSON | structured (host, port, login, password, extra) |
| Encryption | optional (Fernet, key based) | password/extra encrypted, остальное plaintext |
| Hook resolution | нет | Hook.get_connection() уже знает как использовать |
| ENV convention | AIRFLOW_VAR_<KEY> | AIRFLOW_CONN_<KEY> |
| UI editing | да, simple form | да, structured form с conn_type-specific fields |
| Use case | feature flags, configs, schedule overrides | DB / API / cloud credentials |
Правило: если значение представляет «как подключиться к системе» (credentials + endpoint) — это Connection. Если значение — конфигурация поведения (batch size, feature flag, list of buckets) — это Variable.
Анти-паттерн: хранить database password в Variable — теряете Hook integration, должны рукой передавать host/port/etc.
Production checklist
- НЕ используйте
Variable.get()на top-level DAG-файла (даже если у вас 5 DAGs — это привычка) - Включите
[secrets] use_cache = True(с разумным TTL) для production - Для config (не secrets) — ENV vars
AIRFLOW_VAR_* - Для secrets — Secrets Backend (Vault / Secrets Manager), не Variable в DB
- Naming:
<env>_<system>_<purpose>(напримерprod_etl_batch_size) - Audit: log изменений Variable через Listener API (Module 12) — кто и когда менял
- НЕ кладите большие values в Variable — column
val TEXTбез size limit, но JSON > 100 KB замедлит UI rendering