Learning Platform
Глоссарий Troubleshooting
Урок 09.05 · 12 мин
Продвинутый
Runtime FilterJoin Strategy ChangePlan AdaptationStatistics Collection

Runtime фильтры и адаптация плана

Динамическая смена стратегии join

Одна из самых впечатляющих возможностей AQE — автоматическая смена стратегии join на основе реальных размеров данных. Catalyst выбирает стратегию в compile-time, опираясь на метаданные каталога. Но после выполнения первого stage AQE знает точный размер данных.

# Сценарий: Catalyst выбрал SortMergeJoin
df_orders = spark.read.parquet("/data/orders/")       # metadata: 50 ГБ
df_returns = spark.read.parquet("/data/returns/")     # metadata: 2 ГБ

# Фильтр оставляет 0.1% данных
result = (
    df_orders
    .filter(col("date") == "2024-01-15")  # 50 ГБ → 50 МБ!
    .join(df_returns, "order_id")
)

Compile-time (Catalyst): Видит 50 ГБ + 2 ГБ → SortMergeJoin (обе таблицы больше broadcast threshold 10 МБ).

Runtime (AQE): После выполнения filter stage видит 50 МБ + 2 ГБ → меняет SortMergeJoin на BroadcastHashJoin (50 МБ < broadcast threshold).

Compile-time план:
  SortMergeJoin [order_id]
  :- Exchange hashpartitioning(order_id, 200)  # shuffle 50 ГБ
  :  +- Filter (date = '2024-01-15')
  +- Exchange hashpartitioning(order_id, 200)  # shuffle 2 ГБ

Runtime-адаптированный план (AQE):
  BroadcastHashJoin [order_id]                 # no shuffle для orders!
  :- Filter (date = '2024-01-15')              # 50 МБ -- broadcast
  +- FileScan parquet [returns]                # local scan, no shuffle

Экономия: вместо двух shuffle (50 ГБ + 2 ГБ = 52 ГБ по сети) — один broadcast (50 МБ по сети). Ускорение в 10-100 раз для этого stage.

Local Shuffle Reader

AQE включает оптимизацию local shuffle reader — когда после смены стратегии на broadcast данные уже находятся на нужных executor’ах, Spark читает shuffle-файлы локально вместо сетевого чтения:

Без local shuffle reader:
  Executor 1 → Network → Executor 3 (read shuffle output)

С local shuffle reader:
  Executor 1 → Local Disk → Executor 1 (read own shuffle output)

Параметр spark.sql.adaptive.localShuffleReader.enabled = true (по умолчанию) контролирует эту оптимизацию.

TIP

Как определить, что AQE сменил стратегию?

В Spark UI на вкладке SQL найдите ваш запрос. Если AQE адаптировал план, вы увидите AdaptiveSparkPlan isFinalPlan=true в корне плана. Внутри будут узлы BroadcastHashJoin там, где compile-time план показывал SortMergeJoin. Также в логах драйвера ищите сообщение "Adaptive plan updated".

Runtime Filter Pushdown

Помимо смены стратегии join, AQE выполняет runtime filter pushdown — продвижение фильтров, которые стали возможны только после сбора runtime статистик:

  1. Bloom filter pushdown: AQE может построить bloom filter из маленькой стороны join и передать его в scan большой стороны для раннего отсечения строк
  2. Min/max pushdown: Диапазон значений join-ключа на маленькой стороне используется для pruning файлов на большой стороне
  3. InSet pushdown: Если маленькая сторона содержит малое количество distinct values, AQE создаёт IN-list фильтр
# Пример: runtime filter от маленькой стороны
df_active_products = (
    spark.read.parquet("/data/products/")
    .filter(col("status") == "active")           # 100 из 10K продуктов
)

df_sales = spark.read.parquet("/data/sales/")    # 500M строк

result = df_sales.join(df_active_products, "product_id")
# AQE может передать bloom filter с 100 product_id в scan sales
# → пропустить файлы, не содержащие эти product_id

Сводная таблица всех AQE-оптимизаций

ОптимизацияЧто делаетКонфигурацияПо умолчанию
Coalesce PartitionsОбъединяет мелкие shuffle-партицииcoalescePartitions.enabledtrue
Skew JoinРазбивает skewed-партиции при joinskewJoin.enabledtrue
Join Strategy ChangeМеняет SortMerge → Broadcast runtimeАвтоматически с AQE
Local Shuffle ReaderЛокальное чтение shuffle-файловlocalShuffleReader.enabledtrue
DPP (Catalyst)Runtime filter из dimension в fact scandynamicPartitionPruning.enabledtrue
Advisory Partition SizeЦелевой размер партицииadvisoryPartitionSizeInBytes64MB
Skew DetectionFactor * median AND absolute thresholdskewJoin.skewedPartitionFactor5.0
Skew ThresholdМинимальный размер для skewskewJoin.skewedPartitionThresholdInBytes256MB

Когда AQE недостаточен

AQE — мощный инструмент, но он не решает все проблемы производительности:

1. Data layout problems: AQE не может исправить плохое партиционирование данных на диске. Если fact-таблица не партиционирована по join-ключу, AQE не добавит DPP. Решение: правильное партиционирование при записи.

2. Missing predicate pushdown: AQE работает между stages. Если вся работа в одном stage (нет shuffle), AQE не активируется. Для таких случаев полагайтесь на Catalyst predicate pushdown.

3. Кардинальность данных: AQE не может изменить количество данных — только способ их обработки. Если ваш join производит cartesian product (миллиарды строк), AQE не уменьшит результат.

4. UDF-блокировка: AQE не может “заглянуть” внутрь UDF. Если UDF скрывает фильтр, AQE не может использовать этот фильтр для оптимизации. Решение: используйте built-in функции вместо UDF (подробнее в модуле 06).

5. Memory-intensive операции: AQE не управляет memory pressure. Если groupBy с collect_list собирает миллионы элементов в один список — это OOM, и AQE не поможет.

# AQE не спасёт от этого:
result = df.groupBy("user_id").agg(
    collect_list("event").alias("all_events")  # OOM если > 1M событий на пользователя
)
# Решение: ограничьте агрегацию или используйте другой подход

Анти-паттерн: отключение AQE “для тестирования”

# ОПАСНО: отключили AQE для benchmark, забыли включить обратно
spark.conf.set("spark.sql.adaptive.enabled", "false")

# ... прошло 3 месяца, production pipeline работает в 5 раз медленнее ...

Правило: если нужно протестировать план без AQE, делайте это в отдельной сессии и явно документируйте:

# Безопасный подход: отдельная сессия для сравнения
spark_no_aqe = SparkSession.builder \
    .config("spark.sql.adaptive.enabled", "false") \
    .appName("benchmark-no-aqe") \
    .getOrCreate()

# Тест без AQE
result_no_aqe = spark_no_aqe.sql("SELECT ...")
result_no_aqe.explain()

# Production всегда с AQE
spark.conf.get("spark.sql.adaptive.enabled")  # true
Проверка знанийKnowledge check
Почему AQE не может оптимизировать запрос без shuffle (single-stage query)?
ОтветAnswer
AQE выполняет re-optimization между query stages, которые разделены shuffle-границами (Exchange операторами). Именно при shuffle данные материализуются, и Spark может собрать точные runtime статистики (размеры партиций, количество строк). Без shuffle нет точки материализации -- данные передаются потоково через pipeline, и AQE не может вставить re-optimization step. Для single-stage запросов оптимизация полностью зависит от Catalyst compile-time решений.
Проверка знанийKnowledge check
В каких случаях динамическая смена join стратегии (SortMerge → Broadcast) может ухудшить производительность?
ОтветAnswer
Если AQE неправильно оценил размер после filter (например, filter на dynamic column с нестабильной selectivity), и таблица на самом деле не помещается в broadcast threshold — broadcast может вызвать OOM на executor'ах. Также если маленькая таблица содержит heavy columns (BLOB, large arrays), её broadcast-копия на каждом executor может исчерпать память. На практике это редкость, так как AQE использует реальные размеры, не оценки — но edge cases возможны.

Итоги модуля

В этом модуле мы разобрали Adaptive Query Execution — runtime-оптимизатор, который дополняет Catalyst:

  • Coalesce partitions — объединение мелких shuffle-партиций (200 → оптимальное число)
  • Skew join — разбиение перекошенных партиций (1 straggler task → 500 параллельных)
  • Dynamic Partition Pruning — runtime-фильтр из dimension в fact scan (75-99% reduction)
  • Join strategy change — SortMerge → Broadcast при уменьшении данных после filter
  • Local shuffle reader — локальное чтение shuffle-файлов

AQE включён по умолчанию с Spark 3.2. В большинстве случаев оставьте настройки по умолчанию — они хорошо работают для типичных нагрузок. Настраивайте advisoryPartitionSizeInBytes и skewedPartitionFactor только при специфических паттернах данных.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. Catalyst выбрал SortMergeJoin для двух таблиц (метаданные: 50 ГБ + 2 ГБ). После фильтра первая таблица уменьшилась до 50 МБ. Что сделает AQE?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5