Event time vs processing time
В batch-обработке “время” — это атрибут данных. Вы анализируете “логи за 14 мая” и не задумываетесь, когда именно эти логи попали в hadoop кластер. В streaming процессорах время становится более тонким понятием: есть момент, когда событие произошло (event time), и момент, когда оператор Flink его обработал (processing time). Эти два времени могут различаться на секунды, минуты или даже часы — и от выбора одного из них зависит корректность всей вашей аналитики.
В этом уроке разбираем эти концепции с практической стороны: когда использовать event time, когда processing time допустим, и почему обычно правильный ответ — всегда event time.
Три времени в Flink
Flink определяет три временных характеристики:
Event time — момент, когда событие действительно произошло. Извлекается из самого события (поле timestamp, created_at, header в Kafka). Это “правда” про реальный мир.
Processing time — момент, когда оператор обработал событие. Считывается из системных часов TaskManager. Не зависит от данных.
Ingestion time — момент, когда событие попало в Flink (в source operator). Промежуточный вариант — менее точный, чем event time, но более стабильный, чем processing time. Редко используется в современных pipelines.
// Получить текущий processing time
long now = System.currentTimeMillis();
// Получить event time текущего события (в ProcessFunction)
long eventTime = ctx.timestamp();
Почему processing time проблематичен
Представьте Flink-job, агрегирующий заказы по часовым окнам с processing time:
orders
.keyBy(Order::getShopId)
.window(TumblingProcessingTimeWindows.of(Duration.ofHours(1)))
.reduce(...);
Что определяет, в какое окно попадёт заказ? Момент, когда оператор window его получил. Это значит:
- Задержки в Kafka меняют результат. Если consumer тормозит на 30 минут, заказы из 14:30 попадут в окно [15:00, 16:00). Бизнес-отчёт за 14 часов будет неполным.
- Restart job-а ведёт к skew. После restart pipeline догоняет накопившиеся в Kafka сообщения за минуты — все они получают новый processing time и попадают в один-два окна.
- Параллелизм даёт расхождения. События одной транзакции, обработанные на разных subtask-ах, могут получить разный processing time из-за разной скорости задач.
- Replay не воспроизводим. При переигрывании Kafka offsets вы получите совершенно другие результаты, чем при первом проходе.
Processing time имеет ровно одно преимущество — минимальная latency. Окно эмиттит сразу при достижении wall-clock дедлайна, не ждёт watermarks. Это полезно для real-time мониторинга, где небольшая неточность приемлема, но не для аналитики.
Event time: семантика “что произошло”
Event time даёт детерминированный результат, не зависящий от задержек обработки:
WatermarkStrategy<Order> watermarkStrategy = WatermarkStrategy
.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((order, ts) -> order.timestamp);
orders
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(Order::getShopId)
.window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.reduce(...);
Здесь:
withTimestampAssigner— функция, извлекающая timestamp из события. Это event time, который Flink будет использовать для всех временных решений.forBoundedOutOfOrderness(10s)— стратегия для watermarks: Flink ожидает, что события могут опоздать максимум на 10 секунд (см. урок 02 про watermark strategies).
Результат:
- Hourly report за 10:00-11:00 содержит все заказы с timestamp в этом интервале, независимо от того, когда они дошли до Flink.
- Replay даёт идентичный результат при перезапуске.
- Late events до 10 секунд автоматически попадают в свои окна; более поздние — обрабатываются через
allowedLatenessили дропаются.
Out-of-order events: фундаментальная реальность
Главная причина, почему event time требует watermarks, — события не приходят строго по порядку. Источники:
Watermarks в Spark Structured StreamingСеть. Два события из разных регионов попадают на один Kafka брокер с разной latency. Событие из Сингапура (10:00:00.500) может прийти позже события из Лондона (10:00:01.000).
Multiple partitions. Flink читает несколько Kafka партиций параллельно. Partition 0 может быть на минуту впереди partition 1 по wall-clock, потому что consumer задерживается на одной. События partition 1 будут “поздними” относительно событий partition 0.
Mobile clients. Приложение собирает события в буфер и шлёт пачкой. Через минуту полёта в самолёте присылает 30 минут накопленных событий, все с timestamp в прошлом.
Replay & backfill. При переигрывании Kafka с начала события из 30 дней назад приходят сейчас. Каждое — это late event по wall-clock.
Watermarks — механизм Flink для работы с этой реальностью. Watermark W(t) — это утверждение “я уверен, что событий с event_time <= t больше не придёт” (с некоторой формальной погрешностью). Окна закрываются, когда watermark проходит за их границы.
Когда processing time оправдан
Несмотря на все недостатки, processing time имеет нишу:
Real-time monitoring dashboards. “Сколько событий за последнюю минуту” для оператора NOC — точность плюс-минус секунда не критична, low-latency важна.
Rate limiting / throttling. Логика “не более 100 запросов в секунду от user” работает на processing time — это правило про текущий момент, не про event time.
Heartbeat detection. “Если не было события 5 минут — клиент offline” — это про реальное wall-clock время, не про event time.
Тесты и dev-окружения. Когда не хочется морочиться с timestamps в test fixtures.
Во всех остальных случаях production — event time.
Ingestion time: компромисс
IngestionTimeAssigner присваивает timestamp в source-операторе при чтении события. Это компромисс между event time и processing time:
- Не зависит от downstream-задержек.
- Не требует timestamp в payload.
- Деградирует при restart (накопленные сообщения получат “current ingestion time”).
В современном Flink ingestion time используется редко — обычно либо event time из payload, либо processing time для очень specific случаев. Если в вашей Kafka headers есть producer_timestamp, его можно использовать как event time — это часто проще, чем переключаться на ingestion.
Kafka 0.10+ записывает producer timestamp в headers каждого сообщения. Можно использовать его как event time через withTimestampAssigner((record, ts) -> record.timestamp()). Это бесплатное приближение к “истинному event time” даже когда payload не содержит timestamp.
Влияние на API Flink
Выбор времени определяет, какие window assigners и triggers использовать:
| Время | Window assigner | Trigger | Watermarks |
|---|---|---|---|
| Event time | TumblingEventTimeWindows и т.д. | EventTimeTrigger | Обязательны |
| Processing time | TumblingProcessingTimeWindows и т.д. | ProcessingTimeTrigger | Не нужны |
| Ingestion time | Те же event time + IngestionTimeAssigner | EventTimeTrigger | Авто-генерируются |
Деpathическое для API: в одном job-е можно смешивать операторы с разным временем (один оператор event-time-based, другой processing-time), но это редко нужно и усложняет дебаг.
В Table API / SQL выбор делается в DDL:
CREATE TABLE orders (
id BIGINT,
amount DECIMAL,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (...);
-- Это окно использует event time
SELECT TUMBLE_START(order_time, INTERVAL '1' HOUR), SUM(amount)
FROM orders
GROUP BY TUMBLE(order_time, INTERVAL '1' HOUR);
Production-чеклист для event time
- Timestamp в payload. Каждое событие должно иметь явное поле
event_timeилиcreated_at. Не полагайтесь на ingestion time для production-аналитики. - Производитель timestamp под контролем. Если timestamp ставит мобильный клиент, это может быть несинхронизированное wall clock (часы могут отставать или спешить на минуты). Иногда лучше использовать server-side timestamp при receipt.
- Watermark strategy явно настроен. См. урок 02. Дефолтные настройки часто слишком оптимистичны.
- Late events обрабатываются. allowedLateness + sideOutputLateData как стандарт.
- idle partition handling. withIdleness для случаев, когда часть Kafka партиций может молчать.
Попробуй сам
-
Сравнение результатов. Запустите два job-а на одном Kafka топике: один с event time + tumbling 1h окном, другой с processing time + тем же окном. После 30 минут работы остановите оба job-а и сравните выходные данные за прошедший час. Event time job должен дать стабильный результат, processing time — зависящий от latency Kafka consumer.
-
Restart effect. В job-е с processing time запустите, накопите данные за 10 минут, остановите на 30 минут (имитируя простой), запустите снова. Посмотрите на окна сразу после restart — все накопленные в Kafka сообщения попадут в один-два окна по processing time.
-
Replay determinism. Запустите event time job, запишите выход за час. Остановите job, переиграйте те же Kafka offsets — выход должен быть идентичным. Повторите с processing time — выход будет другим.