Runtime фильтры и адаптация плана
Динамическая смена стратегии join
Одна из самых впечатляющих возможностей AQE — автоматическая смена стратегии join на основе реальных размеров данных. Catalyst выбирает стратегию в compile-time, опираясь на метаданные каталога. Но после выполнения первого stage AQE знает точный размер данных.
# Сценарий: Catalyst выбрал SortMergeJoin
df_orders = spark.read.parquet("/data/orders/") # metadata: 50 ГБ
df_returns = spark.read.parquet("/data/returns/") # metadata: 2 ГБ
# Фильтр оставляет 0.1% данных
result = (
df_orders
.filter(col("date") == "2024-01-15") # 50 ГБ → 50 МБ!
.join(df_returns, "order_id")
)
Compile-time (Catalyst): Видит 50 ГБ + 2 ГБ → SortMergeJoin (обе таблицы больше broadcast threshold 10 МБ).
Runtime (AQE): После выполнения filter stage видит 50 МБ + 2 ГБ → меняет SortMergeJoin на BroadcastHashJoin (50 МБ < broadcast threshold).
Compile-time план:
SortMergeJoin [order_id]
:- Exchange hashpartitioning(order_id, 200) # shuffle 50 ГБ
: +- Filter (date = '2024-01-15')
+- Exchange hashpartitioning(order_id, 200) # shuffle 2 ГБ
Runtime-адаптированный план (AQE):
BroadcastHashJoin [order_id] # no shuffle для orders!
:- Filter (date = '2024-01-15') # 50 МБ -- broadcast
+- FileScan parquet [returns] # local scan, no shuffle
Экономия: вместо двух shuffle (50 ГБ + 2 ГБ = 52 ГБ по сети) — один broadcast (50 МБ по сети). Ускорение в 10-100 раз для этого stage.
Local Shuffle Reader
AQE включает оптимизацию local shuffle reader — когда после смены стратегии на broadcast данные уже находятся на нужных executor’ах, Spark читает shuffle-файлы локально вместо сетевого чтения:
Без local shuffle reader:
Executor 1 → Network → Executor 3 (read shuffle output)
С local shuffle reader:
Executor 1 → Local Disk → Executor 1 (read own shuffle output)
Параметр spark.sql.adaptive.localShuffleReader.enabled = true (по умолчанию) контролирует эту оптимизацию.
Как определить, что AQE сменил стратегию?
В Spark UI на вкладке SQL найдите ваш запрос. Если AQE адаптировал план, вы увидите AdaptiveSparkPlan isFinalPlan=true в корне плана. Внутри будут узлы BroadcastHashJoin там, где compile-time план показывал SortMergeJoin. Также в логах драйвера ищите сообщение "Adaptive plan updated".
Runtime Filter Pushdown
Помимо смены стратегии join, AQE выполняет runtime filter pushdown — продвижение фильтров, которые стали возможны только после сбора runtime статистик:
- Bloom filter pushdown: AQE может построить bloom filter из маленькой стороны join и передать его в scan большой стороны для раннего отсечения строк
- Min/max pushdown: Диапазон значений join-ключа на маленькой стороне используется для pruning файлов на большой стороне
- InSet pushdown: Если маленькая сторона содержит малое количество distinct values, AQE создаёт IN-list фильтр
# Пример: runtime filter от маленькой стороны
df_active_products = (
spark.read.parquet("/data/products/")
.filter(col("status") == "active") # 100 из 10K продуктов
)
df_sales = spark.read.parquet("/data/sales/") # 500M строк
result = df_sales.join(df_active_products, "product_id")
# AQE может передать bloom filter с 100 product_id в scan sales
# → пропустить файлы, не содержащие эти product_id
Сводная таблица всех AQE-оптимизаций
| Оптимизация | Что делает | Конфигурация | По умолчанию |
|---|---|---|---|
| Coalesce Partitions | Объединяет мелкие shuffle-партиции | coalescePartitions.enabled | true |
| Skew Join | Разбивает skewed-партиции при join | skewJoin.enabled | true |
| Join Strategy Change | Меняет SortMerge → Broadcast runtime | Автоматически с AQE | — |
| Local Shuffle Reader | Локальное чтение shuffle-файлов | localShuffleReader.enabled | true |
| DPP (Catalyst) | Runtime filter из dimension в fact scan | dynamicPartitionPruning.enabled | true |
| Advisory Partition Size | Целевой размер партиции | advisoryPartitionSizeInBytes | 64MB |
| Skew Detection | Factor * median AND absolute threshold | skewJoin.skewedPartitionFactor | 5.0 |
| Skew Threshold | Минимальный размер для skew | skewJoin.skewedPartitionThresholdInBytes | 256MB |
Когда AQE недостаточен
AQE — мощный инструмент, но он не решает все проблемы производительности:
1. Data layout problems: AQE не может исправить плохое партиционирование данных на диске. Если fact-таблица не партиционирована по join-ключу, AQE не добавит DPP. Решение: правильное партиционирование при записи.
2. Missing predicate pushdown: AQE работает между stages. Если вся работа в одном stage (нет shuffle), AQE не активируется. Для таких случаев полагайтесь на Catalyst predicate pushdown.
3. Кардинальность данных: AQE не может изменить количество данных — только способ их обработки. Если ваш join производит cartesian product (миллиарды строк), AQE не уменьшит результат.
4. UDF-блокировка: AQE не может “заглянуть” внутрь UDF. Если UDF скрывает фильтр, AQE не может использовать этот фильтр для оптимизации. Решение: используйте built-in функции вместо UDF (подробнее в модуле 06).
5. Memory-intensive операции:
AQE не управляет memory pressure. Если groupBy с collect_list собирает миллионы элементов в один список — это OOM, и AQE не поможет.
# AQE не спасёт от этого:
result = df.groupBy("user_id").agg(
collect_list("event").alias("all_events") # OOM если > 1M событий на пользователя
)
# Решение: ограничьте агрегацию или используйте другой подход
Анти-паттерн: отключение AQE “для тестирования”
# ОПАСНО: отключили AQE для benchmark, забыли включить обратно
spark.conf.set("spark.sql.adaptive.enabled", "false")
# ... прошло 3 месяца, production pipeline работает в 5 раз медленнее ...
Правило: если нужно протестировать план без AQE, делайте это в отдельной сессии и явно документируйте:
# Безопасный подход: отдельная сессия для сравнения
spark_no_aqe = SparkSession.builder \
.config("spark.sql.adaptive.enabled", "false") \
.appName("benchmark-no-aqe") \
.getOrCreate()
# Тест без AQE
result_no_aqe = spark_no_aqe.sql("SELECT ...")
result_no_aqe.explain()
# Production всегда с AQE
spark.conf.get("spark.sql.adaptive.enabled") # true
Итоги модуля
В этом модуле мы разобрали Adaptive Query Execution — runtime-оптимизатор, который дополняет Catalyst:
- Coalesce partitions — объединение мелких shuffle-партиций (200 → оптимальное число)
- Skew join — разбиение перекошенных партиций (1 straggler task → 500 параллельных)
- Dynamic Partition Pruning — runtime-фильтр из dimension в fact scan (75-99% reduction)
- Join strategy change — SortMerge → Broadcast при уменьшении данных после filter
- Local shuffle reader — локальное чтение shuffle-файлов
AQE включён по умолчанию с Spark 3.2. В большинстве случаев оставьте настройки по умолчанию — они хорошо работают для типичных нагрузок. Настраивайте advisoryPartitionSizeInBytes и skewedPartitionFactor только при специфических паттернах данных.