Кэширование и persistence
Spark использует lazy evaluation — трансформации не выполняются до вызова action (show, count, write). Это означает, что если один и тот же DataFrame используется дважды, Spark пересчитает его заново для каждого action. cache() и persist() позволяют сохранить результат в памяти или на диске и использовать повторно.
cache() vs persist()
# cache() -- shortcut для persist(StorageLevel.MEMORY_AND_DISK)
df_cached = df.cache()
# persist() -- с выбором уровня хранения
from pyspark import StorageLevel
df_persisted = df.persist(StorageLevel.MEMORY_AND_DISK)
cache() — это просто persist(StorageLevel.MEMORY_AND_DISK). Разницы нет, кроме того, что persist() позволяет выбрать другой StorageLevel.
StorageLevel: варианты хранения
| StorageLevel | Память | Диск | Сериализация | Реплик |
|---|---|---|---|---|
MEMORY_ONLY | Да | Нет | Нет | 1 |
MEMORY_AND_DISK | Да | Да (spillover) | Нет | 1 |
DISK_ONLY | Нет | Да | Да | 1 |
MEMORY_ONLY_SER | Да | Нет | Да | 1 |
MEMORY_AND_DISK_SER | Да | Да | Да | 1 |
OFF_HEAP | Off-heap | Нет | Да | 1 |
MEMORY_ONLY_2 | Да | Нет | Нет | 2 |
MEMORY_AND_DISK_2 | Да | Да | Нет | 2 |
Что означают варианты
Десериализованный (без _SER): данные хранятся как Java-объекты в heap. Быстрый доступ, но занимает 2-5x больше памяти, чем на диске.
Сериализованный (_SER): данные хранятся в сжатом бинарном формате. Экономит 60-80% памяти, но требует десериализации при чтении — дополнительный CPU.
С репликами (_2): данные дублируются на 2 executors. Если один executor падает, данные доступны на другом. Полезно для критичных вычислений, но удваивает расход памяти.
OFF_HEAP: хранение вне JVM heap (в Tungsten managed memory). Без GC давления, но требует настройки spark.memory.offHeap.enabled=true.
# Для большинства случаев -- MEMORY_AND_DISK (default cache)
df.cache()
# Для больших DataFrame, когда не хватает памяти -- сериализованный
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
# Для очень больших данных, когда память не нужна -- только диск
df.persist(StorageLevel.DISK_ONLY)
Spark4.0
Spark 4.0: улучшенный MEMORY_AND_DISK. В Spark 4.0 spill-to-disk для MEMORY_AND_DISK стал более эффективным — Spark использует columnar storage format при спилле, что сокращает объём данных на диске до 3x по сравнению со старым row-based spillом.
Когда кэшировать
1. Итеративные алгоритмы
# ML: K-means использует одни данные 100 раз
training_data = spark.read.parquet("/data/features/") \
.filter(col("valid") == True) \
.select("feature_1", "feature_2", "label")
training_data.cache() # Вычисляем один раз
for i in range(100):
model = update_model(training_data, model)
# Без cache: 100 чтений Parquet + 100 фильтраций
# С cache: 1 чтение + 99 обращений к памяти
2. DataFrame используется несколько раз
# Промежуточный результат используется дважды
filtered = orders.join(customers, "id").filter(col("amount") > 1000)
filtered.cache()
# Использование 1: подсчёт
total = filtered.count()
# Использование 2: запись
filtered.write.parquet("/output/high_value_orders/")
# Без cache: join + filter выполняется ДВАЖДЫ
# С cache: join + filter выполняется ОДИН раз
3. После дорогой трансформации перед несколькими actions
# Дорогой ETL pipeline
result = (raw_data
.join(broadcast(dimensions), "key") # join
.groupBy("category").agg(...) # shuffle + aggregate
.withColumn("rank", row_number().over(window)) # window function
)
result.cache()
# Несколько выходов из одного pipeline
result.filter(col("rank") <= 10).write.parquet("/top_10/")
result.write.parquet("/full_result/")
result.describe().show()
Когда НЕ кэшировать
Анти-паттерн: кэширование всего
# ПЛОХО: кэшируем всё подряд
df1 = spark.read.parquet("/data/table1/").cache()
df2 = spark.read.parquet("/data/table2/").cache()
df3 = spark.read.parquet("/data/table3/").cache()
result = df1.join(df2, "key").join(df3, "key")
result.cache()
result.show()
Проблемы:
- 4 DataFrame в кэше = вся память executor занята кэшем
- Spill-to-disk для данных, которые не поместились — медленнее, чем пересчёт
- GC давление: десериализованные объекты в heap вызывают Long GC pauses
1. Одноразовое чтение
# НЕ НУЖЕН CACHE: данные читаются один раз
df = spark.read.parquet("/data/events/")
df.write.parquet("/output/processed/")
# Cache только замедлит -- дополнительная запись в memory/disk
2. Большой DataFrame, который не помещается в память
# ПЛОХО: 500 ГБ DataFrame с 64 ГБ суммарной памяти кластера
huge_df = spark.read.parquet("/data/500gb/")
huge_df.cache() # Большая часть spill на диск -- хуже, чем пересчёт
3. DataFrame, который дёшево пересчитать
# НЕ НУЖЕН CACHE: простой filter от Parquet с partition pruning
df = spark.read.parquet("/data/events/") \
.filter(col("year") == 2024)
# Parquet partition pruning + predicate pushdown = быстро
# Cache добавляет overhead без выигрыша
Правило: кэшируйте, если стоимость пересчёта > стоимость хранения. Сложная трансформация (join + groupBy + window) используется 5 раз — кэшируйте. Простой filter от Parquet — не кэшируйте. Если кэш не помещается в память и большая часть spillится на диск — не кэшируйте.
unpersist() и управление кэшем
Кэшированные данные остаются в памяти до конца SparkSession или явного вызова unpersist():
# Кэшируем
training_data.cache()
training_data.count() # Материализация кэша (lazy!)
# ... используем training_data ...
# Освобождаем память ЯВНО
training_data.unpersist()
Lazy evaluation и cache
Важно: cache() — это lazy трансформация. Она не вычисляет DataFrame немедленно:
df.cache() # Ничего не произошло! Cache только запланирован.
df.count() # СЕЙЧАС данные вычислены и кэшированы.
df.show() # Читает из кэша.
Первый action после cache() вычисляет данные и кэширует их. Последующие actions читают из кэша.
Мониторинг кэша через Spark UI
Storage tab в Spark UI показывает все кэшированные DataFrame:
Storage Tab:
RDD Name | Storage Level | Cached Partitions | Size in Memory | Size on Disk
-----------------|-----------------|-------------------|----------------|-------------
In-memory table | MEMORY_AND_DISK | 200/200 | 4.2 ГБ | 0 B
In-memory table | MEMORY_AND_DISK | 200/200 | 1.8 ГБ | 2.3 ГБ ← spill!
Если Size on Disk > 0 при StorageLevel MEMORY_AND_DISK — часть данных spillилась на диск из-за нехватки памяти. Это сигнал: кэш слишком большой для доступной памяти.
DAG с кэшем и без
Without cache: два action вычисляют весь pipeline заново.
Action 1 (count):
FileScan → Filter → Join → GroupBy → Count
Action 2 (write):
FileScan → Filter → Join → GroupBy → Write
↑
Всё пересчитано!
With cache: второй action читает из кэша.
Action 1 (count):
FileScan → Filter → Join → GroupBy → [CACHE] → Count
Action 2 (write):
[CACHE] → Write
↑
Из памяти/диска
StorageLevel.MEMORY_AND_DISK_SER для production. В production рекомендуется MEMORY_AND_DISK_SER вместо default cache (MEMORY_AND_DISK). Сериализация сокращает расход памяти на 60-80%, и данные, не поместившиеся в память, хранятся на диске в компактном формате. Десериализация добавляет ~10-15% CPU overhead, но это меньше, чем пересчёт тяжёлой трансформации.
Как Spark делит память между execution и storage — разбор на уровне исходников в senior-курсе:
Spark Internals: Unified Memory ManagerЧто дальше?
Модуль M04 завершён! Вы освоили ключевые техники оптимизации: shuffle минимизация, партиционирование, bucketing, broadcast join, data skew handling и кэширование. В следующем модуле (M05) мы погрузимся в Adaptive Query Execution (AQE) — механизм runtime-оптимизации, который автоматически применяет многие из этих техник.