Learning Platform
Глоссарий Troubleshooting
Урок 09.02 · 20 мин
Средний
Broadcast StateBroadcastProcessFunctionRules EngineFeature Flags

Broadcast pattern: правила и feature flags

В production регулярно возникает задача: основной поток событий нужно обогатить или фильтровать по правилам, которые могут меняться в runtime. Правила приходят редко (раз в минуту, раз в час), их немного (десятки или сотни), но каждый subtask должен видеть полный набор правил, чтобы корректно обработать любое входящее событие. Это паттерн broadcast.

Flink поддерживает broadcast нативно через BroadcastStream и BroadcastState. В этом уроке разберём, как соединить main-поток с broadcast-потоком, как хранить правила в BroadcastState, и где этот паттерн применяется на практике — fraud detection, feature flags, dynamic rules engine.


Задача: rules engine для fraud detection

Представим payment-сервис. Основной поток — transactions (миллионы транзакций в секунду). Команда антифрода периодически публикует правила: «если transaction.amount > $10000 и user.country = ‘XX’ — flag as suspicious».

Что не работает:

  • Загружать правила из БД при каждом событии — БД упадёт.
  • Перезапускать job при обновлении правил — недопустимо, downtime.
  • Хранить правила в keyed state — keyed state per key, а правила общие.
  • Operator state — есть split при rescale, не подходит для «каждый видит всё».

Что работает: broadcast state.

Kafka compacted topics для правил и конфигурации
  1. Правила публикуются в Kafka топик rules.
  2. Из топика создаётся отдельный DataStream правил.
  3. Этот stream объявляется как broadcast() — Flink дублирует его во все subtask’и основного потока.
  4. Соединяем основной stream с broadcast stream через .connect() и BroadcastProcessFunction.
  5. В функции: при получении правила обновляем BroadcastState, при получении транзакции — применяем правила из state.

Архитектура: один-ко-многим

Broadcast: правила транслируются во все subtask

transactions parallelism 8

Основной поток транзакций. Высокий throughput, нужно обработать каждую транзакцию.

rules parallelism 1

Поток правил. Низкий throughput (раз в минуту), но каждый subtask main потока должен видеть все правила.

main.connect(rules.broadcast(descriptor))

connect соединяет два потока в один. Затем broadcast говорит Flink дублировать rules-поток во все subtask main-потока.

Subtask 0 BroadcastState all rules

Subtask 0 main потока имеет полную копию BroadcastState — все правила.

Subtask 1 BroadcastState all rules

Subtask 1 main потока имеет полную копию BroadcastState — все правила.

Subtask N BroadcastState all rules

Subtask N main потока имеет полную копию BroadcastState — все правила. Каждый subtask может проверить любую транзакцию против всех правил локально, без сетевых вызовов.

Каждый subtask main-потока локально хранит копию правил. Когда приходит новое правило в broadcast stream, оно дублируется во все subtask. Когда приходит транзакция в main stream — subtask применяет правила локально.


BroadcastState и MapStateDescriptor

BroadcastState — это всегда Map<K, V> (даже когда логически вам нужен List, обернёте в Map<Long, Rule>):

// Java
public static final MapStateDescriptor<String, Rule> RULES_DESCRIPTOR =
    new MapStateDescriptor<>(
        "rules",
        BasicTypeInfo.STRING_TYPE_INFO,
        TypeInformation.of(Rule.class)
    );
# Python: PyFlink 2.x broadcast descriptor
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import MapStateDescriptor

RULES_DESCRIPTOR = MapStateDescriptor(
    "rules",
    Types.STRING(),
    Types.PICKLED_BYTE_ARRAY()
)

Дескриптор используется в двух местах:

  1. При объявлении broadcast stream: rulesStream.broadcast(RULES_DESCRIPTOR).
  2. Внутри BroadcastProcessFunction для получения state: ctx.getBroadcastState(RULES_DESCRIPTOR).

BroadcastProcessFunction: два метода

BroadcastProcessFunction<IN, BROADCAST, OUT> имеет два метода, по одному на каждый стрим:

public abstract class BroadcastProcessFunction<IN, BROADCAST, OUT>
        extends BaseBroadcastProcessFunction {

    // Обработка основного потока (transactions)
    // КОНТЕКСТ: ReadOnlyContext — broadcast state доступен ТОЛЬКО на чтение
    public abstract void processElement(
        IN value,
        ReadOnlyContext ctx,
        Collector<OUT> out
    ) throws Exception;

    // Обработка broadcast потока (rules)
    // КОНТЕКСТ: Context — broadcast state доступен НА ЗАПИСЬ
    public abstract void processBroadcastElement(
        BROADCAST value,
        Context ctx,
        Collector<OUT> out
    ) throws Exception;
}

Ключевое ограничение: processElement (основной поток) получает ReadOnlyContext. Это значит, что только broadcast-сторона может изменять BroadcastState. Это сделано специально, чтобы избежать рассогласования между subtask’ами: если бы main-поток мог писать в state, разные subtask’и накопили бы разные данные.


Полный пример: rules engine

public class FraudDetectionRules
        extends BroadcastProcessFunction<Transaction, Rule, FraudAlert> {

    private static final MapStateDescriptor<String, Rule> RULES_DESCRIPTOR =
        new MapStateDescriptor<>(
            "fraud-rules",
            Types.STRING,
            TypeInformation.of(Rule.class)
        );

    @Override
    public void processBroadcastElement(
            Rule rule, Context ctx, Collector<FraudAlert> out) throws Exception {
        BroadcastState<String, Rule> rules =
            ctx.getBroadcastState(RULES_DESCRIPTOR);

        if (rule.isDeleted()) {
            rules.remove(rule.getId());
        } else {
            rules.put(rule.getId(), rule);
        }
    }

    @Override
    public void processElement(
            Transaction tx, ReadOnlyContext ctx, Collector<FraudAlert> out)
            throws Exception {
        ReadOnlyBroadcastState<String, Rule> rules =
            ctx.getBroadcastState(RULES_DESCRIPTOR);

        for (Map.Entry<String, Rule> entry : rules.immutableEntries()) {
            Rule rule = entry.getValue();
            if (rule.matches(tx)) {
                out.collect(new FraudAlert(tx, rule.getId()));
                return;
            }
        }
    }
}

Собираем job:

DataStream<Transaction> transactions = env
    .fromSource(kafkaSource("transactions"), wmStrategy(), "transactions");

DataStream<Rule> rules = env
    .fromSource(kafkaSource("rules"), wmStrategy(), "rules");

BroadcastStream<Rule> broadcastRules = rules.broadcast(RULES_DESCRIPTOR);

DataStream<FraudAlert> alerts = transactions
    .connect(broadcastRules)
    .process(new FraudDetectionRules());

alerts.sinkTo(kafkaSink("fraud-alerts"));

Что хранится в snapshot

BroadcastState участвует в checkpoint’ах как обычное state. Но в отличие от operator state, каждый subtask хранит полную копию, и при restore каждый subtask получает полную копию (как UnionListState, но для Map).

WARNING

Это значит, что при parallelism = 100 и broadcast state размером 10 MB на checkpoint размер итогового snapshot будет ~1 GB (10 MB x 100 копий). Не используйте broadcast state для больших объёмов данных. Правило большого пальца: до десятков MB, до тысяч правил.


Альтернатива: lookup по запросу

Что если правил миллионы или они огромны? Тогда broadcast не подходит — слишком много памяти. Альтернативы:

  1. Async I/O lookup в Redis/ScyllaDB: каждое событие main-потока делает lookup в внешний store правил. Подходит для больших справочников. Минус: latency и нагрузка на store. Подробно — в модуле 09.
  2. Keyed state с rules per key: если правила привязаны к ключу (например, per-user правила), храните их в keyed state и обновляйте через side input.
  3. Шардирование правил: правила тоже keyBy, и main поток тоже keyBy по тому же ключу, чтобы правила и события встретились в одном subtask. Усложнение, но избегает broadcast overhead.
ПодходКогда использоватьМинус
Broadcast stateДо тысяч правил, до десятков MBДублирование во все subtask
Async I/O в RedisМиллионы правил, дёшевоLatency и зависимость от внешнего store
Keyed state + co-streamПравила per-keyСложная топология

Initialize broadcast state с заранее заданными значениями

Если вам нужно, чтобы при первом старте job уже был набор default-правил (например, пока ещё не пришло ни одного обновления из Kafka), используйте applyToKeyedState или просто заранее опубликуйте дефолтные правила в Kafka топик.

В Flink нет API «инициализировать BroadcastState из кода». Состояние всегда строится из broadcast-потока. Если нужны дефолты — публикуйте их в bootstrap-сообщении в Kafka.


Попробуй сам

Сделай простой feature-flags-engine:

  1. Main stream: Event(userId: String, action: String).
  2. Broadcast stream: FeatureFlag(name: String, enabled: Boolean, rolloutPercent: Int).
  3. BroadcastProcessFunction: если у события есть match с включённым флагом и userId.hashCode() % 100 < rolloutPercent — пропустить событие в out. Иначе — отбросить.
  4. Симулируй обновление флага в runtime: добавь новый FeatureFlag(name='new_checkout', enabled=true, rolloutPercent=10) и убедись, что в логах после публикации 10% юзеров проходят, 90% — нет.

Ключевые выводы

  1. Broadcast state — состояние, которое дублируется во все subtask’и оператора. Используется для правил, feature flags, конфигов.
  2. BroadcastProcessFunction имеет два метода: processElement (main, ReadOnlyContext) и processBroadcastElement (broadcast, Context с write-доступом). Только broadcast-сторона может изменять state.
  3. MapStateDescriptor — единственный тип descriptor для broadcast state. Если нужен List — оборачиваем в Map с искусственным ключом.
  4. Snapshot масштабируется как state_size x parallelism — не использовать для больших данных. Десятки MB, тысячи правил — потолок.
  5. Альтернативы: async I/O lookup, keyed state + co-stream, шардирование. Broadcast — для маленьких часто-применяемых правил.
Проверка знанийKnowledge check
У вас broadcast state с 1000 правил, каждое ~100 KB (JSON со сложными условиями). Parallelism main-потока = 50. Какой размер snapshot будет создаваться при каждом checkpoint? Что произойдёт при rescale до parallelism = 100? Какое архитектурное решение лучше принять?
ОтветAnswer
Snapshot размер = 1000 правил x 100 KB x 50 subtask = ~5 GB на каждый checkpoint. Это огромный объём для snapshot: долгое создание, долгое восстановление, большой расход места в state backend. При rescale до 100 subtask: каждый из 100 subtask получит ПОЛНУЮ копию всех правил (как UnionListState), итоговый размер state в кластере удваивается до ~10 GB. Архитектурное решение: правила слишком большие для broadcast. Лучшая альтернатива — async I/O lookup в Redis или DynamoDB: правила хранятся внешне, main-поток делает lookup по rule_id при обработке транзакции. Это снимает нагрузку с checkpoint, но добавляет latency на каждое событие. Если правил мало активных (например, всегда применяются ~100 из 1000) — можно сделать local cache в operator state с TTL и lookup только при cache miss.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В BroadcastProcessFunction processElement (main stream) получает ReadOnlyContext, а processBroadcastElement (broadcast stream) получает Context с write-доступом к BroadcastState. Почему такая асимметрия?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3