Частые ошибки и проблемы при работе с Apache Spark — симптомы, причины и пошаговые решения.
Executor получает партицию данных, которая не помещается в JVM heap. Часто возникает при data skew (одна партиция значительно больше остальных), неправильной конфигурации spark.executor.memory, или при широких трансформациях (explode, crossJoin).
JVM тратит более 98% времени на сборку мусора, восстанавливая менее 2% памяти. Обычно вызвано кэшированием слишком большого объёма данных в memory, утечкой ссылок в UDF, или неоптимальной сериализацией (Java вместо Kryo).
Executor, записавший shuffle-данные, упал или стал недоступен до того, как другие executor прочитали его shuffle-файлы. Частые причины: OOM на executor-источнике, сетевые таймауты, нехватка дисковой квоты для shuffle-файлов.
Spark сериализует closure (лямбду) целиком для передачи на executor. Если closure захватывает ссылку на несериализуемый объект (соединение с БД, SparkContext, логгер), сериализация падает. Часто возникает при обращении к полям класса внутри трансформации.
Неравномерное распределение данных по ключу группировки или JOIN. Один ключ (например, NULL, '', популярный ID) содержит непропорционально много записей, и одна задача обрабатывает большую часть данных.
Имя колонки в выражении не совпадает с реальными именами в схеме DataFrame. Причины: case sensitivity (spark.sql.caseSensitive=true), пробелы в именах колонок из CSV/JSON, потеря квалификатора после JOIN, переименование при alias().
column nameОперации collect(), toPandas(), show() с большим limit переносят все данные на драйвер. Если результат превышает spark.driver.maxResultSize (по умолчанию 1g) или доступную память драйвера, задание падает.
Значение spark.sql.shuffle.partitions по умолчанию (200) рассчитано на средние нагрузки. Для малых данных это создаёт избыточные пустые задачи, а для больших — слишком крупные партиции. Параметр применяется ко всем shuffle-операциям в сессии.
Checkpoint содержит сериализованное состояние запроса, включая схему, оффсеты и operator state. При изменении схемы данных, добавлении/удалении агрегаций или смене источника существующий чекпоинт становится несовместимым.
Delta Lake использует оптимистичную блокировку через _delta_log. Две транзакции пытаются записать файлы, которые конфликтуют (перекрываются по партиции). Конфликт детектируется при коммите второй транзакции.
Стандартные Python UDF требуют сериализации каждой строки из JVM в Python (через pickle) и обратно. Это создаёт O(n) overhead на сериализацию/десериализацию и исключает оптимизации Catalyst и Tungsten.
Spark пытается broadcast'ить таблицу, размер которой превышает spark.sql.autoBroadcastJoinThreshold (по умолчанию 10MB) или абсолютный лимит 8GB. Если таблица неожиданно выросла, автоматический broadcast перестаёт работать.
Кластер не может выделить запрошенные ресурсы. Причины: запрошено больше памяти/ядер, чем доступно; другие приложения заняли все ресурсы; неправильная конфигурация YARN-очереди; максимальная аллокация контейнера меньше, чем запрошенные ресурсы executor.
Kubernetes OOM Killer завершает контейнер, если его RSS превышает limits.memory. Spark off-heap память (Tungsten, overhead) не учитывается в spark.executor.memory, но потребляет реальную RAM контейнера. Также возможно eviction из-за ephemeral-storage shuffle-файлами.
Kryo требует явной регистрации классов для эффективной сериализации. Если spark.kryo.registrationRequired=true, незарегистрированный класс вызывает ошибку. Также возможна проблема с classpath — класс отсутствует на executor.
Catalyst оценивает размер таблицы по статистике каталога. Если статистика устарела или отсутствует (например, для view, subquery, или фильтрованной таблицы), оптимизатор не может определить реальный размер и выбирает безопасный SortMergeJoin.
Каждый файл создаёт минимум одну задачу. Тысячи мелких файлов (< 1MB) создают overhead на планирование, открытие файлов и сетевые запросы к хранилищу. Типичная причина — частые append-операции без compaction.
Watermark определяет порог, после которого поздние данные отбрасываются. Если данные приходят с задержкой, превышающей watermark delay, они не попадают в результат. Слишком агрессивный watermark отбрасывает валидные данные; слишком мягкий — увеличивает state и задержку.
Parquet поддерживает ограниченную schema evolution: добавление новых колонок и расширение типов (int→long). Но смена типа колонки (int→string) несовместима — старые и новые файлы не могут быть прочитаны одной схемой.
SparkContext — синглтон внутри JVM. При параллельном запуске тестов или неправильном lifecycle (создание нового контекста без остановки предыдущего) возникают конфликты. Pytest с xdist запускает тесты в одной JVM по умолчанию.
Данные не соответствуют определённым expectations. Типичные причины: изменение формата данных в источнике, NULL-значения в обязательных полях, нарушение уникальности, выход числовых значений за допустимые границы.
Чекпоинт хранит оффсеты, которые уже удалены из Kafka (retention policy). При перезапуске Spark пытается прочитать с несуществующего оффсета. Без failOnDataLoss=false запрос падает.