Snowflake warehouse protection — production pattern с pools
Это самый важный production pattern Airflow: защита внешних ресурсов через pools. Без него вы рано или поздно положите production Snowflake warehouse, исчерпаете RDS connections или попадёте на rate limit external API. С правильно настроенными pools — predictable concurrency, predictable cost, никаких сюрпризов на on-call.
Урок разбирает реальные конфигурации для трёх классов resources: Snowflake warehouse (cost-driven), RDS / Postgres (connection-driven), external API (rate-limit driven). И показывает паттерн split per query class — почему один pool на ресурс это часто недостаточно.
Snowflake warehouse — кредиты и модель стоимости
Базовый паттерн
@task(pool="snowflake_xs", pool_slots=2)
def heavy_query():
# SnowflakeOperator или SQLExecuteQueryOperator
...
Создаём pool в UI / CLI:
airflow pools set snowflake_xs 8 "Snowflake XS warehouse — protect concurrent queries"
Что это даёт:
- Максимум 4 одновременных таких task (8 slots / 2 slots per task = 4)
- Cross-DAG — все
@task(pool="snowflake_xs")шарят quota - Atomic в critical section — невозможно over-allocate
- Mutable через UI без редеплоя
Snowflake: warehouse как resource
Snowflake warehouse — это compute cluster, измеряется в credits per hour. Размеры: XS (1), S (2), M (4), L (8), XL (16), 2XL (32). Каждая concurrent query занимает часть ресурса warehouse.
Snowflake имеет внутренний MAX_CONCURRENCY_LEVEL (default 8). Если queries >8 одновременно — Snowflake queues их сам. Но Snowflake queue не имеет хорошего back-pressure для Airflow: query «висит» в pending до timeout, worker слот занят.
Стратегия: Airflow pool устанавливает hard limit ДО Snowflake’а — никогда не достигать Snowflake queue.
Split per query class — почему один pool недостаточно
Real production обычно имеет смешанную workload: лёгкие SELECT, тяжёлые JOIN-aggregation, single-writer DDL. Один общий pool — это compromise:
- Если pool=8 — лёгкие SELECT упираются (их хочется 50 одновременно)
- Если pool=50 — тяжёлые ETL положат warehouse (нужно max 4)
Решение — split: разные pools per query class.
# Production split для Snowflake L warehouse (8 credits)
# Pool snowflake_l_read (slots=16): light SELECT, dashboards, BI
@task(pool="snowflake_l_read", pool_slots=1)
def lightweight_query():
pass
# Pool snowflake_l_etl (slots=4): tяжёлые ETL queries
@task(pool="snowflake_l_etl", pool_slots=1)
def heavy_etl():
pass
# Pool snowflake_l_ddl (slots=1): single-writer DDL
@task(pool="snowflake_l_ddl", pool_slots=1)
def schema_migration():
pass
Pool configurations:
| Pool | Slots | Use case | Why |
|---|---|---|---|
snowflake_l_read | 16 | Light SELECT, dashboards | Warehouse handles много lightweight в parallel |
snowflake_l_etl | 4 | Heavy aggregations, JOINs | Каждая занимает заметно ресурс — 4 макс |
snowflake_l_ddl | 1 | ALTER TABLE, DDL | Single writer — никаких race conditions |
snowflake_l_admin | 1 | VACUUM analog, optimize | Маинт операции — не одновременно с production |
Total capacity can suprass MAX_CONCURRENCY_LEVEL=8 Snowflake’а — это normal, потому что light queries весят мало. Warehouse сам priority-классы делает.
RDS / Postgres: connection-driven
Здесь bottleneck — connections. RDS instance имеет конечный max_connections (default ~100 для small, ~1000 для xlarge). PgBouncer добавляет pooling, но он тоже имеет limit pool_size.
Pattern: pool, который сопоставляется с PgBouncer pool:
# PgBouncer:
# [databases]
# orders = host=db port=5432 dbname=orders pool_size=20
# Airflow:
airflow pools set rds_orders 20 "Sync с PgBouncer pool_size for orders DB"
@task(pool="rds_orders", pool_slots=1)
def write_to_orders(): ...
Если у вас три DBs (orders, users, analytics) — три pools, synchronized с PgBouncer.
Дополнительные splits внутри:
airflow pools set rds_orders_writes 6 "Heavy writes — single-writer per partition"
airflow pools set rds_orders_reads 14 "Read queries via replica"
External API: rate-limited
Здесь bottleneck — requests per second / minute quota. Pool defines parallel requests; sleep/throttle логика внутри task.
# Pool external_api_stripe — 5 concurrent (Stripe API limit)
airflow pools set external_api_stripe 5 "Stripe API parallel call limit"
@task(pool="external_api_stripe", pool_slots=1, retries=5, retry_delay=timedelta(seconds=10))
def fetch_stripe_customer(customer_id: str):
response = http.get(f"https://api.stripe.com/v1/customers/{customer_id}")
if response.status_code == 429: # rate limited
time.sleep(int(response.headers.get("Retry-After", 30)))
raise AirflowException("Rate limited, retrying")
return response.json()
Pool ограничивает parallel — но если API returns 429, retry mechanism Airflow добивает.
Cost control — конкретный расчёт
Snowflake L warehouse = 8 credits/hour. Допустим 24.
Без pool: scheduler решает запустить 50 параллельных heavy ETL queries. Warehouse activates, делает auto-scale до multi-cluster (4 clusters × 8 credits = 32 credits/hour). За час backfill — 24.
С pool slots=4 для heavy ETL: scheduler пропускает только 4 одновременных. Single cluster, no auto-scale. $24/hour stable.
Долгий runs: 100 queries × 10 min each = 1000 min total. С pool=4: 1000/4 = 250 min real time = 4 hours × 96**. Без pool: ~30 min real time × 48** (быстрее, но spike risk и неконтролируемый).
Pool — trade-off: predictability vs raw speed. Production почти всегда выбирает predictability.
Production configuration: full example
# Snowflake L warehouse
airflow pools set snowflake_l_read 16 "Read queries — dashboards, lookups"
airflow pools set snowflake_l_etl 4 "Heavy ETL — aggregations, joins"
airflow pools set snowflake_l_ddl 1 "DDL — single writer"
airflow pools set snowflake_l_admin 2 "Maintenance — optimize, statistics"
# RDS orders DB
airflow pools set rds_orders_writes 6
airflow pools set rds_orders_reads 14
# External APIs
airflow pools set api_stripe 5
airflow pools set api_segment 10
airflow pools set api_internal 20
# Compute pools — для CPU-heavy tasks
airflow pools set spark_cluster_a 8 "Spark cluster A — 16 cores total"
airflow pools set kpod_gpu 2 "GPU pods on K8s"
DAG-level:
@dag(...)
def orders_etl():
@task(pool="rds_orders_reads", pool_slots=1)
def extract_orders(): ...
@task(pool="snowflake_l_etl", pool_slots=1)
def transform_in_snowflake(): ...
@task(pool="snowflake_l_ddl", pool_slots=1)
def update_partition_metadata(): ...
@task(pool="api_internal", pool_slots=1)
def notify_downstream_service(): ...
Каждая task явно в правильном pool — это checked DAG code review.
Pool naming convention
В production critically важно naming convention. Рекомендуемая схема:
<resource_type>_<instance>_<class>
Примеры:
snowflake_l_read— Snowflake warehouse L size, read queriesrds_orders_writes— RDS instance for orders, writes onlyapi_stripe_charge— Stripe API, charge endpointspark_prod_etl— Spark cluster production, ETL workloadk8s_gpu_inference— K8s GPU pods, ML inference
Это даёт:
- Searchable в UI (
/poolsпоказывает все sortable) - Self-documenting в DAG code
- Легко regex-фильтрация в monitoring
Monitoring pool usage
Critical metric — pool saturation ratio:
-- Pool utilization summary
SELECT
sp.pool,
sp.slots,
COALESCE(SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END), 0) AS used,
sp.slots - COALESCE(SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END), 0) AS open,
ROUND(100.0 * COALESCE(SUM(CASE WHEN ti.state IN ('running', 'queued') THEN ti.pool_slots ELSE 0 END), 0) / sp.slots, 1) AS utilization_pct
FROM slot_pool sp
LEFT JOIN task_instance ti ON ti.pool = sp.pool
GROUP BY sp.pool, sp.slots
ORDER BY utilization_pct DESC;
В Prometheus / OTel метрики:
airflow.pool.used_slots.<pool_name>airflow.pool.queued_slots.<pool_name>airflow.pool.open_slots.<pool_name>
Alerts:
- Utilization >90% sustained >10 min → consider raising slots or splitting pool
queued_slots > slots*2long time → tasks накапливаются быстрее чем consumed
Production gotchas
1. Pool не масштабируется с auto-scale warehouse Snowflake. Если warehouse multi-cluster (auto-scale 1-4 clusters), pool остаётся тем же. Имеет смысл pool на single cluster и не доверять auto-scale — иначе теряется cost predictability.
2. pool_slots на task hard-coded — не пересчитывается dynamic. Если задача в реальности занимает 1 credit, а потом разрослась до 4 — pool counts всё равно как 1. Регулярно ревьюйте pool_slots по реальной нагрузке.
3. Pool — не cgroup, не quota. Pool ограничивает только Airflow scheduling. Если та же warehouse используется из dbt Cloud, BI tool, ad-hoc users — они не видят pool. Pool лишь часть cost strategy, не серебряная пуля.
4. Cross-DAG pool sharing с разными priority_weight. Если DAG A и DAG B оба в pool warehouse — priority_weight внутри pool важна. См. предыдущий урок.
5. Pool default_pool для no-pool tasks — это implicit pool. Никогда не оставляйте production tasks без явного pool. Лучше pool default_pool явно прописывать тоже — defensive coding.