Learning Platform
Глоссарий Troubleshooting
Урок 09.04 · 12 мин
Продвинутый
DPPDynamic Partition PruningBroadcast ExchangePartition Filter

Dynamic Partition Pruning (DPP)

Проблема: сканирование всей fact-таблицы

В star schema (звёздная схема) типичный запрос соединяет большую fact-таблицу с маленькими dimension-таблицами:

# fact-таблица: 500M строк, партиционирована по date_key
df_sales = spark.read.parquet("/data/sales/")

# dimension-таблица: 365 строк
df_dates = spark.read.parquet("/data/dim_dates/")

# Запрос: продажи за Q1 2024
result = (
    df_sales
    .join(df_dates, "date_key")
    .filter(col("quarter") == "Q1-2024")
    .groupBy("product_id")
    .agg(sum("amount"))
)

Без DPP: Spark сначала сканирует все 500M строк из sales, затем выполняет join с dim_dates, и только потом фильтрует по quarter = 'Q1-2024'. Фильтр на quarter применяется к dimension-таблице, но Catalyst не может “пробросить” его через join к fact-таблице, потому что quarter — колонка dimension, а не fact.

Без DPP:
  Scan sales: 500M rows (ALL partitions)  →  Join  →  Filter quarter=Q1  →  Result
  Scan dim_dates: 365 rows               ↗

  Сканировано: 500M строк
  Реально нужно: ~125M строк (Q1 = 25% данных)

Что такое DPP?

Dynamic Partition Pruning решает эту проблему, внедряя runtime-фильтр из dimension-таблицы в scan fact-таблицы. DPP работает так:

  1. Spark сначала выполняет scan + filter dimension-таблицы (365 строк → 90 строк за Q1)
  2. Из результата извлекает список date_key значений (90 ключей)
  3. Внедряет этот список как фильтр в scan fact-таблицы
  4. Fact-таблица сканируется только по партициям, содержащим эти 90 date_key
С DPP:
  Scan dim_dates: 365 rows → Filter quarter=Q1 → 90 date_keys

                                              DPP Filter (runtime)

  Scan sales: ONLY 90 partitions (125M rows)  →  Join  →  Result

  Сканировано: 125M строк вместо 500M  →  75% reduction!

В explain() DPP отображается как DynamicPruningExpression:

== Physical Plan ==
*(3) HashAggregate(keys=[product_id], functions=[sum(amount)])
+- *(3) BroadcastHashJoin [date_key], [date_key]
   :- *(1) FileScan parquet [sales]
   :     PartitionFilters: [date_key IN (dynamicpruning#123)]   # <-- DPP!
   :     DynamicPruningExpression: date_key IN (subquery#456)
   +- BroadcastExchange
      +- *(2) Filter (quarter = Q1-2024)
         +- *(2) FileScan parquet [dim_dates]

Когда DPP работает

DPP активируется при выполнении всех условий:

  1. Fact-таблица партиционирована по join-ключу (или колонке, связанной с join-ключом)
  2. Join использует BroadcastExchange — dimension-таблица достаточно мала для broadcast
  3. Фильтр на dimension сужает количество ключей
  4. spark.sql.optimizer.dynamicPartitionPruning.enabled = true (по умолчанию)
# DPP работает: fact партиционирована по date_key, dimension маленькая
df_sales = spark.read.parquet("/data/sales/")  # partitioned by date_key
result = df_sales.join(df_dates, "date_key").filter(col("quarter") == "Q1")

# DPP НЕ работает: fact НЕ партиционирована
df_logs = spark.read.parquet("/data/logs_flat/")  # no partitioning
result = df_logs.join(df_dates, "date_key").filter(col("quarter") == "Q1")
# Spark всё равно сканирует все файлы -- DPP бесполезен без партиционирования
WARNING

DPP требует партиционирования! Если ваша fact-таблица не партиционирована по join-ключу, DPP не поможет. Это самая частая причина, по которой разработчики ожидают DPP, но не получают его. Проверьте: df.rdd.getNumPartitions() покажет число партиций, но для DPP важно именно file-level partitioning (как в Parquet/Hive partitioned tables).

Before/After: сокращение scan

Сценарий: star schema с fact-таблицей sales (500M строк, партиционирована по date_key, 365 партиций) и dimension-таблицей dim_dates (365 строк).

МетрикаБез DPPС DPP
Партиций просканировано36590 (Q1)
Строк прочитано500M125M
Bytes scanned200 ГБ50 ГБ
Scan time120 сек30 сек
Ускорение scan4x

Для более селективных фильтров (один месяц, один день) экономия ещё больше:

ФильтрПартицийScan reduction
quarter = 'Q1'90 из 36575%
month = 'January'31 из 36592%
date_key = '2024-01-15'1 из 36599.7%

Конфигурация DPP

ПараметрЗначениеНазначение
spark.sql.optimizer.dynamicPartitionPruning.enabledtrueГлавный переключатель DPP
spark.sql.optimizer.dynamicPartitionPruning.useStatstrueИспользовать статистики для оценки эффективности DPP
spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio0.5Если > 50% партиций остаётся — DPP может быть невыгоден
spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnlytrueИспользовать DPP только если broadcast уже есть (без дополнительного shuffle)
# Проверка настроек DPP
spark.conf.get("spark.sql.optimizer.dynamicPartitionPruning.enabled")  # true

# DPP включён по умолчанию -- обычно настройка не требуется

Анти-паттерн: ожидание DPP на непартиционированных таблицах

# Анти-паттерн: таблица не партиционирована
df_events = spark.read.parquet("/data/events/")  # flat directory, no partitioning
df_users = spark.read.parquet("/data/dim_users/")

result = df_events.join(df_users, "user_id").filter(col("country") == "RU")
# DPP НЕ может пропустить файлы -- нет partition metadata для pruning

# Решение: перепартиционируйте fact-таблицу
df_events.write.partitionBy("user_country").parquet("/data/events_partitioned/")
# Теперь DPP сможет пропускать партиции по country

Правило: DPP — это data layout optimization. Если ваши данные не партиционированы по join-ключу, DPP физически не может пропустить файлы. Планируйте партиционирование при записи данных, а не при чтении.

DPP и AQE: взаимодействие

DPP — это compile-time оптимизация (часть Catalyst optimizer), а не runtime AQE. Но AQE усиливает DPP:

  • AQE может менять join-стратегию на BroadcastHashJoin во время выполнения, включая DPP, который не был доступен в compile-time плане
  • AQE собирает точные runtime статистики, которые DPP использует для оценки selectivity
  • Результат: DPP + AQE вместе дают больше, чем каждый по отдельности
Проверка знанийKnowledge check
Почему DPP требует BroadcastExchange для dimension-таблицы?
ОтветAnswer
DPP внедряет фильтр из dimension в scan fact-таблицы. Чтобы узнать значения join-ключей ДО сканирования fact-таблицы, Spark должен сначала полностью выполнить dimension-сторону. С BroadcastExchange dimension-таблица уже материализована на driver'е, и список ключей доступен без дополнительного этапа. Без broadcast Spark'у пришлось бы выполнить дополнительный shuffle/collect для получения ключей, что может быть дороже, чем полный scan fact-таблицы.
Проверка знанийKnowledge check
Может ли DPP работать без партиционирования fact-таблицы, если данные хранятся в Delta Lake с Z-ordering?
ОтветAnswer
DPP в классическом виде требует file-level partitioning (Hive-style партиции). Z-ordering и data skipping — это другой механизм (min/max statistics на уровне файлов), который также может пропускать файлы, но это не DPP. Delta Lake может использовать оба подхода: DPP для партиционированных колонок и data skipping для Z-ordered колонок. Они дополняют друг друга, но работают через разные механизмы.

Что дальше?

В финальном уроке этого модуля мы разберём runtime фильтры и адаптацию плана — как AQE меняет стратегию join на лету, оптимизирует local shuffle reader и когда AQE всё-таки недостаточен.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Что делает Dynamic Partition Pruning (DPP)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5