Learning Platform
Глоссарий
Troubleshooting

Решение проблем Apache Spark

Частые ошибки и проблемы при работе с Apache Spark — симптомы, причины и пошаговые решения.

Область

Категория

Показано 22 из 22 ошибок

Симптомы

  • Executor падает с OOM при обработке больших партиций
  • Задачи shuffle-стадии завершаются с ошибкой памяти
  • В Spark UI видно, что пиковое потребление памяти одного executor превышает лимит

Причина

Executor получает партицию данных, которая не помещается в JVM heap. Часто возникает при data skew (одна партиция значительно больше остальных), неправильной конфигурации spark.executor.memory, или при широких трансформациях (explode, crossJoin).

Решение

  1. Увеличьте spark.executor.memory и spark.executor.memoryOverhead
  2. Проверьте распределение данных: df.groupBy(spark_partition_id()).count().show()
  3. Включите AQE: spark.sql.adaptive.enabled=true для автоматического перераспределения
  4. Используйте repartition() для более равномерного распределения данных по партициям

Связанные уроки:

Симптомы

  • Задача работает крайне медленно, затем падает с GC overhead
  • В логах executor видно частые Full GC циклы
  • Spark UI показывает GC Time > 50% от Task Time

Причина

JVM тратит более 98% времени на сборку мусора, восстанавливая менее 2% памяти. Обычно вызвано кэшированием слишком большого объёма данных в memory, утечкой ссылок в UDF, или неоптимальной сериализацией (Java вместо Kryo).

Решение

  1. Замените .cache() на .persist(StorageLevel.MEMORY_AND_DISK) для автоматического сброса на диск
  2. Переключите сериализацию на Kryo: spark.serializer=org.apache.spark.serializer.KryoSerializer
  3. Настройте G1GC: -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35
  4. Мониторьте GC через Prometheus: spark.metrics.conf.*.sink.prometheusServlet.class

Связанные уроки:

Симптомы

  • Задачи reduce-стадии падают с FetchFailedException
  • В логах — 'Connection refused' или 'Connection reset' при чтении shuffle-данных
  • Задача перезапускается несколько раз, затем стадия отмечается как failed

Причина

Executor, записавший shuffle-данные, упал или стал недоступен до того, как другие executor прочитали его shuffle-файлы. Частые причины: OOM на executor-источнике, сетевые таймауты, нехватка дисковой квоты для shuffle-файлов.

Решение

  1. Увеличьте spark.shuffle.io.maxRetries и spark.shuffle.io.retryWait
  2. Включите External Shuffle Service: spark.shuffle.service.enabled=true
  3. Рассмотрите Apache Celeborn для push-based shuffle в больших кластерах
  4. Проверьте, не падают ли executor-источники из-за OOM (см. предыдущую ошибку)

Связанные уроки:

Симптомы

  • Задача не запускается — ошибка сериализации при отправке closure на executor
  • В стектрейсе — NotSerializableException с указанием класса, захваченного в closure
  • Ошибка появляется при использовании внешних объектов внутри map/filter/foreach

Причина

Spark сериализует closure (лямбду) целиком для передачи на executor. Если closure захватывает ссылку на несериализуемый объект (соединение с БД, SparkContext, логгер), сериализация падает. Часто возникает при обращении к полям класса внутри трансформации.

Решение

  1. Используйте локальные переменные вместо полей класса внутри трансформаций
  2. Оберните нужные значения в broadcast-переменные: val bc = sc.broadcast(config)
  3. Для подключений к внешним системам используйте foreachPartition с созданием коннекта внутри
  4. Добавьте extends Serializable к классам, если их поля действительно сериализуемы

Связанные уроки:

Симптомы

  • Одна задача стадии выполняется в 10-100x дольше остальных
  • В Spark UI — Summary Metrics показывает огромную разницу между Min и Max duration
  • Общее время стадии определяется одной 'отстающей' задачей

Причина

Неравномерное распределение данных по ключу группировки или JOIN. Один ключ (например, NULL, '', популярный ID) содержит непропорционально много записей, и одна задача обрабатывает большую часть данных.

Решение

  1. Включите AQE skew join: spark.sql.adaptive.skewJoin.enabled=true
  2. Используйте salting: добавьте случайный суффикс к ключу, затем агрегируйте в два этапа
  3. Отфильтруйте или обработайте NULL-ключи отдельно перед JOIN
  4. Диагностируйте через Spark UI → Stage → Task Metrics → Shuffle Read Size

Связанные уроки:

Симптомы

  • SQL-запрос или DataFrame-операция падает с 'cannot resolve'
  • Колонка существует, но имя не совпадает из-за регистра или пробелов
  • После JOIN появляются дублированные имена колонок

Причина

Имя колонки в выражении не совпадает с реальными именами в схеме DataFrame. Причины: case sensitivity (spark.sql.caseSensitive=true), пробелы в именах колонок из CSV/JSON, потеря квалификатора после JOIN, переименование при alias().

Решение

  1. Проверьте схему: df.printSchema() и df.columns
  2. Для колонок с пробелами/спецсимволами используйте backtick: column name
  3. После JOIN явно выбирайте колонки: df.select(left['id'], right['id'].alias('right_id'))
  4. Установите spark.sql.caseSensitive=false (по умолчанию) для нечувствительности к регистру

Связанные уроки:

Симптомы

  • Задача collect() или take() на большом DataFrame падает на стороне драйвера
  • В логах драйвера — OutOfMemoryError после успешного выполнения всех задач
  • Результат запроса слишком велик для передачи на драйвер

Причина

Операции collect(), toPandas(), show() с большим limit переносят все данные на драйвер. Если результат превышает spark.driver.maxResultSize (по умолчанию 1g) или доступную память драйвера, задание падает.

Решение

  1. Замените collect() на take(N) или limit(N) для получения только нужного количества строк
  2. Используйте df.write для записи результата в файл вместо передачи на драйвер
  3. Увеличьте spark.driver.memory и spark.driver.maxResultSize при необходимости
  4. Для анализа используйте df.describe() или df.summary() вместо полного collect()

Связанные уроки:

Симптомы

  • Маленькие запросы создают 200 задач, большинство из которых обрабатывают 0 строк
  • Overhead от планирования задач превышает время обработки данных
  • В Spark UI — Shuffle Read Size/Records показывает 0 для большинства задач

Причина

Значение spark.sql.shuffle.partitions по умолчанию (200) рассчитано на средние нагрузки. Для малых данных это создаёт избыточные пустые задачи, а для больших — слишком крупные партиции. Параметр применяется ко всем shuffle-операциям в сессии.

Решение

  1. Включите AQE: spark.sql.adaptive.enabled=true — автоматически объединяет мелкие партиции
  2. Для малых данных: spark.sql.shuffle.partitions=20 или coalesce() после широких трансформаций
  3. Для больших данных: установите partitions = total_shuffle_data / 128MB
  4. Используйте AQE advisory: spark.sql.adaptive.advisoryPartitionSizeInBytes=128m

Симптомы

  • Streaming-запрос не перезапускается после сбоя — ошибка чтения чекпоинта
  • В директории чекпоинта — повреждённые или неполные файлы
  • Смена схемы или логики запроса приводит к несовместимости с существующим чекпоинтом

Причина

Checkpoint содержит сериализованное состояние запроса, включая схему, оффсеты и operator state. При изменении схемы данных, добавлении/удалении агрегаций или смене источника существующий чекпоинт становится несовместимым.

Решение

  1. Для изменения логики — удалите старый чекпоинт и перезапустите с начала (потеря состояния)
  2. Используйте отдельные checkpoint-директории для разных версий запроса
  3. Настройте надёжное хранилище: HDFS/S3 вместо локальной файловой системы
  4. Для HDFS: проверьте права доступа и доступность NameNode

Связанные уроки:

Симптомы

  • Параллельные записи в Delta-таблицу завершаются с ConcurrentAppendException
  • Streaming и batch одновременно пишут в одну таблицу — один из них падает
  • MERGE INTO конфликтует с параллельным INSERT

Причина

Delta Lake использует оптимистичную блокировку через _delta_log. Две транзакции пытаются записать файлы, которые конфликтуют (перекрываются по партиции). Конфликт детектируется при коммите второй транзакции.

Решение

  1. Разделите записи по партициям: каждый writer пишет в свой partition
  2. Включите автоматические ретраи: spark.databricks.delta.retryWriteConflict.enabled=true
  3. Для streaming + batch: используйте разные целевые директории и объединяйте через UNION VIEW
  4. Рассмотрите Isolation Level: SET spark.databricks.delta.isolationLevel = WriteSerializable

Связанные уроки:

Симптомы

  • PySpark UDF работает в 10-100x медленнее, чем эквивалентная встроенная функция
  • Python worker периодически падает с сигналом SIGKILL при больших данных
  • В Spark UI — время Task значительно больше, чем Shuffle Read/Write

Причина

Стандартные Python UDF требуют сериализации каждой строки из JVM в Python (через pickle) и обратно. Это создаёт O(n) overhead на сериализацию/десериализацию и исключает оптимизации Catalyst и Tungsten.

Решение

  1. Замените Python UDF на встроенные функции PySpark: pyspark.sql.functions.*
  2. Если UDF необходим — используйте Pandas UDF (@pandas_udf) с Arrow-сериализацией
  3. Для scalar-операций: @pandas_udf(returnType, PandasUDFType.SCALAR)
  4. Включите Arrow: spark.sql.execution.arrow.pyspark.enabled=true

Связанные уроки:

Симптомы

  • JOIN с broadcast hint падает из-за превышения лимита размера таблицы
  • Автоматический broadcast join выбирается для таблицы, которая выросла за порог
  • Ошибка при BroadcastHashJoin — executor не может принять broadcast-переменную

Причина

Spark пытается broadcast'ить таблицу, размер которой превышает spark.sql.autoBroadcastJoinThreshold (по умолчанию 10MB) или абсолютный лимит 8GB. Если таблица неожиданно выросла, автоматический broadcast перестаёт работать.

Решение

  1. Уменьшите spark.sql.autoBroadcastJoinThreshold или установите -1 для отключения auto-broadcast
  2. Явно укажите стратегию: df.join(other.hint('shuffle_hash'), ...)
  3. Для больших таблиц используйте SortMergeJoin — он не требует размещения в памяти
  4. Проверьте актуальный размер: spark.catalog.cacheTable('t'); spark.table('t').count()

Связанные уроки:

Симптомы

  • Spark-приложение зависает на старте — executor'ы не выделяются
  • В YARN UI — приложение в статусе ACCEPTED, но контейнеры не запускаются
  • Логи ResourceManager показывают нехватку ресурсов в очереди

Причина

Кластер не может выделить запрошенные ресурсы. Причины: запрошено больше памяти/ядер, чем доступно; другие приложения заняли все ресурсы; неправильная конфигурация YARN-очереди; максимальная аллокация контейнера меньше, чем запрошенные ресурсы executor.

Решение

  1. Проверьте доступные ресурсы: yarn application -list или YARN ResourceManager UI
  2. Уменьшите spark.executor.memory и spark.executor.cores до лимитов очереди
  3. Настройте yarn.scheduler.maximum-allocation-mb >= spark.executor.memory + memoryOverhead
  4. Используйте dynamic allocation: spark.dynamicAllocation.enabled=true

Связанные уроки:

Симптомы

  • Executor-поды в Kubernetes периодически завершаются с статусом Evicted или OOMKilled
  • Задачи перезапускаются на новых executor'ах, общее время задания растёт
  • kubectl describe pod показывает Reason: OOMKilled или Evicted

Причина

Kubernetes OOM Killer завершает контейнер, если его RSS превышает limits.memory. Spark off-heap память (Tungsten, overhead) не учитывается в spark.executor.memory, но потребляет реальную RAM контейнера. Также возможно eviction из-за ephemeral-storage shuffle-файлами.

Решение

  1. Установите spark.executor.memoryOverhead = max(384m, 0.1 * executor.memory)
  2. Настройте K8s resource limits: memory = executor.memory + memoryOverhead + 512m (запас)
  3. Для shuffle на локальных дисках: увеличьте ephemeral-storage limits
  4. Рассмотрите External Shuffle Service или Celeborn для уменьшения нагрузки на локальные диски

Связанные уроки:

Симптомы

  • Ошибка сериализации при shuffle или broadcast с Kryo-сериализатором
  • Kryo не может найти класс, используемый в RDD/DataFrame
  • Ошибка появляется только при spark.serializer=KryoSerializer, но не с Java-сериализацией

Причина

Kryo требует явной регистрации классов для эффективной сериализации. Если spark.kryo.registrationRequired=true, незарегистрированный класс вызывает ошибку. Также возможна проблема с classpath — класс отсутствует на executor.

Решение

  1. Зарегистрируйте классы: spark.kryo.classesToRegister=com.example.MyClass
  2. Или отключите обязательную регистрацию: spark.kryo.registrationRequired=false
  3. Убедитесь, что JAR с классами доступен на всех executor: --jars или spark.jars
  4. Для сложных случаев реализуйте кастомный KryoRegistrator

Симптомы

  • EXPLAIN показывает SortMergeJoin для таблицы размером 5MB
  • Catalyst не выбирает broadcast join, хотя таблица маленькая
  • Время JOIN-запроса в 5-10x больше ожидаемого из-за shuffle

Причина

Catalyst оценивает размер таблицы по статистике каталога. Если статистика устарела или отсутствует (например, для view, subquery, или фильтрованной таблицы), оптимизатор не может определить реальный размер и выбирает безопасный SortMergeJoin.

Решение

  1. Обновите статистику: ANALYZE TABLE t COMPUTE STATISTICS
  2. Явно укажите hint: SELECT /*+ BROADCAST(small_table) */ ... FROM ...
  3. В DataFrame API: large_df.join(broadcast(small_df), 'key')
  4. Увеличьте порог: spark.sql.autoBroadcastJoinThreshold=50m (если таблица стабильно мала)

Связанные уроки:

Симптомы

  • Чтение таблицы с тысячами мелких файлов занимает минуты только на листинг
  • Spark UI показывает 50000+ задач для таблицы размером 1GB
  • Driver зависает на этапе File Listing перед началом вычислений

Причина

Каждый файл создаёт минимум одну задачу. Тысячи мелких файлов (< 1MB) создают overhead на планирование, открытие файлов и сетевые запросы к хранилищу. Типичная причина — частые append-операции без compaction.

Решение

  1. Запустите compaction: OPTIMIZE table_name (для Delta Lake)
  2. Настройте coalesce перед записью: df.coalesce(target_files).write.parquet(...)
  3. Установите spark.sql.files.maxPartitionBytes=128m для объединения мелких файлов при чтении
  4. Для регулярных загрузок: настройте автоматический VACUUM и OPTIMIZE по расписанию

Связанные уроки:

Симптомы

  • Streaming-агрегация теряет поздние события без явных ошибок
  • Результаты оконных функций неполные — пропущены записи с опозданием
  • В метриках запроса — numRowsDroppedByWatermark > 0

Причина

Watermark определяет порог, после которого поздние данные отбрасываются. Если данные приходят с задержкой, превышающей watermark delay, они не попадают в результат. Слишком агрессивный watermark отбрасывает валидные данные; слишком мягкий — увеличивает state и задержку.

Решение

  1. Увеличьте watermark delay: .withWatermark('event_time', '30 minutes') вместо '10 minutes'
  2. Анализируйте реальную задержку данных перед выбором watermark: df.selectExpr('current_timestamp() - event_time')
  3. Для критичных данных: используйте output mode 'update' + внешнее хранилище состояния
  4. Мониторьте numRowsDroppedByWatermark через StreamingQueryListener

Связанные уроки:

Симптомы

  • Чтение Parquet-файлов после изменения схемы падает с AnalysisException
  • Старые файлы содержат тип INT, новые — STRING для одной и той же колонки
  • Schema evolution не работает между несовместимыми типами

Причина

Parquet поддерживает ограниченную schema evolution: добавление новых колонок и расширение типов (int→long). Но смена типа колонки (int→string) несовместима — старые и новые файлы не могут быть прочитаны одной схемой.

Решение

  1. Включите schema merging: spark.sql.parquet.mergeSchema=true (для совместимых изменений)
  2. Для несовместимых изменений: создайте новую таблицу и мигрируйте данные с CAST
  3. Используйте Delta Lake или Iceberg — они поддерживают безопасную schema evolution
  4. Для чтения смешанных файлов: задайте схему явно через .schema(explicit_schema)

Связанные уроки:

Симптомы

  • Тесты PySpark падают при параллельном запуске — конфликт SparkSession
  • Второй тест получает уже остановленный SparkContext
  • Ошибка 'Only one SparkContext may be running' при создании новой сессии

Причина

SparkContext — синглтон внутри JVM. При параллельном запуске тестов или неправильном lifecycle (создание нового контекста без остановки предыдущего) возникают конфликты. Pytest с xdist запускает тесты в одной JVM по умолчанию.

Решение

  1. Используйте фикстуру с session scope: @pytest.fixture(scope='session') для SparkSession
  2. Не вызывайте spark.stop() между тестами — переиспользуйте одну сессию
  3. Для изоляции: создавайте временные view/database для каждого теста, а не новую сессию
  4. Рассмотрите spark-testing-base для управления lifecycle в тестах

Связанные уроки:

Симптомы

  • Great Expectations checkpoint возвращает failed — часть expectations не выполнена
  • Data Docs показывают красные ячейки в отчёте валидации
  • Pipeline останавливается из-за проваленной проверки качества данных

Причина

Данные не соответствуют определённым expectations. Типичные причины: изменение формата данных в источнике, NULL-значения в обязательных полях, нарушение уникальности, выход числовых значений за допустимые границы.

Решение

  1. Просмотрите Data Docs для деталей: great_expectations docs build && open uncommitted/data_docs/local_site/index.html
  2. Обновите expectations, если изменение данных ожидаемо: great_expectations suite edit
  3. Для pipeline: настройте severity levels — warn vs fail для разных expectations
  4. Добавьте slack/email алерты при checkpoint failure: action_list в checkpoint config

Связанные уроки:

Симптомы

  • Streaming-запрос из Kafka не запускается — оффсеты вне диапазона
  • После длительного простоя чекпоинт указывает на удалённые сегменты
  • Kafka-топик имеет retention меньше, чем время простоя streaming-приложения

Причина

Чекпоинт хранит оффсеты, которые уже удалены из Kafka (retention policy). При перезапуске Spark пытается прочитать с несуществующего оффсета. Без failOnDataLoss=false запрос падает.

Решение

  1. Установите failOnDataLoss=false для автоматического сброса на earliest/latest доступный оффсет
  2. Настройте Kafka retention >= максимальное время простоя приложения
  3. Используйте startingOffsets='latest' при первом запуске для пропуска исторических данных
  4. Для гарантий: настройте мониторинг lag через kafka-consumer-groups.sh --describe

Связанные уроки: