Learning Platform
Глоссарий Troubleshooting
Урок 11.02 · 22 мин
Продвинутый
ConnectionHookURIBaseHookconn_type

Connections fundamentals

Connection в Airflow — это именованная запись credentials и параметров доступа к внешней системе. Postgres, S3, Snowflake, Slack webhook — для каждой внешней системы у вас есть отдельный conn_id. Code на это ссылается строкой: PostgresOperator(postgres_conn_id="warehouse_dwh") — Airflow на runtime достанет credentials, расшифрует и передаст в Hook.

Этот урок препарирует, как Connection устроен внутри: формат URI, какие поля хранятся в connection таблице, как BaseHook.get_connection(conn_id) резолвит запись через цепочку Backend → ENV → DB, и какие подводные камни ждут с special characters в паролях.


Модель данных

Таблица connection в metadata DB:

CREATE TABLE connection (
    id              SERIAL PRIMARY KEY,
    conn_id         VARCHAR(250) UNIQUE NOT NULL,
    conn_type       VARCHAR(500) NOT NULL,
    description     TEXT,
    host            VARCHAR(500),
    schema          VARCHAR(500),
    login           VARCHAR(500),
    password        VARCHAR(5000),  -- ← Fernet encrypted!
    port            INTEGER,
    is_encrypted    BOOLEAN,
    is_extra_encrypted BOOLEAN,
    extra           TEXT             -- ← Fernet encrypted JSON
);

Два поля шифруются Fernet (см. урок 03): password и extra. Остальные поля хранятся открытым текстом — поэтому host, login, schema, port видны в pg_dump без расшифровки.

conn_type — это строка-discriminator, по которой Airflow выбирает Hook class:

  • postgresairflow.providers.postgres.hooks.postgres.PostgresHook
  • awsairflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
  • httpairflow.providers.http.hooks.http.HttpHook
  • snowflakeairflow.providers.snowflake.hooks.snowflake.SnowflakeHook

Mapping conn_type → Hook class регистрируется через provider’s provider.yaml и собирается в ProvidersManager при scheduler startup.


Kubernetes Secrets — типы, base64 и encryption at rest

Connection URI format

Альтернативная форма представления Connection — единая URI-строка. Это формат для environment variables (AIRFLOW_CONN_<NAME>), а также для CLI airflow connections add:

<conn_type>://<login>:<password>@<host>:<port>/<schema>?<extra_json_or_params>

Пример Postgres connection:

postgres://airflow:[email protected]:5432/warehouse?sslmode=require&connect_timeout=10

Парсинг этой строки даёт:

Парсинг Connection URI
conn_typeПрефикс до :// определяет conn_type. Должен совпадать с зарегистрированным provider hook type. Если префикс неизвестен — Airflow примет его как generic, но Hook resolution упадёт.
loginUser part до : в auth блоке. URL-decoded automatically. Может содержать spaces, но только в URL-encoded форме (%20).
passwordПосле : до @. URL-decoded. Это поле зашифровано Fernet в DB, но в URI оно plaintext — поэтому AIRFLOW_CONN_* env vars надо защищать в Vault/Secret Manager.
hostПосле @ до : или /. Может быть DNS name, IP, или Unix socket path (URL-encoded /). Postgres conn_type специально парсит host=/var/run/postgresql формат.
portПосле : в host блоке. Опционален — если отсутствует, Hook применяет default (5432 для Postgres). Integer.
schemaPath component / (после хоста). В Postgres это database name (исторический misnamer от SQLAlchemy). В Snowflake это database. В S3 — bucket name.
extraQuery string после ? собирается в dict и сохраняется как JSON в extra поле. Postgres-specific: sslmode, connect_timeout, sslcert, sslkey. Provider-specific keys документированы в provider docs.

В коде:

from airflow.models import Connection

conn = Connection(
    conn_id="warehouse_dwh"
    conn_type="postgres"
    host="db.internal"
    port=5432,
    login="airflow"
    password="s3cr3t"
    schema="warehouse"
    extra='{"sslmode": "require", "connect_timeout": 10}',
)

# Эквивалентно:
conn = Connection(
    conn_id="warehouse_dwh"
    uri="postgres://airflow:[email protected]:5432/warehouse?sslmode=require&connect_timeout=10",
)

Hooks resolution через BaseHook.get_connection

Когда оператор/таска нуждается в connection, цепочка такая:

# Внутри PostgresHook.__init__
from airflow.hooks.base import BaseHook

class PostgresHook(BaseHook):
    def get_conn(self):
        conn = self.get_connection(self.postgres_conn_id)  # ← вот эта строка
        # conn — это уже Connection object с расшифрованными password/extra
        return psycopg2.connect(
            host=conn.host,
            port=conn.port,
            user=conn.login,
            password=conn.password,
            dbname=conn.schema,
            **conn.extra_dejson,
        )

BaseHook.get_connection(conn_id) — это священный entry point. Внутри:

@classmethod
def get_connection(cls, conn_id: str) -> Connection:
    conn = Connection.get_connection_from_secrets(conn_id)
    return conn

И уже Connection.get_connection_from_secrets(conn_id) запускает lookup chain Backend → ENV → DB. Этот lookup детально разбирается в уроке 04 и 05.


Testing connection: UI / CLI / Python

Airflow 2.10/2.11 имеет Test button в UI на странице Connections:

Test Connection flow
UI: Test buttonWebserver Flask endpoint /connection/test/<id> вызывает Hook.test_connection() через subprocess или inline. По умолчанию feature DISABLED в production — set test_connection = Enabled в airflow.cfg [core] для включения.
POST /connection/test
Hook.test_connection()Каждый provider Hook реализует test_connection() метод. PostgresHook делает SELECT 1; HttpHook делает GET на base_url; S3Hook list_buckets() с timeout 5s. Возвращает (bool, str) — success + message.
Response (success/fail)UI показывает зелёный / красный banner с message. На failure — exception тип и текст. Это удобный smoke test, но не заменяет integration testing в production CI.

CLI:

# Add connection из URI
airflow connections add 'warehouse_dwh' \
  --conn-uri 'postgres://airflow:[email protected]:5432/warehouse?sslmode=require'

# List
airflow connections list

# Get
airflow connections get warehouse_dwh

# Test (Airflow 2.7+)
airflow connections test warehouse_dwh

# Export (для backup)
airflow connections export connections.json --file-format json

# Delete
airflow connections delete warehouse_dwh
WARNING

По умолчанию в Airflow 2.10/2.11 Test Connection через UI DISABLED (после CVE про SSRF через misuse). Чтобы включить — в airflow.cfg [core] установить test_connection = Enabled. Альтернативно Hidden (default) или Disabled (полностью блокировать). Это правильная безопасная default: testing в UI выполняется на webserver-е, который мог не иметь netw. access к target системам.


Pitfall #1: Special characters в password

Самая частая боль с Connections — special characters в password. Пример:

# Password содержит @ — ломает URI parsing
AIRFLOW_CONN_DB='postgres://user:p@ss@host:5432/db'
#                              ^^^^ password
#                                    ^^^^ host? login?

Парсер увидит user как login, p как password, ss@host как host — broken.

Правильное решение: URL-encode password:

from urllib.parse import quote_plus

password = "p@ss/word#1!"
encoded = quote_plus(password)  # 'p%40ss%2Fword%231%21'

uri = f"postgres://user:{encoded}@host:5432/db"

Список символов, которые нужно encode-ить: @ / : # ? & = + space.

В UI Airflow эту проблему не имеет — password идёт как отдельное поле формы, не парсится из URI.


Pitfall #2: extra JSON и nested structures

Поле extra хранится как JSON string. Когда вы добавляете connection через CLI или ENV:

# Правильно — JSON в extra
AIRFLOW_CONN_S3='{"conn_type":"aws","login":"AKIAXX","password":"yyy","extra":{"region_name":"eu-west-1"}}'

# Также правильно через URI (extra как query string flat):
AIRFLOW_CONN_S3='aws://AKIAXX:yyy@?region_name=eu-west-1'

Когда reading из Python:

conn = BaseHook.get_connection("s3")
extra_dict = conn.extra_dejson  # ← парсит JSON в dict
region = extra_dict.get("region_name")

conn.extra — это raw JSON string. conn.extra_dejson — parsed dict. Always use extra_dejson в коде Hook-ов, никогда json.loads(conn.extra) руками — extra_dejson обрабатывает edge cases (empty, malformed).


Pitfall #3: conn_id ≠ host

Частая путаница новичков:

# WRONG
PostgresOperator(postgres_conn_id="db.internal:5432/warehouse", ...)

# RIGHT
PostgresOperator(postgres_conn_id="warehouse_dwh", ...)
# где warehouse_dwh — это conn_id в Connections таблице

conn_id — это logical name (alias). Hook берёт его и резолвит в фактический Connection через get_connection_from_secrets. Это даёт environment portability: один и тот же DAG в dev/staging/prod ссылается на warehouse_dwh, а реальный host/credentials меняются в каждой среде.


Pitfall #4: provider package missing

Если для conn_type не установлен provider package, lookup упадёт на runtime:

ERROR: Connection conn_id='snowflake_prod', conn_type='snowflake'
       requires apache-airflow-providers-snowflake to be installed

Это особенно болезненно при upgrade — в Airflow 2.0 был monorepo, в 2.x провайдеры выделены в отдельные pip packages. После pip install apache-airflow==2.11.0 нужно явно установить нужные providers:

pip install apache-airflow-providers-postgres \
            apache-airflow-providers-amazon \
            apache-airflow-providers-snowflake \
            apache-airflow-providers-http

В constraints файле (pip install -c <constraints_url>) указаны pinned versions providers, совместимые с core Airflow версией.


Использование Connection вне Hooks

Иногда нужно достать credentials в произвольном Python коде:

from airflow.hooks.base import BaseHook

@task
def call_external_api():
    conn = BaseHook.get_connection("external_api")
    api_key = conn.password
    base_url = conn.host

    response = requests.get(
        f"{base_url}/orders"
        headers={"Authorization": f"Bearer {api_key}"},
        timeout=30,
    )
    return response.json()

Это работает потому, что BaseHook.get_connectionclassmethod и не требует instance.

Альтернативно — используйте HttpHook напрямую, который сам управляет sessions и retries.


Production checklist

  • conn_id naming convention: <system>_<env>_<purpose> (например pg_prod_warehouse)
  • Используйте Secrets Backend для production (см. урок 05) — никогда UI для real secrets
  • В extra храните provider-specific config, не secrets
  • Test Connection feature только если webserver имеет network access к target системам
  • Для caching connection lookup — [secrets] use_cache = True в airflow.cfg
  • Audit любые ручные модификации через UI — в production это red flag

Проверка знанийKnowledge check
DAG использует `PostgresOperator(postgres_conn_id='warehouse')` но падает на runtime с ошибкой `Connection 'warehouse' not found`. При этом `airflow connections get warehouse` показывает запись. Какие 3 причины могут быть?
ОтветAnswer
1) **Worker не видит metadata DB напрямую** (например, KubernetesExecutor с разделёнными network policies) — но это редкость, т.к. workers обычно имеют DB access. 2) **Secrets Backend сконфигурирован, но Vault недоступен и fallback отключён** — Connection.get_connection_from_secrets вернёт None раньше, чем дойдёт до DB lookup. Check `[secrets] backend_kwargs` и Vault availability. 3) **AIRFLOW_CONN_WAREHOUSE env var имеет битый формат** — она прерывает chain на ENV stage. ENV-based lookup имеет higher priority чем DB — если env var malformed (например, URL без encoding), Connection objects creation failed silently, и в итоге upstream код видит 'not found'. Plus вариант: conn_id case-sensitive, проверьте exact spelling. Debug: `airflow connections get warehouse --output json` показывает source (env vs db), используйте этот вывод для диагностики.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие именно поля Connection шифруются Fernet в metadata DB?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7