Offset log, commit log и WAL
Exactly-once — одно из самых ёмких и часто неправильно понимаемых понятий в стриминге. В Spark Structured Streaming оно достигается не магией, а конкретной механикой: двумя write-ahead log-файлами в checkpoint directory, replayable-источниками и idempotent-синками. Разберём эту механику до байтового уровня: как устроены файлы, что в них записывается, в каком порядке, и как система восстанавливается после каждого возможного сбоя.
Структура checkpoint directory
Когда вы задаёте checkpointLocation, Spark создаёт в ней конкретную структуру директорий:
/checkpoint-location/
├── metadata -- метаданные запроса (queryId, Spark version)
├── offsets/ -- OffsetSeqLog: WAL для offset'ов
│ ├── 0 -- offset'ы батча 0
│ ├── 1 -- offset'ы батча 1
│ └── 2 -- offset'ы батча 2
├── commits/ -- CommitLog: лог завершённых батчей
│ ├── 0
│ └── 1 -- батч 2 ещё не завершён (пример)
└── state/ -- StateStore данные (урок 3, 4)
└── 0/ -- оператор #0
└── 0/ -- партиция 0
├── 1.delta
└── 2.delta
Обратите внимание: в этом примере в offsets/ есть файл 2, а в commits/ — нет. Именно эта асимметрия сигнализирует системе: батч 2 был начат, но не завершён, и его нужно переиграть.
HDFSMetadataLog: базовый класс
И OffsetSeqLog, и CommitLog наследуются от HDFSMetadataLog[T] (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala). Это параметризованный класс, который реализует append-only log поверх Hadoop-совместимой файловой системы.
abstract class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
extends MetadataLog[T] with Logging {
// Записать новую запись в лог
def add(batchId: Long, metadata: T): Boolean
// Прочитать запись по batchId
def get(batchId: Long): Option[T]
// Прочитать диапазон [startId, endId]
def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)]
// Последний batchId в логе
def getLatest(): Option[(Long, T)]
// Очистить старые записи
def purge(thresholdBatchId: Long): Unit
}
Каждая запись хранится как отдельный файл, имя которого совпадает с batchId (числом). Это простое, но эффективное решение: атомарность операции записи обеспечивается атомарностью HDFS rename (write to temp -> rename to final), листинг директории даёт все доступные batchId.
Формат файлов — UTF-8 JSON с заголовочной строкой:
v1
{"batchWatermarkMs": 0, "batchTimestampMs": 1716000000000, "conf": {...}}
{"kafka": {"topic": {"0": 100, "1": 50, "2": 75}}}
Первая строка — версия формата. Вторая — OffsetSeqMetadata (watermark, timestamp, конфиги). Третья — собственно offset’ы источников.
Файлы offsets/ и commits/ — два WAL-лога. Асимметрия между ними определяет состояние батча при восстановлении.
OffsetSeqLog: детали формата
OffsetSeqLog (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala) хранит объект OffsetSeq — обёртку над массивом offset’ов (по одному на каждый источник) плюс OffsetSeqMetadata.
case class OffsetSeq(
offsets: Seq[Option[Offset]], // offset для каждого источника (None если источник не изменился)
metadata: Option[OffsetSeqMetadata] // watermark и timestamp батча
)
case class OffsetSeqMetadata(
batchWatermarkMs: Long, // watermark в мс (используется для eviction state)
batchTimestampMs: Long, // время начала батча
conf: Map[String, String] // конфиги на момент батча (shuffle partitions, etc.)
)
OffsetSeqMetadata — важная структура: она хранит watermark на момент начала батча. Это не просто метаданные для логирования — watermark используется в IncrementalExecution для фильтрации поздних данных и eviction state. Подробно об этом в уроке 5.
Пример реального файла offsets/42 для Kafka-источника:
v1
{"batchWatermarkMs":1716003600000,"batchTimestampMs":1716003610000,"conf":{"spark.sql.shuffle.partitions":"200"}}
{"apache.kafka.v2":{"topic-orders":{"0":18432,"1":17891,"2":19204,"3":18755}}}
CommitLog: подтверждение завершения
CommitLog (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala) значительно проще — он хранит только подтверждение того, что батч завершён, плюс watermark для следующего батча:
case class CommitMetadata(
nextBatchWatermarkMs: Long // watermark который будет использован в следующем батче
)
Файл commits/42:
v1
{"nextBatchWatermarkMs":1716003600000}
Это умышленная минималистичность: CommitLog — это именно «флаг завершения», не более. Данные о том, что было обработано, уже в OffsetSeqLog.
В Spark 4.0 CommitLog также хранит nextBatchWatermarkMs — это watermark, вычисленный по результатам текущего батча, который будет применён к следующему. Это позволяет разделить «watermark для чтения событий» (batchWatermarkMs) и «watermark как результат наблюдений» (nextBatchWatermarkMs).
WAL-протокол: точная последовательность
Теперь посмотрим на точную последовательность операций, которая обеспечивает exactly-once:
Сбой между строками 4 и 11 (после WAL, до commit) приводит к replay батча N с теми же offset’ами.
Анализ всех сценариев сбоя
Рассмотрим все точки, в которых может произойти сбой, и что случится при каждом:
Сбой до offsetLog.add(): Никакого WAL-файла нет. При рестарте система не знает об этом батче. Запрашиваются текущие offset’ы у источников и строится новый батч. Никаких потерь — данные в Kafka не удалены.
Сбой после offsetLog.add(), до исполнения: В offsets/N есть запись, в commits/N — нет. При рестарте: populateStartOffsets() видит разницу, переигрывает батч N с теми же offset’ами. Kafka-источник перечитывает те же данные. Exactly-once при idempotent sink.
Сбой во время addBatch() (Spark Job упал на полпути): Spark Job провалился. Батч N не записан в commits/. При рестарте — тот же сценарий: replay батча N. Delta-синк отвергнет повторную запись с тем же batchId.
Сбой после addBatch(), до commitLog.add(): Самый опасный сценарий — данные уже в синке, но система не знает об этом. При рестарте: replay батча N. Delta-синк обнаружит batchId=N уже существует и применит idempotent semantics (no-op). Exactly-once сохраняется.
Сбой после commitLog.add(): Нормальное завершение. При рестарте: начинаем с батча N+1.
Если вы используете синк, который не поддерживает idempotent writes (например, foreachBatch с обычным append в Parquet без overwrite-семантики), сбой между addBatch и commitLog.add() приведёт к дублированию данных. Это не баг Spark — это архитектурное ограничение, которое нужно учитывать при выборе синка.
Replayable sources и idempotent sinks
Exactly-once — это сквозное свойство, которое требует соответствия от всех компонентов:
Replayable sources — источники, которые позволяют читать данные заново по offset’у. Примеры: Apache Kafka, Amazon Kinesis (sequence number), Delta Lake (table version), Apache Iceberg (snapshot id), Google Pub/Sub (с ack). Пример нереplayable-источника: файлы в S3, которые были удалены после чтения.
Idempotent sinks — синки, которые безопасно обрабатывают повторную запись одного и того же батча. Примеры: Delta Lake (использует batchId для deduplication), Apache Iceberg, любой key-value store с overwrite по ключу. Пример неidempotent-синка: простой append в файловую систему без transactional semantics.
# Правильно: Delta как idempotent sink
(result.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint")
.start("/data/output"))
# Небезопасно при сбоях: plain Parquet append
(result.writeStream
.format("parquet")
.option("checkpointLocation", "/checkpoint")
.start("/data/output")) # дубликаты возможны при replay
Purge: управление размером логов
По умолчанию OffsetSeqLog и CommitLog не удаляют старые записи автоматически. Это может привести к тому, что в offsets/ накопятся тысячи файлов (по одному на батч). Spark предоставляет конфиги для управления retention:
spark.conf.set("spark.sql.streaming.minBatchesToRetain", "2")
# Минимальное количество батчей, которые нужно хранить в логах.
# При значении 2: хранятся последние 2 бatcha, более старые удаляются.
Механика purge: в конце каждого батча MicroBatchExecution вызывает offsetLog.purge(currentBatchId - minBatchesToRetain) и commitLog.purge(...). Purge просто удаляет файлы с batchId меньше порогового значения.
Уменьшение minBatchesToRetain ускоряет листинг директории (меньше файлов), но сокращает окно восстановления. При значении 1 вы можете восстановиться только с последнего завершённого батча. Если у вас долгие батчи (больше интервала), увеличьте значение.
Практика: инспекция checkpoint директории
Вы можете читать WAL-файлы напрямую — они обычный текст:
# Смотрим последние 3 offset-записи
import json
checkpoint_dir = "/checkpoint-location"
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
spark._jsc.hadoopConfiguration())
for batch_id in range(max(0, last_batch - 2), last_batch + 1):
path = spark._jvm.org.apache.hadoop.fs.Path(
f"{checkpoint_dir}/offsets/{batch_id}")
if fs.exists(path):
stream = fs.open(path)
reader = spark._jvm.java.io.BufferedReader(
spark._jvm.java.io.InputStreamReader(stream))
lines = []
line = reader.readLine()
while line:
lines.append(line)
line = reader.readLine()
print(f"Batch {batch_id}: {lines}")
Или через shell:
# Посмотреть offsets батча 42
hdfs dfs -cat /checkpoint/offsets/42
# Посмотреть все commits
hdfs dfs -ls /checkpoint/commits/
# Найти «дыру» между offsets и commits
diff <(hdfs dfs -ls /checkpoint/offsets/ | awk '{print $NF}' | xargs -n1 basename | sort -n) \
<(hdfs dfs -ls /checkpoint/commits/ | awk '{print $NF}' | xargs -n1 basename | sort -n)
Попробуй сам
Запустите запрос и исследуйте WAL-файлы в реальном времени:
from pyspark.sql import SparkSession
import os, json, time
spark = SparkSession.builder \
.appName("wal-inspection") \
.getOrCreate()
CHECKPOINT = "/tmp/wal-demo-checkpoint"
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 50)
.load())
query = (df.writeStream
.format("memory")
.queryName("wal_demo")
.option("checkpointLocation", CHECKPOINT)
.trigger(processingTime="3 seconds")
.start())
# Ждём несколько батчей
time.sleep(15)
query.stop()
# Исследуем WAL-файлы
print("=== OFFSET LOG ===")
for fname in sorted(os.listdir(f"{CHECKPOINT}/offsets")):
with open(f"{CHECKPOINT}/offsets/{fname}") as f:
content = f.readlines()
batch_id = int(fname)
print(f"Batch {batch_id}:")
for line in content:
print(f" {line.rstrip()}")
print("\n=== COMMIT LOG ===")
for fname in sorted(os.listdir(f"{CHECKPOINT}/commits")):
with open(f"{CHECKPOINT}/commits/{fname}") as f:
content = f.read()
print(f"Batch {fname}: {content.strip()}")
Ожидаемый вывод:
=== OFFSET LOG ===
Batch 0:
v1
{"batchWatermarkMs":0,"batchTimestampMs":1716000000000,"conf":{}}
{"rate":{"numRows":{"0":150}}}
Batch 1:
v1
{"batchWatermarkMs":0,"batchTimestampMs":1716000003000,"conf":{}}
{"rate":{"numRows":{"0":300}}}
=== COMMIT LOG ===
Batch 0: v1
{"nextBatchWatermarkMs":0}
Batch 1: v1
{"nextBatchWatermarkMs":0}