PostgreSQL tuning для Airflow — connection pools, autovacuum, partitioning
PostgreSQL — единая точка масштабирования Airflow. После 200-300 DAGs и 10-20k TI/day плохо tuned RDS становится bottleneck быстрее, чем scheduler или workers.
OLTP: транзакции и точечные операции Этот урок — конкретный production-checklist: какие параметры менять, как считать max_connections, как настроить autovacuum для high-churn таблиц вроде task_instance и log, как использовать pg_partman для partitioning по дате и зачем нужен weekly VACUUM FULL.
Все параметры указаны для PostgreSQL 14-16 (рекомендуется 15+), хостинг — AWS RDS db.r6i.2xlarge (8 vCPU, 64 GB RAM) как baseline для medium-large deployment (500+ DAGs, 50-200k TI/day).
max_connections — формула и реальный размер
max_connections — самый недооценённый параметр. RDS default — 100, что мало для любого реального Airflow. Но и завышать опасно: каждое соединение занимает ~10 MB RAM в Postgres backend process.
Формула для Airflow 2.x:
max_connections ≥ (N_schedulers × 16)
+ (N_webservers × 4)
+ (N_workers × 2)
+ (N_triggerers × 2)
+ (N_dag_processors × 4)
+ 50 headroom (admin, monitoring, ad-hoc)
Для medium deployment (2 schedulers, 3 webservers, 16 workers, 2 triggerers, 1 dag-processor):
= (2 × 16) + (3 × 4) + (16 × 2) + (2 × 2) + (1 × 4) + 50
= 32 + 12 + 32 + 4 + 4 + 50
= 134 connections (round up to 150)
Production gotcha: эта формула рассчитана БЕЗ PgBouncer. С PgBouncer перед Postgres max_connections можно держать намного меньше — реально 60-80 для most deployments, потому что PgBouncer мультиплексирует короткие транзакции через малый пул real connections.
-- Текущий counter
SELECT max_conn, used, res_for_super,
max_conn - used - res_for_super AS available
FROM (SELECT count(*) AS used FROM pg_stat_activity) t1,
(SELECT setting::int AS max_conn FROM pg_settings WHERE name='max_connections') t2,
(SELECT setting::int AS res_for_super FROM pg_settings WHERE name='superuser_reserved_connections') t3;
PgBouncer — mandatory для production
Каждое scheduler tick — 5-10 SQL queries в коротких транзакциях. Без pool каждая транзакция = new connection = ~5ms TCP handshake + ~10ms PG backend startup = катастрофа. PgBouncer обязателен для любого production Airflow.
# pgbouncer.ini — production
[databases]
airflow = host=airflow-postgres.example.com port=5432 dbname=airflow
[pgbouncer]
listen_addr = 0.0.0.0
listen_port = 6432
auth_type = scram-sha-256
auth_file = /etc/pgbouncer/userlist.txt
# Pooling mode
pool_mode = transaction # ВАЖНО: transaction, не session
default_pool_size = 50 # real PG connections в pool
max_client_conn = 500 # сколько Airflow клиентов можем принять
reserve_pool_size = 10 # бэкап на pool exhaustion
reserve_pool_timeout = 5
# Connection lifetime
server_lifetime = 3600 # 1 час — refresh connection
server_idle_timeout = 600 # 10 min idle disconnect
server_reset_query = DISCARD ALL
# Logging
log_connections = 0 # отключаем — иначе log spam
log_disconnections = 0
log_pooler_errors = 1
stats_period = 60
# TLS
client_tls_sslmode = require
server_tls_sslmode = verify-full
Pool modes — что выбрать:
| Mode | Описание | Подходит для Airflow? |
|---|---|---|
session | Connection держится весь session client-а | Нет — низкая эффективность, large pool |
transaction | Connection возвращается после COMMIT/ROLLBACK | Да — production стандарт |
statement | Connection возвращается после каждого statement | Нет — ломает multi-statement транзакции |
Transaction mode caveats: ломает SET LOCAL, prepared statements (с PG 14+ можно через track_prepared_statements), LISTEN/NOTIFY (Airflow их не использует). SQLAlchemy в Airflow 2.7+ работает корректно — issue в старых версиях.
Мониторинг PgBouncer:
-- Подключиться к pgbouncer admin DB
PSQL_PAGER='' psql -h pgbouncer -p 6432 -U pgbouncer pgbouncer
SHOW POOLS; -- размер pool, активные клиенты
SHOW STATS; -- queries/sec, avg query time
SHOW SERVERS; -- real PG connections
SHOW CLIENTS; -- активные клиенты
Ключевая метрика — cl_waiting в SHOW POOLS. Если > 0 длительно — pool exhausted, увеличить default_pool_size.
shared_buffers и memory tuning
| Параметр | Default | Recommended (64 GB RAM) | Зачем |
|---|---|---|---|
shared_buffers | 128 MB | 16 GB (25% RAM) | Кэш страниц таблиц/индексов |
effective_cache_size | 4 GB | 48 GB (75% RAM) | Hint планировщику о OS cache |
work_mem | 4 MB | 32 MB | Память для sort/hash per query |
maintenance_work_mem | 64 MB | 2 GB | Для VACUUM, CREATE INDEX |
wal_buffers | -1 (auto) | 64 MB | WAL buffer |
random_page_cost | 4.0 | 1.1 | Для gp3/SSD storage |
effective_io_concurrency | 1 | 200 | Для gp3 SSD |
-- Применить через ALTER SYSTEM (RDS — через parameter group)
ALTER SYSTEM SET shared_buffers = '16GB';
ALTER SYSTEM SET effective_cache_size = '48GB';
ALTER SYSTEM SET work_mem = '32MB';
ALTER SYSTEM SET maintenance_work_mem = '2GB';
ALTER SYSTEM SET random_page_cost = 1.1;
ALTER SYSTEM SET effective_io_concurrency = 200;
SELECT pg_reload_conf(); -- shared_buffers требует restart
В AWS RDS вы НЕ можете менять параметры через ALTER SYSTEM — нужен Parameter Group. Создайте custom parameter group, измените значения, attach к instance, restart. Параметры с pending-reboot требуют instance reboot — планируйте maintenance window.
Autovacuum для high-churn таблиц
Airflow таблицы делятся на high-churn (task_instance, log, xcom, dag_run) и low-churn (dag, connection, variable). Default autovacuum thresholds — 20% от размера таблицы — для high-churn это миллионы dead tuples перед vacuum. Это катастрофа: queries замедляются, индексы раздуваются.
Решение — per-table autovacuum tuning:
-- task_instance — самая высокочастотная таблица
ALTER TABLE task_instance SET (
autovacuum_vacuum_scale_factor = 0.05, -- vacuum при 5% (не 20%)
autovacuum_analyze_scale_factor = 0.02, -- analyze при 2%
autovacuum_vacuum_cost_limit = 2000, -- агрессивнее (default 200)
autovacuum_vacuum_cost_delay = 10, -- меньше пауз
autovacuum_naptime = 20, -- чаще проверять
fillfactor = 90 -- место для HOT updates
);
-- log — самая большая по объёму
ALTER TABLE log SET (
autovacuum_vacuum_scale_factor = 0.1,
autovacuum_analyze_scale_factor = 0.05,
fillfactor = 100 -- insert-only, fillfactor 100
);
-- xcom — высокочастотная, особенно с custom backend headers
ALTER TABLE xcom SET (
autovacuum_vacuum_scale_factor = 0.05,
autovacuum_analyze_scale_factor = 0.02
);
-- dag_run — moderate
ALTER TABLE dag_run SET (
autovacuum_vacuum_scale_factor = 0.1,
autovacuum_analyze_scale_factor = 0.05
);
-- job — heartbeat updates каждые 5s
ALTER TABLE job SET (
autovacuum_vacuum_scale_factor = 0.05,
fillfactor = 80
);
-- slot_pool — критичная для critical section
ALTER TABLE slot_pool SET (
autovacuum_vacuum_scale_factor = 0.05,
fillfactor = 80
);
Monitoring autovacuum:
-- Когда последний раз vacuum/analyze
SELECT schemaname, relname,
n_live_tup, n_dead_tup,
round(100 * n_dead_tup::numeric / NULLIF(n_live_tup, 0), 2) AS dead_pct,
last_autovacuum, last_autoanalyze,
autovacuum_count, autoanalyze_count
FROM pg_stat_user_tables
WHERE relname IN ('task_instance', 'log', 'xcom', 'dag_run', 'job', 'slot_pool')
ORDER BY n_dead_tup DESC;
Если dead_pct > 20% стабильно — autovacuum не успевает, нужно ещё агрессивнее (scale_factor = 0.02-0.03).
Partitioning через pg_partman
Таблицы log, xcom, task_instance растут линейно с временем. Через 6-12 месяцев они достигают 50-200 GB. Чистка через DELETE медленная (row-by-row) и не возвращает место в OS. Партиционирование по дате — единственный production-grade подход.
pg_partman — extension для автоматического partition management.
-- Установка pg_partman (RDS поддерживает)
CREATE EXTENSION pg_partman;
-- Создать parent table с partitioning
-- ВНИМАНИЕ: requires migrating existing data (см. ниже)
-- Пример для log table
CREATE TABLE log_partitioned (
LIKE log INCLUDING ALL
) PARTITION BY RANGE (dttm);
-- Создать первую partition
CREATE TABLE log_y2026m05 PARTITION OF log_partitioned
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
-- Настроить pg_partman для daily partitions с retention
SELECT partman.create_parent(
p_parent_table => 'public.log_partitioned',
p_control => 'dttm',
p_type => 'native',
p_interval => 'daily',
p_premake => 7 -- создавать partitions на 7 дней вперёд
);
-- Retention — удалять partitions старше 30 дней
UPDATE partman.part_config
SET retention = '30 days',
retention_keep_table = false -- DROP TABLE, не просто DETACH
WHERE parent_table = 'public.log_partitioned';
-- Cron job для maintenance (запускается из pg_cron или CronJob в K8s)
SELECT partman.run_maintenance();
Что даёт partitioning:
- Чистка старых данных через
DROP TABLE log_y2025m10(мгновенно), вместоDELETE FROM log WHERE dttm < ...(часы) - Queries с фильтром по
dttmсканируют только relevant partitions (partition pruning) - VACUUM работает per-partition — параллельно, быстрее
- Indexes тоже per-partition — меньше bloat
Migration существующей log table в partitioned:
-- 1. Создать partitioned table
CREATE TABLE log_new (LIKE log INCLUDING ALL) PARTITION BY RANGE (dttm);
SELECT partman.create_parent('public.log_new', 'dttm', 'native', 'daily');
-- 2. Скопировать данные batch-ами (через pg_partman или вручную)
INSERT INTO log_new SELECT * FROM log WHERE dttm BETWEEN '2026-05-01' AND '2026-05-02';
-- повторить для каждого дня
-- 3. Atomic swap (требует exclusive lock — maintenance window)
BEGIN;
ALTER TABLE log RENAME TO log_old;
ALTER TABLE log_new RENAME TO log;
COMMIT;
-- 4. После проверки — DROP TABLE log_old;
VACUUM FULL weekly — рекомендуется
Обычный VACUUM возвращает место внутри таблицы (reuse dead tuples), но не возвращает в OS. После долгого uptime таблицы накапливают bloat — даже с агрессивным autovacuum.
VACUUM FULL переписывает таблицу с нуля, возвращает место в OS, но требует exclusive lock (DAGs встанут на время).
Production стратегия: weekly maintenance window (например, воскресенье 03:00 UTC):
#!/bin/bash
# weekly_vacuum.sh — запускается из cron на bastion host
# 1. Pause all DAGs (чтобы не накапливался task_instance)
airflow dags list -o json | jq -r '.[].dag_id' | xargs -I{} airflow dags pause {}
# 2. Wait for in-flight tasks
sleep 60
# 3. VACUUM FULL по очереди (не параллельно — пострадает IOPS)
for tbl in task_instance log xcom dag_run job task_fail task_reschedule; do
psql -c "VACUUM FULL VERBOSE $tbl;"
done
# 4. Re-analyze
psql -c "ANALYZE;"
# 5. Unpause DAGs
airflow dags list -o json | jq -r '.[].dag_id' | xargs -I{} airflow dags unpause {}
Alternative: pg_repack — VACUUM FULL без exclusive lock (rebuild через staging table + atomic swap). Тяжелее в installation, но без downtime. Production-стандарт для больших Airflow installations.
Production gotchas
Не используйте airflow db clean как замену partitioning. db clean делает DELETE FROM — медленно, не возвращает space в OS, фрагментирует индексы. Подходит для small installations (<10 GB DB). Для больших — pg_partman.
Backup metadata DB через pg_dump ломается на больших installations. При 100+ GB DB pg_dump может занимать часы. Используйте RDS snapshots (instant, point-in-time recovery) или физический pg_basebackup + WAL archiving.
Connection pooler не помогает long-running queries. Если DAG читает через хук PostgresHook 100 MB result — это держит connection 30+ секунд, занимает slot в PgBouncer. Решение: stream результаты, или вынести heavy queries в отдельный read replica с отдельным PgBouncer pool.
statement_timeout обязателен. Airflow background queries иногда зависают (например, lock на slot_pool deadlock). Поставьте statement_timeout = 60s на role airflow — Postgres сам убьёт зависшие queries.
ALTER ROLE airflow SET statement_timeout = '60s';
Composite index idx_ti_dag_run_task_map_index — обязателен (добавлен в 2.7). Если у вас upgrade с более старой версии — проверьте:
SELECT indexname FROM pg_indexes
WHERE tablename = 'task_instance'
AND indexname = 'idx_ti_dag_run_task_map_index';
-- Если пусто:
CREATE INDEX CONCURRENTLY idx_ti_dag_run_task_map_index
ON task_instance(dag_id, run_id, task_id, map_index);
SSD/gp3 — обязательно для production. gp2 ограничен 16k IOPS на инстанс — Airflow scheduler легко упирается. gp3 даёт 12k IOPS baseline + provisioned до 64k. io2 для extreme throughput.
Метрики, которые ловят проблемы заранее
-- 1. Index bloat
SELECT schemaname, relname, indexrelname,
pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes
WHERE relname IN ('task_instance', 'log', 'xcom')
ORDER BY pg_relation_size(indexrelid) DESC;
-- 2. Top queries по total_time (нужен pg_stat_statements)
SELECT calls, total_exec_time/1000 AS total_sec,
mean_exec_time AS mean_ms,
LEFT(query, 100) AS query
FROM pg_stat_statements
ORDER BY total_exec_time DESC LIMIT 20;
-- 3. Cache hit ratio (должно быть >99%)
SELECT
sum(heap_blks_read) AS heap_read,
sum(heap_blks_hit) AS heap_hit,
round(sum(heap_blks_hit)::numeric / nullif(sum(heap_blks_hit + heap_blks_read), 0) * 100, 2) AS cache_hit_ratio
FROM pg_statio_user_tables;