Skew detection и mitigation
Data skew — это когда некоторые ключи получают непропорционально большую долю events. На большом stream это значит: один subtask загружен на 100%, остальные idle. Throughput всего job-а ограничен этим одним subtask. Никакое увеличение parallelism не поможет — горячий key всегда живёт на одной задаче.
Этот урок про два навыка: как уверенно diagnostic skew (часто его симптомы misleading), и как mitigation techniques (от простых aggregation tricks до полноценного two-stage rebalance).
Skew join mitigation в Spark AQEОткуда берётся skew
Data skew появляется по двум причинам:
1. Natural distribution skew. Реальные данные редко uniform. Несколько guests составляют 80% trafficа, top-10 customers — 90% revenue, hashtag #mood в Twitter — millions vs typical hashtag — десятки. Если key напрямую отражает business entity — горячие entities дают skew.
2. Synthetic skew. Когда default value становится key — например все unauthenticated users получают user_id = "guest". Или device без serial = “unknown”. Эти synthetic keys собирают огромную долю events.
Skew проблема для всех keyed operations: GROUP BY в SQL, keyBy в DataStream API, joins, windows. Везде, где events partitioned by key, hot key создаёт hot subtask.
Detection: per-subtask metrics
Flink Web UI показывает per-subtask metrics для каждого operator. Skew виден через несколько signals:
1. records-in / records-out per subtask. Если разница 10x+ между min и max — skew.
Operator GroupAggregator (parallelism=8):
Subtask 0: 2.1M records/min
Subtask 1: 245K records/min
Subtask 2: 230K records/min
Subtask 3: 280K records/min
Subtask 4: 255K records/min
Subtask 5: 260K records/min
Subtask 6: 235K records/min
Subtask 7: 250K records/min
Subtask 0 получает 9x больше events, чем остальные. Это classic skew pattern.
2. CPU per subtask. Через JVM metrics:
flink_taskmanager_Status_JVM_CPU_Load{subtask="0"} = 0.95
flink_taskmanager_Status_JVM_CPU_Load{subtask="1"} = 0.12
flink_taskmanager_Status_JVM_CPU_Load{subtask="2"} = 0.15
...
Один subtask near 100% CPU, остальные idle.
3. Backpressure indicator. В Web UI backpressure HIGH на upstream operator, BUT только перед hot subtask. Это указывает что hot subtask не успевает consuming.
4. State size per subtask. В Checkpoint Details page:
Checkpoint #1247:
Subtask 0: 12 GB state
Subtask 1: 1.3 GB state
Subtask 2: 1.1 GB state
Subtask 3: 1.2 GB state
10x разница в state size — strong signal of skew (одна subtask хранит state для huge key).
Programmatic skew detection
Для production monitoring — automated detection через metrics. Custom check:
public class SkewDetector extends RichMapFunction{'<'}Event, Event{'>'} {
private transient Counter eventCount;
private transient int subtaskIndex;
private transient long startTime;
@Override
public void open(Configuration parameters) {
eventCount = getRuntimeContext()
.getMetricGroup()
.counter("subtaskEventCount");
subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
startTime = System.currentTimeMillis();
}
@Override
public Event map(Event e) {
eventCount.inc();
return e;
}
}
В Prometheus query для alerting:
# Coefficient of variation by subtask
stddev(rate(flink_taskmanager_job_task_operator_subtaskEventCount[5m])) by (operator_name)
/
avg(rate(flink_taskmanager_job_task_operator_subtaskEventCount[5m])) by (operator_name)
> 0.5
CoV > 0.5 (50% variation) — alert на skew. Tune threshold per workload.
Mitigation 1: Pre-aggregation
Самая простая (и часто достаточная) техника — pre-aggregation перед keyBy. Идея: события того же key амортизируются locally перед глобальным shuffle.
// До: каждое event идёт через сеть
events
.keyBy(e -{'>'} e.userId)
.sum("amount")
.print();
// После: pre-aggregation на source side
events
.keyBy(e -{'>'} e.userId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.reduce((a, b) -{'>'} {
return new Event(a.userId, a.amount + b.amount, a.ts);
})
.keyBy(e -{'>'} e.userId)
.sum("amount")
.print();
Trick: первый keyBy + window — это local aggregation внутри parallel subtask, второй keyBy — global. Между ними shuffle, но pre-aggregated events намного меньше.
Если на source 1M events/sec, после 1-секундной window получаем максимум (unique users) * 1 = ~10K aggregated events/sec going через shuffle. Skew всё ещё может быть на global side, но volume to hot subtask reduced в 100x.
Limitations:
- Подходит только для associative aggregations (sum, max, min, count).
- Не работает для non-associative (mean без weights, percentile).
- Добавляет latency на размер window.
Mitigation 2: LocalKeyedAggregation в Table API
Flink Table API имеет встроенный optimization для skewed group-by — LocalKeyedAggregation, который автоматически делает pre-aggregation:
SET table.optimizer.distinct-agg.split.enabled = true;
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
SELECT user_id,
COUNT(*) AS event_count,
SUM(amount) AS total
FROM orders
GROUP BY user_id;
С agg-phase-strategy = TWO_PHASE Calcite optimizer вставляет local aggregator перед shuffle. Working principle:
- Local agg внутри parallel subtask: partial aggregates по local keys.
- Shuffle: только partial aggregates сегу через сеть (much smaller volume).
- Global agg: финальная aggregation per key.
Это эквивалент manual pre-aggregation, но automatic. Recommended для всех Table API jobs с group-by.
Mini-batch для дальнейшего ускорения:
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
Mini-batch буферизует events в local agg, обрабатывает batch — это уменьшает per-event overhead.
Mitigation 3: Salting
Когда hot key concentrates events на одну subtask, можно искусственно разделить ключ на N “bucket”-ов. Это salting — добавление random suffix к key для distribution.
Two-stage aggregation pattern:
// Stage 1: salt the key, aggregate per (key, salt)
DataStream{'<'}Aggregate{'>'} stage1 = events
.map(e -{'>'} {
int salt = ThreadLocalRandom.current().nextInt(32); // 32 buckets
return new SaltedEvent(e.userId, salt, e.amount);
})
.keyBy(s -{'>'} s.userId + "_" + s.salt)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((a, b) -{'>'} new SaltedAggregate(a.userId, a.salt, a.sum + b.sum));
// Stage 2: remove salt, aggregate globally
DataStream{'<'}Result{'>'} result = stage1
.keyBy(s -{'>'} s.userId)
.reduce((a, b) -{'>'} new Result(a.userId, a.sum + b.sum));
В этой схеме:
- Hot key
guestтеперь distributed между 32 buckets:guest_0,guest_1, …,guest_31. - Stage 1 aggregation runs на 32x больше parallel keys — load distributed.
- Stage 2 aggregation runs только на pre-aggregated values — namely 32 partial aggregates per hot key — small volume.
Trade-offs:
- Latency: данные waiting в Stage 1 window (5 seconds в example).
- Complexity: добавляется один stage и custom key types.
- Approximation: если используется percentile или other non-associative — нельзя salt directly.
Salt number tuning:
- Small (4-8): для mild skew.
- Medium (16-32): для moderate skew.
- Large (64-128): для extreme skew (одна key получает 50%+ events).
Чем больше salt — больше distribution, но больше state overhead (each bucket maintains separate aggregate).
Mitigation 4: Hot key sidetracking
Иногда proceeding делать full two-stage слишком complex. Простая heuristic — выделить специально известные hot keys и обработать их separately:
SingleOutputStreamOperator{'<'}Event{'>'} routed = events
.process(new ProcessFunction{'<'}Event, Event{'>'}() {
private final Set{'<'}String{'>'} HOT_KEYS = Set.of("guest", "unknown", "anonymous");
private final OutputTag{'<'}Event{'>'} hotKeysTag = new OutputTag{'<'}{'>'}("hot") {};
@Override
public void processElement(Event e, Context ctx, Collector{'<'}Event{'>'} out) {
if (HOT_KEYS.contains(e.userId)) {
ctx.output(hotKeysTag, e); // hot keys в отдельный stream
} else {
out.collect(e); // normal stream
}
}
});
// Normal processing для остальных keys
DataStream{'<'}Result{'>'} normalResult = routed
.keyBy(e -{'>'} e.userId)
.sum("amount");
// Specialized processing для hot keys с salting
DataStream{'<'}Result{'>'} hotResult = routed.getSideOutput(hotKeysTag)
.map(e -{'>'} new SaltedEvent(e.userId, randomSalt(64), e.amount))
.keyBy(s -{'>'} s.userId + "_" + s.salt)
.sum("amount")
.keyBy(s -{'>'} s.userId)
.sum("amount");
// Union
DataStream{'<'}Result{'>'} all = normalResult.union(hotResult);
Этот подход:
- Cold path (99% keys) — straight keyBy, fastest path, no overhead.
- Hot path (specific keys) — salted aggregation, more overhead но distributed load.
- Total: normal throughput на cold path, manageable throughput на hot path.
Tradeoff: нужно знать hot keys заранее. В production обычно вы знаете top-N через offline analysis (predefined list) или через runtime monitoring.
Watermark skew alignment
Связанная проблема — watermark skew, когда некоторые subtasks имеют namely медленные watermarks. Это сильно отличается от data skew, но симптомы похожи: некоторые subtask блокируются.
Watermark skew случается когда:
- Источник имеет неравномерные events per partition (Kafka partition имеет idle period).
- Один subtask processes more records и delays watermark advancement.
- Network delays для some partitions.
Detection: Flink Web UI -> Watermarks tab. Если разница 30+ seconds — watermark skew.
Mitigation через watermark alignment (Flink 1.15+):
WatermarkStrategy{'<'}Event{'>'} strategy = WatermarkStrategy
.{'<'}Event{'>'}forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withIdleness(Duration.ofMinutes(5))
.withTimestampAssigner((e, ts) -{'>'} e.timestamp)
.withWatermarkAlignment(
"wm-group-1", // alignment group ID
Duration.ofSeconds(20), // max drift
Duration.ofSeconds(1) // update interval
);
KafkaSource source = KafkaSource.{'<'}Event{'>'}builder()
...
.build();
env.fromSource(source, strategy, "kafka");
Watermark alignment — это backpressure-based mechanism. Subtasks которые advance watermark быстрее throttled, чтобы overall watermark moved evenly. Predeven watermarks -> predсуmm event processing.
Combining techniques
Real production обычно combines multiple techniques. Workflow:
-
First — detection. Custom metrics + Prometheus alerting. Без detection skew invisible.
-
Try Table API + TWO_PHASE first. Если pipeline на SQL — enable two-phase agg. Free win, automatic optimization.
-
Pre-aggregation для DataStream API. Если manual code — add pre-aggregation window перед global keyBy.
-
Salting для остальных cases. Если pre-aggregation не достаточно — salting two-stage.
-
Hot key sidetracking для extreme cases. Когда conkretnyh keys создают 50%+ всех events.
-
Watermark alignment если skew в event time — отдельная проблема, отдельный fix.
-
Monitoring continues. Skew changes over time (например new product launch creates new hot key). Continuous monitoring необходим.
Skew detection и mitigation — это операционная discipline, не одноразовый fix. Скейл team-ы expect monthly check of per-subtask metrics для top-N largest jobs. Создайте dashboard “Skew Health” с per-job CoV и review его раз в неделю.
Anti-patterns
1. Just increase parallelism. Не работает для skew. Если key hot, он останется hot на любом числе subtasks. Single key всегда на одной subtask.
2. Manual hashing key. Some try keyBy(e -{'>'} hash(e.userId)) думая что это distribute. Это just rehashes, hot keys всё ещё concentrate.
3. Random partitioning через .rebalance(). Distributes evenly, но ломает keyBy semantics — нельзя aggregate per user, потому что user events scattered.
4. Ignoring skew until production crisis. Самый частый pattern. Detection не настроена, hot key emerges, всё горит. Setup monitoring proactively.
Real production case
Real case из ride-sharing platform. Job aggregates ride completions per driver. После platform expansion в new city, скalability стала проблемой.
Symptoms: один subtask 100% CPU, остальные 20%. Backpressure от upstream. Throughput dropped 60%.
Investigation: per-subtask metrics показал subtask 7 получает 8x events. Top-cardinality analysis identified driver_id = "system_dispatch" — это shared key для system-generated rides (rebalancing trips). Этот ID accumulated 80% всех events.
Mitigation: hot key sidetracking pattern.
- Created OutputTag для system_dispatch events.
- Cold path: normal keyBy на real driver IDs.
- Hot path: salt system_dispatch на 32 buckets, two-stage aggregation.
After mitigation:
- All subtasks 35-40% CPU (equal).
- Throughput recovered to baseline.
- Latency p99 from 2 seconds to 200ms.
Lesson: regular cardinality analysis на large jobs identifies emerging hot keys before they crisis.
Попробуй сам
-
Symulate skew. Создайте Flink job с keyBy и аггрегацией. В source 90% events имеют
key="hot", 10% — random. Observe per-subtask metrics — one subtask saturated. -
Pre-aggregation fix. Add tumbling window перед keyBy для local aggregation. Re-test — distribution должна improve.
-
Salting на extreme skew. Make 99% events have same key. Try salting с 64 buckets. Compare before/after CPU per subtask.