Timers
Timers — это механизм Flink для отложенных действий: “через 5 минут запусти эту логику”, “при достижении этого event time эмиттит результат”, “если в течение 24 часов не было события — очисти state”. В KeyedProcessFunction timers — это второй основной API после processElement.
Timers — мощный, но коварный механизм. Этот урок — про правильное использование и common bugs, которые ломают production-job-ы.
Event time vs processing time timers
Flink поддерживает два типа timers:
Event time timer. Срабатывает, когда watermark проходит за заданный timestamp. Зависит от event time потока.
ctx.timerService().registerEventTimeTimer(eventTimestamp + 60000);
Processing time timer. Срабатывает в реальный момент wall-clock TaskManager.
long deadline = System.currentTimeMillis() + 60000;
ctx.timerService().registerProcessingTimeTimer(deadline);
При срабатывании timer Flink вызывает onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out). Параметр timestamp — это запланированный момент срабатывания, не текущий wall-clock.
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
TimeDomain domain = ctx.timeDomain();
if (domain == TimeDomain.EVENT_TIME) {
// обработка event time timer
} else if (domain == TimeDomain.PROCESSING_TIME) {
// обработка processing time timer
}
}
Когда какой использовать
Event time timer — когда логика семантически про время событий:
- “Закрой сессию через 30 минут после последнего события” (
registerEventTimeTimer(lastEventTime + 30min)). - “Эмиттит результат окна, когда watermark пройдёт за конец окна.”
- Все сценарии, где результат должен быть детерминированным при replay.
Processing time timer — когда логика про реальное wall-clock время:
- “Через 5 минут после регистрации pending-order — таймаут” (бизнес: реальное время, не event time).
- “Каждые 30 секунд — heartbeat check”.
- Logic, не зависящая от данных.
Если сомневаетесь — выбирайте event time. Это даёт детерминизм при replay, корректную работу при backfill, и общую согласованность с event-time-семантикой Flink. Processing time для timers — только когда явно нужно “wall-clock сейчас”, не “момент в потоке”.
Полный пример: pending order timeout
Сценарий: order имеет статус PENDING; нужно через 24 часа автоматически перевести в EXPIRED, если не пришло COMPLETED событие.
public class OrderTimeoutDetector
extends KeyedProcessFunction<String, OrderEvent, OrderStatus> {
private transient ValueState<OrderEvent> pending;
@Override
public void open(OpenContext ctx) {
pending = getRuntimeContext().getState(
new ValueStateDescriptor<>("pending-order", OrderEvent.class));
}
@Override
public void processElement(OrderEvent event, Context ctx,
Collector<OrderStatus> out) throws Exception {
if (event.status == OrderStatus.PENDING) {
pending.update(event);
// Регистрируем timer на 24 часа вперёд
long timeoutAt = event.timestamp + Duration.ofHours(24).toMillis();
ctx.timerService().registerEventTimeTimer(timeoutAt);
} else if (event.status == OrderStatus.COMPLETED) {
OrderEvent p = pending.value();
if (p != null) {
// Удаляем timer и state — order завершён
long timeoutAt = p.timestamp + Duration.ofHours(24).toMillis();
ctx.timerService().deleteEventTimeTimer(timeoutAt);
pending.clear();
out.collect(new OrderStatus(event.orderId, "COMPLETED"));
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<OrderStatus> out) throws Exception {
OrderEvent p = pending.value();
if (p != null) {
// Timer сработал, order до сих пор PENDING — timeout
out.collect(new OrderStatus(p.orderId, "EXPIRED"));
pending.clear();
}
}
}
Ключевые моменты:
- При получении PENDING регистрируется event time timer на 24 часа.
- При получении COMPLETED timer удаляется через
deleteEventTimeTimer. - Если COMPLETED не приходит — timer срабатывает, и
onTimerэмитит EXPIRED.
Python:
from datetime import timedelta
class OrderTimeoutDetector(KeyedProcessFunction):
def open(self, ctx):
self.pending = ctx.get_state(ValueStateDescriptor("pending", ...))
def process_element(self, event, ctx):
if event.status == "PENDING":
self.pending.update(event)
timeout_ms = event.timestamp + int(timedelta(hours=24).total_seconds() * 1000)
ctx.timer_service().register_event_time_timer(timeout_ms)
elif event.status == "COMPLETED":
p = self.pending.value()
if p is not None:
timeout_ms = p.timestamp + int(timedelta(hours=24).total_seconds() * 1000)
ctx.timer_service().delete_event_time_timer(timeout_ms)
self.pending.clear()
yield OrderStatus(event.order_id, "COMPLETED")
def on_timer(self, timestamp, ctx):
p = self.pending.value()
if p is not None:
yield OrderStatus(p.order_id, "EXPIRED")
self.pending.clear()
Семантика timers: дедупликация и срабатывание
Несколько важных свойств:
Дедупликация по timestamp. Если зарегистрировать registerEventTimeTimer(1000) дважды — timer один, и onTimer(1000) будет вызван один раз. Дублирующая регистрация — no-op.
Per-key timer. Timers per ключ независимы. registerEventTimeTimer(1000) для user_id=A и user_id=B — это два timer, каждый сработает один раз с current key соответствующего пользователя.
Время срабатывания event time timer — когда watermark проходит за timestamp timer-а (watermark > timestamp, не >=). То есть registerEventTimeTimer(1000) срабатывает когда watermark становится 1001 или больше.
Время срабатывания processing time timer — когда currentProcessingTime() >= timestamp. Может быть с небольшой задержкой относительно запланированного.
Удаление timers
deleteEventTimeTimer(ts) / deleteProcessingTimeTimer(ts) удаляют ранее зарегистрированный timer. Если timer не существует — no-op.
ctx.timerService().registerEventTimeTimer(1000);
// ...
ctx.timerService().deleteEventTimeTimer(1000); // отменяет
Это критично для предотвращения spurious срабатываний. В примере выше (order timeout), если приходит COMPLETED — мы удаляем timer, чтобы EXPIRED не сработал зря.
Если в onTimer вы регистрируете новый timer (например, для отложенной повторной проверки), убедитесь, что есть условие выхода. Бесконечная цепочка self-rescheduling timers — это утечка state и нагрузка на TaskManager. Используйте флаг в state, который остановит цепочку.
Common bugs
Bug 1: timer на каждое событие без условия
// ПЛОХО
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) {
ctx.timerService().registerEventTimeTimer(e.timestamp + 60000);
// ... другая логика
}
Для 10K событий/сек = 10K timers/sec регистрируется. Все они хранятся в timer state. Через час — десятки миллионов timers в памяти. OOM или extreme degradation.
Лечение: регистрируйте timer только когда нужно (например, только для PENDING events), и удаляйте при наступлении terminating event. Или используйте паттерн “один активный timer per ключ” с perpetual re-schedule.
Bug 2: timer без сохранения timestamp в state
// ПЛОХО
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) {
if (e.isStart) {
ctx.timerService().registerEventTimeTimer(e.timestamp + 60000);
} else if (e.isEnd) {
// как удалить timer? мы не помним его timestamp!
ctx.timerService().deleteEventTimeTimer(???);
}
}
Чтобы удалить timer, нужно знать его exact timestamp. Если не сохранили — timer останется и сработает зря.
Лечение: сохраняйте timestamp в state параллельно с регистрацией timer.
private transient ValueState<Long> currentTimerTs;
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
if (e.isStart) {
long ts = e.timestamp + 60000;
ctx.timerService().registerEventTimeTimer(ts);
currentTimerTs.update(ts);
} else if (e.isEnd) {
Long ts = currentTimerTs.value();
if (ts != null) {
ctx.timerService().deleteEventTimeTimer(ts);
currentTimerTs.clear();
}
}
}
Bug 3: spurious срабатывания после rescheduling
// ПЛОХО
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
Long current = currentTimerTs.value();
long newTs = e.timestamp + 60000;
ctx.timerService().registerEventTimeTimer(newTs);
currentTimerTs.update(newTs);
// Старый timer не удалили! Он сработает зря.
}
Регистрация нового timer не отменяет старый. Если задача — “перенести timer на новое время”, нужно явно удалить старый.
Лечение:
Long old = currentTimerTs.value();
if (old != null) ctx.timerService().deleteEventTimeTimer(old);
long newTs = e.timestamp + 60000;
ctx.timerService().registerEventTimeTimer(newTs);
currentTimerTs.update(newTs);
Bug 4: timer без cleanup в onTimer
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
if (someCondition) {
ctx.timerService().registerEventTimeTimer(timestamp + 60000);
// забыли state.clear() — старое состояние осталось
}
}
В onTimer полезно явно очищать state, если timer — это финальное действие для ключа.
Bug 5: бесконечная цепочка timers
// ПЛОХО — бесконечный self-reschedule
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
out.collect(...);
ctx.timerService().registerEventTimeTimer(timestamp + 60000); // снова и снова
}
Без условия выхода — utечка. Каждый ключ накапливает один live timer, который перезапускается. Сколько активных ключей — столько одновременно живых timer-ов в state, без cleanup.
Лечение: условие выхода через state.
Производительность
Timer state хранится отдельно от user state, но влияет на размер чекпойнтов и memory footprint.
Рекомендации:
- Один timer per ключ. Не регистрируйте per event без необходимости.
- Удаляйте через delete*Timer. Не полагайтесь на естественное срабатывание для cleanup.
- Round timestamps. Если timer не критичен к миллисекундам, округляйте до секунд — это уменьшит количество уникальных timestamps и upgrades dedup.
- Используйте RocksDB backend для job-ов с многими timers — heap backend deserialize всё в память.
Production-чеклист
- Каждой регистрации timer соответствует либо явное
delete*Timer, либоonTimerс гарантированной обработкой. - При rescheduling timer — сначала удалить старый.
- В onTimer — финализация state (clear() если задача завершена).
- Метрика количества активных timers — мониторить через JMX (currently registered timers).
- При работе с processing time timers — учитывайте, что они могут сработать с задержкой при backpressure.
Попробуй сам
-
Watchdog timer. Реализуйте: на каждое событие сбрасывайте timer на 5 минут вперёд. Если 5 минут не было событий — emit “user inactive”. Используйте deleteEventTimeTimer при rescheduling.
-
Periodic emit. Реализуйте: каждые 30 секунд per ключ emit агрегированное значение из
ValueState<Long counter>. Используйте processing time timer + perpetual self-reschedule. -
Bug 3 reproduction. Намеренно не удаляйте старый timer при rescheduling. Наблюдайте, как с временем накапливаются timers — посмотрите на JMX метрику числа активных timers. После исправления (delete + register) — количество должно стабилизироваться.