State TTL
State в Flink — это память без срока годности по умолчанию. Каждый записанный ключ остаётся в RocksDB или heap-backend до тех пор, пока вы явно не вызовете clear(). Для unbounded потоков (трафик, события пользователей, IoT-метрики) это означает безусловный рост state — пока диск не закончится или job не упадёт по OOM.
StateTtlConfig решает эту проблему: вы декларируете, как долго каждая запись должна “жить”, и Flink сам подметает устаревшее. Этот урок — про то, как это правильно настроить.
Зачем TTL почти обязателен
Самый частый сценарий неконтролируемого роста state:
// keyBy(user_id) — но user_id со временем редеет
events.keyBy(Event::getUserId)
.flatMap(new ComputeMetricsPerUser());
Пользователи приходят и уходят. Сегодня активны 1M, завтра ушли 100K, появились 80K новых. Через год у вас в state хранится 50M ключей, из которых активны 1M, а остальные — мёртвые записи, которые никто не читает, но которые Flink исправно загружает в чекпойнты, мониторит и сериализует при rescale.
Без TTL единственный способ избавиться от мёртвых ключей — явно отслеживать “последний раз видел” и руками удалять через state.clear() в onTimer() (см. урок про ProcessFunction). Это работает, но добавляет сложности. TTL — это декларативный шорткат: “удаляй автоматически через N времени”.
Любой keyBy с unbounded cardinality ключа (user_id, device_id, session_id, transaction_id) — это потенциальная утечка state. Для production-job почти всегда нужно либо TTL, либо явный cleanup через timers. Без этого job будет работать “до первого OOM”.
Базовая конфигурация
StateTtlConfig применяется к state-descriptor перед регистрацией:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<UserProfile> descriptor =
new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);
userProfileState = getRuntimeContext().getState(descriptor);
Три параметра, которые нужно осознанно задать:
- TTL duration — сколько времени запись считается актуальной.
- UpdateType — когда обновлять таймер.
- StateVisibility — что возвращать, если запись просрочена, но ещё не удалена.
В Python TTL настраивается аналогично через StateTtlConfig:
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
from pyflink.common.time import Time
ttl_config = StateTtlConfig \
.new_builder(Time.days(30)) \
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
.build()
descriptor = ValueStateDescriptor("user-profile", Types.PICKLED_BYTE_ARRAY())
descriptor.enable_time_to_live(ttl_config)
UpdateType: когда продлевать таймер
Три варианта определяют, как ведёт себя “часы” TTL:
Disabled — TTL не активен. Эквивалентно отсутствию enableTimeToLive().
OnCreateAndWrite (по умолчанию) — таймер сбрасывается при каждом update()/add()/put(). Если запись не обновляется N времени, она помечается как expired.
OnReadAndWrite — таймер сбрасывается и при чтении, и при записи. Запись остаётся актуальной, пока к ней есть обращения. Полезно для кэшей: “если не используем — удалить”.
// User profile: продлевать жизнь только при изменениях
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// Кэш LRU-style: продлевать жизнь при любых обращениях
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
OnReadAndWrite добавляет небольшой write-overhead при каждом read (нужно обновить timestamp в RocksDB). Если ваш workload читает в десятки раз чаще, чем пишет, это может заметно ухудшить производительность. В большинстве случаев OnCreateAndWrite достаточно.
StateVisibility: что делать с expired-данными
Cleanup в Flink — фоновая операция. Между моментом “запись стала expired” и “запись удалена с диска” может пройти время (от секунд до часов в зависимости от cleanup-стратегии). Что делать, если в этом окне кто-то запросил такую запись?
NeverReturnExpired (по умолчанию) — value() возвращает null, даже если физически запись ещё в RocksDB. Состояние “выглядит как удалённое” сразу при достижении expiration.
ReturnExpiredIfNotCleanedUp — value() возвращает значение, если оно физически ещё в state. Это даёт небольшое увеличение hit-rate для случаев, когда запись только что просрочилась.
// Логически чистая семантика: 30 дней — потом null
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// Прагматично: использовать пока есть, не торопиться удалять
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
Для большинства бизнес-логики NeverReturnExpired — правильный выбор: если вы сказали “30 дней TTL”, приложение не должно случайно увидеть данные старее.
ReturnExpiredIfNotCleanedUp нужен, когда TTL — это оптимистичный “по возможности удалить”, а не строгая граница (например, кэш ML-фичей).
Cleanup-стратегии
TTL без cleanup — это бесполезная отметка “просрочено” на записи, которая всё равно занимает диск. Cleanup физически освобождает место. Flink предлагает три стратегии:
1. Cleanup в фоне при доступе (включено по умолчанию). При обращении к state Flink проверяет timestamp и удаляет запись, если просрочена. Без явных обращений к ключу — он не очистится никогда.
2. Full snapshot cleanup — cleanupFullSnapshot(). При создании checkpoint просроченные записи не включаются в snapshot. Это даёт корректные размеры чекпойнтов, но НЕ освобождает место в локальном RocksDB.
.cleanupFullSnapshot()
3. Incremental cleanup — cleanupIncrementally(cleanupSize, runCleanupForEveryRecord). Flink проходит N записей при каждой обработке элемента (или периодически), проверяет TTL и удаляет просроченные.
// 100 записей за раз, не на каждой обработке (на таймере)
.cleanupIncrementally(100, false)
// 10 записей за раз, на каждом event (медленно, но равномерно)
.cleanupIncrementally(10, true)
4. RocksDB compaction filter — cleanupInRocksdbCompactFilter(queryTimeAfterNumEntries) (только для RocksDB-backend). Использует встроенный compaction-механизм RocksDB: при фоновых compaction-проходах expired-записи дропаются. Это самый эффективный способ для больших state.
// Проверять текущее время раз в 1000 записей (cheap)
.cleanupInRocksdbCompactFilter(1000)
Полный пример для production RocksDB-job:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000)
.cleanupFullSnapshot()
.build();
Эта конфигурация:
- 30 дней TTL.
- Таймер сбрасывается при изменениях.
- Expired-записи никогда не возвращаются read-операциями.
- Compaction filter удаляет с диска постепенно.
- Full snapshots не включают expired.
TTL для ListState и MapState
ListState и MapState поддерживают TTL только на уровне всего ключа целиком, не по отдельным элементам списка/мапы. То есть TTL применяется к “вся ListState для key=U-42”, а не к каждому элементу списка отдельно.
Логика расчёта:
- ListState: TTL обновляется при любом
add()/addAll()/update(). - MapState: TTL обновляется при любом
put()/remove(). Удаление отдельной пары черезremove()не сбрасывает TTL для всей мапы.
Это важно: если вы хотите per-element TTL (например, “каждый элемент списка живёт 24 часа, потом удаляется”), TTL вам не поможет. Нужно явное управление через timers (см. KeyedProcessFunction).
Event time vs processing time
В Flink 2.x TTL по умолчанию работает на processing time. Это значит, что отсчёт ведётся по системным часам TaskManager, не по watermark.
Для CDC-пайплайнов и backfill это может быть проблемой: вы переиграете 30 дней истории за час, и весь “старый” state будет создан с processing-time = текущему моменту. TTL начнёт срабатывать через 30 дней после backfill, а не через 30 дней после event time.
Альтернатива — реализовать event-time-aware TTL вручную через KeyedProcessFunction с registerEventTimeTimer() (см. урок про timers). StateTtlConfig для event time как нативная фича находится в обсуждении (FLIP-292), но на момент Flink 2.2 ещё не доступна стабильно.
Если ваш job переигрывает большие объёмы исторических данных (CDC initial snapshot, backfill через Kafka offsets), и TTL критичен — используйте explicit cleanup в onTimer вместо StateTtlConfig. Это даёт корректное удаление по event time даже при rapid replay.
Cost: что стоит TTL
TTL не бесплатен. Затраты:
- Storage overhead. Каждая запись хранит дополнительный 8-байтный timestamp последнего обновления.
- CPU при cleanup. Incremental и compaction-filter cleanup тратят CPU. Compaction-filter обычно дешевле (происходит и так).
- Snapshot time. При full snapshot cleanup нужно проверить TTL на каждой записи — это I/O.
В большинстве jobs эти затраты пренебрежимо малы по сравнению с экономией от удалённого state. Но для очень large state (десятки терабайт) с агрессивным TTL стоит замерять конкретный workload.
Production-чеклист
- Для unbounded keyBy — всегда настройте TTL, либо явный cleanup.
- TTL длительность — с запасом 2-3x от ожидаемого активного окна. Если ваш SLA — 30 дней, ставьте 60-90 дней TTL, чтобы accidental delay не сломал бизнес-логику.
- На RocksDB-backend используйте
cleanupInRocksdbCompactFilter()— это самый эффективный cleanup для больших state. - В мониторинге следите за
state_size_bytesper TaskManager. Стабильный рост — сигнал, что TTL не справляется или не настроен. - Не используйте
ReturnExpiredIfNotCleanedUp, если бизнес-логика опирается на чёткие границы TTL.
Попробуй сам
-
TTL effectiveness test. Создайте job с ValueState и TTL=60 секунд. Прогоните 100K событий с уникальными ключами, потом подождите 90 секунд без новых событий. Замерьте размер state через
flink list -tили Web UI. Без cleanupInRocksdbCompactFilter — state будет жить долго. С ним — упадёт после следующего compaction. -
OnRead vs OnWrite. На одинаковом workload запустите два job-а с разными UpdateType. Замерьте размер state после 24 часов. OnReadAndWrite должен показывать больший state (записи живут дольше).
-
Event-time replay. Запустите job с processing-time TTL=1 час и переиграйте 24 часа Kafka истории за 5 минут (бот-loader). Все 24 часа state останутся в памяти, потому что processing-time часы тикают медленнее. Этот эксперимент показывает, почему для backfill нужен event-time-aware cleanup.