Transformations: map, filter, flatMap, keyBy, reduce, aggregate
DataStream transformations — это рабочий инструмент, которым вы будете пользоваться каждый день. map, filter, flatMap — для stateless операций. keyBy + reduce / aggregate — для stateful. Освоить их — значит уметь решать 80% задач stream processing.
Этот урок — практический. Каждое преобразование с code примером на Flink 2.2 (Java), типичными ошибками и performance-замечаниями. К концу вы сможете прочитать любой DataStream pipeline и понять, что он делает.
Структура: типы операторов
Все DataStream transformations делятся на две категории:
Stateless — каждое событие обрабатывается независимо. Нет state между событиями. Примеры: map, filter, flatMap. Можно параллелить как угодно, рестарт не теряет ничего важного.
Stateful — оператор поддерживает state, который меняется с каждым событием. Примеры: keyBy + reduce, aggregate, window. State — first-class concept; checkpoints включают этот state.
Граница между ними — keyBy. До keyBy — stateless world; после — keyed stream с state per key.
map: преобразование 1 -> 1
map(MapFunction) — самое базовое преобразование. На каждое входящее событие выдаёт ровно одно выходящее.
DataStream<String> input = env.fromSource(kafkaSource, ws, "Source");
// Парсим JSON в объект
DataStream<Transaction> transactions = input.map(
new MapFunction<String, Transaction>() {
@Override
public Transaction map(String json) throws Exception {
return objectMapper.readValue(json, Transaction.class);
}
}
);
// С лямбдой (Java 8+) — нужен .returns() для type hint
DataStream<Transaction> transactions = input
.map(json -> objectMapper.readValue(json, Transaction.class))
.returns(TypeInformation.of(Transaction.class));
RichMapFunction — расширенный вариант, дающий доступ к runtime context (lifecycle hooks open/close, метрики, broadcast variables):
public class GeoEnrichmentMap extends RichMapFunction<Event, EnrichedEvent> {
private transient GeoIPReader geoReader;
@Override
public void open(Configuration parameters) throws Exception {
// Открываем GeoIP database один раз на subtask
geoReader = new GeoIPReader("/data/GeoLite2-City.mmdb");
}
@Override
public void close() throws Exception {
if (geoReader != null) geoReader.close();
}
@Override
public EnrichedEvent map(Event event) throws Exception {
String country = geoReader.lookup(event.getIp());
return new EnrichedEvent(event, country);
}
}
Когда нужен RichMapFunction: открытие ресурсов (DB connections, файлы, large dictionaries), метрики, любая инициализация per-subtask.
Не выбрасывайте RuntimeException в map для valid edge cases (например, пустой JSON). Это упадёт job. Лучше — log warning и эмитьте null/sentinel, или используйте flatMap (см. ниже), чтобы skip element. Job, который рестартится 100 раз в сутки из-за одного malformed JSON, — плохой production-код.
filter: фильтрация
filter(FilterFunction) — пропускает только события, для которых функция возвращает true.
DataStream<Transaction> suspicious = transactions.filter(
txn -> txn.getAmount().compareTo(BigDecimal.valueOf(10000)) > 0
);
С RichFilterFunction:
public class FraudFilter extends RichFilterFunction<Transaction> {
private transient Counter filteredCounter;
@Override
public void open(Configuration parameters) {
filteredCounter = getRuntimeContext()
.getMetricGroup()
.counter("filtered_transactions");
}
@Override
public boolean filter(Transaction txn) {
boolean keep = !fraudList.contains(txn.getUserId());
if (!keep) filteredCounter.inc();
return keep;
}
}
Производительность: filter — самая дешёвая операция. Применяйте максимально рано в pipeline (после source), чтобы down-stream операторы получили меньше данных. Каждая фильтрация в начале экономит CPU и память в shuffle.
flatMap: 1 -> N (включая 0)
flatMap(FlatMapFunction) — может эмитить 0, 1, или много событий на каждое входящее. Самый гибкий stateless transformation.
DataStream<String> lines = env.fromSource(kafkaSource, ws, "Source");
// Разбиение строки на слова
DataStream<String> words = lines.flatMap(
(String line, Collector<String> out) -> {
for (String word : line.toLowerCase().split("\\s+")) {
if (!word.isEmpty()) {
out.collect(word);
}
}
}
).returns(Types.STRING);
С полной FlatMapFunction (когда нужен type safety):
public class JsonExtractor implements FlatMapFunction<String, Transaction> {
@Override
public void flatMap(String json, Collector<Transaction> out) throws Exception {
try {
// Один JSON может содержать массив
JsonNode root = mapper.readTree(json);
if (root.isArray()) {
for (JsonNode node : root) {
out.collect(mapper.treeToValue(node, Transaction.class));
}
} else {
out.collect(mapper.treeToValue(root, Transaction.class));
}
} catch (Exception e) {
// Skip invalid JSON, log to side output
log.warn("Invalid JSON: {}", json);
// ничего не collect, событие "поглощается"
}
}
}
Производительность: flatMap дороже, чем map (создаётся Collector). Но если вы и так бы делали try/catch или unwrap collection — flatMap правильный инструмент.
keyBy: разделение по ключу
keyBy(KeySelector) — критическое преобразование. Оно превращает DataStream<T> в KeyedStream<T, K>. После keyBy все события с одинаковым ключом гарантированно попадают в один subtask, и можно использовать keyed state.
// Лямбда KeySelector
KeyedStream<Transaction, String> byUser = transactions
.keyBy(txn -> txn.getUserId());
// С полным KeySelector
KeyedStream<Transaction, Tuple2<String, String>> byUserAndCountry = transactions
.keyBy(new KeySelector<Transaction, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(Transaction txn) {
return Tuple2.of(txn.getUserId(), txn.getCountry());
}
});
Что происходит под капотом: keyBy — это hash partitioning по ключу. Flink вычисляет hash(key) % parallelism и отправляет событие в соответствующий subtask. Это shuffle — данные передаются по сети между TaskManager’ами (если subtask на другом TM).
keyBy с высокой cardinality ключа (миллиарды уникальных значений) — не проблема, наоборот. Но keyBy с низкой cardinality (например, country с 200 значениями) при parallelism=4 даст хорошее распределение. KeyBy с очень низкой cardinality (например, 10 ключей при parallelism=20) — большинство subtasks без данных, перегруз других. Это называется “hot key” проблема — она дорого решается, лучше избегать.
Hot keys: главный риск keyBy
Hot key — это ключ, который получает много больше событий, чем другие. Пример: keyBy(country), 90% траффика — США. Один subtask грузится на 100%, другие почти простаивают.
Решения:
- Префиксование ключа:
keyBy(country + "_" + random(10))— разбивает hot key на 10 sub-keys. Но потом нужна re-aggregation (двух-уровневая агрегация). - Иной ключ: вместо country использовать region+country, или userId+timeBucket.
- Pre-aggregation перед keyBy: уменьшить нагрузку через локальную агрегацию.
reduce: stateful агрегация
reduce(ReduceFunction) — работает только на KeyedStream. Накопительная функция: state — это последний reduced value per key, на каждое событие — новый reduced.
// keyBy + reduce: total amount per user
KeyedStream<Transaction, String> byUser = transactions.keyBy(Transaction::getUserId);
DataStream<Transaction> sumPerUser = byUser.reduce(
(a, b) -> {
// Тип "Transaction" остаётся; reduce — это T -> T -> T
return new Transaction(
a.getUserId(),
a.getAmount().add(b.getAmount()),
// другие поля — например, последний timestamp
Math.max(a.getTimestamp(), b.getTimestamp())
);
}
);
Тип результата reduce — такой же, как тип входа (T -> T). Это ограничение reduce — он не может изменить тип.
State: per key хранится одно значение типа T (последний reduced result). Размер state = количество уникальных ключей × размер T.
aggregate: гибкая агрегация
aggregate(AggregateFunction) — более мощный аналог reduce. Позволяет иметь intermediate type ACC ≠ input type IN ≠ output type OUT. Используется часто внутри windows.
public class AverageAggregator implements AggregateFunction<
Transaction, // input type
Tuple2<Double, Long>, // accumulator type (sum, count)
Double> { // output type (average)
@Override
public Tuple2<Double, Long> createAccumulator() {
return Tuple2.of(0.0, 0L);
}
@Override
public Tuple2<Double, Long> add(Transaction txn, Tuple2<Double, Long> acc) {
return Tuple2.of(
acc.f0 + txn.getAmount().doubleValue(),
acc.f1 + 1
);
}
@Override
public Double getResult(Tuple2<Double, Long> acc) {
return acc.f1 == 0 ? 0.0 : acc.f0 / acc.f1;
}
@Override
public Tuple2<Double, Long> merge(Tuple2<Double, Long> a, Tuple2<Double, Long> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
}
Зачем merge: нужен для session windows (объединение двух accumulators в один при merge sessions).
Использование внутри window:
DataStream<Double> avgPerWindow = byUser
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AverageAggregator());
Когда aggregate, когда reduce: используйте aggregate, когда тип intermediate state отличается от input/output. Reduce — только когда T -> T -> T достаточно (что часто бывает для простых случаев типа sum, max).
sum, max, min, minBy, maxBy: convenient shortcuts
Для очень типичных случаев есть shortcuts:
// Sum по полю (для Tuple2/Tuple3)
KeyedStream<Tuple2<String, Long>, String> byWord = wordsTupled
.keyBy(t -> t.f0);
DataStream<Tuple2<String, Long>> counts = byWord.sum(1); // sum поля index 1
// Max/min по полю
DataStream<Tuple2<String, Long>> maxes = byWord.max(1);
// MaxBy / minBy — возвращает целиком event с max value (не агрегат)
DataStream<Tuple2<String, Long>> highestPerKey = byWord.maxBy(1);
Эти shortcuts эквивалентны reduce/aggregate с simple функцией. Удобно для quick prototyping.
GroupBy aggregations в Spark: тот же паттерн, другая платформаКомпозиция: реальный pipeline
Соберём всё вместе на примере real-time fraud detection:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000);
env.setParallelism(4);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("transactions")
.setGroupId("fraud-detector")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
WatermarkStrategy<String> ws = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(5));
DataStream<String> raw = env.fromSource(source, ws, "Transactions Kafka").name("Source");
// Шаг 1: парсим JSON, skip невалидные
DataStream<Transaction> txns = raw
.flatMap((String json, Collector<Transaction> out) -> {
try {
out.collect(mapper.readValue(json, Transaction.class));
} catch (Exception e) {
// log + skip
}
})
.returns(TypeInformation.of(Transaction.class))
.name("Parse JSON");
// Шаг 2: фильтр — только подозрительные суммы и страны
DataStream<Transaction> suspicious = txns
.filter(txn ->
txn.getAmount().compareTo(BigDecimal.valueOf(5000)) > 0
|| HIGH_RISK_COUNTRIES.contains(txn.getCountry())
)
.name("Filter Suspicious");
// Шаг 3: групировка per user, агрегация суммы за окно
DataStream<UserRiskScore> userRisk = suspicious
.keyBy(Transaction::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new RiskScoreAggregator())
.name("Compute Risk Score");
// Шаг 4: фильтр — только high-risk пользователи
DataStream<UserRiskScore> highRisk = userRisk
.filter(score -> score.getScore() > 0.7)
.name("High Risk Users");
// Sink — пишем в alerts topic
highRisk.sinkTo(alertsKafkaSink).name("Sink to Alerts");
env.execute("Fraud Detection Pipeline");
Что здесь хорошо:
- Каждый оператор именован (
.name(...)) — в Web UI понятно, что делает. - Filter максимально рано — снижает нагрузку на keyBy и aggregate.
- Window aggregation — корректное использование event time.
- Sink на отдельный alerts топик — separation of concerns.
Производительность: pitfalls
-
Слишком много map’ов подряд: каждый map — отдельный оператор. Они chain’атся, но в логе сложнее читать. Лучше один map с несколькими действиями.
-
Heavy work в keyBy KeySelector: KeySelector вызывается для КАЖДОГО события. Не делайте там тяжёлых вычислений или DB lookup’ов.
-
Object creation в hot path:
new SimpleDateFormat()в map (вызывается тысячи раз/сек). Используйте RichMapFunction.open() для one-time init. -
Сериализация: если выходной тип map — Generic Object (не POJO/Tuple/Avro), Flink fallback на Kryo (медленный). См. урок 03.5 про сериализацию.
-
Шаффлинг (keyBy) перед фильтрацией: фильтруйте ДО keyBy, не после. Снижает shuffle volume.
Попробуй сам
Расширьте WordCount example:
- Добавьте фильтр: считать только слова длиннее 3 символов. Используйте
.filter(...). - Добавьте mapping: после счёта (sum) — преобразовать
Tuple2<String, Long>вWordCountEventобъект (новый POJO). Используйте.map(...). - Добавьте second-level aggregation: после per-word counter, посчитать total count всех слов через
keyBy(constant)+sum. Это даст общий счётчик слов. - Запустите и проверьте Web UI: сколько операторов? Какие chained? Сколько subtasks у каждого?
- Bonus: попробуйте
aggregateвместоsumдля подсчёта (используйтеAverageAggregatoradapted для count).