IncrementalExecution и stateful-операторы
Когда MicroBatchExecution строит план для каждого батча, он не использует стандартный QueryExecution. Вместо него создаётся IncrementalExecution — специализированный планировщик, который знает о батч-идентификаторах, версиях state store, watermark’ах и output mode. Именно IncrementalExecution превращает абстрактные streaming-операторы (агрегация, деduplication, join) в конкретные физические узлы с ссылками на StateStore. Без понимания этого класса невозможно читать explain() streaming-запросов и диагностировать проблемы с state.
IncrementalExecution: что это и зачем
IncrementalExecution (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala) — это QueryExecution с переопределённым preparations и optimizedPlan.
class IncrementalExecution(
sparkSession: SparkSession,
logicalPlan: LogicalPlan,
outputMode: OutputMode, // Append, Update, Complete
checkpointLocation: String,
queryId: UUID,
runId: UUID,
currentBatchId: Long, // batchId текущего батча
prevOffsetSeqMetadata: Option[OffsetSeqMetadata],
offsetSeqMetadata: OffsetSeqMetadata, // содержит watermark
watermarkPropagator: WatermarkPropagator
) extends QueryExecution(sparkSession, logicalPlan)
Ключевые поля, которые отличают IncrementalExecution от обычного QueryExecution:
currentBatchId— идентификатор текущего батча, передаётся вStateStoreIdкак версияoffsetSeqMetadata.batchWatermarkMs— watermark для фильтрации поздних данных (eviction)prevOffsetSeqMetadata— watermark предыдущего батча (для фильтрации событий, которые не должны попасть в текущий батч)outputMode— определяет поведениеStateStoreSaveExec(агрессивный eviction при Append)
State preparation rule: заполнение метаданных
IncrementalExecution переопределяет preparations — набор правил, применяемых к физическому плану перед исполнением. Ключевое правило — state:
// Упрощённо: state preparation rule
val state = new Rule[SparkPlan] {
var operatorId = 0
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case StateStoreSaveExec(keys, None, None, None, storeConf, child) =>
operatorId += 1
StateStoreSaveExec(
keys,
Some(currentBatchId),
Some(outputMode),
Some(offsetSeqMetadata), // передаём watermark
StateStoreId(checkpointLocation, operatorId, partitionId),
storeConf,
child
)
case StateStoreRestoreExec(keys, None, storeConf, child) =>
StateStoreRestoreExec(
keys,
Some(currentBatchId),
StateStoreId(checkpointLocation, operatorId, partitionId),
storeConf,
child
)
// аналогично для FlatMapGroupsWithStateExec, StreamingDeduplicateExec, etc.
}
}
Эта rule пробегает физический план снизу вверх и заполняет все stateful-операторы ссылками на конкретный StateStore (через StateStoreId) и текущими метаданными батча.
Дерево stateful-операторов
После применения state preparation rule физический план streaming-запроса с агрегацией выглядит так:
// df.groupBy("key").count().writeStream.outputMode("update")...
explain() вывод для одного батча:
== Physical Plan ==
WriteToDataSourceV2 [...]
+- StateStoreSaveExec [key#10], stateInfo = StatefulOperatorStateInfo(
checkpointLocation = /checkpoint/state,
queryRunId = ...,
operatorId = 0,
storeVersion = 42,
numPartitions = 200)
outputMode = Update, watermark = EventTimeWatermark(0)
+- HashAggregateExec(keys=[key#10], functions=[count(1)])
+- StateStoreRestoreExec [key#10], stateInfo = StatefulOperatorStateInfo(...)
+- ShuffleExchangeExec(HashPartitioning([key#10], 200))
+- KafkaBatchScan [...]
Обратите внимание на пару StateStoreRestoreExec / StateStoreSaveExec: они оборачивают агрегацию. Restore читает предыдущее состояние перед агрегацией, Save записывает обновлённое состояние после.
Пара Restore/Save оборачивает агрегацию. Restore подаёт предыдущий state на вход агрегации, Save сохраняет результат.
StateStoreRestoreExec: детали
StateStoreRestoreExec — унарный физический оператор. Для каждой входной строки он:
- Достаёт ключ (ключевые колонки из
keysвыражения) - Делает
store.get(key)— читает предыдущее состояние - Если ключ есть в store — испускает две строки: старое состояние и новую входную строку
- Если ключа нет — испускает только новую входную строку
// Псевдокод doExecute
override def doExecute(): RDD[InternalRow] = {
val stateStore = getStateStore(stateInfo)
child.execute().mapPartitionsWithStateStore(stateStore) { iter =>
iter.flatMap { row =>
val key = getKey(row)
val savedState = stateStore.get(key)
if (savedState != null) {
Seq(savedState, row) // предыдущий state + новая строка
} else {
Seq(row) // только новая строка
}
}
}
}
Почему две строки? HashAggregateExec знает, как мёрджить частичные агрегаты. Передавая старое состояние как «предыдущий частичный агрегат», Spark избегает необходимости специального streaming-aware агрегатора.
StateStoreSaveExec: output mode-зависимое поведение
StateStoreSaveExec — более сложный оператор. Его поведение кардинально меняется от outputMode:
Update mode:
Для каждой строки: store.put(key, newValue)
Испускать: все обновлённые строки (даже если значение не изменилось)
Eviction: строки с ключами старше watermark удаляются из state (store.remove(key))
Append mode:
Для каждой строки: store.put(key, newValue)
Испускать: только строки, ключи которых "закрыты" watermark (window.end < watermark)
Eviction: закрытые ключи удаляются из state после испускания
Complete mode:
Для каждой строки: store.put(key, newValue)
Испускать: ВСЕ строки из state (полный снимок)
Eviction: нет (state никогда не удаляется)
Complete mode никогда не удаляет из state. Для unbounded streaming запросов без ограничений это означает неограниченный рост state. Complete mode подходит только для запросов с ограниченным набором ключей (например, агрегация по статическому enum-полю).
Watermark в физическом плане: два значения
В IncrementalExecution есть два разных watermark значения, что часто путает:
// eventTimeWatermarkForEviction: используется для удаления state
val eventTimeWatermarkForEviction: Option[Long] =
offsetSeqMetadata.batchWatermarkMs match {
case 0L => None
case ms => Some(ms)
}
// eventTimeWatermarkForLateEvents: используется для фильтрации входных данных
val eventTimeWatermarkForLateEvents: Option[Long] =
prevOffsetSeqMetadata.flatMap(_.batchWatermarkMs match {
case 0L => None
case ms => Some(ms)
})
Почему два значения? Watermark обновляется в конце батча, но применяется в следующем батче. Это означает:
- Фильтрация поздних событий (input filtering) использует watermark предыдущего батча — чтобы батч N не отфильтровал события, которые сам же и использовал для продвижения watermark
- Eviction state (удаление старых ключей) использует watermark текущего батча — актуальное значение для агрессивного освобождения памяти
Батч 40: max event time = 10:00:30
-> watermark после батча = 10:00:30 - 10min = 09:50:30
Батч 41:
eventTimeWatermarkForLateEvents = 09:50:30 (из предыдущего батча)
eventTimeWatermarkForLateEvents применяется к входным данным
(допустим, max event time = 10:00:45, watermark продвигается до 09:50:45)
eventTimeWatermarkForEviction = 09:50:45 (из текущего батча)
используется для eviction state в StateStoreSaveExec
OffsetSeqMetadata: мост между батчами
OffsetSeqMetadata — это структура, которая сохраняется в offsetLog вместе с offset’ами и восстанавливается при рестарте:
case class OffsetSeqMetadata(
batchWatermarkMs: Long, // watermark на момент начала батча
batchTimestampMs: Long, // системное время начала батча
conf: Map[String, String] // spark.sql.shuffle.partitions и другие конфиги
)
Поле conf особенно важно: если при рестарте spark.sql.shuffle.partitions изменился — Spark может обнаружить несоответствие и либо предупредить, либо отказаться продолжать (в зависимости от конфига). Это гарантирует, что state, созданный с 200 партициями, не будет прочитан при 400 партициях (что привело бы к неправильной маршрутизации данных).
Watermark обновляется в конце батча N и применяется в батче N+1. Два разных значения используются для разных целей.
Другие stateful-операторы
Помимо агрегации, IncrementalExecution конфигурирует и другие stateful-операторы:
StreamingDeduplicateExec — реализует dropDuplicates(). State хранит виденные ключи:
df.dropDuplicates(["user_id", "event_id"])
# StateStore: Set<(user_id, event_id)>
# При получении строки: если ключ в state -> отбрасываем, иначе -> испускаем + добавляем в state
StreamingSymmetricHashJoinExec — реализует stream-stream join. Хранит два StateStore: один для левого потока, один для правого:
stream1.join(stream2, "join_key", "inner")
# Left StateStore: буферизует строки из stream1, которые ещё не нашли пару
# Right StateStore: буферизует строки из stream2
# Watermark позволяет evict старые строки из обоих буферов
FlatMapGroupsWithStateExec / MapGroupsWithStateExec — старый arbitrary state API (deprecated в Spark 4.0 в пользу transformWithState).
Чтение explain() для streaming запросов
# Посмотреть физический план streaming-запроса
query = (df.groupBy("key").count()
.writeStream
.outputMode("update")
.format("console")
.start())
# IncrementalExecution.explain() доступен через lastExecution
spark.streams.get(query.id).lastExecution.executedPlan.toString()
# Более читаемо:
print(query.explain(True))
Ключевые вещи, на которые смотреть в explain():
storeVersion— должен совпадать сbatchIdnumPartitions— если изменилось, state несовместимoutputMode— убедиться, что правильный режимwatermark— если 0, eviction не происходит
Попробуй сам
Исследуйте, как IncrementalExecution настраивает физический план:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, col
import time
spark = SparkSession.builder \
.appName("incremental-execution-demo") \
.getOrCreate()
df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 100)
.load())
# Windowed aggregation с watermark
result = (df
.withWatermark("timestamp", "30 seconds")
.groupBy(window("timestamp", "1 minute"), col("value") % 5)
.count())
query = (result.writeStream
.outputMode("update")
.format("memory")
.queryName("plan_demo")
.option("checkpointLocation", "/tmp/plan-demo")
.start())
time.sleep(15)
# Получаем последний executed plan
last_exec = spark.streams.get(query.id).lastExecution
if last_exec:
print("=== Executed Plan ===")
print(last_exec.executedPlan)
print("\n=== State Operators ===")
for op in query.lastProgress.get("stateOperators", []):
print(f"Operator: {op['operatorName']}")
print(f" rows in state: {op['numRowsTotal']}")
print(f" memory used: {op['memoryUsedBytes']} bytes")
query.stop()
В выводе executedPlan найдите StateStoreSaveExec и StateStoreRestoreExec — они обрамляют HashAggregateExec. Обратите внимание на storeVersion в stateInfo — он должен совпадать с последним batchId.