Learning Platform
Глоссарий
Troubleshooting

Troubleshooting Apache Spark Internals

Проблемы internals-уровня: OOM executor, spill, shuffle fetch failure, GC-паузы, StateStore, codegen — симптомы, первопричины и решения с детальным погружением во внутреннее устройство Spark.

Область

Категория

Показано 25 из 25 ошибок

Симптомы

  • Executor падает с OOM во время shuffle sort или hash aggregation
  • Spark UI → Stage → Task Metrics показывает огромный Shuffle Spill (Memory) без disk-записи
  • В логах executor: 'Unable to acquire X bytes of memory' от MemoryManager
  • Задачи в том же stage повторяются несколько раз, затем stage помечается как failed

Причина

UnifiedMemoryManager не может выделить execution memory для ExternalSorter или BytesToBytesMap, потому что граница execution/storage смещена в сторону storage кешированными данными или off-heap лимит исчерпан. Также причина — data skew: один task получает partition в 10-100x больше медианной, и его не спасает spill, так как spill сам требует execution memory для merge.

Решение

  1. Увеличьте spark.memory.fraction (с 0.6 до 0.7-0.75) чтобы дать больше heap UnifiedMemoryManager
  2. Уменьшите spark.memory.storageFraction — это позволит execution memory вытеснять больше кешированных блоков
  3. Включите AQE skew join: spark.sql.adaptive.skewJoin.enabled=true для автоматического дробления skewed partition-ов
  4. Увеличьте spark.executor.memory и пересчитайте число executor-ов для сохранения общей ёмкости кластера
  5. Проверьте распределение данных: df.groupBy(spark_partition_id()).count().orderBy(desc('count')).show()

Симптомы

  • Spark UI → Stage показывает Shuffle Spill (Disk) намного больше Shuffle Write
  • Время stage-а в 5-20x дольше аналогичного stage-а без spill
  • Локальный SSD executor-ов быстро заполняется временными spill-файлами
  • GC time в задачах высок из-за нехватки execution memory

Причина

ExternalSorter или ExternalAppendOnlyMap не могут удержать промежуточные данные в execution memory и периодически сбрасывают на диск. Типичные причины: spark.sql.shuffle.partitions слишком мало → крупные partition-ы; нехватка executor.memory; данные с высоким fan-out (explode/cross join без фильтров); groupByKey вместо reduceByKey (groupByKey не агрегирует до shuffle).

Решение

  1. Увеличьте spark.sql.shuffle.partitions (или включите AQE для автоматического определения нужного числа)
  2. Замените groupByKey + sum на reduceByKey — это снизит объём данных до shuffle в 10-100x
  3. Включите AQE coalesce: spark.sql.adaptive.coalescePartitions.enabled=true и advisoryPartitionSizeInBytes=128m
  4. Настройте spark.shuffle.spill.compress=true и spark.io.compression.codec=lz4 — уменьшит размер spill-файлов
  5. Увеличьте spark.executor.memory или spark.memory.fraction для предоставления ExternalSorter больше памяти

Симптомы

  • Reduce-стадия падает с FetchFailedException после завершения map-стадии
  • В логах executor: 'Connection refused to <host>:<port>' при чтении shuffle-блоков
  • DAGScheduler перезапускает всю map-стадию (resubmits parent ShuffleMapStage)
  • При dynamic allocation: executor, чьи shuffle-данные нужны, уже уничтожен

Причина

ShuffleBlockFetcherIterator не может получить блок от executor-источника: executor упал из-за OOM, dynamic allocation убил его до того, как reduce успел прочитать блоки, или сетевые таймауты. При отсутствии External Shuffle Service shuffle-данные хранятся в JVM executor-а и теряются при его смерти.

Решение

  1. Включите External Shuffle Service: spark.shuffle.service.enabled=true — shuffle-данные переживают executor lifecycle
  2. Для Kubernetes без ESS: spark.dynamicAllocation.shuffleTracking.enabled=true — Spark не убивает executor, пока его блоки нужны
  3. Рассмотрите Apache Celeborn или Uniffle как scalable remote shuffle service
  4. Увеличьте spark.shuffle.io.maxRetries (default 3) и spark.shuffle.io.retryWait (default 5s)
  5. Диагностируйте первопричину смерти executor-а: spark.network.timeout должен быть >> spark.executor.heartbeatInterval

Симптомы

  • Spark UI → Stage → Task Metrics: max task duration >> median duration
  • Все ядра простаивают, пока один straggler task ещё выполняется
  • Skewed task потребляет GBs shuffle read, остальные — MBs
  • AQE включён, но skew join не срабатывает — partition меньше порога

Причина

Неравномерное распределение данных по ключу: один или несколько ключей содержат непропорционально много записей (null-ключи, 'unknown', популярные ID). AQE skew detection требует, чтобы skewed partition превышал max(skewedPartitionFactor × median, skewedPartitionThresholdInBytes) — при низком медианном размере порог может не срабатывать.

Решение

  1. Снизьте skewedPartitionThresholdInBytes: spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=64m
  2. Снизьте skewedPartitionFactor: spark.sql.adaptive.skewJoin.skewedPartitionFactor=3 (вместо default 5)
  3. Для null-skew: явно фильтруйте или обрабатывайте null-ключи перед join
  4. Ручной salting: добавить случайный суффикс к ключу, затем агрегировать в два этапа
  5. Включите speculative execution для straggler-ов: spark.speculation=true (только для idempotent задач)

Симптомы

  • Spark UI → Stages → Task Metrics: GC Time аномально велик (30-70% от Task Time)
  • JVM Full GC паузы делают задачи медленными и провоцируют executor heartbeat timeout
  • Executor теряет heartbeat и помечается driver-ом как мёртвый после паузы GC > network.timeout
  • В GC-логах executor: частые CMS concurrent mode failure или G1 Evacuation Failure

Причина

Heap executor перегружен объектами. Причины: слишком много данных кешировано с StorageLevel.MEMORY_ONLY (Java-объекты, не UnsafeRow), старый GC (ParallelGC/CMS), Java-сериализация вместо Kryo, кастомные объекты в map-операциях, утечка ссылок в Accumulators или broadcast-переменных.

Решение

  1. Переключите на G1GC: spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35
  2. Используйте StorageLevel.MEMORY_AND_DISK_SER вместо MEMORY_ONLY — сериализованные данные занимают меньше heap
  3. Включите off-heap: spark.memory.offHeap.enabled=true, spark.memory.offHeap.size=4g — Tungsten данные выходят из GC
  4. Переключите на Kryo: spark.serializer=org.apache.spark.serializer.KryoSerializer
  5. Увеличьте spark.network.timeout до 600s чтобы пережить длинные GC-паузы без потери executor-а

Симптомы

  • Driver падает с OOM при попытке broadcast'ить таблицу
  • В логах driver: 'Not enough memory to broadcast the table' или heap OOM в SparkContext
  • SparkException: Cannot broadcast the table that is larger than 8GB
  • Таблица росла постепенно и внезапно превысила broadcast threshold

Причина

BroadcastExchangeExec собирает данные broadcast-таблицы на driver-е в виде Java-объектов перед нарезкой на TorrentBroadcast-блоки. Если broadcast-таблица неожиданно выросла (добавились данные, фильтр больше не убирает лишнее), driver не успевает ограничить потребление. Абсолютный лимит Spark 8GB на broadcast-объект.

Решение

  1. Отключите auto-broadcast для крупных таблиц: spark.sql.autoBroadcastJoinThreshold=-1 и добавьте явный hint SHUFFLE_HASH или MERGE
  2. Увеличьте spark.driver.memory (broadcast собирается на driver-е, не на executor-е)
  3. Включите AQE: с включённым AQE runtime-размер таблицы определяется точнее и auto-broadcast не применяется для переросших таблиц
  4. Аудит данных: проверьте, не выросла ли dimension-таблица — df.count() и df.explain() для проверки плана

Симптомы

  • EXPLAIN не показывает звёздочки (*) рядом с операторами
  • Задачи работают в 3-10x медленнее аналогичного запроса, где WSCG активен
  • В логах: 'Cannot use BinaryType for CodeGen' или 'Falling back to interpreted mode'
  • SQL-план содержит complex types (MapType, ArrayType) или custom UDF, разрывающие цепочку

Причина

Whole-Stage CodeGen разрывается при: (1) наличии Python UDF в цепочке (Python-граница), (2) сложных типах данных (MapType, StructType) в проекции, (3) превышении лимита числа операторов в одном CodeGenStage (spark.sql.codegen.maxFields=100), (4) ошибке компиляции сгенерированного кода (Janino limit: метод > 64KB bytecode).

Решение

  1. Замените Python UDF на встроенные функции pyspark.sql.functions или Pandas UDF — Python UDF разрывает CodeGen цепочку
  2. Увеличьте spark.sql.codegen.maxFields если запрос работает с широкими схемами (>100 колонок)
  3. Для диагностики: df.queryExecution.debug.codegen() — покажет сгенерированный код или причину fallback
  4. Разбейте цепочку — создайте промежуточный DataFrame.cache() чтобы начать новый CodeGenStage с чистого листа
  5. Включите spark.sql.codegen.fallback=true (default) чтобы не падать, а gracefully переходить в interpreted mode

Симптомы

  • spark.sql.adaptive.enabled=true, но после shuffle всё равно 200 мелких partition-ов
  • AQE coalesce partitions не срабатывает для конкретного запроса
  • Spark UI показывает 200 задач, большинство обрабатывают 0-10 строк

Причина

AQE coalesce не применяется если: (1) запрос использует только narrow transformations без shuffle, (2) spark.sql.adaptive.coalescePartitions.enabled=false (может быть переопределено на уровне сессии), (3) все partition-ы уже больше minPartitionSize, (4) в запросе есть final sort или repartition, отменяющий coalesce.

Решение

  1. Убедитесь что в запросе есть shuffle-операция — без ShuffleExchange AQE не получает runtime-статистики
  2. Проверьте конфиг в конкретной сессии: spark.conf.get('spark.sql.adaptive.coalescePartitions.enabled')
  3. Уменьшите advisoryPartitionSizeInBytes: spark.sql.adaptive.advisoryPartitionSizeInBytes=32m для маленьких данных
  4. Проверьте minPartitionSize: spark.sql.adaptive.coalescePartitions.minPartitionSize должен быть << targetSize
  5. Посмотрите EXPLAIN EXTENDED — если нет QueryStageExec узлов, AQE действительно не активен

Симптомы

  • EXPLAIN показывает SortMergeJoin для таблицы размером 20MB
  • Запрос генерирует shuffle обеих сторон join, хотя одна сторона маленькая
  • Catalyst statistics показывают неверный размер или 'None' для одной стороны
  • После включения AQE plan не обновляется в пользу BroadcastHashJoin

Причина

Catalyst не может выбрать BroadcastHashJoin без достоверной оценки размера таблицы. Статистика отсутствует для: subquery, view, отфильтрованного CTE, таблиц без ANALYZE TABLE. AQE может переключить на BroadcastHashJoin после shuffle, только если фактический размер данных < autoBroadcastJoinThreshold. Без AQE или при отключённом AQE — только до-shuffle оценка.

Решение

  1. Обновите статистики: ANALYZE TABLE my_table COMPUTE STATISTICS FOR ALL COLUMNS
  2. Добавьте явный hint: SELECT /*+ BROADCAST(small_table) */ ... или df.join(broadcast(small_df), key)
  3. Включите AQE: spark.sql.adaptive.enabled=true — AQE может динамически переключить на broadcast после первого shuffle
  4. Увеличьте порог: spark.sql.autoBroadcastJoinThreshold=100m (если таблица стабильно < этого размера)
  5. Проверьте queryExecution.analyzed.stats.sizeInBytes — покажет текущую оценку размера

Симптомы

  • EXPLAIN не показывает DynamicPruningExpression в Scan узле fact-таблицы
  • Scan-оператор читает все partition-ы fact-таблицы несмотря на фильтр по dimension
  • После включения DPP время запроса не улучшилось для star-schema join

Причина

DPP применяется только при: (1) join с broadcast-стороной (dimension должна быть broadcast), (2) fact-таблица партиционирована по join-ключу, (3) dimension содержит фильтр (WHERE). DPP не применяется если: dimension слишком большая для broadcast, fact-таблица не партиционирована, join-ключ не соответствует partition column.

Решение

  1. Убедитесь что dimension-таблица broadcast'ится: проверьте EXPLAIN на BroadcastHashJoin на dimension-стороне
  2. Fact-таблица должна быть партиционирована по join-ключу: PARTITIONED BY (category_id) в Hive/Iceberg
  3. Включите DPP явно: spark.sql.optimizer.dynamicPartitionPruning.enabled=true
  4. Для forced DPP: spark.sql.optimizer.dynamicPartitionPruning.useStats=false — использовать DPP даже без статистик
  5. Проверьте через EXPLAIN EXTENDED наличие 'dynamicpruning#N' в Scan узле

Симптомы

  • Streaming query стабильно работает N часов, затем executor падает с OOM
  • Spark UI → Structured Streaming → stateMemory растёт и не уменьшается
  • HDFSBackedStateStore держит весь state в memory-resident HashMap
  • Число ключей в state store неограниченно растёт (нет watermark или TTL)

Причина

HDFSBackedStateStoreProvider хранит всё состояние в памяти JVM executor-а. Без watermark state никогда не очищается: каждый уникальный ключ (user_id, session_id) остаётся в state навсегда. Для stateful-операций без watermark eviction состояние растёт линейно с числом уникальных ключей.

Решение

  1. Мигрируйте на RocksDB backend: spark.sql.streaming.stateStore.providerClass=...RocksDBStateStoreProvider — хранит на диске, а не в heap
  2. Добавьте watermark для автоматического eviction устаревших ключей: .withWatermark('event_time', '2 hours')
  3. Для transformWithState — используйте TTL: handle.getValueState(..., TTLConfig(Duration.ofHours(2)))
  4. Увеличьте RocksDB page cache: spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB=256
  5. Мониторинг: query.lastProgress['stateOperators'][0]['numRowsTotal'] — число ключей в state

Симптомы

  • Streaming query не перезапускается после изменения логики или схемы
  • Ошибка: 'Detected incompatible evolution in stateful operators' или AnalysisException
  • checkpoint/state/ содержит данные со старой схемой, несовместимые с новым планом
  • После добавления нового stateful оператора query падает на старте

Причина

IncrementalExecution хранит сериализованный физический план и схему state store в checkpoint. При изменении: (1) типа или числа state переменных, (2) схемы агрегации, (3) добавлении/удалении stateful операторов — Spark не может продолжить с существующим checkpoint.

Решение

  1. Удалите старый checkpoint и перезапустите с чистого состояния (потеря накопленного state — неизбежно при несовместимых изменениях)
  2. Планируйте миграцию: сохраните state в external store (Redis/Cassandra) перед изменением плана
  3. Используйте versioned checkpoint directories: checkpointLocation/v1/, checkpointLocation/v2/
  4. Для совместимых изменений (только добавление колонок в source-схему): enableChangeDataFeed может помочь в Delta источниках
  5. Проверьте совместимость заранее: запустите новый код с .queryName('test') на staging checkpoint

Симптомы

  • Streaming micro-batch duration внезапно возрастает в 3-5x (stall)
  • В метриках state store: rocksdbNumCompactionRuns растёт, rocksdbCompactionTimeMs высок
  • commitTimeMs в query.lastProgress аномально большое в момент compaction
  • Нагрузка на локальный диск executor-а (iowait) высока во время stall

Причина

RocksDB периодически запускает compaction LSM-дерева для объединения SST-файлов, что блокирует write path. При высокой частоте записей (много обновлений state) compaction не успевает за write rate — write stall. Также причина: малый write buffer → частые flush → много L0 SST-файлов → тяжёлый compaction.

Решение

  1. Увеличьте write buffer: spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB=32 — реже flush
  2. Увеличьте L0 trigger: spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber=4
  3. Отключите compactOnCommit: spark.sql.streaming.stateStore.rocksdb.compactOnCommit=false — background compaction
  4. Используйте NVMe SSD на executor-нодах для быстрого random I/O RocksDB
  5. Мониторинг: rocksdbWriteStallDurationMillis > 0 → признак write stall

Симптомы

  • Таймеры в transformWithState не срабатывают в ожидаемое время
  • State с TTL не очищается — устаревшие ключи остаются в store
  • handleExpiredTimers() не вызывается несмотря на прошедшее event time

Причина

Таймеры и TTL eviction в transformWithState работают в event-time режиме только при наличии watermark. Без withWatermark() event time не продвигается, и Spark не знает, что таймеры истекли. Processing-time таймеры зависят от реального времени, но при лаге обработки могут существенно запаздывать.

Решение

  1. Обязательно добавьте withWatermark() для event-time таймеров и TTL eviction
  2. Для processing-time таймеров: используйте TimeMode.ProcessingTime() — не требует watermark
  3. Проверьте что watermark движется: query.lastProgress['eventTime']['watermark'] должен увеличиваться
  4. Для TTL: убедитесь что TTLConfig указан при создании state: handle.getValueState[T](name, enc, TTLConfig(...))
  5. Диагностика: query.lastProgress['stateOperators'][0]['numRowsUpdated'] vs 'numRowsDroppedByWatermark'

Симптомы

  • Shuffle read занимает несправедливо долго даже при малом объёме данных
  • В логах executor: 'waiting for remote block fetch' паузы
  • Network utilization низкий несмотря на активную shuffle read фазу
  • Большое число мелких shuffle-блоков (много map task-ов, мало данных)

Причина

ShuffleBlockFetcherIterator ограничивает одновременный fetch объём данных параметром spark.reducer.maxSizeInFlight (default 48m). При большом числе мелких блоков каждый fetch batch мал, создаётся много sequential round-trips. Это особенно болезненно при высоком network RTT (cross-AZ, cross-region).

Решение

  1. Увеличьте spark.reducer.maxSizeInFlight до 96m-256m для параллельного fetch большего объёма
  2. Включите push-based shuffle: spark.shuffle.push.enabled=true — pre-merged блоки снижают число RPC
  3. Рассмотрите Celeborn с его крупными merge-файлами вместо тысяч мелких shuffle-блоков
  4. Увеличьте spark.reducer.maxReqsInFlight (default 16) для параллельных fetch-запросов
  5. Используйте Apache Uniffle или Celeborn для co-location map outputs в одном remote файле на partition

Симптомы

  • Driver помечает executor как мёртвый: 'Executor N lost: heartbeat timed out after 120 seconds'
  • Executor на самом деле жив — долгая GC пауза или CPU-bound задача заблокировала heartbeat поток
  • Task повторяется на другом executor, теряя прогресс уже почти завершённой задачи
  • В GC-логах executor: долгий Full GC совпадает по времени с потерей heartbeat

Причина

Heartbeat executor-а отправляется из отдельного потока каждые spark.executor.heartbeatInterval (default 10s). При STW-паузе GC или при полном насыщении CPU потоки заблокированы. Если пауза > spark.network.timeout (default 120s), driver считает executor мёртвым, хотя он просто занят.

Решение

  1. Увеличьте spark.network.timeout до 300-600s для тяжёлых computation-bound задач
  2. Переключите на G1GC с меньшими паузами: -XX:+UseG1GC -XX:MaxGCPauseMillis=200
  3. Включите ZGC (Java 17+): -XX:+UseZGC — sub-millisecond pauses для heap до 16TB
  4. Уменьшите heap pressure: разбейте крупные задачи на более мелкие partition-ы
  5. Мониторинг: spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDateStamps

Симптомы

  • Число executor-ов не уменьшается после завершения крупного stage
  • Idle executor-ы держатся часами, блокируя ресурсы для других приложений
  • spark.dynamicAllocation.enabled=true, но executors не освобождаются

Причина

Dynamic allocation не освобождает executor если: (1) в нём есть кешированные данные (cached RDD/DataFrame) — хранитель кеша не освобождается, (2) executor держит live shuffle-блоки (без ESS / shuffle tracking), (3) executorIdleTimeout слишком большой, (4) executor постоянно получает небольшие задачи и никогда не становится idle.

Решение

  1. Включите cachedExecutorIdleTimeout отдельно: spark.dynamicAllocation.cachedExecutorIdleTimeout=600s
  2. Явно unpersist() DataFrame-ы после использования: df.unpersist(blocking=True)
  3. Для shuffle-блоков: включите ESS или shuffle tracking — тогда shuffle executor не блокирует удаление
  4. Уменьшите executorIdleTimeout: spark.dynamicAllocation.executorIdleTimeout=60s
  5. Мониторинг: spark.dynamicAllocation.executorAllocationRatio и число active/idle executor-ов в Spark UI

Симптомы

  • Spark UI → Stage → Tasks: Locality Level = ANY для большинства задач
  • Задачи читают данные по сети вместо локального доступа
  • Performance хуже ожидаемого несмотря на данные на тех же нодах

Причина

Delay scheduling ждёт освобождения executor-а с нужной локальностью определённое время. Если в кластере высокая нагрузка и нужные executor-ы заняты, TaskScheduler ухудшает уровень локальности по истечению spark.locality.wait таймаута (default 3s). Для object storage (S3, GCS) данные вообще не локальны — всегда ANY.

Решение

  1. Для object storage: установите spark.locality.wait=0s — не ждать локальности, нет смысла
  2. Для HDFS с cached data: убедитесь что executor-ы co-located с DataNode-ами (YARN node labels)
  3. Увеличьте spark.locality.wait.process до 10s если PROCESS_LOCAL важен для cached DataFrames
  4. Проверьте preferred locations: rdd.preferredLocations(rdd.partitions(0)) — должно возвращать executor host
  5. В K8s: используйте node affinity чтобы executor-поды размещались на нодах с данными

Симптомы

  • Агрегация возвращает некорректные результаты или ArrayIndexOutOfBoundsException
  • Ошибка воспроизводится только при включённом whole-stage codegen
  • Выключение spark.sql.codegen.wholeStage=false исправляет ошибку
  • Проблема возникает при обработке wide schema (50+ полей) с null-значениями

Причина

Редкие баги в Catalyst-кодогенерации при edge cases: неправильный offset в null bitmap UnsafeRow для wide schema, переполнение 64KB bytecode лимита Janino генерирующее неполный метод, некорректное выравнивание variable-length данных при специфичных комбинациях типов.

Решение

  1. Первый шаг: отключите whole-stage codegen для изоляции: spark.sql.codegen.wholeStage=false
  2. Обновитесь до последнего Spark 4.0 патча — многие codegen-баги исправлены в minor releases
  3. Если баг подтверждён: зарегистрируйте JIRA с минимальным воспроизводящим примером
  4. Обходное решение: добавьте промежуточный cache() чтобы разбить проблемную CodeGenStage
  5. Диагностика: spark.sql.codegen.comments=true позволяет видеть комментарии в сгенерированном коде

Симптомы

  • FetchFailedException при чтении через Celeborn после падения worker-ноды
  • В Celeborn логах: 'Worker lost: heartbeat timeout' или 'Disk full'
  • Spark задание retry на уровне stage, затем падает после maxAttempts

Причина

Remote Shuffle Service worker упал с потерей своих данных. При single-replica конфигурации (по умолчанию в Celeborn) данные на упавшем worker-е теряются. Celeborn не реплицирует по умолчанию — это компромисс между производительностью и надёжностью.

Решение

  1. Включите репликацию в Celeborn: celeborn.client.push.replicate.enabled=true — каждый блок пишется на 2 worker-а
  2. Настройте автоматический failover в Celeborn: celeborn.worker.heartbeat.timeout=120s
  3. Увеличьте spark.shuffle.io.maxRetries=10 и spark.shuffle.io.retryWait=15s для retry при временных сбоях
  4. Используйте SSD на Celeborn worker-ах с мониторингом disk usage — disk full = immediate data loss
  5. Мониторинг: Celeborn Master UI → Workers → Failed Workers

Симптомы

  • EXPLAIN показывает Java-операторы вместо нативных несмотря на включённый Comet/Gluten
  • Ожидаемое 3-5x ускорение не достигается — большинство операторов выполняется в JVM
  • В логах: 'Falling back to Spark native for operator X: unsupported type/expression'

Причина

Нативные движки поддерживают только подмножество операторов Spark. Comet не поддерживает: window functions с complex frames, некоторые string functions, decimal с precision > 18. Gluten/Velox: ограниченная поддержка HiveUDF, некоторых join стратегий. Любой неподдерживаемый оператор в цепочке вызывает fallback всей смежной цепочки.

Решение

  1. Проверьте список unsupported операторов в документации используемого движка
  2. Включите verbose logging: spark.comet.explain.fallback.enabled=true — покажет причину каждого fallback
  3. Перепишите запрос для исключения неподдерживаемых конструкций (decimal → double, HiveUDF → built-in)
  4. Для partial acceleration: spark.comet.exec.all.enabled=false + явное включение нужных операторов
  5. Мониторинг: процент нативных vs JVM операторов в Spark UI (физический план)

Симптомы

  • Задание падает с StackOverflowError при построении DAG для очень длинных цепочек трансформаций
  • Итеративные алгоритмы (ML training, graph processing) с сотнями итераций
  • toDebugString() возвращает тысячи строк lineage

Причина

DAGScheduler использует рекурсивный обход lineage для поиска shuffle-зависимостей. При очень длинных lineage-цепочках (>1000 трансформаций без checkpoint) рекурсия достигает лимита стека JVM. Типично для итеративных алгоритмов Spark MLlib или вручную написанных циклов без checkpoint.

Решение

  1. Добавьте checkpoint в цикле каждые 10-20 итераций: rdd.checkpoint() или df.checkpoint()
  2. Настройте checkpoint directory: sc.setCheckpointDir('hdfs:///spark-checkpoints/')
  3. Для DataFrame: df.checkpoint() материализует данные и обрезает lineage
  4. Увеличьте стек JVM: spark.driver.extraJavaOptions=-Xss8m (default обычно 512k-1m)
  5. Используйте cache() в сочетании с checkpoint() — cache ускоряет материализацию перед checkpoint

Симптомы

  • Dataset[T] операции падают с ClassNotFoundException или ClassCastException на executor
  • Ошибка появляется при передаче custom case class через границы executor
  • Работает локально (master=local), не работает на кластере

Причина

ExpressionEncoder для Dataset[T] генерируется на driver-е и сериализуется для executor-ов. Если класс T не входит в user JAR или его нет в classpath executor-а, десериализация encoder-а падает. Также возможна проблема с несовпадением версий класса между driver и executor classpath.

Решение

  1. Убедитесь, что JAR с custom классами добавлен: spark-submit --jars my-classes.jar или spark.jars=...
  2. Проверьте совпадение версий: одна версия JAR на driver и executor (spark.jars включает оба)
  3. Для Scala: убедитесь что классы сериализуемы: case class автоматически, обычный class нужно extends Serializable
  4. Зарегистрируйте класс в Kryo если используете spark.serializer=KryoSerializer: spark.kryo.classesToRegister
  5. Диагностика: executor logs → 'ClassNotFoundException' → убедитесь что путь в CLASSPATH executor-а

Симптомы

  • Данные дублируются в Kafka/JDBC/custom sink при включённом spark.speculation=true
  • Два task-а для одной и той же partition завершились успешно перед отменой дубликата
  • Отмена speculative task приходит после того, как он уже записал данные во внешний store

Причина

Speculative execution запускает дублирующий task параллельно со straggler. Оба task-а могут успеть записать данные во внешний sink до того, как драйвер отменит проигравший. Для non-idempotent sink (Kafka produce без exactly-once, JDBC без upsert, counter increment) это приводит к дублям.

Решение

  1. Отключите speculation для non-idempotent задач: spark.speculation=false (глобально или per-stage через RDD.localCheckpoint + hint)
  2. Сделайте sink идемпотентным: Kafka + transactional producer, JDBC + UPSERT ON CONFLICT
  3. Используйте Structured Streaming вместо batch для exactly-once через checkpoint + idempotent sink
  4. Если speculation необходим: добавьте task attempt ID в ключ записи и дедуплицируйте downstream
  5. Мониторинг: Spark UI → Stage → Tasks → поле 'Speculative' у завершённых задач

Симптомы

  • toPandas() с Arrow падает с ArrowInvalid или ArrowNotImplementedError
  • pandas_udf бросает ValueError: 'Expected batch size X, got Y'
  • Ошибка возникает только для определённых типов данных (Decimal128, nested struct)

Причина

Arrow-сериализация Spark → Python не поддерживает все типы данных: Decimal с precision > 38, некоторые StructType вложения, LegacyDateType vs ArrowDateType несоответствие, MapType с non-string ключами. Также возможна проблема с несовпадением версий PyArrow на executor-ах.

Решение

  1. При падении: spark.sql.execution.arrow.pyspark.enabled=false — fallback на pickle (медленнее, но надёжнее)
  2. Приведите Decimal к double перед toPandas(): df.withColumn('col', col('col').cast('double')).toPandas()
  3. Убедитесь что версия PyArrow одинакова на driver и executor: pip show pyarrow
  4. Для pandas_udf: уменьшите spark.sql.execution.arrow.maxRecordsPerBatch до 1000 для работы с объёмными строками
  5. При LegacyDateType ошибке: spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED