Learning Platform
Глоссарий Troubleshooting
Урок 12.05 · 28 мин
Продвинутый
PoolsSnowflakeCost ControlProduction PatternRDS

Snowflake warehouse protection — production pattern с pools

Это самый важный production pattern Airflow: защита внешних ресурсов через pools. Без него вы рано или поздно положите production Snowflake warehouse, исчерпаете RDS connections или попадёте на rate limit external API. С правильно настроенными pools — predictable concurrency, predictable cost, никаких сюрпризов на on-call.

Урок разбирает реальные конфигурации для трёх классов resources: Snowflake warehouse (cost-driven), RDS / Postgres (connection-driven), external API (rate-limit driven). И показывает паттерн split per query class — почему один pool на ресурс это часто недостаточно.


Snowflake warehouse — кредиты и модель стоимости

Базовый паттерн

@task(pool="snowflake_xs", pool_slots=2)
def heavy_query():
    # SnowflakeOperator или SQLExecuteQueryOperator
    ...

Создаём pool в UI / CLI:

airflow pools set snowflake_xs 8 "Snowflake XS warehouse — protect concurrent queries"

Что это даёт:

  • Максимум 4 одновременных таких task (8 slots / 2 slots per task = 4)
  • Cross-DAG — все @task(pool="snowflake_xs") шарят quota
  • Atomic в critical section — невозможно over-allocate
  • Mutable через UI без редеплоя

Snowflake: warehouse как resource

Snowflake warehouse — это compute cluster, измеряется в credits per hour. Размеры: XS (1), S (2), M (4), L (8), XL (16), 2XL (32). Каждая concurrent query занимает часть ресурса warehouse.

Snowflake имеет внутренний MAX_CONCURRENCY_LEVEL (default 8). Если queries >8 одновременно — Snowflake queues их сам. Но Snowflake queue не имеет хорошего back-pressure для Airflow: query «висит» в pending до timeout, worker слот занят.

Стратегия: Airflow pool устанавливает hard limit ДО Snowflake’а — никогда не достигать Snowflake queue.

Pool как gate перед Snowflake warehouse
100 scheduled TI хотят queryBackfill за месяц или fan-out на 100 partitions. Все @task(pool='snowflake_xs') ждут scheduling.
critical section gate
Pool snowflake_xs (slots=8)Airflow scheduler внутри critical section проверяет: open_slots = 8 - used = 0. Пропускает только 8 TI в queued. Остальные 92 остаются scheduled.
executor pickup
Worker → SnowflakeHook.run(sql)8 worker slots исполняют queries. Snowflake видит максимум 8 одновременных connections. MAX_CONCURRENCY_LEVEL Snowflake не достигается.
completion
Snowflake warehouse — predictable loadСтабильно 8 queries. Cost predictable: ~8 * query_avg_time / 3600 * credits_per_hour. Никакого spike, никакого throttling.

Split per query class — почему один pool недостаточно

Real production обычно имеет смешанную workload: лёгкие SELECT, тяжёлые JOIN-aggregation, single-writer DDL. Один общий pool — это compromise:

  • Если pool=8 — лёгкие SELECT упираются (их хочется 50 одновременно)
  • Если pool=50 — тяжёлые ETL положат warehouse (нужно max 4)

Решение — split: разные pools per query class.

# Production split для Snowflake L warehouse (8 credits)

# Pool snowflake_l_read (slots=16): light SELECT, dashboards, BI
@task(pool="snowflake_l_read", pool_slots=1)
def lightweight_query():
    pass

# Pool snowflake_l_etl (slots=4): tяжёлые ETL queries
@task(pool="snowflake_l_etl", pool_slots=1)
def heavy_etl():
    pass

# Pool snowflake_l_ddl (slots=1): single-writer DDL
@task(pool="snowflake_l_ddl", pool_slots=1)
def schema_migration():
    pass

Pool configurations:

PoolSlotsUse caseWhy
snowflake_l_read16Light SELECT, dashboardsWarehouse handles много lightweight в parallel
snowflake_l_etl4Heavy aggregations, JOINsКаждая занимает заметно ресурс — 4 макс
snowflake_l_ddl1ALTER TABLE, DDLSingle writer — никаких race conditions
snowflake_l_admin1VACUUM analog, optimizeМаинт операции — не одновременно с production

Total capacity can suprass MAX_CONCURRENCY_LEVEL=8 Snowflake’а — это normal, потому что light queries весят мало. Warehouse сам priority-классы делает.


RDS / Postgres: connection-driven

Здесь bottleneck — connections. RDS instance имеет конечный max_connections (default ~100 для small, ~1000 для xlarge). PgBouncer добавляет pooling, но он тоже имеет limit pool_size.

Pattern: pool, который сопоставляется с PgBouncer pool:

# PgBouncer:
# [databases]
# orders = host=db port=5432 dbname=orders pool_size=20

# Airflow:
airflow pools set rds_orders 20 "Sync с PgBouncer pool_size for orders DB"
@task(pool="rds_orders", pool_slots=1)
def write_to_orders(): ...

Если у вас три DBs (orders, users, analytics) — три pools, synchronized с PgBouncer.

Дополнительные splits внутри:

airflow pools set rds_orders_writes 6 "Heavy writes — single-writer per partition"
airflow pools set rds_orders_reads 14 "Read queries via replica"

External API: rate-limited

Здесь bottleneck — requests per second / minute quota. Pool defines parallel requests; sleep/throttle логика внутри task.

# Pool external_api_stripe — 5 concurrent (Stripe API limit)
airflow pools set external_api_stripe 5 "Stripe API parallel call limit"
@task(pool="external_api_stripe", pool_slots=1, retries=5, retry_delay=timedelta(seconds=10))
def fetch_stripe_customer(customer_id: str):
    response = http.get(f"https://api.stripe.com/v1/customers/{customer_id}")
    if response.status_code == 429:  # rate limited
        time.sleep(int(response.headers.get("Retry-After", 30)))
        raise AirflowException("Rate limited, retrying")
    return response.json()

Pool ограничивает parallel — но если API returns 429, retry mechanism Airflow добивает.


Cost control — конкретный расчёт

Snowflake L warehouse = 8 credits/hour. Допустим 3/credit.Одинчасwarehouse=3/credit. Один час warehouse = 24.

Без pool: scheduler решает запустить 50 параллельных heavy ETL queries. Warehouse activates, делает auto-scale до multi-cluster (4 clusters × 8 credits = 32 credits/hour). За час backfill — 96вчаспротивожидаемых96 в час против ожидаемых 24.

С pool slots=4 для heavy ETL: scheduler пропускает только 4 одновременных. Single cluster, no auto-scale. $24/hour stable.

Долгий runs: 100 queries × 10 min each = 1000 min total. С pool=4: 1000/4 = 250 min real time = 4 hours × 24=24 = **96**. Без pool: ~30 min real time × 96/hour=96/hour = **48** (быстрее, но spike risk и неконтролируемый).

Pool — trade-off: predictability vs raw speed. Production почти всегда выбирает predictability.


Production configuration: full example

# Snowflake L warehouse
airflow pools set snowflake_l_read 16 "Read queries — dashboards, lookups"
airflow pools set snowflake_l_etl 4 "Heavy ETL — aggregations, joins"
airflow pools set snowflake_l_ddl 1 "DDL — single writer"
airflow pools set snowflake_l_admin 2 "Maintenance — optimize, statistics"

# RDS orders DB
airflow pools set rds_orders_writes 6
airflow pools set rds_orders_reads 14

# External APIs
airflow pools set api_stripe 5
airflow pools set api_segment 10
airflow pools set api_internal 20

# Compute pools — для CPU-heavy tasks
airflow pools set spark_cluster_a 8 "Spark cluster A — 16 cores total"
airflow pools set kpod_gpu 2 "GPU pods on K8s"

DAG-level:

@dag(...)
def orders_etl():
    @task(pool="rds_orders_reads", pool_slots=1)
    def extract_orders(): ...

    @task(pool="snowflake_l_etl", pool_slots=1)
    def transform_in_snowflake(): ...

    @task(pool="snowflake_l_ddl", pool_slots=1)
    def update_partition_metadata(): ...

    @task(pool="api_internal", pool_slots=1)
    def notify_downstream_service(): ...

Каждая task явно в правильном pool — это checked DAG code review.


Pool naming convention

В production critically важно naming convention. Рекомендуемая схема:

<resource_type>_<instance>_<class>

Примеры:

  • snowflake_l_read — Snowflake warehouse L size, read queries
  • rds_orders_writes — RDS instance for orders, writes only
  • api_stripe_charge — Stripe API, charge endpoint
  • spark_prod_etl — Spark cluster production, ETL workload
  • k8s_gpu_inference — K8s GPU pods, ML inference

Это даёт:

  • Searchable в UI (/pools показывает все sortable)
  • Self-documenting в DAG code
  • Легко regex-фильтрация в monitoring

Monitoring pool usage

Critical metric — pool saturation ratio:

-- Pool utilization summary
SELECT
    sp.pool,
    sp.slots,
    COALESCE(SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END), 0) AS used,
    sp.slots - COALESCE(SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END), 0) AS open,
    ROUND(100.0 * COALESCE(SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END), 0) / sp.slots, 1) AS utilization_pct
FROM slot_pool sp
LEFT JOIN task_instance ti ON ti.pool = sp.pool
GROUP BY sp.pool, sp.slots
ORDER BY utilization_pct DESC;

В Prometheus / OTel метрики:

  • airflow.pool.used_slots.<pool_name>
  • airflow.pool.queued_slots.<pool_name>
  • airflow.pool.open_slots.<pool_name>

Alerts:

  • Utilization >90% sustained >10 min → consider raising slots or splitting pool
  • queued_slots > slots*2 long time → tasks накапливаются быстрее чем consumed

Production gotchas

1. Pool не масштабируется с auto-scale warehouse Snowflake. Если warehouse multi-cluster (auto-scale 1-4 clusters), pool остаётся тем же. Имеет смысл pool на single cluster и не доверять auto-scale — иначе теряется cost predictability.

2. pool_slots на task hard-coded — не пересчитывается dynamic. Если задача в реальности занимает 1 credit, а потом разрослась до 4 — pool counts всё равно как 1. Регулярно ревьюйте pool_slots по реальной нагрузке.

3. Pool — не cgroup, не quota. Pool ограничивает только Airflow scheduling. Если та же warehouse используется из dbt Cloud, BI tool, ad-hoc users — они не видят pool. Pool лишь часть cost strategy, не серебряная пуля.

4. Cross-DAG pool sharing с разными priority_weight. Если DAG A и DAG B оба в pool warehouse — priority_weight внутри pool важна. См. предыдущий урок.

5. Pool default_pool для no-pool tasks — это implicit pool. Никогда не оставляйте production tasks без явного pool. Лучше pool default_pool явно прописывать тоже — defensive coding.


Проверка знанийKnowledge check
Snowflake L warehouse (8 credits, MAX_CONCURRENCY_LEVEL=8). У вас есть 10 lightweight read queries и 4 heavy ETL queries готовых к запуску. Как настроить pools чтобы и read и ETL имели разумный throughput, но не положить warehouse?
ОтветAnswer
Split на два pool: snowflake_l_read (slots=16) и snowflake_l_etl (slots=4). Логика: lightweight queries весят 0.1-0.3 credit каждая, можно держать много параллельных — warehouse priority системы Snowflake сам справится. Heavy ETL queries весят 1-2 credit каждая — на L warehouse реалистично 4 одновременных максимум, иначе queue в Snowflake или auto-scale. Total max slots = 20, но usually concurrent ~8-10 (read queries короткие, ETL длинные — naturally desync). Все read tasks: @task(pool='snowflake_l_read', pool_slots=1). Все heavy ETL: @task(pool='snowflake_l_etl', pool_slots=1). Альтернатива — single pool slots=8 с pool_slots per task (1 для read, 2 для ETL) — но это упирается в global 8 limit, нет benefit isolation между light и heavy. Split лучше: isolation + predictable cost (ETL pool — max $X/hour deterministically) + scale read без касания ETL. На monitoring: snowflake_l_etl utilization 100% sustained → раздумать поднимать warehouse size или дробить ETL. snowflake_l_read 100% sustained → раздумать про caching или materialized views.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. У Snowflake warehouse L (8 credits) MAX_CONCURRENCY_LEVEL=8. Один pool slots=8 для всех queries — что не так?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6