Движок micro-batch исполнения
Когда вы вызываете df.writeStream.start(), Spark не начинает немедленно перекачивать байты из источника в сток. Вместо этого он запускает отдельный поток, внутри которого крутится бесконечный цикл, и каждая итерация этого цикла — это обычный batch-запрос. Именно этот цикл и есть micro-batch execution engine. Понять его устройство — значит понять, почему Structured Streaming ведёт себя именно так, как он ведёт себя: задержки, гарантии exactly-once, поведение при падении, поведение при watermark.
StreamExecution: абстрактная основа
Весь streaming execution начинается с абстрактного класса StreamExecution (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala). До Spark 2.3 этот класс был конкретным и содержал всю логику. В 2.3 его разбили на иерархию:
StreamExecution (abstract)
├── MicroBatchExecution -- ProcessingTime, Once, AvailableNow
└── ContinuousExecution -- Trigger.Continuous (experimental)
StreamExecution содержит общую инфраструктуру:
offsetLog: OffsetSeqLog— write-ahead log для offsets (подробно в уроке 2)commitLog: CommitLog— log успешно завершённых батчейcurrentBatchId: Long— монотонно растущий идентификатор текущего батчаstreamDeathCause: Throwable— причина гибели стрима при ошибкеqueryExecutionThread— поток, в котором крутится execution loopWatermarkTracker— трекер глобального watermark (обновляется по завершении батча)
Когда вы вызываете start(), StreamExecution создаёт queryExecutionThread (daemon thread) и запускает его. Поток вызывает runStream(), который устанавливает Spark-контекст, настраивает StreamMetrics, а затем делегирует конкретной реализации через абстрактный метод runActivatedStream().
MicroBatchExecution: конкретная реализация
MicroBatchExecution — это то, что действительно работает для подавляющего большинства production-запросов. Его можно охарактеризовать одной фразой: он превращает streaming-запрос в бесконечную серию обычных Spark SQL batch-запросов, каждый из которых обрабатывает инкремент данных с момента предыдущего батча.
Ключевые поля:
// Отслеживаем границы offset'ов
protected var committedOffsets = new StreamProgress
protected var availableOffsets = new StreamProgress
// Источники и их метаданные
private val sources: Seq[SparkDataStream]
private val uniqueSources: Map[SparkDataStream, ReadLimit]
// Состояние планировщика батчей
private var isCurrentBatchConstructed = false
// TriggerExecutor определяет ритм исполнения
private val triggerExecutor: TriggerExecutor
Абстрактный StreamExecution и две конкретные реализации. MicroBatchExecution используется для всех production trigger'ов, ContinuousExecution — экспериментальная.
TriggerExecutor: ритм исполнения
MicroBatchExecution создаёт TriggerExecutor на основе переданного пользователем Trigger. Это стратегия, определяющая, как часто запускать батч:
| Trigger | TriggerExecutor | Поведение |
|---|---|---|
ProcessingTime("10 seconds") | ProcessingTimeExecutor | Батч каждые 10 секунд (учитывает время самого батча) |
Once | SingleBatchExecutor | Один батч, затем завершение |
AvailableNow | MultiBatchExecutor | Обрабатывает весь накопленный backlog, затем завершение |
Continuous("1 second") | (уходит в ContinuousExecution) | — |
Важная деталь ProcessingTimeExecutor: если батч занял 8 секунд, а интервал 10 секунд — следующий батч стартует через 2 секунды. Если батч занял 15 секунд — следующий стартует немедленно, без паузы. Это гарантирует, что при backlog’е Spark догоняет источник как можно быстрее.
Полный цикл micro-batch: от триггера до commit
Центральный метод — runActivatedStream(). Внутри него triggerExecutor.execute(batchRunner) вызывает batchRunner в бесконечном цикле. Каждая итерация — один микро-батч:
Восемь фаз, от startTrigger до commit. Метрики каждой фазы видны в StreamingQueryProgress.durationMs.
Эта последовательность — не случайная. Именно порядок шагов обеспечивает exactly-once гарантии. Разберём это детально.
constructNextBatch: WAL-запись как фундамент
Фаза constructNextBatch() — это самое важное место в cycle. Она делает следующее:
- Для каждого источника (
SparkDataStream) вызываетgetOffset()илиlatestOffset()— источник возвращает текущий «кончик» данных. - Сравнивает с
committedOffsets— есть ли что-то новое? - Если новых данных нет и нет pending stateful операторов — батч не нужен, возвращает
false. - Если данные есть — записывает в
offsetLogзапись(batchId -> OffsetSeq).
// Псевдокод constructNextBatch
val newOffsets = sources.map(s => s -> s.getOffset()).toMap
if (newOffsets == committedOffsets && !stateOperatorsPending) {
return false // пропускаем батч
}
offsetLog.add(currentBatchId, OffsetSeq.fill(sources, newOffsets))
availableOffsets = newOffsets
true
Запись в offsetLog происходит до любого исполнения. Это Write-Ahead Log в чистом виде: если процесс упадёт после offsetLog.add() но до commitLog.add(), при перезапуске Spark увидит, что batch N есть в offsetLog, но нет в commitLog — и переиграет его заново.
Именно порядок «сначала offsetLog, потом исполнение, потом commitLog» обеспечивает exactly-once при наличии replayable-источников (Kafka, Delta) и idempotent-синков (Delta, Iceberg, форматы с overwrite по ключу). Если синк не idempotent (например, plain Parquet append), дублирование возможно при повторном исполнении батча после сбоя.
runBatch: планирование каждого батча
После успешного constructNextBatch() вызывается runBatch(). Именно здесь streaming-запрос превращается в batch-запрос:
Шаг 1 — getBatch. Каждый источник превращает диапазон offset’ов в DataFrame. Kafka-источник, например, вернёт план типа KafkaScan(topic, partition=0, startOffset=100, endOffset=150). Это чисто логический план — никаких данных ещё не читается.
Шаг 2 — transform logical plan. Логический план стрима содержит заглушки StreamingExecutionRelation — по одной на каждый источник. Эти заглушки заменяются реальными логическими планами, полученными из источников на предыдущем шаге.
Шаг 3 — queryPlanning. Создаётся IncrementalExecution:
val newExecution = new IncrementalExecution(
sparkSession,
logicalPlan, // transformed логический план
outputMode, // Append / Update / Complete
checkpointLocation, // директория для state
queryId,
runId,
currentBatchId,
prevOffsetSeqMetadata,
offsetSeqMetadata, // содержит batchWatermarkMs
watermarkPropagator
)
IncrementalExecution — это обычный QueryExecution с дополнительными правилами планирования специально для streaming. О нём подробно в уроке 5.
Шаг 4 — addBatch. StreamExecution передаёт newExecution.executedPlan в sink через sink.addBatch(batchId, data). Здесь происходит реальное исполнение Spark Job с читкой данных и записью результата.
Что происходит с реальными данными
Любопытный факт: до шага addBatch никакие данные из источника физически не читаются. Все предыдущие шаги работают только с метаданными (offset’ами, планами). Это позволяет Spark построить полный физический план (включая оптимизации Catalyst) до того, как начнётся чтение.
В момент addBatch Spark исполняет DAG:
- Читает данные у источников (например, из Kafka партиций)
- Применяет трансформации (filter, join, aggregation)
- Записывает в sink
Все операции упакованы в один SQLExecution.withNewExecutionId — поэтому в Spark UI вы видите один Execution ID на батч, но внутри него может быть несколько Spark Job (например, при агрегации с shuffle).
Замена заглушек реальными планами и создание IncrementalExecution. Каждый батч — обычный Spark SQL запрос.
Метрики в Spark UI и логах
После каждого батча MicroBatchExecution публикует StreamingQueryProgress. В Spark UI вкладка «Structured Streaming» показывает:
- inputRowsPerSecond — скорость поступления данных из источников
- processedRowsPerSecond — пропускная способность обработки
- batchDuration — суммарное время батча
- durationMs — карта по фазам:
{"triggerExecution": 1200, "getBatch": 50, "queryPlanning": 80, "addBatch": 950, "walCommit": 15, "commitOffsets": 10}
# Получить последний StreamingQueryProgress программно
query = df.writeStream.start()
progress = query.lastProgress
print(progress["durationMs"])
# {"triggerExecution": 1247, "getBatch": 48, "queryPlanning": 76, "addBatch": 1089}
Если addBatch занимает 90%+ времени — узкое место в исполнении запроса (логика, join, shuffle). Если getBatch большой — медленный источник или большой backlog. Если walCommit вдруг вырос — I/O проблемы с checkpoint location (HDFS/S3 latency).
Распространённая проблема в production: processedRowsPerSecond намного ниже inputRowsPerSecond означает нарастающий backlog. Spark не теряет данные (Kafka хранит их), но задержка растёт. Решение: масштабировать executor’ы или уменьшить сложность запроса.
Micro-batch vs Continuous Processing
Spark поддерживает два режима обработки. Понять их различие важно для осознанного выбора:
Micro-batch (MicroBatchExecution):
- Данные читаются пакетами с фиксированным ритмом
- Каждый батч — отдельный Spark Job
- Минимальная latency: ~100 мс (обычно 1–30 секунд в production)
- Гарантия: exactly-once
- Подходит для: агрегация, join, stateful-операторы, все стандартные случаи
Continuous Processing (ContinuousExecution):
- Исполнители читают данные непрерывно, без пауз
- Offset’ы фиксируются по эпохам через
EpochCoordinator - Минимальная latency: ~1 мс
- Гарантия: at-least-once (в Spark 4.0 всё ещё экспериментальный режим)
- Подходит для: ультра-низкая latency, простые map/filter без state
Ключевое ограничение continuous processing: stateful-операторы (агрегации, join, transformWithState) не поддерживаются. Это фундаментальное ограничение архитектуры — state требует синхронизации между всеми задачами, что несовместимо с моделью непрерывного исполнения.
# Micro-batch (по умолчанию, рекомендован для production)
query = (df.writeStream
.trigger(processingTime="10 seconds")
.start())
# Continuous processing (experimental, только map/filter)
query = (df.writeStream
.trigger(continuous="1 second")
.start())
Восстановление после падения
Когда streaming-запрос падает и перезапускается, MicroBatchExecution.populateStartOffsets() восстанавливает состояние:
- Читает последний
batchIdизcommitLog— это последний успешно завершённый батч. - Читает тот же
batchIdизoffsetLog— восстанавливаетcommittedOffsets. - Проверяет: есть ли в
offsetLogзапись дляbatchId + 1?- Если есть — батч
batchId + 1был начат (WAL записан), но не завершён. Переигрываем его с теми же offset’ами. - Если нет — запрашиваем новые offset’ы у источников и строим новый батч.
- Если есть — батч
Именно здесь WAL-семантика играет свою роль: мы никогда не пропустим данные, потому что offset’ы записаны до исполнения.
Попробуй сам
Запустите streaming-запрос и понаблюдайте за метриками по фазам:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
import time
spark = SparkSession.builder \
.appName("microbatch-internals") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/streaming-checkpoint") \
.getOrCreate()
# Читаем из rate-источника (генерирует строки автоматически)
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 100)
.load())
# Простая агрегация
query = (df.groupBy("value")
.count()
.writeStream
.outputMode("update")
.format("memory")
.queryName("test_query")
.trigger(processingTime="5 seconds")
.start())
# Ждём несколько батчей
time.sleep(30)
# Смотрим метрики последнего батча
progress = query.lastProgress
print("Batch ID:", progress["batchId"])
print("Input rows/sec:", progress["inputRowsPerSecond"])
print("Processed rows/sec:", progress["processedRowsPerSecond"])
print("Duration breakdown:", progress["durationMs"])
query.stop()
Ожидаемый вывод:
Batch ID: 5
Input rows/sec: 100.0
Processed rows/sec: 2500.0
Duration breakdown: {"triggerExecution": 2041, "getBatch": 12,
"queryPlanning": 45, "addBatch": 1950,
"walCommit": 18, "commitOffsets": 16}