ProcessFunction: основы
ProcessFunction — это низкоуровневое API Flink для работы с потоками. Когда вы упираетесь в ограничения высокоуровневых операций (windows, joins, reduce/aggregate), ProcessFunction даёт полный контроль: доступ к event time и watermarks, регистрация timers, работа со state, side outputs, fine-grained управление emit.
В этом уроке — базы: KeyedProcessFunction, метод processElement, контекст и его возможности. Timers и side outputs — в следующих уроках.
Когда нужна ProcessFunction
ProcessFunction оправдана, когда:
- Нужна логика, не вписывающаяся в windows. Например, многоступенчатая FSM, sessionization с custom rules, transformations зависящие от истории.
- Нужны timers для отложенных действий (timeout pending orders, watchdog для unbounded state).
- Нужны side outputs для maintenance вроде DLQ или alerts вне основного потока.
- Custom watermark handling — реакция на onTimer для work поверх watermarks.
Высокоуровневое API (window, reduce, aggregate) проще и понятнее — используйте его, когда оно покрывает требования. ProcessFunction — это инструмент когда нужно больше контроля.
KeyedProcessFunction: типичный сигнатура
Большинство production использует KeyedProcessFunction<K, IN, OUT> — применяется к keyed stream, имеет доступ к keyed state.
public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {
private transient ValueState<Double> dailySpending;
@Override
public void open(OpenContext openContext) {
dailySpending = getRuntimeContext().getState(
new ValueStateDescriptor<>("daily-spending", Double.class));
}
@Override
public void processElement(Transaction tx,
Context ctx,
Collector<Alert> out) throws Exception {
Double current = dailySpending.value();
if (current == null) current = 0.0;
current += tx.amount;
dailySpending.update(current);
if (current > 10000) {
out.collect(new Alert(ctx.getCurrentKey(), "DAILY_LIMIT_EXCEEDED", current));
}
}
}
transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector());
Типовые параметры:
K— тип ключа (Stringдля account_id).IN— тип входных событий (Transaction).OUT— тип выходных (Alert).
Методы:
processElement(IN value, Context ctx, Collector<OUT> out)— обрабатывает каждое событие.onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out)— обрабатывает timers (см. следующий урок).open(OpenContext)— инициализация state и других ресурсов.close()— cleanup.
Python:
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types
class FraudDetector(KeyedProcessFunction):
def __init__(self):
self.daily_spending = None
def open(self, runtime_context):
descriptor = ValueStateDescriptor("daily-spending", Types.DOUBLE())
self.daily_spending = runtime_context.get_state(descriptor)
def process_element(self, tx, ctx):
current = self.daily_spending.value() or 0.0
current += tx.amount
self.daily_spending.update(current)
if current > 10000:
yield Alert(ctx.get_current_key(), "DAILY_LIMIT_EXCEEDED", current)
transactions \
.key_by(lambda t: t.account_id) \
.process(FraudDetector())
Context: API для работы с runtime
Context (для processElement) и OnTimerContext (для onTimer) — это API, через которые ProcessFunction взаимодействует с runtime Flink:
@Override
public void processElement(Transaction tx, Context ctx, Collector<Alert> out) throws Exception {
// Текущий ключ
String accountId = ctx.getCurrentKey();
// Timestamp текущего события (может быть null если нет timestamp assigner)
Long eventTimestamp = ctx.timestamp();
// Текущий watermark
long currentWatermark = ctx.timerService().currentWatermark();
// Текущий processing time
long currentProcessingTime = ctx.timerService().currentProcessingTime();
// Регистрация timers (детально в след. уроке)
ctx.timerService().registerEventTimeTimer(eventTimestamp + 60000);
ctx.timerService().registerProcessingTimeTimer(currentProcessingTime + 10000);
// Side output (детально в уроке про side outputs)
OutputTag<String> debugTag = new OutputTag<>("debug"){};
ctx.output(debugTag, "processed " + tx.id);
// Emit в основной поток
out.collect(new Alert(...));
}
Ключевые возможности:
getCurrentKey()— узнать ключ, для которого вызывается метод (полезно для логирования и emit).timestamp()— event timestamp текущего события (null если timestamp assigner не настроен).timerService()— доступ к watermarks, processing time, регистрации timers.output(tag, value)— отправка в side output.
ProcessFunction vs KeyedProcessFunction
Помимо KeyedProcessFunction существует базовая ProcessFunction<IN, OUT> — для не-keyed streams. Различия:
| Аспект | ProcessFunction | KeyedProcessFunction |
|---|---|---|
| Применение | stream.process(...) | keyedStream.process(...) |
| Доступ к keyed state | НЕТ | ДА |
| Timers | Per-operator (один на весь оператор) | Per-key (отдельный для каждого ключа) |
| getCurrentKey() | НЕТ | ДА |
В подавляющем большинстве случаев нужен KeyedProcessFunction — keyed state и per-key timers критичны для production-логики. Базовый ProcessFunction используется редко, обычно для broadcast-state pattern или для логики, реально не зависящей от ключа.
Доступ к state в processElement
State работает так же, как в RichFunction (см. модуль 04):
public class StatefulProcessor extends KeyedProcessFunction<String, Event, Result> {
private transient ValueState<Long> lastSeenTs;
private transient MapState<String, Long> sessionCounters;
private transient ListState<String> recentEventIds;
@Override
public void open(OpenContext ctx) {
lastSeenTs = getRuntimeContext().getState(
new ValueStateDescriptor<>("last-seen", Long.class));
sessionCounters = getRuntimeContext().getMapState(
new MapStateDescriptor<>("sessions", String.class, Long.class));
recentEventIds = getRuntimeContext().getListState(
new ListStateDescriptor<>("recent-ids", String.class));
}
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
Long last = lastSeenTs.value();
if (last == null || e.timestamp - last > 60000) {
// ... логика new session
}
lastSeenTs.update(e.timestamp);
}
}
State в ProcessFunction подчиняется тем же правилам:
- Декларируется в
open()через дескриптор. - Поле
transient. - Состояние per-key (изолированное между ключами).
- TTL через StateTtlConfig (см. модуль 04).
Пример: оповещение при первом событии за день
Реальный сценарий: оповещение admin-у, когда новый пользователь делает первое действие за день.
public class FirstSeenDailyDetector
extends KeyedProcessFunction<String, UserAction, Notification> {
private transient ValueState<Long> lastSeenDay;
@Override
public void open(OpenContext ctx) {
lastSeenDay = getRuntimeContext().getState(
new ValueStateDescriptor<>("last-day", Long.class));
}
@Override
public void processElement(UserAction action, Context ctx,
Collector<Notification> out) throws Exception {
long currentDay = action.timestamp / (86400 * 1000L);
Long lastDay = lastSeenDay.value();
if (lastDay == null || currentDay > lastDay) {
// Первое событие пользователя за этот день
out.collect(new Notification(
ctx.getCurrentKey(),
"First action of " + currentDay,
action.timestamp
));
lastSeenDay.update(currentDay);
}
}
}
userActions
.keyBy(UserAction::getUserId)
.process(new FirstSeenDailyDetector());
Эту задачу было бы сложно решить через windows: нужна не сумма за окно, а изменение состояния от события к событию. ProcessFunction — естественный выбор.
getRuntimeContext() vs Context
Часто путают:
getRuntimeContext()(наследуется от RichFunction) — даёт доступ к state, метрикам, distributed cache. Вызывается вopen()обычно.Context/OnTimerContext(параметр processElement / onTimer) — даёт доступ к timestamps, watermarks, timers, side outputs, current key. Вызывается внутри метода.
@Override
public void open(OpenContext ctx) {
// RuntimeContext через getRuntimeContext()
state = getRuntimeContext().getState(descriptor);
metric = getRuntimeContext().getMetricGroup().counter("processed");
}
@Override
public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
// Context используется здесь
String key = ctx.getCurrentKey();
long ts = ctx.timestamp();
ctx.timerService().registerEventTimeTimer(ts + 1000);
}
Производительность
ProcessFunction не имеет встроенной оптимизации windows (incremental aggregation, key group state co-location). Каждое событие — это вызов user-defined метода, который может делать что угодно.
Что важно для производительности:
- Cheap state access. Минимизировать вызовы value()/update() в processElement. Если можно — кэшировать в transient field в пределах одного метода.
- Лёгкая логика. Тяжёлая обработка (парсинг больших JSON, regex) — это backpressure. Часто стоит делать pre-processing в map перед
process(). - Async I/O. Если нужны внешние вызовы (DB lookup, API call) — используйте Async I/O Function (модуль 09), не блокирующие синхронные вызовы.
ProcessFunction vs window function: когда что
| Сценарий | Что использовать |
|---|---|
| Sum/count/avg в фиксированном окне | window + AggregateFunction |
| Top-K, percentiles в малом окне | window + ProcessWindowFunction |
| Sessionization с gap-only | window + EventTimeSessionWindows |
| Sessionization с business-event close | KeyedProcessFunction |
| FSM (multi-state, transitions) | KeyedProcessFunction |
| Timeout pending operations | KeyedProcessFunction + timers |
| Pattern detection (CEP) | KeyedProcessFunction или Flink CEP |
| Stream-to-stream lookups | Async I/O или KeyedProcessFunction |
| Side outputs (DLQ, debug) | KeyedProcessFunction или window function |
Production-чеклист
- Декларируйте все state в
open(). Не используйтеtransientленивую инициализацию вprocessElement. - Метрики через
getRuntimeContext().getMetricGroup()— счётчики event-ов, latency-гистограммы. - Логирование — не на каждое событие. Лучше — sampled (например, каждое 1000-е) или через metrics.
- При работе с timers — обязательно
clear()вonTimerили logic для предотвращения утечек (см. след. урок). - Тестирование — используйте
KeyedOneInputStreamOperatorTestHarness(есть вflink-test-utils) для unit-тестов с timer manipulation.
Попробуй сам
-
Simple counter. Реализуйте KeyedProcessFunction, который для каждого user_id считает количество событий и emit-ит каждое 100-е. Используйте ValueState
<Long>. -
Get current key check. Реализуйте функцию, которая проверяет, что ctx.getCurrentKey() возвращает ожидаемое значение в processElement. Сравните с тем, что вы передавали в keyBy — должны совпадать.
-
Context vs RuntimeContext. Логируйте в processElement: timestamp, watermark, currentProcessingTime через ctx; и состояние через getRuntimeContext().getState(). Убедитесь, что вы понимаете, какой API даёт что.