Connections fundamentals
Connection в Airflow — это именованная запись credentials и параметров доступа к внешней системе. Postgres, S3, Snowflake, Slack webhook — для каждой внешней системы у вас есть отдельный conn_id. Code на это ссылается строкой: PostgresOperator(postgres_conn_id="warehouse_dwh") — Airflow на runtime достанет credentials, расшифрует и передаст в Hook.
Этот урок препарирует, как Connection устроен внутри: формат URI, какие поля хранятся в connection таблице, как BaseHook.get_connection(conn_id) резолвит запись через цепочку Backend → ENV → DB, и какие подводные камни ждут с special characters в паролях.
Модель данных
Таблица connection в metadata DB:
CREATE TABLE connection (
id SERIAL PRIMARY KEY,
conn_id VARCHAR(250) UNIQUE NOT NULL,
conn_type VARCHAR(500) NOT NULL,
description TEXT,
host VARCHAR(500),
schema VARCHAR(500),
login VARCHAR(500),
password VARCHAR(5000), -- ← Fernet encrypted!
port INTEGER,
is_encrypted BOOLEAN,
is_extra_encrypted BOOLEAN,
extra TEXT -- ← Fernet encrypted JSON
);
Два поля шифруются Fernet (см. урок 03): password и extra. Остальные поля хранятся открытым текстом — поэтому host, login, schema, port видны в pg_dump без расшифровки.
conn_type — это строка-discriminator, по которой Airflow выбирает Hook class:
postgres→airflow.providers.postgres.hooks.postgres.PostgresHookaws→airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHookhttp→airflow.providers.http.hooks.http.HttpHooksnowflake→airflow.providers.snowflake.hooks.snowflake.SnowflakeHook
Mapping conn_type → Hook class регистрируется через provider’s provider.yaml и собирается в ProvidersManager при scheduler startup.
Kubernetes Secrets — типы, base64 и encryption at rest
Connection URI format
Альтернативная форма представления Connection — единая URI-строка. Это формат для environment variables (AIRFLOW_CONN_<NAME>), а также для CLI airflow connections add:
<conn_type>://<login>:<password>@<host>:<port>/<schema>?<extra_json_or_params>
Пример Postgres connection:
postgres://airflow:[email protected]:5432/warehouse?sslmode=require&connect_timeout=10
Парсинг этой строки даёт:
В коде:
from airflow.models import Connection
conn = Connection(
conn_id="warehouse_dwh"
conn_type="postgres"
host="db.internal"
port=5432,
login="airflow"
password="s3cr3t"
schema="warehouse"
extra='{"sslmode": "require", "connect_timeout": 10}',
)
# Эквивалентно:
conn = Connection(
conn_id="warehouse_dwh"
uri="postgres://airflow:[email protected]:5432/warehouse?sslmode=require&connect_timeout=10",
)
Hooks resolution через BaseHook.get_connection
Когда оператор/таска нуждается в connection, цепочка такая:
# Внутри PostgresHook.__init__
from airflow.hooks.base import BaseHook
class PostgresHook(BaseHook):
def get_conn(self):
conn = self.get_connection(self.postgres_conn_id) # ← вот эта строка
# conn — это уже Connection object с расшифрованными password/extra
return psycopg2.connect(
host=conn.host,
port=conn.port,
user=conn.login,
password=conn.password,
dbname=conn.schema,
**conn.extra_dejson,
)
BaseHook.get_connection(conn_id) — это священный entry point. Внутри:
@classmethod
def get_connection(cls, conn_id: str) -> Connection:
conn = Connection.get_connection_from_secrets(conn_id)
return conn
И уже Connection.get_connection_from_secrets(conn_id) запускает lookup chain Backend → ENV → DB. Этот lookup детально разбирается в уроке 04 и 05.
Testing connection: UI / CLI / Python
Airflow 2.10/2.11 имеет Test button в UI на странице Connections:
CLI:
# Add connection из URI
airflow connections add 'warehouse_dwh' \
--conn-uri 'postgres://airflow:[email protected]:5432/warehouse?sslmode=require'
# List
airflow connections list
# Get
airflow connections get warehouse_dwh
# Test (Airflow 2.7+)
airflow connections test warehouse_dwh
# Export (для backup)
airflow connections export connections.json --file-format json
# Delete
airflow connections delete warehouse_dwh
По умолчанию в Airflow 2.10/2.11 Test Connection через UI DISABLED (после CVE про SSRF через misuse). Чтобы включить — в airflow.cfg [core] установить test_connection = Enabled. Альтернативно Hidden (default) или Disabled (полностью блокировать). Это правильная безопасная default: testing в UI выполняется на webserver-е, который мог не иметь netw. access к target системам.
Pitfall #1: Special characters в password
Самая частая боль с Connections — special characters в password. Пример:
# Password содержит @ — ломает URI parsing
AIRFLOW_CONN_DB='postgres://user:p@ss@host:5432/db'
# ^^^^ password
# ^^^^ host? login?
Парсер увидит user как login, p как password, ss@host как host — broken.
Правильное решение: URL-encode password:
from urllib.parse import quote_plus
password = "p@ss/word#1!"
encoded = quote_plus(password) # 'p%40ss%2Fword%231%21'
uri = f"postgres://user:{encoded}@host:5432/db"
Список символов, которые нужно encode-ить: @ / : # ? & = + space.
В UI Airflow эту проблему не имеет — password идёт как отдельное поле формы, не парсится из URI.
Pitfall #2: extra JSON и nested structures
Поле extra хранится как JSON string. Когда вы добавляете connection через CLI или ENV:
# Правильно — JSON в extra
AIRFLOW_CONN_S3='{"conn_type":"aws","login":"AKIAXX","password":"yyy","extra":{"region_name":"eu-west-1"}}'
# Также правильно через URI (extra как query string flat):
AIRFLOW_CONN_S3='aws://AKIAXX:yyy@?region_name=eu-west-1'
Когда reading из Python:
conn = BaseHook.get_connection("s3")
extra_dict = conn.extra_dejson # ← парсит JSON в dict
region = extra_dict.get("region_name")
conn.extra — это raw JSON string. conn.extra_dejson — parsed dict. Always use extra_dejson в коде Hook-ов, никогда json.loads(conn.extra) руками — extra_dejson обрабатывает edge cases (empty, malformed).
Pitfall #3: conn_id ≠ host
Частая путаница новичков:
# WRONG
PostgresOperator(postgres_conn_id="db.internal:5432/warehouse", ...)
# RIGHT
PostgresOperator(postgres_conn_id="warehouse_dwh", ...)
# где warehouse_dwh — это conn_id в Connections таблице
conn_id — это logical name (alias). Hook берёт его и резолвит в фактический Connection через get_connection_from_secrets. Это даёт environment portability: один и тот же DAG в dev/staging/prod ссылается на warehouse_dwh, а реальный host/credentials меняются в каждой среде.
Pitfall #4: provider package missing
Если для conn_type не установлен provider package, lookup упадёт на runtime:
ERROR: Connection conn_id='snowflake_prod', conn_type='snowflake'
requires apache-airflow-providers-snowflake to be installed
Это особенно болезненно при upgrade — в Airflow 2.0 был monorepo, в 2.x провайдеры выделены в отдельные pip packages. После pip install apache-airflow==2.11.0 нужно явно установить нужные providers:
pip install apache-airflow-providers-postgres \
apache-airflow-providers-amazon \
apache-airflow-providers-snowflake \
apache-airflow-providers-http
В constraints файле (pip install -c <constraints_url>) указаны pinned versions providers, совместимые с core Airflow версией.
Использование Connection вне Hooks
Иногда нужно достать credentials в произвольном Python коде:
from airflow.hooks.base import BaseHook
@task
def call_external_api():
conn = BaseHook.get_connection("external_api")
api_key = conn.password
base_url = conn.host
response = requests.get(
f"{base_url}/orders"
headers={"Authorization": f"Bearer {api_key}"},
timeout=30,
)
return response.json()
Это работает потому, что BaseHook.get_connection — classmethod и не требует instance.
Альтернативно — используйте HttpHook напрямую, который сам управляет sessions и retries.
Production checklist
- conn_id naming convention:
<system>_<env>_<purpose>(напримерpg_prod_warehouse) - Используйте Secrets Backend для production (см. урок 05) — никогда UI для real secrets
- В extra храните provider-specific config, не secrets
- Test Connection feature только если webserver имеет network access к target системам
- Для caching connection lookup —
[secrets] use_cache = Trueвairflow.cfg - Audit любые ручные модификации через UI — в production это red flag