Learning Platform
Глоссарий Troubleshooting
Урок 20.04 · 26 мин
Продвинутый
Data SkewPre-aggregationLocalKeyedAggregationSaltingTwo-stage Aggregation

Skew detection и mitigation

Data skew — это когда некоторые ключи получают непропорционально большую долю events. На большом stream это значит: один subtask загружен на 100%, остальные idle. Throughput всего job-а ограничен этим одним subtask. Никакое увеличение parallelism не поможет — горячий key всегда живёт на одной задаче.

Этот урок про два навыка: как уверенно diagnostic skew (часто его симптомы misleading), и как mitigation techniques (от простых aggregation tricks до полноценного two-stage rebalance).

Skew join mitigation в Spark AQE

Откуда берётся skew

Data skew появляется по двум причинам:

1. Natural distribution skew. Реальные данные редко uniform. Несколько guests составляют 80% trafficа, top-10 customers — 90% revenue, hashtag #mood в Twitter — millions vs typical hashtag — десятки. Если key напрямую отражает business entity — горячие entities дают skew.

2. Synthetic skew. Когда default value становится key — например все unauthenticated users получают user_id = "guest". Или device без serial = “unknown”. Эти synthetic keys собирают огромную долю events.

Skew проблема для всех keyed operations: GROUP BY в SQL, keyBy в DataStream API, joins, windows. Везде, где events partitioned by key, hot key создаёт hot subtask.


Detection: per-subtask metrics

Flink Web UI показывает per-subtask metrics для каждого operator. Skew виден через несколько signals:

1. records-in / records-out per subtask. Если разница 10x+ между min и max — skew.

Operator GroupAggregator (parallelism=8):
  Subtask 0: 2.1M records/min
  Subtask 1: 245K records/min
  Subtask 2: 230K records/min
  Subtask 3: 280K records/min
  Subtask 4: 255K records/min
  Subtask 5: 260K records/min
  Subtask 6: 235K records/min
  Subtask 7: 250K records/min

Subtask 0 получает 9x больше events, чем остальные. Это classic skew pattern.

2. CPU per subtask. Через JVM metrics:

flink_taskmanager_Status_JVM_CPU_Load{subtask="0"} = 0.95
flink_taskmanager_Status_JVM_CPU_Load{subtask="1"} = 0.12
flink_taskmanager_Status_JVM_CPU_Load{subtask="2"} = 0.15
...

Один subtask near 100% CPU, остальные idle.

3. Backpressure indicator. В Web UI backpressure HIGH на upstream operator, BUT только перед hot subtask. Это указывает что hot subtask не успевает consuming.

4. State size per subtask. В Checkpoint Details page:

Checkpoint #1247:
  Subtask 0: 12 GB state
  Subtask 1: 1.3 GB state
  Subtask 2: 1.1 GB state
  Subtask 3: 1.2 GB state

10x разница в state size — strong signal of skew (одна subtask хранит state для huge key).


Programmatic skew detection

Для production monitoring — automated detection через metrics. Custom check:

public class SkewDetector extends RichMapFunction{'<'}Event, Event{'>'} {
    private transient Counter eventCount;
    private transient int subtaskIndex;
    private transient long startTime;

    @Override
    public void open(Configuration parameters) {
        eventCount = getRuntimeContext()
            .getMetricGroup()
            .counter("subtaskEventCount");
        subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
        startTime = System.currentTimeMillis();
    }

    @Override
    public Event map(Event e) {
        eventCount.inc();
        return e;
    }
}

В Prometheus query для alerting:

# Coefficient of variation by subtask
stddev(rate(flink_taskmanager_job_task_operator_subtaskEventCount[5m])) by (operator_name)
/
avg(rate(flink_taskmanager_job_task_operator_subtaskEventCount[5m])) by (operator_name)
> 0.5

CoV > 0.5 (50% variation) — alert на skew. Tune threshold per workload.


Skew: проблема и mitigation techniques
До mitigation: 1 hot subtask, 7 idle
Subtask 0: 95% CPUHot subtask: получает 90% events для горячего ключа 'guest'. CPU 100%, backpressure HIGH. Является bottleneck'ом всего pipeline. Throughput всего job-а определяется этой subtask.
SB1: 12%Idle subtask 1: 12% CPU.
SB2: 15%Idle subtask 2: 15% CPU.
SB3: 14%Idle subtask 3: 14% CPU.
SB4: 13%Idle subtask 4: 13% CPU.
SB5: 11%Idle subtask 5: 11% CPU.
SB6: 14%Idle subtask 6: 14% CPU.
SB7: 13%Idle subtask 7: 13% CPU.
После two-stage aggregation: load distributed
SB0: 35%После salting: hot key distributed между 16 sub-buckets. Каждая subtask получает 1-2 buckets, equal load.
SB1: 38%Subtask 1 теперь также handles partial guest aggregation.
SB2: 36%Subtask 2 теперь также handles partial guest aggregation.
SB3: 37%Subtask 3 теперь также handles partial guest aggregation.
SB4: 35%Subtask 4 теперь также handles partial guest aggregation.
SB5: 38%Subtask 5 теперь также handles partial guest aggregation.
SB6: 36%Subtask 6 теперь также handles partial guest aggregation.
SB7: 37%Subtask 7 теперь также handles partial guest aggregation.

Mitigation 1: Pre-aggregation

Самая простая (и часто достаточная) техника — pre-aggregation перед keyBy. Идея: события того же key амортизируются locally перед глобальным shuffle.

// До: каждое event идёт через сеть
events
    .keyBy(e -{'>'} e.userId)
    .sum("amount")
    .print();

// После: pre-aggregation на source side
events
    .keyBy(e -{'>'} e.userId)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    .reduce((a, b) -{'>'} {
        return new Event(a.userId, a.amount + b.amount, a.ts);
    })
    .keyBy(e -{'>'} e.userId)
    .sum("amount")
    .print();

Trick: первый keyBy + window — это local aggregation внутри parallel subtask, второй keyBy — global. Между ними shuffle, но pre-aggregated events намного меньше.

Если на source 1M events/sec, после 1-секундной window получаем максимум (unique users) * 1 = ~10K aggregated events/sec going через shuffle. Skew всё ещё может быть на global side, но volume to hot subtask reduced в 100x.

Limitations:

  • Подходит только для associative aggregations (sum, max, min, count).
  • Не работает для non-associative (mean без weights, percentile).
  • Добавляет latency на размер window.

Mitigation 2: LocalKeyedAggregation в Table API

Flink Table API имеет встроенный optimization для skewed group-by — LocalKeyedAggregation, который автоматически делает pre-aggregation:

SET table.optimizer.distinct-agg.split.enabled = true;
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

SELECT user_id,
       COUNT(*) AS event_count,
       SUM(amount) AS total
FROM orders
GROUP BY user_id;

С agg-phase-strategy = TWO_PHASE Calcite optimizer вставляет local aggregator перед shuffle. Working principle:

  1. Local agg внутри parallel subtask: partial aggregates по local keys.
  2. Shuffle: только partial aggregates сегу через сеть (much smaller volume).
  3. Global agg: финальная aggregation per key.

Это эквивалент manual pre-aggregation, но automatic. Recommended для всех Table API jobs с group-by.

Mini-batch для дальнейшего ускорения:

SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;

Mini-batch буферизует events в local agg, обрабатывает batch — это уменьшает per-event overhead.


Mitigation 3: Salting

Когда hot key concentrates events на одну subtask, можно искусственно разделить ключ на N “bucket”-ов. Это salting — добавление random suffix к key для distribution.

Two-stage aggregation pattern:

// Stage 1: salt the key, aggregate per (key, salt)
DataStream{'<'}Aggregate{'>'} stage1 = events
    .map(e -{'>'} {
        int salt = ThreadLocalRandom.current().nextInt(32);  // 32 buckets
        return new SaltedEvent(e.userId, salt, e.amount);
    })
    .keyBy(s -{'>'} s.userId + "_" + s.salt)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((a, b) -{'>'} new SaltedAggregate(a.userId, a.salt, a.sum + b.sum));

// Stage 2: remove salt, aggregate globally
DataStream{'<'}Result{'>'} result = stage1
    .keyBy(s -{'>'} s.userId)
    .reduce((a, b) -{'>'} new Result(a.userId, a.sum + b.sum));

В этой схеме:

  • Hot key guest теперь distributed между 32 buckets: guest_0, guest_1, …, guest_31.
  • Stage 1 aggregation runs на 32x больше parallel keys — load distributed.
  • Stage 2 aggregation runs только на pre-aggregated values — namely 32 partial aggregates per hot key — small volume.

Trade-offs:

  • Latency: данные waiting в Stage 1 window (5 seconds в example).
  • Complexity: добавляется один stage и custom key types.
  • Approximation: если используется percentile или other non-associative — нельзя salt directly.

Salt number tuning:

  • Small (4-8): для mild skew.
  • Medium (16-32): для moderate skew.
  • Large (64-128): для extreme skew (одна key получает 50%+ events).

Чем больше salt — больше distribution, но больше state overhead (each bucket maintains separate aggregate).


Mitigation 4: Hot key sidetracking

Иногда proceeding делать full two-stage слишком complex. Простая heuristic — выделить специально известные hot keys и обработать их separately:

SingleOutputStreamOperator{'<'}Event{'>'} routed = events
    .process(new ProcessFunction{'<'}Event, Event{'>'}() {
        private final Set{'<'}String{'>'} HOT_KEYS = Set.of("guest", "unknown", "anonymous");
        private final OutputTag{'<'}Event{'>'} hotKeysTag = new OutputTag{'<'}{'>'}("hot") {};

        @Override
        public void processElement(Event e, Context ctx, Collector{'<'}Event{'>'} out) {
            if (HOT_KEYS.contains(e.userId)) {
                ctx.output(hotKeysTag, e);  // hot keys в отдельный stream
            } else {
                out.collect(e);  // normal stream
            }
        }
    });

// Normal processing для остальных keys
DataStream{'<'}Result{'>'} normalResult = routed
    .keyBy(e -{'>'} e.userId)
    .sum("amount");

// Specialized processing для hot keys с salting
DataStream{'<'}Result{'>'} hotResult = routed.getSideOutput(hotKeysTag)
    .map(e -{'>'} new SaltedEvent(e.userId, randomSalt(64), e.amount))
    .keyBy(s -{'>'} s.userId + "_" + s.salt)
    .sum("amount")
    .keyBy(s -{'>'} s.userId)
    .sum("amount");

// Union
DataStream{'<'}Result{'>'} all = normalResult.union(hotResult);

Этот подход:

  • Cold path (99% keys) — straight keyBy, fastest path, no overhead.
  • Hot path (specific keys) — salted aggregation, more overhead но distributed load.
  • Total: normal throughput на cold path, manageable throughput на hot path.

Tradeoff: нужно знать hot keys заранее. В production обычно вы знаете top-N через offline analysis (predefined list) или через runtime monitoring.


Watermark skew alignment

Связанная проблема — watermark skew, когда некоторые subtasks имеют namely медленные watermarks. Это сильно отличается от data skew, но симптомы похожи: некоторые subtask блокируются.

Watermark skew случается когда:

  • Источник имеет неравномерные events per partition (Kafka partition имеет idle period).
  • Один subtask processes more records и delays watermark advancement.
  • Network delays для some partitions.

Detection: Flink Web UI -> Watermarks tab. Если разница 30+ seconds — watermark skew.

Mitigation через watermark alignment (Flink 1.15+):

WatermarkStrategy{'<'}Event{'>'} strategy = WatermarkStrategy
    .{'<'}Event{'>'}forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withIdleness(Duration.ofMinutes(5))
    .withTimestampAssigner((e, ts) -{'>'} e.timestamp)
    .withWatermarkAlignment(
        "wm-group-1",            // alignment group ID
        Duration.ofSeconds(20),  // max drift
        Duration.ofSeconds(1)    // update interval
    );

KafkaSource source = KafkaSource.{'<'}Event{'>'}builder()
    ...
    .build();

env.fromSource(source, strategy, "kafka");

Watermark alignment — это backpressure-based mechanism. Subtasks которые advance watermark быстрее throttled, чтобы overall watermark moved evenly. Predeven watermarks -> predсуmm event processing.


Combining techniques

Real production обычно combines multiple techniques. Workflow:

  1. First — detection. Custom metrics + Prometheus alerting. Без detection skew invisible.

  2. Try Table API + TWO_PHASE first. Если pipeline на SQL — enable two-phase agg. Free win, automatic optimization.

  3. Pre-aggregation для DataStream API. Если manual code — add pre-aggregation window перед global keyBy.

  4. Salting для остальных cases. Если pre-aggregation не достаточно — salting two-stage.

  5. Hot key sidetracking для extreme cases. Когда conkretnyh keys создают 50%+ всех events.

  6. Watermark alignment если skew в event time — отдельная проблема, отдельный fix.

  7. Monitoring continues. Skew changes over time (например new product launch creates new hot key). Continuous monitoring необходим.

TIP

Skew detection и mitigation — это операционная discipline, не одноразовый fix. Скейл team-ы expect monthly check of per-subtask metrics для top-N largest jobs. Создайте dashboard “Skew Health” с per-job CoV и review его раз в неделю.


Anti-patterns

1. Just increase parallelism. Не работает для skew. Если key hot, он останется hot на любом числе subtasks. Single key всегда на одной subtask.

2. Manual hashing key. Some try keyBy(e -{'>'} hash(e.userId)) думая что это distribute. Это just rehashes, hot keys всё ещё concentrate.

3. Random partitioning через .rebalance(). Distributes evenly, но ломает keyBy semantics — нельзя aggregate per user, потому что user events scattered.

4. Ignoring skew until production crisis. Самый частый pattern. Detection не настроена, hot key emerges, всё горит. Setup monitoring proactively.


Real production case

Real case из ride-sharing platform. Job aggregates ride completions per driver. После platform expansion в new city, скalability стала проблемой.

Symptoms: один subtask 100% CPU, остальные 20%. Backpressure от upstream. Throughput dropped 60%.

Investigation: per-subtask metrics показал subtask 7 получает 8x events. Top-cardinality analysis identified driver_id = "system_dispatch" — это shared key для system-generated rides (rebalancing trips). Этот ID accumulated 80% всех events.

Mitigation: hot key sidetracking pattern.

  • Created OutputTag для system_dispatch events.
  • Cold path: normal keyBy на real driver IDs.
  • Hot path: salt system_dispatch на 32 buckets, two-stage aggregation.

After mitigation:

  • All subtasks 35-40% CPU (equal).
  • Throughput recovered to baseline.
  • Latency p99 from 2 seconds to 200ms.

Lesson: regular cardinality analysis на large jobs identifies emerging hot keys before they crisis.


Попробуй сам

  1. Symulate skew. Создайте Flink job с keyBy и аггрегацией. В source 90% events имеют key="hot", 10% — random. Observe per-subtask metrics — one subtask saturated.

  2. Pre-aggregation fix. Add tumbling window перед keyBy для local aggregation. Re-test — distribution должна improve.

  3. Salting на extreme skew. Make 99% events have same key. Try salting с 64 buckets. Compare before/after CPU per subtask.

Проверка знанийKnowledge check
Вы наблюдаете в production Flink job: GROUP BY user_id с count + sum, parallelism 16. Top 3 subtasks показывают 90%+ CPU, остальные 13 — менее 20%. Кто эти top-3 keys и как минимально-invasive исправить? Что если у вас Table API на SQL, а что если DataStream API на Java?
ОтветAnswer
Сначала диагностика: query data чтобы найти top-N keys. Самый простой способ — отдельный batch query через Flink SQL или Spark на тот же source: SELECT user_id, COUNT(*) FROM source GROUP BY user_id ORDER BY 2 DESC LIMIT 10. Это выдаст concrete hot keys. Скорее всего это special values (guest, system, null) или legitimate huge customers. Для Table API на SQL: (1) Самый простой — enable two-phase aggregation: SET table.optimizer.agg-phase-strategy = TWO_PHASE и SET table.optimizer.distinct-agg.split.enabled = true. Это автоматически вставляет local aggregator перед shuffle. Также enable mini-batch: SET table.exec.mini-batch.enabled = true; SET table.exec.mini-batch.allow-latency = 1s; SET table.exec.mini-batch.size = 5000. Это объединяет local aggregation + batching — обычно достаточно для moderate skew. (2) Если этого мало — SQL hints для skew handling (Flink 2.x): /*+ SKEW_HANDLE(keys='user_id', values='guest,system') */ — Flink applies salting automatically для specified hot keys. (3) Если SQL hints unavailable или skew extreme — нужно manually rewrite query с salting через subquery. Для DataStream API на Java: (1) Сначала pre-aggregation window: events.keyBy(userId).window(...).reduce(...).keyBy(userId).sum() — это amortizes local events. (2) Hot key sidetracking — separate OutputTag для top-3 hot keys, normal keyBy для остальных, salted two-stage для hot. Это max effectiveness, добавляет complexity. (3) Watermark alignment если задействует event time. (4) После любого изменения — monitor per-subtask CoV должен быть под 0.3, CPU per subtask within 20% of each other. И setup automated alert на CoV > 0.5 чтобы skew regression не вернулся silently.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Production Flink job: GROUP BY user_id с count + sum, parallelism 16. Top 3 subtasks 90%+ CPU, остальные 13 — <20%. Какой минимально-invasive fix в Table API SQL и в DataStream API Java?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5