Learning Platform
Глоссарий Troubleshooting
Урок 08.03 · 28 мин
Продвинутый
State Processor APISavepointReaderSavepointWriterState BootstrapState Migration

State Processor API — программная работа со savepoint

Savepoint — это структурированный файловый артефакт, и доступ к нему через State Processor API даёт мощные возможности, недоступные через стандартный Flink workflow.

Управление state в Spark Structured Streaming Вы можете прочитать savepoint в DataSet/DataStream, проинспектировать state каждого оператора, модифицировать его, или сгенерировать новый savepoint программно — например, из batch-данных для bootstrap-а нового streaming job-а.

API ориентирован на advanced операционные задачи: миграция между state backends, изменение state schema, отладка state surgery (исправление испорченных значений), bootstrap state из исторических данных. В этом уроке разбираем API, основные классы и три классических use case с код-примерами.


Зачем нужно

Без State Processor API state в savepoint — opaque blob. Вы можете restore из него (через CLI или REST), но не можете заглянуть внутрь. Если вы хотите узнать “сколько entries в моём MapState для оператора UID=‘session-window’”, или “какие keys имеют ValueState > 1MB” — стандартные тулзы не помогают.

State Processor API даёт два главных входа:

  1. SavepointReader — чтение savepoint как DataStream (с 1.13+) или DataSet (legacy). Можно итерировать по state каждого оператора и анализировать его.
  2. SavepointWriter — программная генерация savepoint. Можно создать savepoint с любым state (например, из CSV/Parquet файла) и потом использовать его как starting point для streaming job-а.

Эти два API позволяют решать задачи “state surgery” — операции на state без перезапуска job-а с нуля.


Минимальный example: чтение state

import org.apache.flink.state.api.SavepointReader;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SavepointReader savepoint = SavepointReader.read(
    env,
    "s3://my-bucket/savepoints/savepoint-12345-abcdef",
    new HashMapStateBackend()  // или EmbeddedRocksDBStateBackend
);

// Read keyed state одного оператора
DataStream<UserProfile> profiles = savepoint.readKeyedState(
    OperatorIdentifier.forUid("fraud-detector"),
    new UserProfileReader()
);

profiles.print();

env.execute("Read savepoint");

UserProfileReader — это KeyedStateReaderFunction с одним методом readKey(K key, Context ctx, Collector<OUT> out). Внутри Context даёт доступ ко всем state-ам оператора:

public class UserProfileReader extends KeyedStateReaderFunction<String, UserProfile> {
    private ValueState<UserProfile> profileState;
    
    @Override
    public void open(Configuration config) {
        profileState = getRuntimeContext().getState(
            new ValueStateDescriptor<>("user-profile", UserProfile.class)
        );
    }
    
    @Override
    public void readKey(String userId, Context ctx, Collector<UserProfile> out) throws Exception {
        UserProfile profile = profileState.value();
        if (profile != null) {
            out.collect(profile);
        }
    }
}

API эмулирует обычный keyed function, но в контексте reader-а. Flink под капотом распределяет work по subtask-ам параллельно (по key groups), каждая subtask читает свою часть state.


Use case 1: миграция между backends

Сценарий: ваш job работает на HashMap state backend, state дошёл до 5 GB, OOM. Нужно переключиться на RocksDB. Каноничный path:

  1. Take canonical savepoint (всегда portable).
  2. Stop job.
  3. Change config (state backend = RocksDB).
  4. Restart from savepoint.

Это работает, но canonical savepoint — full snapshot, медленный (минуты) и блокирует pipeline. Альтернатива через State Processor API — convert savepoint программно:

// Read existing savepoint
SavepointReader sourceSavepoint = SavepointReader.read(
    env,
    "s3://my-bucket/savepoints/old-savepoint",
    new HashMapStateBackend()
);

DataStream<KeyValue> entries = sourceSavepoint.readKeyedState(
    OperatorIdentifier.forUid("my-operator"),
    new EntryReader()
);

// Write новый savepoint с RocksDB backend
SavepointWriter newSavepoint = SavepointWriter
    .newSavepoint(env, new EmbeddedRocksDBStateBackend(), 128)  // 128 = maxParallelism
    .withOperator(
        OperatorIdentifier.forUid("my-operator"),
        OperatorTransformation
            .bootstrapWith(entries)
            .keyBy(KeyValue::getKey)
            .transform(new EntryBootstrapFunction())
    );

newSavepoint.write("s3://my-bucket/savepoints/new-savepoint");

env.execute("Migrate savepoint to RocksDB");

После execution в s3://my-bucket/savepoints/new-savepoint будет savepoint в RocksDB native format. Restore из него — быстрый. Этот workflow медленнее обычного flink savepoint со switch backend, но не блокирует production job — конверсия проходит на отдельном batch job, оригинальный job продолжает работать.


Use case 2: bootstrap state из batch

Сценарий: запускаете новый streaming job для fraud detection. State — MapState<UserID, FraudScore>. Изначально state пустой, и первые часы работы job-а — fully cold start. Хочется загрузить historical fraud scores из batch (Parquet файла) перед запуском streaming-а.

State Processor API решает:

// Read historical fraud scores from Parquet
DataStream<HistoricScore> historic = env.fromSource(
    parquetSource,
    WatermarkStrategy.noWatermarks(),
    "Historical scores"
);

// Build savepoint with bootstrapped state
SavepointWriter savepoint = SavepointWriter
    .newSavepoint(env, new EmbeddedRocksDBStateBackend(), 128)
    .withOperator(
        OperatorIdentifier.forUid("fraud-detector"),
        OperatorTransformation
            .bootstrapWith(historic)
            .keyBy(HistoricScore::getUserId)
            .transform(new FraudBootstrapFunction())
    );

savepoint.write("s3://my-bucket/savepoints/fraud-bootstrap");
env.execute("Bootstrap fraud detection state");
public class FraudBootstrapFunction extends KeyedStateBootstrapFunction<String, HistoricScore> {
    private MapState<String, FraudScore> scores;
    
    @Override
    public void open(Configuration config) {
        scores = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("fraud-scores", String.class, FraudScore.class)
        );
    }
    
    @Override
    public void processElement(HistoricScore value, Context ctx) throws Exception {
        scores.put(value.getCategory(), new FraudScore(value.getScore(), value.getTimestamp()));
    }
}

Теперь когда streaming job стартует через flink run -s :savepointDir job.jar, fraud-detector оператор уже имеет full state. Не нужно ждать часов накопления данных — система готова к detection с момента T0.

Этот pattern часто используется в production: stream + batch гибрид, где batch backfills данные, stream обрабатывает live.


Use case 3: state surgery

Сценарий: в production обнаружена data corruption. Один оператор содержит invalid ValueState для некоторых ключей (например, отрицательные timestamps из-за бага). Нужно исправить state без re-processing всех данных.

SavepointReader reader = SavepointReader.read(env, oldSavepoint, new EmbeddedRocksDBStateBackend());

DataStream<UserState> states = reader.readKeyedState(
    OperatorIdentifier.forUid("session-tracker"),
    new StateReader()
);

// Fix corrupted timestamps
DataStream<UserState> fixed = states.map(state -> {
    if (state.lastSeenTimestamp < 0) {
        state.lastSeenTimestamp = 0;  // или какое-то reasonable default
        state.fixed = true;
    }
    return state;
});

// Write fixed savepoint
SavepointWriter writer = SavepointWriter
    .fromExistingSavepoint(env, oldSavepoint, new EmbeddedRocksDBStateBackend())
    .removeOperator(OperatorIdentifier.forUid("session-tracker"))
    .withOperator(
        OperatorIdentifier.forUid("session-tracker"),
        OperatorTransformation
            .bootstrapWith(fixed)
            .keyBy(UserState::getUserId)
            .transform(new StateBootstrapFunction())
    );

writer.write(newSavepoint);
env.execute("Fix corrupted state");

Здесь использован fromExistingSavepoint (не newSavepoint) — это reuse state других операторов без modification, и replace только session-tracker. Очень полезный pattern: меняете один оператор, остальные сохраняются как есть.


SavepointWriter operations

МетодСемантика
newSavepoint(env, backend, maxParallelism)Создать совершенно новый savepoint с нуля
fromExistingSavepoint(env, path, backend)Начать с existing savepoint, modify selectively
.withOperator(uid, transformation)Добавить или заменить operator state
.removeOperator(uid)Удалить operator state из savepoint
.withConfiguration(config)Установить additional checkpoint configuration
.write(path)Trigger execution и запись savepoint

OperatorTransformation типы:

  • bootstrapWith(stream) — keyed state из streaming источника (вычисляется keyBy + transform).
  • bootstrapWithBroadcast(stream) — broadcast state из источника.
  • bootstrapWithUnion(stream) — union list state (split-redistribute).
  • bootstrapWithKeyedBroadcast — keyed + broadcast в одном.

Внутренне SavepointWriter запускает batch-job, который читает input stream, накапливает state в memory backend (или RocksDB), и в конце сериализует state как savepoint metadata + state files в DFS.


Performance considerations

State Processor API — batch operation, runs as DataStream batch job (since 1.17, в DataStream API). Для savepoint размером 100 GB write может занимать часы. Резервируйте достаточно ресурсов для bootstrap job-а.

Несколько советов:

Parallelism for bootstrap. Установите parallelism bootstrap job-а >= max-parallelism будущего streaming job-а. Это позволит писать state files параллельно по key groups. На 128 max-parallelism в streaming, bootstrap с parallelism=32 будет писать по 4 group per subtask — приемлемо.

State backend choice for bootstrap. Если final streaming job будет на RocksDB, делайте bootstrap тоже в RocksDB — это создаст native savepoint, который потом будет faster restore. Bootstrap в HashMap создаёт canonical savepoint — портабельный, но дольше restore.

Memory tuning. Bootstrap job собирает state in-memory до flush. Если у вас 50 GB unique keys, parallelism=10 = 5 GB per subtask — нужен heap соответствующего размера. Иначе OOM.

Read performance. SavepointReader тоже batch, но обычно быстрее, чем write. Single read pass через все state files.

TIP

При bootstrap из batch проверьте, что parallelism вашего bootstrap job-а делит maxParallelism нацело. Иначе key groups распределятся неравномерно, и финальное состояние будет иметь skew. Рекомендация: bootstrap parallelism = maxParallelism / 4 (округлено).


Limits and gotchas

Gotcha 1: TypeSerializer compatibility. Если ваш POJO имеет field types, которые сериализуются через нестандартный TypeSerializer (например, Kryo), State Processor API должен иметь access к тому же TypeSerializer. Включите все user JARs в classpath bootstrap job-а.

Gotcha 2: timer state. ProcessFunction timers (processing-time и event-time) — это часть state, но не доступна напрямую через ValueState/MapState API. Для inspection timers нужна KeyedStateReaderFunction.Context.registeredTimerService(), который даёт Iterable<Long> of timer timestamps.

Gotcha 3: operator UID is mandatory. Чтобы читать state через API, оригинальный job должен был использовать explicit UID. Без него вы не можете адресовать operator state.

Gotcha 4: schema evolution. Если ваш POJO структура изменилась между job versions, для чтения старого savepoint нужно использовать старый POJO schema. Иначе deserialization failes. См. lesson 04 для деталей schema evolution.

Gotcha 5: bootstrap state size limits. Bootstrap pass через State Processor API hold всё state в memory или local disk перед flush. Для очень больших state (terabytes) это может потребовать sharding bootstrap на несколько runs.


Чтение source

  • org.apache.flink.state.api.SavepointReader — главный класс для чтения.
  • org.apache.flink.state.api.SavepointWriter — главный класс для записи.
  • org.apache.flink.state.api.functions.KeyedStateReaderFunction — base class для readers.
  • org.apache.flink.state.api.functions.KeyedStateBootstrapFunction — base class для bootstrap.
  • org.apache.flink.state.api.OperatorTransformation — builder для transformation.
  • FLIP-43 (State Processor API) в Apache Flink Wiki — original design.

Попробуй сам

  1. Inspect production savepoint. Запустите простой read script на real savepoint. Подсчитайте количество keys per operator, размер state per key. Это очень полезный инсайт о вашем production state.

  2. Bootstrap state из CSV. Прочитайте CSV с user data, конвертируйте в savepoint через SavepointWriter, запустите streaming job с этим savepoint как starting point. Полезный pattern для cold start fraud detection / recommendation systems.

  3. Fix corrupted state. Симулируйте data corruption (в test environment): запустите job, добавьте bad data, take savepoint. Используйте State Processor API для fix и compare result после restore. Это даёт hands-on опыт с state surgery — критично для production on-call.

Проверка знанийKnowledge check
Вам нужно мигрировать production job из HashMap в RocksDB state backend. Текущий state — 30 GB. Job обрабатывает 50K events/sec. Stop window — максимум 5 минут. Какой workflow корректен и почему наивный подход 'take canonical savepoint, switch backend, restore' не подходит?
ОтветAnswer
Наивный подход проблематичен: canonical savepoint для 30 GB на HashMap занимает 10-20 минут (HashMap snapshot serializes весь state синхронно), что превышает 5-минутный window. Кроме того, restore canonical в RocksDB тоже долгий (5-10 минут) — total downtime 20-30 минут. Правильный workflow через State Processor API: 1) Не останавливая prod, take ЛЮБОЙ savepoint (canonical, ~20 минут — но в background, prod continues running). 2) Запустить bootstrap job на отдельном cluster, который reads canonical savepoint и writes NEW savepoint в RocksDB format. Этот job занимает 30-60 минут, но не trogает prod. 3) После завершения bootstrap у вас есть RocksDB savepoint, ready to restore. 4) Take fresh savepoint от prod job (для catch-up последних updates с момента шага 1). Bootstrap этого delta тоже через State Processor API. Альтернатива — use changelog catch-up: 5) Stop prod job в очень узкое window (1-2 минуты), restore новый job из RocksDB savepoint + replay events с Kafka offset фиксированного в delta savepoint. Total downtime — minimum, поскольку state preparation (тяжелая часть) была offline. Дополнительный bonus: тестируете corrupted RocksDB savepoint в staging до production cutover — снижает риск downtime extension если что-то пойдёт не так.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие два главных класса предоставляет State Processor API?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4