Learning Platform
Глоссарий Troubleshooting
Урок 04.02 · 22 мин
Средний
DataStreamTransformationsMapFilterKeyByReduceAggregate

Transformations: map, filter, flatMap, keyBy, reduce, aggregate

DataStream transformations — это рабочий инструмент, которым вы будете пользоваться каждый день. map, filter, flatMap — для stateless операций. keyBy + reduce / aggregate — для stateful. Освоить их — значит уметь решать 80% задач stream processing.

Этот урок — практический. Каждое преобразование с code примером на Flink 2.2 (Java), типичными ошибками и performance-замечаниями. К концу вы сможете прочитать любой DataStream pipeline и понять, что он делает.


Структура: типы операторов

Все DataStream transformations делятся на две категории:

Stateless — каждое событие обрабатывается независимо. Нет state между событиями. Примеры: map, filter, flatMap. Можно параллелить как угодно, рестарт не теряет ничего важного.

Stateful — оператор поддерживает state, который меняется с каждым событием. Примеры: keyBy + reduce, aggregate, window. State — first-class concept; checkpoints включают этот state.

Граница между ними — keyBy. До keyBy — stateless world; после — keyed stream с state per key.


map: преобразование 1 -> 1

map(MapFunction) — самое базовое преобразование. На каждое входящее событие выдаёт ровно одно выходящее.

DataStream<String> input = env.fromSource(kafkaSource, ws, "Source");

// Парсим JSON в объект
DataStream<Transaction> transactions = input.map(
    new MapFunction<String, Transaction>() {
        @Override
        public Transaction map(String json) throws Exception {
            return objectMapper.readValue(json, Transaction.class);
        }
    }
);

// С лямбдой (Java 8+) — нужен .returns() для type hint
DataStream<Transaction> transactions = input
    .map(json -> objectMapper.readValue(json, Transaction.class))
    .returns(TypeInformation.of(Transaction.class));

RichMapFunction — расширенный вариант, дающий доступ к runtime context (lifecycle hooks open/close, метрики, broadcast variables):

public class GeoEnrichmentMap extends RichMapFunction<Event, EnrichedEvent> {
    private transient GeoIPReader geoReader;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // Открываем GeoIP database один раз на subtask
        geoReader = new GeoIPReader("/data/GeoLite2-City.mmdb");
    }
    
    @Override
    public void close() throws Exception {
        if (geoReader != null) geoReader.close();
    }
    
    @Override
    public EnrichedEvent map(Event event) throws Exception {
        String country = geoReader.lookup(event.getIp());
        return new EnrichedEvent(event, country);
    }
}

Когда нужен RichMapFunction: открытие ресурсов (DB connections, файлы, large dictionaries), метрики, любая инициализация per-subtask.

WARNING

Не выбрасывайте RuntimeException в map для valid edge cases (например, пустой JSON). Это упадёт job. Лучше — log warning и эмитьте null/sentinel, или используйте flatMap (см. ниже), чтобы skip element. Job, который рестартится 100 раз в сутки из-за одного malformed JSON, — плохой production-код.


filter: фильтрация

filter(FilterFunction) — пропускает только события, для которых функция возвращает true.

DataStream<Transaction> suspicious = transactions.filter(
    txn -> txn.getAmount().compareTo(BigDecimal.valueOf(10000)) > 0
);

С RichFilterFunction:

public class FraudFilter extends RichFilterFunction<Transaction> {
    private transient Counter filteredCounter;
    
    @Override
    public void open(Configuration parameters) {
        filteredCounter = getRuntimeContext()
            .getMetricGroup()
            .counter("filtered_transactions");
    }
    
    @Override
    public boolean filter(Transaction txn) {
        boolean keep = !fraudList.contains(txn.getUserId());
        if (!keep) filteredCounter.inc();
        return keep;
    }
}

Производительность: filter — самая дешёвая операция. Применяйте максимально рано в pipeline (после source), чтобы down-stream операторы получили меньше данных. Каждая фильтрация в начале экономит CPU и память в shuffle.


flatMap: 1 -> N (включая 0)

flatMap(FlatMapFunction) — может эмитить 0, 1, или много событий на каждое входящее. Самый гибкий stateless transformation.

DataStream<String> lines = env.fromSource(kafkaSource, ws, "Source");

// Разбиение строки на слова
DataStream<String> words = lines.flatMap(
    (String line, Collector<String> out) -> {
        for (String word : line.toLowerCase().split("\\s+")) {
            if (!word.isEmpty()) {
                out.collect(word);
            }
        }
    }
).returns(Types.STRING);

С полной FlatMapFunction (когда нужен type safety):

public class JsonExtractor implements FlatMapFunction<String, Transaction> {
    @Override
    public void flatMap(String json, Collector<Transaction> out) throws Exception {
        try {
            // Один JSON может содержать массив
            JsonNode root = mapper.readTree(json);
            if (root.isArray()) {
                for (JsonNode node : root) {
                    out.collect(mapper.treeToValue(node, Transaction.class));
                }
            } else {
                out.collect(mapper.treeToValue(root, Transaction.class));
            }
        } catch (Exception e) {
            // Skip invalid JSON, log to side output
            log.warn("Invalid JSON: {}", json);
            // ничего не collect, событие "поглощается"
        }
    }
}

Производительность: flatMap дороже, чем map (создаётся Collector). Но если вы и так бы делали try/catch или unwrap collection — flatMap правильный инструмент.


keyBy: разделение по ключу

keyBy(KeySelector) — критическое преобразование. Оно превращает DataStream<T> в KeyedStream<T, K>. После keyBy все события с одинаковым ключом гарантированно попадают в один subtask, и можно использовать keyed state.

// Лямбда KeySelector
KeyedStream<Transaction, String> byUser = transactions
    .keyBy(txn -> txn.getUserId());

// С полным KeySelector
KeyedStream<Transaction, Tuple2<String, String>> byUserAndCountry = transactions
    .keyBy(new KeySelector<Transaction, Tuple2<String, String>>() {
        @Override
        public Tuple2<String, String> getKey(Transaction txn) {
            return Tuple2.of(txn.getUserId(), txn.getCountry());
        }
    });

Что происходит под капотом: keyBy — это hash partitioning по ключу. Flink вычисляет hash(key) % parallelism и отправляет событие в соответствующий subtask. Это shuffle — данные передаются по сети между TaskManager’ами (если subtask на другом TM).

WARNING

keyBy с высокой cardinality ключа (миллиарды уникальных значений) — не проблема, наоборот. Но keyBy с низкой cardinality (например, country с 200 значениями) при parallelism=4 даст хорошее распределение. KeyBy с очень низкой cardinality (например, 10 ключей при parallelism=20) — большинство subtasks без данных, перегруз других. Это называется “hot key” проблема — она дорого решается, лучше избегать.

Hot keys: главный риск keyBy

Hot key — это ключ, который получает много больше событий, чем другие. Пример: keyBy(country), 90% траффика — США. Один subtask грузится на 100%, другие почти простаивают.

Решения:

  1. Префиксование ключа: keyBy(country + "_" + random(10)) — разбивает hot key на 10 sub-keys. Но потом нужна re-aggregation (двух-уровневая агрегация).
  2. Иной ключ: вместо country использовать region+country, или userId+timeBucket.
  3. Pre-aggregation перед keyBy: уменьшить нагрузку через локальную агрегацию.

reduce: stateful агрегация

reduce(ReduceFunction) — работает только на KeyedStream. Накопительная функция: state — это последний reduced value per key, на каждое событие — новый reduced.

// keyBy + reduce: total amount per user
KeyedStream<Transaction, String> byUser = transactions.keyBy(Transaction::getUserId);

DataStream<Transaction> sumPerUser = byUser.reduce(
    (a, b) -> {
        // Тип "Transaction" остаётся; reduce — это T -> T -> T
        return new Transaction(
            a.getUserId(),
            a.getAmount().add(b.getAmount()),
            // другие поля — например, последний timestamp
            Math.max(a.getTimestamp(), b.getTimestamp())
        );
    }
);

Тип результата reduce — такой же, как тип входа (T -> T). Это ограничение reduce — он не может изменить тип.

State: per key хранится одно значение типа T (последний reduced result). Размер state = количество уникальных ключей × размер T.


aggregate: гибкая агрегация

aggregate(AggregateFunction) — более мощный аналог reduce. Позволяет иметь intermediate type ACC ≠ input type IN ≠ output type OUT. Используется часто внутри windows.

public class AverageAggregator implements AggregateFunction<
    Transaction,           // input type
    Tuple2<Double, Long>,  // accumulator type (sum, count)
    Double> {              // output type (average)
    
    @Override
    public Tuple2<Double, Long> createAccumulator() {
        return Tuple2.of(0.0, 0L);
    }
    
    @Override
    public Tuple2<Double, Long> add(Transaction txn, Tuple2<Double, Long> acc) {
        return Tuple2.of(
            acc.f0 + txn.getAmount().doubleValue(),
            acc.f1 + 1
        );
    }
    
    @Override
    public Double getResult(Tuple2<Double, Long> acc) {
        return acc.f1 == 0 ? 0.0 : acc.f0 / acc.f1;
    }
    
    @Override
    public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
        return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
    }
}

Зачем merge: нужен для session windows (объединение двух accumulators в один при merge sessions).

Использование внутри window:

DataStream<Double> avgPerWindow = byUser
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new AverageAggregator());

Когда aggregate, когда reduce: используйте aggregate, когда тип intermediate state отличается от input/output. Reduce — только когда T -> T -> T достаточно (что часто бывает для простых случаев типа sum, max).


sum, max, min, minBy, maxBy: convenient shortcuts

Для очень типичных случаев есть shortcuts:

// Sum по полю (для Tuple2/Tuple3)
KeyedStream<Tuple2<String, Long>, String> byWord = wordsTupled
    .keyBy(t -> t.f0);
DataStream<Tuple2<String, Long>> counts = byWord.sum(1);  // sum поля index 1

// Max/min по полю
DataStream<Tuple2<String, Long>> maxes = byWord.max(1);

// MaxBy / minBy — возвращает целиком event с max value (не агрегат)
DataStream<Tuple2<String, Long>> highestPerKey = byWord.maxBy(1);

Эти shortcuts эквивалентны reduce/aggregate с simple функцией. Удобно для quick prototyping.

GroupBy aggregations в Spark: тот же паттерн, другая платформа

Композиция: реальный pipeline

Соберём всё вместе на примере real-time fraud detection:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000);
env.setParallelism(4);

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("transactions")
    .setGroupId("fraud-detector")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

WatermarkStrategy<String> ws = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5));

DataStream<String> raw = env.fromSource(source, ws, "Transactions Kafka").name("Source");

// Шаг 1: парсим JSON, skip невалидные
DataStream<Transaction> txns = raw
    .flatMap((String json, Collector<Transaction> out) -> {
        try {
            out.collect(mapper.readValue(json, Transaction.class));
        } catch (Exception e) {
            // log + skip
        }
    })
    .returns(TypeInformation.of(Transaction.class))
    .name("Parse JSON");

// Шаг 2: фильтр — только подозрительные суммы и страны
DataStream<Transaction> suspicious = txns
    .filter(txn -> 
        txn.getAmount().compareTo(BigDecimal.valueOf(5000)) > 0
        || HIGH_RISK_COUNTRIES.contains(txn.getCountry())
    )
    .name("Filter Suspicious");

// Шаг 3: групировка per user, агрегация суммы за окно
DataStream<UserRiskScore> userRisk = suspicious
    .keyBy(Transaction::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .aggregate(new RiskScoreAggregator())
    .name("Compute Risk Score");

// Шаг 4: фильтр — только high-risk пользователи
DataStream<UserRiskScore> highRisk = userRisk
    .filter(score -> score.getScore() > 0.7)
    .name("High Risk Users");

// Sink — пишем в alerts topic
highRisk.sinkTo(alertsKafkaSink).name("Sink to Alerts");

env.execute("Fraud Detection Pipeline");

Что здесь хорошо:

  • Каждый оператор именован (.name(...)) — в Web UI понятно, что делает.
  • Filter максимально рано — снижает нагрузку на keyBy и aggregate.
  • Window aggregation — корректное использование event time.
  • Sink на отдельный alerts топик — separation of concerns.

Производительность: pitfalls

  1. Слишком много map’ов подряд: каждый map — отдельный оператор. Они chain’атся, но в логе сложнее читать. Лучше один map с несколькими действиями.

  2. Heavy work в keyBy KeySelector: KeySelector вызывается для КАЖДОГО события. Не делайте там тяжёлых вычислений или DB lookup’ов.

  3. Object creation в hot path: new SimpleDateFormat() в map (вызывается тысячи раз/сек). Используйте RichMapFunction.open() для one-time init.

  4. Сериализация: если выходной тип map — Generic Object (не POJO/Tuple/Avro), Flink fallback на Kryo (медленный). См. урок 03.5 про сериализацию.

  5. Шаффлинг (keyBy) перед фильтрацией: фильтруйте ДО keyBy, не после. Снижает shuffle volume.


Попробуй сам

Расширьте WordCount example:

  1. Добавьте фильтр: считать только слова длиннее 3 символов. Используйте .filter(...).
  2. Добавьте mapping: после счёта (sum) — преобразовать Tuple2<String, Long> в WordCountEvent объект (новый POJO). Используйте .map(...).
  3. Добавьте second-level aggregation: после per-word counter, посчитать total count всех слов через keyBy(constant) + sum. Это даст общий счётчик слов.
  4. Запустите и проверьте Web UI: сколько операторов? Какие chained? Сколько subtasks у каждого?
  5. Bonus: попробуйте aggregate вместо sum для подсчёта (используйте AverageAggregator adapted для count).
Проверка знанийKnowledge check
У вас pipeline: KafkaSource -> map (parse JSON) -> filter (high-value txns) -> keyBy(userId) -> reduce (sum per user) -> KafkaSink. Какой порядок операций оптимальный с точки зрения производительности, и почему?
ОтветAnswer
Порядок правильный: filter ДО keyBy и reduce. Логика: (1) filter — самая дешёвая операция, отсеивает много records без shuffle. (2) keyBy — shuffle по сети между TaskManager'ами, дорогая операция. Если фильтр пропускает 10% records, мы передаём по сети 10% объёма, а не 100%. (3) reduce — stateful, требует state в memory/RocksDB. Меньше records = меньше state операций. Если бы порядок был keyBy -> filter -> reduce, мы бы платили full shuffle cost, а потом отбрасывали 90%. Оптимизация stateless-операций до shuffle — главное правило производительности streaming pipelines. Также: filter с RichFilterFunction может использовать broadcast state для динамической фильтрации (модуль 08), но базовый stateless фильтр — всегда сначала.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Какой transformations пайплайн оптимален с точки зрения производительности?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5