Проблемы internals-уровня: OOM executor, spill, shuffle fetch failure, GC-паузы, StateStore, codegen — симптомы, первопричины и решения с детальным погружением во внутреннее устройство Spark.
UnifiedMemoryManager не может выделить execution memory для ExternalSorter или BytesToBytesMap, потому что граница execution/storage смещена в сторону storage кешированными данными или off-heap лимит исчерпан. Также причина — data skew: один task получает partition в 10-100x больше медианной, и его не спасает spill, так как spill сам требует execution memory для merge.
ExternalSorter или ExternalAppendOnlyMap не могут удержать промежуточные данные в execution memory и периодически сбрасывают на диск. Типичные причины: spark.sql.shuffle.partitions слишком мало → крупные partition-ы; нехватка executor.memory; данные с высоким fan-out (explode/cross join без фильтров); groupByKey вместо reduceByKey (groupByKey не агрегирует до shuffle).
ShuffleBlockFetcherIterator не может получить блок от executor-источника: executor упал из-за OOM, dynamic allocation убил его до того, как reduce успел прочитать блоки, или сетевые таймауты. При отсутствии External Shuffle Service shuffle-данные хранятся в JVM executor-а и теряются при его смерти.
Неравномерное распределение данных по ключу: один или несколько ключей содержат непропорционально много записей (null-ключи, 'unknown', популярные ID). AQE skew detection требует, чтобы skewed partition превышал max(skewedPartitionFactor × median, skewedPartitionThresholdInBytes) — при низком медианном размере порог может не срабатывать.
Heap executor перегружен объектами. Причины: слишком много данных кешировано с StorageLevel.MEMORY_ONLY (Java-объекты, не UnsafeRow), старый GC (ParallelGC/CMS), Java-сериализация вместо Kryo, кастомные объекты в map-операциях, утечка ссылок в Accumulators или broadcast-переменных.
BroadcastExchangeExec собирает данные broadcast-таблицы на driver-е в виде Java-объектов перед нарезкой на TorrentBroadcast-блоки. Если broadcast-таблица неожиданно выросла (добавились данные, фильтр больше не убирает лишнее), driver не успевает ограничить потребление. Абсолютный лимит Spark 8GB на broadcast-объект.
Whole-Stage CodeGen разрывается при: (1) наличии Python UDF в цепочке (Python-граница), (2) сложных типах данных (MapType, StructType) в проекции, (3) превышении лимита числа операторов в одном CodeGenStage (spark.sql.codegen.maxFields=100), (4) ошибке компиляции сгенерированного кода (Janino limit: метод > 64KB bytecode).
AQE coalesce не применяется если: (1) запрос использует только narrow transformations без shuffle, (2) spark.sql.adaptive.coalescePartitions.enabled=false (может быть переопределено на уровне сессии), (3) все partition-ы уже больше minPartitionSize, (4) в запросе есть final sort или repartition, отменяющий coalesce.
Catalyst не может выбрать BroadcastHashJoin без достоверной оценки размера таблицы. Статистика отсутствует для: subquery, view, отфильтрованного CTE, таблиц без ANALYZE TABLE. AQE может переключить на BroadcastHashJoin после shuffle, только если фактический размер данных < autoBroadcastJoinThreshold. Без AQE или при отключённом AQE — только до-shuffle оценка.
DPP применяется только при: (1) join с broadcast-стороной (dimension должна быть broadcast), (2) fact-таблица партиционирована по join-ключу, (3) dimension содержит фильтр (WHERE). DPP не применяется если: dimension слишком большая для broadcast, fact-таблица не партиционирована, join-ключ не соответствует partition column.
HDFSBackedStateStoreProvider хранит всё состояние в памяти JVM executor-а. Без watermark state никогда не очищается: каждый уникальный ключ (user_id, session_id) остаётся в state навсегда. Для stateful-операций без watermark eviction состояние растёт линейно с числом уникальных ключей.
IncrementalExecution хранит сериализованный физический план и схему state store в checkpoint. При изменении: (1) типа или числа state переменных, (2) схемы агрегации, (3) добавлении/удалении stateful операторов — Spark не может продолжить с существующим checkpoint.
RocksDB периодически запускает compaction LSM-дерева для объединения SST-файлов, что блокирует write path. При высокой частоте записей (много обновлений state) compaction не успевает за write rate — write stall. Также причина: малый write buffer → частые flush → много L0 SST-файлов → тяжёлый compaction.
Таймеры и TTL eviction в transformWithState работают в event-time режиме только при наличии watermark. Без withWatermark() event time не продвигается, и Spark не знает, что таймеры истекли. Processing-time таймеры зависят от реального времени, но при лаге обработки могут существенно запаздывать.
ShuffleBlockFetcherIterator ограничивает одновременный fetch объём данных параметром spark.reducer.maxSizeInFlight (default 48m). При большом числе мелких блоков каждый fetch batch мал, создаётся много sequential round-trips. Это особенно болезненно при высоком network RTT (cross-AZ, cross-region).
Heartbeat executor-а отправляется из отдельного потока каждые spark.executor.heartbeatInterval (default 10s). При STW-паузе GC или при полном насыщении CPU потоки заблокированы. Если пауза > spark.network.timeout (default 120s), driver считает executor мёртвым, хотя он просто занят.
Dynamic allocation не освобождает executor если: (1) в нём есть кешированные данные (cached RDD/DataFrame) — хранитель кеша не освобождается, (2) executor держит live shuffle-блоки (без ESS / shuffle tracking), (3) executorIdleTimeout слишком большой, (4) executor постоянно получает небольшие задачи и никогда не становится idle.
Delay scheduling ждёт освобождения executor-а с нужной локальностью определённое время. Если в кластере высокая нагрузка и нужные executor-ы заняты, TaskScheduler ухудшает уровень локальности по истечению spark.locality.wait таймаута (default 3s). Для object storage (S3, GCS) данные вообще не локальны — всегда ANY.
Редкие баги в Catalyst-кодогенерации при edge cases: неправильный offset в null bitmap UnsafeRow для wide schema, переполнение 64KB bytecode лимита Janino генерирующее неполный метод, некорректное выравнивание variable-length данных при специфичных комбинациях типов.
Remote Shuffle Service worker упал с потерей своих данных. При single-replica конфигурации (по умолчанию в Celeborn) данные на упавшем worker-е теряются. Celeborn не реплицирует по умолчанию — это компромисс между производительностью и надёжностью.
Нативные движки поддерживают только подмножество операторов Spark. Comet не поддерживает: window functions с complex frames, некоторые string functions, decimal с precision > 18. Gluten/Velox: ограниченная поддержка HiveUDF, некоторых join стратегий. Любой неподдерживаемый оператор в цепочке вызывает fallback всей смежной цепочки.
DAGScheduler использует рекурсивный обход lineage для поиска shuffle-зависимостей. При очень длинных lineage-цепочках (>1000 трансформаций без checkpoint) рекурсия достигает лимита стека JVM. Типично для итеративных алгоритмов Spark MLlib или вручную написанных циклов без checkpoint.
ExpressionEncoder для Dataset[T] генерируется на driver-е и сериализуется для executor-ов. Если класс T не входит в user JAR или его нет в classpath executor-а, десериализация encoder-а падает. Также возможна проблема с несовпадением версий класса между driver и executor classpath.
Speculative execution запускает дублирующий task параллельно со straggler. Оба task-а могут успеть записать данные во внешний sink до того, как драйвер отменит проигравший. Для non-idempotent sink (Kafka produce без exactly-once, JDBC без upsert, counter increment) это приводит к дублям.
Arrow-сериализация Spark → Python не поддерживает все типы данных: Decimal с precision > 38, некоторые StructType вложения, LegacyDateType vs ArrowDateType несоответствие, MapType с non-string ключами. Также возможна проблема с несовпадением версий PyArrow на executor-ах.