Проблема маленьких файлов
Почему маленькие файлы — это проблема
Маленькие файлы (< 1MB) создают три уровня overhead:
1. NameNode overhead (HDFS)
HDFS хранит метаданные каждого файла в памяти NameNode. Каждый файл занимает ~150 байт метаданных. 10 миллионов маленьких файлов = 1.5GB памяти NameNode только на метаданные.
2. Избыточное планирование задач
Spark создаёт минимум одну задачу (task) на файл. 10,000 файлов = 10,000 tasks. Каждая task имеет overhead запуска (~5-20ms), scheduling, serialization. На 10,000 задач это 30-200 секунд чистого overhead до начала полезной работы.
10,000 файлов по 100KB:
Overhead: 10,000 tasks × 10ms = 100 секунд
Полезная работа: ~5 секунд
Итого: 105 секунд (95% -- overhead!)
100 файлов по 10MB:
Overhead: 100 tasks × 10ms = 1 секунда
Полезная работа: ~5 секунд
Итого: 6 секунд (17x быстрее!)
3. Плохое сжатие и data skipping
Маленькие файлы содержат мало строк → dictionary encoding и run-length encoding работают хуже. Min/max statistics охватывают узкий диапазон, но при 10,000 файлов Spark тратит больше времени на чтение metadata, чем на пропуск блоков.
Откуда берутся маленькие файлы
1. Over-partitioning
# ПЛОХО: партиционирование по высококардинальному столбцу
df.write.partitionBy("user_id").parquet("/data/events/")
# 10M уникальных user_id = 10M директорий по 1-2 файла!
# ПЛОХО: партиционирование по timestamp с высокой гранулярностью
df.write.partitionBy("event_timestamp").parquet("/data/events/")
# Каждая секунда = отдельная партиция!
Подробнее о стратегиях партиционирования — в модуле M04 (Performance Tuning).
2. Streaming micro-batches
# Streaming job пишет файлы каждые 10 секунд
df_stream.writeStream \
.format("parquet") \
.trigger(processingTime="10 seconds") \
.start("/data/events/")
# За 1 день: 8,640 micro-batches × N партиций = десятки тысяч файлов!
3. Ежедневные append-операции
# Каждый день добавляется маленькая порция данных
for day in date_range:
small_df = spark.read.parquet(f"/raw/{day}/") # 50MB
small_df.write.mode("append").parquet("/data/aggregated/")
# Каждый append создаёт новые файлы (200 по 250KB)
4. Joins и filters, уменьшающие данные
# После фильтрации в каждой партиции остаётся мало строк
df = spark.read.parquet("/data/events/") # 200 партиций по 128MB
filtered = df.filter(col("status") == "error") # 0.1% строк
filtered.write.parquet("/data/errors/") # 200 файлов по 128KB!
Решения
1. repartition() перед записью
# Контролируем количество выходных файлов
df.repartition(10).write.parquet("/data/output/")
# 10 файлов оптимального размера
Недостаток: repartition() вызывает full shuffle (перемещение всех данных по сети).
2. coalesce() для уменьшения партиций
# Уменьшаем количество партиций БЕЗ full shuffle
df.coalesce(10).write.parquet("/data/output/")
Преимущество: coalesce() объединяет партиции без shuffle — просто назначает несколько партиций одному executor.
Недостаток: может создать неравномерные файлы, если партиции имеют разный размер.
3. maxRecordsPerFile — ограничение строк на файл
# Каждый файл содержит максимум 1 миллион строк
df.write \
.option("maxRecordsPerFile", 1000000) \
.parquet("/data/output/")
Полезно, когда: вы не знаете заранее, сколько данных будет, но хотите предотвратить создание огромных файлов. Не решает проблему маленьких файлов напрямую, но предотвращает обратную проблему.
4. AQE Coalesce Shuffle Partitions
Adaptive Query Execution автоматически объединяет маленькие партиции после shuffle:
# Включено по умолчанию в Spark 3.2+
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")
# AQE автоматически:
# 1. Выполняет shuffle
# 2. Собирает статистику о размере партиций
# 3. Объединяет маленькие партиции до ~128MB
Подробнее об AQE — в модуле M05 (AQE Deep-Dive).
5. Scheduled compaction (Delta Lake)
-- Регулярная компактизация для streaming-таблиц
OPTIMIZE events
WHERE event_date >= current_date() - INTERVAL 1 DAY
Это лучшее решение для streaming — не нужно менять streaming job, компактизация выполняется отдельно.
Before / After: решение проблемы
Анти-паттерн: партиционирование по timestamp с минутной гранулярностью
# ПЛОХО: создаёт 525,600 директорий в год (1 в минуту)
df.write.partitionBy("minute_timestamp").parquet("/data/events/")
# ХОРОШО: партиционируйте по дню или часу
df.withColumn("event_date", to_date(col("event_timestamp"))) \
.write.partitionBy("event_date").parquet("/data/events/")
# 365 директорий в год -- управляемо
Правило: партиция должна содержать минимум 128MB данных. Если дневная порция данных < 128MB, партиционируйте по неделе или месяцу.
Чек-лист диагностики
Если запросы медленные, проверьте:
- Количество файлов:
ls -R /data/table/ | wc -lилиhadoop fs -count /data/table/ - Средний размер: общий размер / количество файлов (цель: 64MB-1GB)
- Файлов на партицию: если > 500 — нужна compaction
- Размер файлов: если средний < 1MB — критическая проблема
# Проверка в Spark
import os
path = "/data/events/"
files = spark._jvm.org.apache.hadoop.fs.FileSystem \
.get(spark._jsc.hadoopConfiguration()) \
.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path))
sizes = [f.getLen() for f in files]
print(f"Файлов: {len(sizes)}")
print(f"Средний размер: {sum(sizes)/len(sizes)/1024/1024:.1f} MB")
print(f"Минимальный: {min(sizes)/1024:.1f} KB")
print(f"Максимальный: {max(sizes)/1024/1024:.1f} MB")
Резюме модуля
Оптимизация хранения данных включает:
- Правильный формат (Parquet для аналитики, Avro для streaming)
- Data skipping через min/max statistics и Z-ordering
- Bloom filters для высококардинальных столбцов
- Compaction + vacuum для обслуживания данных
- Контроль размера файлов через repartition, coalesce, AQE
Все эти техники работают вместе — Z-ordering эффективен только после compaction, bloom filters дополняют min/max statistics, а AQE автоматически коалесцирует маленькие партиции.
Для углублённого изучения стратегий выбора форматов и оптимального размера файлов см. курс Storage Formats Deep-Dive — decision framework для выбора формата под конкретную нагрузку.
Что дальше?
В следующем модуле мы перейдём к мониторингу и наблюдаемости — как использовать Spark UI, History Server, Prometheus и Grafana для диагностики production-проблем.