Learning Platform
Глоссарий Troubleshooting
Урок 05.03 · 22 мин
Средний
State TTLStateTtlConfigRocksDBState CleanupUpdateTypeStateVisibility

State TTL

State в Flink — это память без срока годности по умолчанию. Каждый записанный ключ остаётся в RocksDB или heap-backend до тех пор, пока вы явно не вызовете clear(). Для unbounded потоков (трафик, события пользователей, IoT-метрики) это означает безусловный рост state — пока диск не закончится или job не упадёт по OOM.

StateTtlConfig решает эту проблему: вы декларируете, как долго каждая запись должна “жить”, и Flink сам подметает устаревшее. Этот урок — про то, как это правильно настроить.


Зачем TTL почти обязателен

Самый частый сценарий неконтролируемого роста state:

// keyBy(user_id) — но user_id со временем редеет
events.keyBy(Event::getUserId)
      .flatMap(new ComputeMetricsPerUser());

Пользователи приходят и уходят. Сегодня активны 1M, завтра ушли 100K, появились 80K новых. Через год у вас в state хранится 50M ключей, из которых активны 1M, а остальные — мёртвые записи, которые никто не читает, но которые Flink исправно загружает в чекпойнты, мониторит и сериализует при rescale.

Без TTL единственный способ избавиться от мёртвых ключей — явно отслеживать “последний раз видел” и руками удалять через state.clear() в onTimer() (см. урок про ProcessFunction). Это работает, но добавляет сложности. TTL — это декларативный шорткат: “удаляй автоматически через N времени”.

WARNING

Любой keyBy с unbounded cardinality ключа (user_id, device_id, session_id, transaction_id) — это потенциальная утечка state. Для production-job почти всегда нужно либо TTL, либо явный cleanup через timers. Без этого job будет работать “до первого OOM”.


Базовая конфигурация

StateTtlConfig применяется к state-descriptor перед регистрацией:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(30))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<UserProfile> descriptor =
    new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);

userProfileState = getRuntimeContext().getState(descriptor);

Три параметра, которые нужно осознанно задать:

  • TTL duration — сколько времени запись считается актуальной.
  • UpdateType — когда обновлять таймер.
  • StateVisibility — что возвращать, если запись просрочена, но ещё не удалена.

В Python TTL настраивается аналогично через StateTtlConfig:

from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
from pyflink.common.time import Time

ttl_config = StateTtlConfig \
    .new_builder(Time.days(30)) \
    .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
    .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
    .build()

descriptor = ValueStateDescriptor("user-profile", Types.PICKLED_BYTE_ARRAY())
descriptor.enable_time_to_live(ttl_config)

UpdateType: когда продлевать таймер

Три варианта определяют, как ведёт себя “часы” TTL:

Disabled — TTL не активен. Эквивалентно отсутствию enableTimeToLive().

OnCreateAndWrite (по умолчанию) — таймер сбрасывается при каждом update()/add()/put(). Если запись не обновляется N времени, она помечается как expired.

OnReadAndWrite — таймер сбрасывается и при чтении, и при записи. Запись остаётся актуальной, пока к ней есть обращения. Полезно для кэшей: “если не используем — удалить”.

// User profile: продлевать жизнь только при изменениях
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

// Кэш LRU-style: продлевать жизнь при любых обращениях
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
UpdateType: OnCreateAndWrite vs OnReadAndWrite
TTL = 24 часа. Timeline ниже показывает write (W) и read (R) операции.
Таймер сбрасывается ТОЛЬКО при write. Reads не продлевают жизнь. Подходит для записей, чья актуальность зависит только от обновлений (профиль, статус).
Таймер сбрасывается при write ИЛИ read. Записи живут пока кто-то к ним обращается. Подходит для кэшей и lookup-таблиц.
TIP

OnReadAndWrite добавляет небольшой write-overhead при каждом read (нужно обновить timestamp в RocksDB). Если ваш workload читает в десятки раз чаще, чем пишет, это может заметно ухудшить производительность. В большинстве случаев OnCreateAndWrite достаточно.


StateVisibility: что делать с expired-данными

Cleanup в Flink — фоновая операция. Между моментом “запись стала expired” и “запись удалена с диска” может пройти время (от секунд до часов в зависимости от cleanup-стратегии). Что делать, если в этом окне кто-то запросил такую запись?

NeverReturnExpired (по умолчанию) — value() возвращает null, даже если физически запись ещё в RocksDB. Состояние “выглядит как удалённое” сразу при достижении expiration.

ReturnExpiredIfNotCleanedUpvalue() возвращает значение, если оно физически ещё в state. Это даёт небольшое увеличение hit-rate для случаев, когда запись только что просрочилась.

// Логически чистая семантика: 30 дней — потом null
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)

// Прагматично: использовать пока есть, не торопиться удалять
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)

Для большинства бизнес-логики NeverReturnExpired — правильный выбор: если вы сказали “30 дней TTL”, приложение не должно случайно увидеть данные старее.

ReturnExpiredIfNotCleanedUp нужен, когда TTL — это оптимистичный “по возможности удалить”, а не строгая граница (например, кэш ML-фичей).


Cleanup-стратегии

TTL без cleanup — это бесполезная отметка “просрочено” на записи, которая всё равно занимает диск. Cleanup физически освобождает место. Flink предлагает три стратегии:

1. Cleanup в фоне при доступе (включено по умолчанию). При обращении к state Flink проверяет timestamp и удаляет запись, если просрочена. Без явных обращений к ключу — он не очистится никогда.

2. Full snapshot cleanupcleanupFullSnapshot(). При создании checkpoint просроченные записи не включаются в snapshot. Это даёт корректные размеры чекпойнтов, но НЕ освобождает место в локальном RocksDB.

.cleanupFullSnapshot()

3. Incremental cleanupcleanupIncrementally(cleanupSize, runCleanupForEveryRecord). Flink проходит N записей при каждой обработке элемента (или периодически), проверяет TTL и удаляет просроченные.

// 100 записей за раз, не на каждой обработке (на таймере)
.cleanupIncrementally(100, false)

// 10 записей за раз, на каждом event (медленно, но равномерно)
.cleanupIncrementally(10, true)

4. RocksDB compaction filtercleanupInRocksdbCompactFilter(queryTimeAfterNumEntries) (только для RocksDB-backend). Использует встроенный compaction-механизм RocksDB: при фоновых compaction-проходах expired-записи дропаются. Это самый эффективный способ для больших state.

RocksDB compaction: как TTL cleanup работает под капотом
// Проверять текущее время раз в 1000 записей (cheap)
.cleanupInRocksdbCompactFilter(1000)

Полный пример для production RocksDB-job:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(30))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter(1000)
    .cleanupFullSnapshot()
    .build();

Эта конфигурация:

  • 30 дней TTL.
  • Таймер сбрасывается при изменениях.
  • Expired-записи никогда не возвращаются read-операциями.
  • Compaction filter удаляет с диска постепенно.
  • Full snapshots не включают expired.
Cleanup стратегии и где они срабатывают
Проверка TTL происходит при каждом обращении к ключу. Если ключ ни разу не запросили — он не очистится никогда. Включено по умолчанию, не убирается.
При checkpoint все expired-записи фильтруются. Snapshot меньше, но локальный RocksDB не уменьшается. Включается явно: cleanupFullSnapshot().
Перебор N записей за раз. Гарантирует cleanup даже для неактивных ключей. Стоит CPU — добавляет latency. cleanupIncrementally(N, perRecord).
Только для RocksDB. Использует встроенный compaction для удаления expired. Лучший вариант для больших state. cleanupInRocksdbCompactFilter(N).

TTL для ListState и MapState

ListState и MapState поддерживают TTL только на уровне всего ключа целиком, не по отдельным элементам списка/мапы. То есть TTL применяется к “вся ListState для key=U-42”, а не к каждому элементу списка отдельно.

Логика расчёта:

  • ListState: TTL обновляется при любом add() / addAll() / update().
  • MapState: TTL обновляется при любом put() / remove(). Удаление отдельной пары через remove() не сбрасывает TTL для всей мапы.

Это важно: если вы хотите per-element TTL (например, “каждый элемент списка живёт 24 часа, потом удаляется”), TTL вам не поможет. Нужно явное управление через timers (см. KeyedProcessFunction).


Event time vs processing time

В Flink 2.x TTL по умолчанию работает на processing time. Это значит, что отсчёт ведётся по системным часам TaskManager, не по watermark.

Для CDC-пайплайнов и backfill это может быть проблемой: вы переиграете 30 дней истории за час, и весь “старый” state будет создан с processing-time = текущему моменту. TTL начнёт срабатывать через 30 дней после backfill, а не через 30 дней после event time.

Альтернатива — реализовать event-time-aware TTL вручную через KeyedProcessFunction с registerEventTimeTimer() (см. урок про timers). StateTtlConfig для event time как нативная фича находится в обсуждении (FLIP-292), но на момент Flink 2.2 ещё не доступна стабильно.

NOTE

Если ваш job переигрывает большие объёмы исторических данных (CDC initial snapshot, backfill через Kafka offsets), и TTL критичен — используйте explicit cleanup в onTimer вместо StateTtlConfig. Это даёт корректное удаление по event time даже при rapid replay.


Cost: что стоит TTL

TTL не бесплатен. Затраты:

  • Storage overhead. Каждая запись хранит дополнительный 8-байтный timestamp последнего обновления.
  • CPU при cleanup. Incremental и compaction-filter cleanup тратят CPU. Compaction-filter обычно дешевле (происходит и так).
  • Snapshot time. При full snapshot cleanup нужно проверить TTL на каждой записи — это I/O.

В большинстве jobs эти затраты пренебрежимо малы по сравнению с экономией от удалённого state. Но для очень large state (десятки терабайт) с агрессивным TTL стоит замерять конкретный workload.


Production-чеклист

  • Для unbounded keyBy — всегда настройте TTL, либо явный cleanup.
  • TTL длительность — с запасом 2-3x от ожидаемого активного окна. Если ваш SLA — 30 дней, ставьте 60-90 дней TTL, чтобы accidental delay не сломал бизнес-логику.
  • На RocksDB-backend используйте cleanupInRocksdbCompactFilter() — это самый эффективный cleanup для больших state.
  • В мониторинге следите за state_size_bytes per TaskManager. Стабильный рост — сигнал, что TTL не справляется или не настроен.
  • Не используйте ReturnExpiredIfNotCleanedUp, если бизнес-логика опирается на чёткие границы TTL.

Попробуй сам

  1. TTL effectiveness test. Создайте job с ValueState и TTL=60 секунд. Прогоните 100K событий с уникальными ключами, потом подождите 90 секунд без новых событий. Замерьте размер state через flink list -t или Web UI. Без cleanupInRocksdbCompactFilter — state будет жить долго. С ним — упадёт после следующего compaction.

  2. OnRead vs OnWrite. На одинаковом workload запустите два job-а с разными UpdateType. Замерьте размер state после 24 часов. OnReadAndWrite должен показывать больший state (записи живут дольше).

  3. Event-time replay. Запустите job с processing-time TTL=1 час и переиграйте 24 часа Kafka истории за 5 минут (бот-loader). Все 24 часа state останутся в памяти, потому что processing-time часы тикают медленнее. Этот эксперимент показывает, почему для backfill нужен event-time-aware cleanup.

Проверка знанийKnowledge check
У вас MapState<String, SessionData>, ключ — user_id, мапа — sessions внутри пользователя. Каждая сессия живёт до 24 часов после последней активности. Вы настроили StateTtlConfig с TTL=24h и UpdateType=OnCreateAndWrite. Будет ли это работать как ожидается?
ОтветAnswer
Нет. StateTtlConfig применяется к MapState на уровне ВСЕЙ мапы для ключа user_id, не на уровне отдельных session_id внутри мапы. Если пользователь активен (любая сессия обновляется), TTL для всей мапы сбрасывается — старые сессии не удалятся. Если пользователь полностью неактивен 24 часа — удалятся все сессии разом. Корректное решение: либо отдельный keyBy(user_id, session_id) с собственным TTL, либо ручное управление через KeyedProcessFunction с registerEventTimeTimer per session_id. В onTimer удаляйте конкретную пару через mapState.remove(sessionId). Это даёт per-session гранулярность очистки.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Команда настроила StateTtlConfig для ValueState с TTL=30 дней и UpdateType=OnCreateAndWrite. После 60 дней работы команда смотрит на размер state — он продолжает расти и ничего не очищается. Что наиболее вероятно забыли сделать?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4