Learning Platform
Глоссарий Troubleshooting
Урок 08.02 · 22 мин
Средний
TimersEventTimeTimerProcessingTimeTimeronTimerdeleteTimerTimer State

Timers

Timers — это механизм Flink для отложенных действий: “через 5 минут запусти эту логику”, “при достижении этого event time эмиттит результат”, “если в течение 24 часов не было события — очисти state”. В KeyedProcessFunction timers — это второй основной API после processElement.

Timers — мощный, но коварный механизм. Этот урок — про правильное использование и common bugs, которые ломают production-job-ы.


Event time vs processing time timers

Flink поддерживает два типа timers:

Event time timer. Срабатывает, когда watermark проходит за заданный timestamp. Зависит от event time потока.

ctx.timerService().registerEventTimeTimer(eventTimestamp + 60000);

Processing time timer. Срабатывает в реальный момент wall-clock TaskManager.

long deadline = System.currentTimeMillis() + 60000;
ctx.timerService().registerProcessingTimeTimer(deadline);

При срабатывании timer Flink вызывает onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out). Параметр timestamp — это запланированный момент срабатывания, не текущий wall-clock.

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) throws Exception {
    TimeDomain domain = ctx.timeDomain();
    if (domain == TimeDomain.EVENT_TIME) {
        // обработка event time timer
    } else if (domain == TimeDomain.PROCESSING_TIME) {
        // обработка processing time timer
    }
}

Когда какой использовать

Event time timer — когда логика семантически про время событий:

  • “Закрой сессию через 30 минут после последнего события” (registerEventTimeTimer(lastEventTime + 30min)).
  • “Эмиттит результат окна, когда watermark пройдёт за конец окна.”
  • Все сценарии, где результат должен быть детерминированным при replay.

Processing time timer — когда логика про реальное wall-clock время:

  • “Через 5 минут после регистрации pending-order — таймаут” (бизнес: реальное время, не event time).
  • “Каждые 30 секунд — heartbeat check”.
Timer service internals: heap vs RocksDB timers
  • Logic, не зависящая от данных.
TIP

Если сомневаетесь — выбирайте event time. Это даёт детерминизм при replay, корректную работу при backfill, и общую согласованность с event-time-семантикой Flink. Processing time для timers — только когда явно нужно “wall-clock сейчас”, не “момент в потоке”.


Полный пример: pending order timeout

Сценарий: order имеет статус PENDING; нужно через 24 часа автоматически перевести в EXPIRED, если не пришло COMPLETED событие.

public class OrderTimeoutDetector
        extends KeyedProcessFunction<String, OrderEvent, OrderStatus> {

    private transient ValueState<OrderEvent> pending;

    @Override
    public void open(OpenContext ctx) {
        pending = getRuntimeContext().getState(
            new ValueStateDescriptor<>("pending-order", OrderEvent.class));
    }

    @Override
    public void processElement(OrderEvent event, Context ctx,
                                Collector<OrderStatus> out) throws Exception {
        if (event.status == OrderStatus.PENDING) {
            pending.update(event);
            // Регистрируем timer на 24 часа вперёд
            long timeoutAt = event.timestamp + Duration.ofHours(24).toMillis();
            ctx.timerService().registerEventTimeTimer(timeoutAt);
        } else if (event.status == OrderStatus.COMPLETED) {
            OrderEvent p = pending.value();
            if (p != null) {
                // Удаляем timer и state — order завершён
                long timeoutAt = p.timestamp + Duration.ofHours(24).toMillis();
                ctx.timerService().deleteEventTimeTimer(timeoutAt);
                pending.clear();
                out.collect(new OrderStatus(event.orderId, "COMPLETED"));
            }
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
                        Collector<OrderStatus> out) throws Exception {
        OrderEvent p = pending.value();
        if (p != null) {
            // Timer сработал, order до сих пор PENDING — timeout
            out.collect(new OrderStatus(p.orderId, "EXPIRED"));
            pending.clear();
        }
    }
}

Ключевые моменты:

  • При получении PENDING регистрируется event time timer на 24 часа.
  • При получении COMPLETED timer удаляется через deleteEventTimeTimer.
  • Если COMPLETED не приходит — timer срабатывает, и onTimer эмитит EXPIRED.

Python:

from datetime import timedelta

class OrderTimeoutDetector(KeyedProcessFunction):
    def open(self, ctx):
        self.pending = ctx.get_state(ValueStateDescriptor("pending", ...))

    def process_element(self, event, ctx):
        if event.status == "PENDING":
            self.pending.update(event)
            timeout_ms = event.timestamp + int(timedelta(hours=24).total_seconds() * 1000)
            ctx.timer_service().register_event_time_timer(timeout_ms)
        elif event.status == "COMPLETED":
            p = self.pending.value()
            if p is not None:
                timeout_ms = p.timestamp + int(timedelta(hours=24).total_seconds() * 1000)
                ctx.timer_service().delete_event_time_timer(timeout_ms)
                self.pending.clear()
                yield OrderStatus(event.order_id, "COMPLETED")

    def on_timer(self, timestamp, ctx):
        p = self.pending.value()
        if p is not None:
            yield OrderStatus(p.order_id, "EXPIRED")
            self.pending.clear()

Семантика timers: дедупликация и срабатывание

Несколько важных свойств:

Дедупликация по timestamp. Если зарегистрировать registerEventTimeTimer(1000) дважды — timer один, и onTimer(1000) будет вызван один раз. Дублирующая регистрация — no-op.

Per-key timer. Timers per ключ независимы. registerEventTimeTimer(1000) для user_id=A и user_id=B — это два timer, каждый сработает один раз с current key соответствующего пользователя.

Время срабатывания event time timer — когда watermark проходит за timestamp timer-а (watermark > timestamp, не >=). То есть registerEventTimeTimer(1000) срабатывает когда watermark становится 1001 или больше.

Время срабатывания processing time timer — когда currentProcessingTime() >= timestamp. Может быть с небольшой задержкой относительно запланированного.

Timer lifecycle
processElement регистрирует event time timer на timestamp = 1000. Timer сохраняется в timer state, привязанный к current key.
watermark progresses
Watermark достигает 1001 (то есть проходит за 1000). Flink вызывает onTimer(1000, ctx, out) с current key = тот, для которого был зарегистрирован timer.
После onTimer Flink удаляет timer из state автоматически. Если в onTimer нужно зарегистрировать новый timer — это явная операция.

Удаление timers

deleteEventTimeTimer(ts) / deleteProcessingTimeTimer(ts) удаляют ранее зарегистрированный timer. Если timer не существует — no-op.

ctx.timerService().registerEventTimeTimer(1000);
// ...
ctx.timerService().deleteEventTimeTimer(1000);  // отменяет

Это критично для предотвращения spurious срабатываний. В примере выше (order timeout), если приходит COMPLETED — мы удаляем timer, чтобы EXPIRED не сработал зря.

WARNING

Если в onTimer вы регистрируете новый timer (например, для отложенной повторной проверки), убедитесь, что есть условие выхода. Бесконечная цепочка self-rescheduling timers — это утечка state и нагрузка на TaskManager. Используйте флаг в state, который остановит цепочку.


Common bugs

Bug 1: timer на каждое событие без условия

// ПЛОХО
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) {
    ctx.timerService().registerEventTimeTimer(e.timestamp + 60000);
    // ... другая логика
}

Для 10K событий/сек = 10K timers/sec регистрируется. Все они хранятся в timer state. Через час — десятки миллионов timers в памяти. OOM или extreme degradation.

Лечение: регистрируйте timer только когда нужно (например, только для PENDING events), и удаляйте при наступлении terminating event. Или используйте паттерн “один активный timer per ключ” с perpetual re-schedule.

Bug 2: timer без сохранения timestamp в state

// ПЛОХО
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) {
    if (e.isStart) {
        ctx.timerService().registerEventTimeTimer(e.timestamp + 60000);
    } else if (e.isEnd) {
        // как удалить timer? мы не помним его timestamp!
        ctx.timerService().deleteEventTimeTimer(???);
    }
}

Чтобы удалить timer, нужно знать его exact timestamp. Если не сохранили — timer останется и сработает зря.

Лечение: сохраняйте timestamp в state параллельно с регистрацией timer.

private transient ValueState<Long> currentTimerTs;

@Override
public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
    if (e.isStart) {
        long ts = e.timestamp + 60000;
        ctx.timerService().registerEventTimeTimer(ts);
        currentTimerTs.update(ts);
    } else if (e.isEnd) {
        Long ts = currentTimerTs.value();
        if (ts != null) {
            ctx.timerService().deleteEventTimeTimer(ts);
            currentTimerTs.clear();
        }
    }
}

Bug 3: spurious срабатывания после rescheduling

// ПЛОХО
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
    Long current = currentTimerTs.value();
    long newTs = e.timestamp + 60000;
    ctx.timerService().registerEventTimeTimer(newTs);
    currentTimerTs.update(newTs);
    // Старый timer не удалили! Он сработает зря.
}

Регистрация нового timer не отменяет старый. Если задача — “перенести timer на новое время”, нужно явно удалить старый.

Лечение:

Long old = currentTimerTs.value();
if (old != null) ctx.timerService().deleteEventTimeTimer(old);
long newTs = e.timestamp + 60000;
ctx.timerService().registerEventTimeTimer(newTs);
currentTimerTs.update(newTs);

Bug 4: timer без cleanup в onTimer

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
    if (someCondition) {
        ctx.timerService().registerEventTimeTimer(timestamp + 60000);
        // забыли state.clear() — старое состояние осталось
    }
}

В onTimer полезно явно очищать state, если timer — это финальное действие для ключа.

Bug 5: бесконечная цепочка timers

// ПЛОХО — бесконечный self-reschedule
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Result> out) throws Exception {
    out.collect(...);
    ctx.timerService().registerEventTimeTimer(timestamp + 60000);  // снова и снова
}

Без условия выхода — utечка. Каждый ключ накапливает один live timer, который перезапускается. Сколько активных ключей — столько одновременно живых timer-ов в state, без cleanup.

Лечение: условие выхода через state.


Производительность

Timer state хранится отдельно от user state, но влияет на размер чекпойнтов и memory footprint.

Рекомендации:

  • Один timer per ключ. Не регистрируйте per event без необходимости.
  • Удаляйте через delete*Timer. Не полагайтесь на естественное срабатывание для cleanup.
  • Round timestamps. Если timer не критичен к миллисекундам, округляйте до секунд — это уменьшит количество уникальных timestamps и upgrades dedup.
  • Используйте RocksDB backend для job-ов с многими timers — heap backend deserialize всё в память.

Production-чеклист

  • Каждой регистрации timer соответствует либо явное delete*Timer, либо onTimer с гарантированной обработкой.
  • При rescheduling timer — сначала удалить старый.
  • В onTimer — финализация state (clear() если задача завершена).
  • Метрика количества активных timers — мониторить через JMX (currently registered timers).
  • При работе с processing time timers — учитывайте, что они могут сработать с задержкой при backpressure.

Попробуй сам

  1. Watchdog timer. Реализуйте: на каждое событие сбрасывайте timer на 5 минут вперёд. Если 5 минут не было событий — emit “user inactive”. Используйте deleteEventTimeTimer при rescheduling.

  2. Periodic emit. Реализуйте: каждые 30 секунд per ключ emit агрегированное значение из ValueState<Long counter>. Используйте processing time timer + perpetual self-reschedule.

  3. Bug 3 reproduction. Намеренно не удаляйте старый timer при rescheduling. Наблюдайте, как с временем накапливаются timers — посмотрите на JMX метрику числа активных timers. После исправления (delete + register) — количество должно стабилизироваться.

Проверка знанийKnowledge check
Команда написала KeyedProcessFunction с registerEventTimeTimer для timeout detection. После недели работы job показывает резкий рост state — миллиарды timers в JMX-метрике 'currentlyRegisteredTimers'. Что вероятно сломано и как починить?
ОтветAnswer
Это типичный bug 'timer на каждое событие без условия'. Скорее всего код регистрирует timer в processElement на каждое событие (например, для 'rescheduling watchdog'), но не удаляет предыдущий через deleteEventTimeTimer. Дублирующая регистрация на тот же timestamp дедуплицируется, но если новый timestamp каждый раз другой (например, current_event_time + offset), Flink создаёт новый timer на каждое событие. Через неделю при 1K events/sec это 600M+ timer state записей. Решение: (1) сохранить current timer timestamp в ValueState<Long>; (2) при регистрации нового всегда удалять предыдущий через deleteEventTimeTimer(oldTs); (3) обновить ValueState с новым timestamp. Это даёт паттерн 'один активный timer per ключ' с правильным rescheduling и стабильным размером timer state (= количеству активных ключей, не количеству событий). Дополнительно: добавить JMX-метрику alerting на резкий рост currentlyRegisteredTimers — индикатор подобных багов.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Job показывает в JMX 'currentlyRegisteredTimers' = 600M после недели работы. Используется KeyedProcessFunction с registerEventTimeTimer для watchdog timeout. Что вероятно сломано?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3