Broadcast pattern: правила и feature flags
В production регулярно возникает задача: основной поток событий нужно обогатить или фильтровать по правилам, которые могут меняться в runtime. Правила приходят редко (раз в минуту, раз в час), их немного (десятки или сотни), но каждый subtask должен видеть полный набор правил, чтобы корректно обработать любое входящее событие. Это паттерн broadcast.
Flink поддерживает broadcast нативно через BroadcastStream и BroadcastState. В этом уроке разберём, как соединить main-поток с broadcast-потоком, как хранить правила в BroadcastState, и где этот паттерн применяется на практике — fraud detection, feature flags, dynamic rules engine.
Задача: rules engine для fraud detection
Представим payment-сервис. Основной поток — transactions (миллионы транзакций в секунду). Команда антифрода периодически публикует правила: «если transaction.amount > $10000 и user.country = ‘XX’ — flag as suspicious».
Что не работает:
- Загружать правила из БД при каждом событии — БД упадёт.
- Перезапускать job при обновлении правил — недопустимо, downtime.
- Хранить правила в keyed state — keyed state per key, а правила общие.
- Operator state — есть split при rescale, не подходит для «каждый видит всё».
Что работает: broadcast state.
Kafka compacted topics для правил и конфигурации- Правила публикуются в Kafka топик
rules. - Из топика создаётся отдельный
DataStreamправил. - Этот stream объявляется как
broadcast()— Flink дублирует его во все subtask’и основного потока. - Соединяем основной stream с broadcast stream через
.connect()иBroadcastProcessFunction. - В функции: при получении правила обновляем
BroadcastState, при получении транзакции — применяем правила из state.
Архитектура: один-ко-многим
transactions parallelism 8
Основной поток транзакций. Высокий throughput, нужно обработать каждую транзакцию.rules parallelism 1
Поток правил. Низкий throughput (раз в минуту), но каждый subtask main потока должен видеть все правила.main.connect(rules.broadcast(descriptor))
connect соединяет два потока в один. Затем broadcast говорит Flink дублировать rules-поток во все subtask main-потока.Subtask 0 BroadcastState all rules
Subtask 0 main потока имеет полную копию BroadcastState — все правила.Subtask 1 BroadcastState all rules
Subtask 1 main потока имеет полную копию BroadcastState — все правила.Subtask N BroadcastState all rules
Subtask N main потока имеет полную копию BroadcastState — все правила. Каждый subtask может проверить любую транзакцию против всех правил локально, без сетевых вызовов.Каждый subtask main-потока локально хранит копию правил. Когда приходит новое правило в broadcast stream, оно дублируется во все subtask. Когда приходит транзакция в main stream — subtask применяет правила локально.
BroadcastState и MapStateDescriptor
BroadcastState — это всегда Map<K, V> (даже когда логически вам нужен List, обернёте в Map<Long, Rule>):
// Java
public static final MapStateDescriptor<String, Rule> RULES_DESCRIPTOR =
new MapStateDescriptor<>(
"rules",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(Rule.class)
);
# Python: PyFlink 2.x broadcast descriptor
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import MapStateDescriptor
RULES_DESCRIPTOR = MapStateDescriptor(
"rules",
Types.STRING(),
Types.PICKLED_BYTE_ARRAY()
)
Дескриптор используется в двух местах:
- При объявлении broadcast stream:
rulesStream.broadcast(RULES_DESCRIPTOR). - Внутри
BroadcastProcessFunctionдля получения state:ctx.getBroadcastState(RULES_DESCRIPTOR).
BroadcastProcessFunction: два метода
BroadcastProcessFunction<IN, BROADCAST, OUT> имеет два метода, по одному на каждый стрим:
public abstract class BroadcastProcessFunction<IN, BROADCAST, OUT>
extends BaseBroadcastProcessFunction {
// Обработка основного потока (transactions)
// КОНТЕКСТ: ReadOnlyContext — broadcast state доступен ТОЛЬКО на чтение
public abstract void processElement(
IN value,
ReadOnlyContext ctx,
Collector<OUT> out
) throws Exception;
// Обработка broadcast потока (rules)
// КОНТЕКСТ: Context — broadcast state доступен НА ЗАПИСЬ
public abstract void processBroadcastElement(
BROADCAST value,
Context ctx,
Collector<OUT> out
) throws Exception;
}
Ключевое ограничение: processElement (основной поток) получает ReadOnlyContext. Это значит, что только broadcast-сторона может изменять BroadcastState. Это сделано специально, чтобы избежать рассогласования между subtask’ами: если бы main-поток мог писать в state, разные subtask’и накопили бы разные данные.
Полный пример: rules engine
public class FraudDetectionRules
extends BroadcastProcessFunction<Transaction, Rule, FraudAlert> {
private static final MapStateDescriptor<String, Rule> RULES_DESCRIPTOR =
new MapStateDescriptor<>(
"fraud-rules",
Types.STRING,
TypeInformation.of(Rule.class)
);
@Override
public void processBroadcastElement(
Rule rule, Context ctx, Collector<FraudAlert> out) throws Exception {
BroadcastState<String, Rule> rules =
ctx.getBroadcastState(RULES_DESCRIPTOR);
if (rule.isDeleted()) {
rules.remove(rule.getId());
} else {
rules.put(rule.getId(), rule);
}
}
@Override
public void processElement(
Transaction tx, ReadOnlyContext ctx, Collector<FraudAlert> out)
throws Exception {
ReadOnlyBroadcastState<String, Rule> rules =
ctx.getBroadcastState(RULES_DESCRIPTOR);
for (Map.Entry<String, Rule> entry : rules.immutableEntries()) {
Rule rule = entry.getValue();
if (rule.matches(tx)) {
out.collect(new FraudAlert(tx, rule.getId()));
return;
}
}
}
}
Собираем job:
DataStream<Transaction> transactions = env
.fromSource(kafkaSource("transactions"), wmStrategy(), "transactions");
DataStream<Rule> rules = env
.fromSource(kafkaSource("rules"), wmStrategy(), "rules");
BroadcastStream<Rule> broadcastRules = rules.broadcast(RULES_DESCRIPTOR);
DataStream<FraudAlert> alerts = transactions
.connect(broadcastRules)
.process(new FraudDetectionRules());
alerts.sinkTo(kafkaSink("fraud-alerts"));
Что хранится в snapshot
BroadcastState участвует в checkpoint’ах как обычное state. Но в отличие от operator state, каждый subtask хранит полную копию, и при restore каждый subtask получает полную копию (как UnionListState, но для Map).
Это значит, что при parallelism = 100 и broadcast state размером 10 MB на checkpoint размер итогового snapshot будет ~1 GB (10 MB x 100 копий). Не используйте broadcast state для больших объёмов данных. Правило большого пальца: до десятков MB, до тысяч правил.
Альтернатива: lookup по запросу
Что если правил миллионы или они огромны? Тогда broadcast не подходит — слишком много памяти. Альтернативы:
- Async I/O lookup в Redis/ScyllaDB: каждое событие main-потока делает lookup в внешний store правил. Подходит для больших справочников. Минус: latency и нагрузка на store. Подробно — в модуле 09.
- Keyed state с rules per key: если правила привязаны к ключу (например, per-user правила), храните их в keyed state и обновляйте через side input.
- Шардирование правил: правила тоже keyBy, и main поток тоже keyBy по тому же ключу, чтобы правила и события встретились в одном subtask. Усложнение, но избегает broadcast overhead.
| Подход | Когда использовать | Минус |
|---|---|---|
| Broadcast state | До тысяч правил, до десятков MB | Дублирование во все subtask |
| Async I/O в Redis | Миллионы правил, дёшево | Latency и зависимость от внешнего store |
| Keyed state + co-stream | Правила per-key | Сложная топология |
Initialize broadcast state с заранее заданными значениями
Если вам нужно, чтобы при первом старте job уже был набор default-правил (например, пока ещё не пришло ни одного обновления из Kafka), используйте applyToKeyedState или просто заранее опубликуйте дефолтные правила в Kafka топик.
В Flink нет API «инициализировать BroadcastState из кода». Состояние всегда строится из broadcast-потока. Если нужны дефолты — публикуйте их в bootstrap-сообщении в Kafka.
Попробуй сам
Сделай простой feature-flags-engine:
- Main stream:
Event(userId: String, action: String). - Broadcast stream:
FeatureFlag(name: String, enabled: Boolean, rolloutPercent: Int). BroadcastProcessFunction: если у события есть match с включённым флагом иuserId.hashCode() % 100 < rolloutPercent— пропустить событие в out. Иначе — отбросить.- Симулируй обновление флага в runtime: добавь новый
FeatureFlag(name='new_checkout', enabled=true, rolloutPercent=10)и убедись, что в логах после публикации 10% юзеров проходят, 90% — нет.
Ключевые выводы
- Broadcast state — состояние, которое дублируется во все subtask’и оператора. Используется для правил, feature flags, конфигов.
BroadcastProcessFunctionимеет два метода:processElement(main, ReadOnlyContext) иprocessBroadcastElement(broadcast, Context с write-доступом). Только broadcast-сторона может изменять state.MapStateDescriptor— единственный тип descriptor для broadcast state. Если нужен List — оборачиваем в Map с искусственным ключом.- Snapshot масштабируется как
state_size x parallelism— не использовать для больших данных. Десятки MB, тысячи правил — потолок. - Альтернативы: async I/O lookup, keyed state + co-stream, шардирование. Broadcast — для маленьких часто-применяемых правил.