Adaptive Query Execution: обзор и конфигурация
Проблема: Catalyst не знает будущего
В модуле 02 мы разобрали, как Catalyst Optimizer строит план выполнения: парсинг, анализ, оптимизация, физический план. Но у Catalyst есть фундаментальное ограничение — он оптимизирует запрос до выполнения, опираясь на статистики, которые могут быть устаревшими или отсутствовать вовсе.
Представьте ситуацию: Catalyst выбрал SortMergeJoin для двух таблиц, основываясь на их размерах из метаданных каталога. Но после применения фильтра WHERE status = 'active' одна из таблиц уменьшилась с 10 ГБ до 8 МБ. BroadcastHashJoin был бы в 10 раз быстрее — но Catalyst уже принял решение, и менять его на лету не умеет.
Catalyst (compile-time):
Metadata: table_a = 10GB, table_b = 500MB
Decision: SortMergeJoin (обе таблицы большие)
Runtime reality:
After filter: table_a = 8MB (99.9% отфильтровано)
Optimal: BroadcastHashJoin (table_a помещается в память)
Adaptive Query Execution (AQE) решает именно эту проблему. AQE работает поверх Catalyst — после физического плана, на основе runtime статистик. Это не замена Catalyst, а его дополнение: Catalyst выполняет compile-time оптимизацию, AQE корректирует план во время выполнения.
VersionBadge: Spark 3.2+
AQE доступен с Spark 3.0, но был opt-in (выключен по умолчанию). Начиная с Spark 3.2, spark.sql.adaptive.enabled установлен в true по умолчанию. Если вы работаете с Spark 3.0-3.1, включите AQE вручную: spark.conf.set("spark.sql.adaptive.enabled", "true").
Три столпа AQE
AQE выполняет три типа runtime-оптимизаций:
| Столп | Что делает | Когда помогает |
|---|---|---|
| Coalesce Shuffle Partitions | Объединяет мелкие партиции после shuffle | 200 партиций по умолчанию, большинство пустые после фильтрации |
| Skew Join Optimization | Разбивает перекошенные партиции при join | Один ключ (например, Москва) содержит 80% данных |
| Dynamic Join Strategy Selection | Меняет стратегию join на основе реальных размеров | После фильтра таблица стала маленькой — переключение на broadcast |
Дополнительно AQE взаимодействует с Dynamic Partition Pruning (DPP), который мы разберем в уроке 04.
Как работает AQE: цикл re-optimization
AQE разбивает физический план на query stages — фрагменты плана, разделённые shuffle-границами. После выполнения каждого stage AQE:
- Собирает runtime статистики — реальные размеры партиций, количество строк, объёмы данных
- Анализирует собранные данные — обнаруживает skew, пустые партиции, маленькие таблицы
- Адаптирует план — меняет стратегию join, объединяет партиции, разбивает skewed партиции
- Выполняет следующий stage — уже с оптимизированным планом
Query Stage 1: Scan + Filter → Execute → Collect Stats
↓
AQE: "After filter, table_a = 8MB"
↓
Re-plan: SortMerge → Broadcast
↓
Query Stage 2: Join (BroadcastHashJoin) → Execute → Collect Stats
↓
AQE: "150 of 200 partitions are empty"
↓
Re-plan: Coalesce 200 → 12
↓
Query Stage 3: Aggregation (12 partitions) → Result
Step 1/5: Initial Physical Plan
Catalyst создал физический план до начала выполнения. SortMergeJoin выбран на основе метаданных каталога (обе таблицы > 10 МБ). 200 shuffle-партиций по умолчанию.
Конфигурационные параметры AQE
Все параметры AQE начинаются с spark.sql.adaptive.:
| Параметр | Значение по умолчанию | Назначение |
|---|---|---|
spark.sql.adaptive.enabled | true (с 3.2) | Главный переключатель AQE |
spark.sql.adaptive.coalescePartitions.enabled | true | Автоматическое объединение мелких партиций |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64MB | Целевой размер партиции после coalesce |
spark.sql.adaptive.coalescePartitions.minPartitionSize | 1MB | Минимальный размер партиции (защита от слишком мелких) |
spark.sql.adaptive.skewJoin.enabled | true | Автоматическое обнаружение и обработка skew |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5.0 | Партиция считается skewed если > median * factor |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | Минимальный размер для признания партиции skewed |
spark.sql.adaptive.forceOptimizeSkewedJoin | false | Принудительная оптимизация skew (игнорирует threshold) |
# Проверка текущих настроек AQE
spark.conf.get("spark.sql.adaptive.enabled") # true
spark.conf.get("spark.sql.adaptive.advisoryPartitionSizeInBytes") # 64MB
spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionFactor") # 5.0
spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes") # 256MB
# Кастомизация для вашего кластера
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") # Для больших данных
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "4MB")
AQE vs Catalyst: кто за что отвечает
SQL / DataFrame API
↓
[Catalyst -- compile-time]
1. Parser → Unresolved Plan
2. Analyzer → Resolved Plan
3. Optimizer → Optimized Plan (RBO + CBO)
4. Planner → Physical Plan
↓
[AQE -- runtime]
5. Execute Stage → Collect Stats
6. Re-optimize → Adapt Plan
7. Execute Next Stage → ...
↓
Final Result
Catalyst выполняет структурные оптимизации: pushdown предикатов, column pruning, constant folding. AQE выполняет адаптивные оптимизации: те, которые зависят от реальных объёмов данных, невидимых до начала выполнения.
Когда AQE не поможет?
AQE оптимизирует план между query stages (shuffle-границами). Если ваш запрос не содержит shuffle (например, простой SELECT * FROM table WHERE id = 1), AQE не активируется — нет точек для re-optimization. Также AQE не может исправить фундаментальные проблемы: неправильную схему данных, отсутствие индексов или плохой data model.
Что дальше?
В следующих уроках мы детально разберём каждый столп AQE: автоматическое объединение shuffle-партиций (урок 02), оптимизацию skew join (урок 03), Dynamic Partition Pruning (урок 04) и runtime-адаптацию плана (урок 05). Каждый урок покажет конкретные before/after метрики с AQE.