Learning Platform
Глоссарий Troubleshooting
Урок 06.05 · 12 мин
Средний
Small File ProblemFile ConsolidationRepartitionmaxRecordsPerFileStreaming Output

Проблема маленьких файлов

Почему маленькие файлы — это проблема

Маленькие файлы (< 1MB) создают три уровня overhead:

1. NameNode overhead (HDFS)

HDFS хранит метаданные каждого файла в памяти NameNode. Каждый файл занимает ~150 байт метаданных. 10 миллионов маленьких файлов = 1.5GB памяти NameNode только на метаданные.

2. Избыточное планирование задач

Spark создаёт минимум одну задачу (task) на файл. 10,000 файлов = 10,000 tasks. Каждая task имеет overhead запуска (~5-20ms), scheduling, serialization. На 10,000 задач это 30-200 секунд чистого overhead до начала полезной работы.

10,000 файлов по 100KB:
  Overhead: 10,000 tasks × 10ms = 100 секунд
  Полезная работа: ~5 секунд
  Итого: 105 секунд (95% -- overhead!)

100 файлов по 10MB:
  Overhead: 100 tasks × 10ms = 1 секунда
  Полезная работа: ~5 секунд
  Итого: 6 секунд (17x быстрее!)

3. Плохое сжатие и data skipping

Маленькие файлы содержат мало строк → dictionary encoding и run-length encoding работают хуже. Min/max statistics охватывают узкий диапазон, но при 10,000 файлов Spark тратит больше времени на чтение metadata, чем на пропуск блоков.

Откуда берутся маленькие файлы

1. Over-partitioning

# ПЛОХО: партиционирование по высококардинальному столбцу
df.write.partitionBy("user_id").parquet("/data/events/")
# 10M уникальных user_id = 10M директорий по 1-2 файла!

# ПЛОХО: партиционирование по timestamp с высокой гранулярностью
df.write.partitionBy("event_timestamp").parquet("/data/events/")
# Каждая секунда = отдельная партиция!

Подробнее о стратегиях партиционирования — в модуле M04 (Performance Tuning).

2. Streaming micro-batches

# Streaming job пишет файлы каждые 10 секунд
df_stream.writeStream \
  .format("parquet") \
  .trigger(processingTime="10 seconds") \
  .start("/data/events/")

# За 1 день: 8,640 micro-batches × N партиций = десятки тысяч файлов!

3. Ежедневные append-операции

# Каждый день добавляется маленькая порция данных
for day in date_range:
    small_df = spark.read.parquet(f"/raw/{day}/")  # 50MB
    small_df.write.mode("append").parquet("/data/aggregated/")
    # Каждый append создаёт новые файлы (200 по 250KB)

4. Joins и filters, уменьшающие данные

# После фильтрации в каждой партиции остаётся мало строк
df = spark.read.parquet("/data/events/")  # 200 партиций по 128MB
filtered = df.filter(col("status") == "error")  # 0.1% строк
filtered.write.parquet("/data/errors/")  # 200 файлов по 128KB!

Решения

1. repartition() перед записью

# Контролируем количество выходных файлов
df.repartition(10).write.parquet("/data/output/")
# 10 файлов оптимального размера

Недостаток: repartition() вызывает full shuffle (перемещение всех данных по сети).

2. coalesce() для уменьшения партиций

# Уменьшаем количество партиций БЕЗ full shuffle
df.coalesce(10).write.parquet("/data/output/")

Преимущество: coalesce() объединяет партиции без shuffle — просто назначает несколько партиций одному executor. Недостаток: может создать неравномерные файлы, если партиции имеют разный размер.

coalesce(2): неравномерные файлы
coalesce(2) из 4 партиций:
Партиция 1 (100MB)
Партиция 2 (100MB)
Файл 1 (200MB)
Партиция 3 (10MB)
Партиция 4 (10MB)
Файл 2 (20MB)← неравномерно!

3. maxRecordsPerFile — ограничение строк на файл

# Каждый файл содержит максимум 1 миллион строк
df.write \
  .option("maxRecordsPerFile", 1000000) \
  .parquet("/data/output/")

Полезно, когда: вы не знаете заранее, сколько данных будет, но хотите предотвратить создание огромных файлов. Не решает проблему маленьких файлов напрямую, но предотвращает обратную проблему.

4. AQE Coalesce Shuffle Partitions

Adaptive Query Execution автоматически объединяет маленькие партиции после shuffle:

# Включено по умолчанию в Spark 3.2+
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")

# AQE автоматически:
# 1. Выполняет shuffle
# 2. Собирает статистику о размере партиций
# 3. Объединяет маленькие партиции до ~128MB

Подробнее об AQE — в модуле M05 (AQE Deep-Dive).

5. Scheduled compaction (Delta Lake)

-- Регулярная компактизация для streaming-таблиц
OPTIMIZE events
WHERE event_date >= current_date() - INTERVAL 1 DAY

Это лучшее решение для streaming — не нужно менять streaming job, компактизация выполняется отдельно.

Before / After: решение проблемы

Small Files: Before / After Compaction
BEFORE (streaming job, 30 дней)
/data/events/
├──event_date=2024-01-01/(5,000 файлов по 50KB = 250MB)
├──event_date=2024-01-02/(5,000 файлов по 50KB = 250MB)
├──event_date=2024-01-.../
└──event_date=2024-01-30/(5,000 файлов по 50KB = 250MB)
Файлов
150,000 файлов
Размер
7.5GB
Запрос 1 день
~45 секунд (overhead 95%)

Анти-паттерн: партиционирование по timestamp с минутной гранулярностью

# ПЛОХО: создаёт 525,600 директорий в год (1 в минуту)
df.write.partitionBy("minute_timestamp").parquet("/data/events/")

# ХОРОШО: партиционируйте по дню или часу
df.withColumn("event_date", to_date(col("event_timestamp"))) \
  .write.partitionBy("event_date").parquet("/data/events/")
# 365 директорий в год -- управляемо

Правило: партиция должна содержать минимум 128MB данных. Если дневная порция данных < 128MB, партиционируйте по неделе или месяцу.

Чек-лист диагностики

Если запросы медленные, проверьте:

  1. Количество файлов: ls -R /data/table/ | wc -l или hadoop fs -count /data/table/
  2. Средний размер: общий размер / количество файлов (цель: 64MB-1GB)
  3. Файлов на партицию: если > 500 — нужна compaction
  4. Размер файлов: если средний < 1MB — критическая проблема
# Проверка в Spark
import os

path = "/data/events/"
files = spark._jvm.org.apache.hadoop.fs.FileSystem \
  .get(spark._jsc.hadoopConfiguration()) \
  .listStatus(spark._jvm.org.apache.hadoop.fs.Path(path))

sizes = [f.getLen() for f in files]
print(f"Файлов: {len(sizes)}")
print(f"Средний размер: {sum(sizes)/len(sizes)/1024/1024:.1f} MB")
print(f"Минимальный: {min(sizes)/1024:.1f} KB")
print(f"Максимальный: {max(sizes)/1024/1024:.1f} MB")
Проверка знанийKnowledge check
Streaming job пишет в Parquet каждые 10 секунд. За месяц накопилось 250,000 маленьких файлов. Какое решение оптимально и почему?
ОтветAnswer
Оптимальное решение -- scheduled compaction (OPTIMIZE) в Delta Lake, потому что: (1) не нужно менять streaming job; (2) OPTIMIZE объединяет маленькие файлы в крупные (~128MB) без остановки streaming; (3) можно добавить Z-ordering для улучшения data skipping; (4) VACUUM затем очистит старые файлы. Альтернатива для чистого Parquet: отдельный batch job, который repartition + перезаписывает данные за прошлые дни, но это сложнее и не atomic.
Проверка знанийKnowledge check
В чём разница между repartition() и coalesce() для решения проблемы маленьких файлов?
ОтветAnswer
repartition(N) выполняет full shuffle -- все данные перемещаются по сети и равномерно распределяются по N партициям. Результат: N файлов одинакового размера. coalesce(N) объединяет партиции без shuffle -- просто назначает несколько partition одному executor. Результат: N файлов, но размеры могут быть неравномерными (зависит от размера исходных партиций). Используйте coalesce, когда нужно уменьшить количество файлов (100 -> 10). Используйте repartition, когда нужно равномерное распределение или увеличение количества файлов.

Резюме модуля

Оптимизация хранения данных включает:

  1. Правильный формат (Parquet для аналитики, Avro для streaming)
  2. Data skipping через min/max statistics и Z-ordering
  3. Bloom filters для высококардинальных столбцов
  4. Compaction + vacuum для обслуживания данных
  5. Контроль размера файлов через repartition, coalesce, AQE

Все эти техники работают вместе — Z-ordering эффективен только после compaction, bloom filters дополняют min/max statistics, а AQE автоматически коалесцирует маленькие партиции.

TIP

Для углублённого изучения стратегий выбора форматов и оптимального размера файлов см. курс Storage Formats Deep-Dive — decision framework для выбора формата под конкретную нагрузку.

Что дальше?

В следующем модуле мы перейдём к мониторингу и наблюдаемости — как использовать Spark UI, History Server, Prometheus и Grafana для диагностики production-проблем.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Почему маленькие файлы (< 1 МБ) убивают производительность Spark?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5