Learning Platform
Глоссарий Troubleshooting
Урок 09.01 · 14 мин
Средний
AQEAdaptive Query Executionspark.sql.adaptiveRuntime Optimization

Adaptive Query Execution: обзор и конфигурация

Проблема: Catalyst не знает будущего

В модуле 02 мы разобрали, как Catalyst Optimizer строит план выполнения: парсинг, анализ, оптимизация, физический план. Но у Catalyst есть фундаментальное ограничение — он оптимизирует запрос до выполнения, опираясь на статистики, которые могут быть устаревшими или отсутствовать вовсе.

Представьте ситуацию: Catalyst выбрал SortMergeJoin для двух таблиц, основываясь на их размерах из метаданных каталога. Но после применения фильтра WHERE status = 'active' одна из таблиц уменьшилась с 10 ГБ до 8 МБ. BroadcastHashJoin был бы в 10 раз быстрее — но Catalyst уже принял решение, и менять его на лету не умеет.

Catalyst (compile-time):
  Metadata: table_a = 10GB, table_b = 500MB
  Decision: SortMergeJoin (обе таблицы большие)

Runtime reality:
  After filter: table_a = 8MB (99.9% отфильтровано)
  Optimal: BroadcastHashJoin (table_a помещается в память)

Adaptive Query Execution (AQE) решает именно эту проблему. AQE работает поверх Catalyst — после физического плана, на основе runtime статистик. Это не замена Catalyst, а его дополнение: Catalyst выполняет compile-time оптимизацию, AQE корректирует план во время выполнения.

NOTE

VersionBadge: Spark 3.2+

AQE доступен с Spark 3.0, но был opt-in (выключен по умолчанию). Начиная с Spark 3.2, spark.sql.adaptive.enabled установлен в true по умолчанию. Если вы работаете с Spark 3.0-3.1, включите AQE вручную: spark.conf.set("spark.sql.adaptive.enabled", "true").

Три столпа AQE

AQE выполняет три типа runtime-оптимизаций:

СтолпЧто делаетКогда помогает
Coalesce Shuffle PartitionsОбъединяет мелкие партиции после shuffle200 партиций по умолчанию, большинство пустые после фильтрации
Skew Join OptimizationРазбивает перекошенные партиции при joinОдин ключ (например, Москва) содержит 80% данных
Dynamic Join Strategy SelectionМеняет стратегию join на основе реальных размеровПосле фильтра таблица стала маленькой — переключение на broadcast

Дополнительно AQE взаимодействует с Dynamic Partition Pruning (DPP), который мы разберем в уроке 04.

Как работает AQE: цикл re-optimization

AQE разбивает физический план на query stages — фрагменты плана, разделённые shuffle-границами. После выполнения каждого stage AQE:

  1. Собирает runtime статистики — реальные размеры партиций, количество строк, объёмы данных
  2. Анализирует собранные данные — обнаруживает skew, пустые партиции, маленькие таблицы
  3. Адаптирует план — меняет стратегию join, объединяет партиции, разбивает skewed партиции
  4. Выполняет следующий stage — уже с оптимизированным планом
Query Stage 1: Scan + Filter → Execute → Collect Stats

                                    AQE: "After filter, table_a = 8MB"

                                    Re-plan: SortMerge → Broadcast

Query Stage 2: Join (BroadcastHashJoin) → Execute → Collect Stats

                                    AQE: "150 of 200 partitions are empty"

                                    Re-plan: Coalesce 200 → 12

Query Stage 3: Aggregation (12 partitions) → Result
AQE: Runtime Re-Optimization

Step 1/5: Initial Physical Plan

Catalyst создал физический план до начала выполнения. SortMergeJoin выбран на основе метаданных каталога (обе таблицы > 10 МБ). 200 shuffle-партиций по умолчанию.

SortMergeJoin [city]
Exchange hashpartitioning(city, 200)
Exchange hashpartitioning(city, 200)
FileScan parquet [orders]
FileScan parquet [city_info]
1 / 5
Original PlanStatisticsOptimized

Конфигурационные параметры AQE

Все параметры AQE начинаются с spark.sql.adaptive.:

ПараметрЗначение по умолчаниюНазначение
spark.sql.adaptive.enabledtrue (с 3.2)Главный переключатель AQE
spark.sql.adaptive.coalescePartitions.enabledtrueАвтоматическое объединение мелких партиций
spark.sql.adaptive.advisoryPartitionSizeInBytes64MBЦелевой размер партиции после coalesce
spark.sql.adaptive.coalescePartitions.minPartitionSize1MBМинимальный размер партиции (защита от слишком мелких)
spark.sql.adaptive.skewJoin.enabledtrueАвтоматическое обнаружение и обработка skew
spark.sql.adaptive.skewJoin.skewedPartitionFactor5.0Партиция считается skewed если > median * factor
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MBМинимальный размер для признания партиции skewed
spark.sql.adaptive.forceOptimizeSkewedJoinfalseПринудительная оптимизация skew (игнорирует threshold)
# Проверка текущих настроек AQE
spark.conf.get("spark.sql.adaptive.enabled")                            # true
spark.conf.get("spark.sql.adaptive.advisoryPartitionSizeInBytes")       # 64MB
spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionFactor")     # 5.0
spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes")  # 256MB

# Кастомизация для вашего кластера
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")  # Для больших данных
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "4MB")

AQE vs Catalyst: кто за что отвечает

SQL / DataFrame API

  [Catalyst -- compile-time]
  1. Parser → Unresolved Plan
  2. Analyzer → Resolved Plan
  3. Optimizer → Optimized Plan (RBO + CBO)
  4. Planner → Physical Plan

  [AQE -- runtime]
  5. Execute Stage → Collect Stats
  6. Re-optimize → Adapt Plan
  7. Execute Next Stage → ...

    Final Result

Catalyst выполняет структурные оптимизации: pushdown предикатов, column pruning, constant folding. AQE выполняет адаптивные оптимизации: те, которые зависят от реальных объёмов данных, невидимых до начала выполнения.

TIP

Когда AQE не поможет?

AQE оптимизирует план между query stages (shuffle-границами). Если ваш запрос не содержит shuffle (например, простой SELECT * FROM table WHERE id = 1), AQE не активируется — нет точек для re-optimization. Также AQE не может исправить фундаментальные проблемы: неправильную схему данных, отсутствие индексов или плохой data model.

Проверка знанийKnowledge check
Почему AQE включается между query stages, а не после каждого оператора?
ОтветAnswer
Query stages разделены shuffle-границами -- точками, где данные материализуются и перераспределяются по кластеру. Именно в этот момент Spark знает реальные размеры партиций и объёмы данных. Между операторами внутри одного stage данные передаются потоково (pipeline), без материализации, поэтому собрать точные статистики невозможно.
Проверка знанийKnowledge check
В чём ключевое отличие AQE от Catalyst CBO?
ОтветAnswer
Catalyst CBO использует статистики из каталога (метаданные таблиц), которые могут быть устаревшими или неточными, и принимает решения ДО выполнения запроса. AQE использует реальные runtime статистики, собранные ПОСЛЕ выполнения каждого stage, и может менять план на лету. AQE не заменяет CBO, а дополняет его -- корректирует ошибочные решения CBO на основе фактических данных.

Что дальше?

В следующих уроках мы детально разберём каждый столп AQE: автоматическое объединение shuffle-партиций (урок 02), оптимизацию skew join (урок 03), Dynamic Partition Pruning (урок 04) и runtime-адаптацию плана (урок 05). Каждый урок покажет конкретные before/after метрики с AQE.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какую ключевую проблему Catalyst решает AQE?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5