Learning Platform
Глоссарий Troubleshooting
Урок 17.03 · 28 мин
Средний
PitfallsDebuggingStateBackpressureWatermarkSkewOOM

Топ-10 production pitfalls в Flink

Каждый, кто бил Flink в production, проходил через одни и те же грабли. Этот урок — пройденный путь, спрессованный в 10 проблем с симптомами, диагностикой и фиксами. Если запомнить их и проверять при дизайне новых джобов — сэкономите недели.


1. Unbounded state: тихая бомба

Симптом: размер checkpoint растёт линейно во времени. Через недели/месяцы — OOM на TaskManager-ах или RocksDB не открывается.

Причина: keyed state без TTL. Например, ProcessFunction по userId, каждый новый пользователь добавляет ключ в RocksDB, и никто его не удаляет. У вас 10M активных пользователей в день, но историчных — 100M, и все они в state.

Фикс: StateTtlConfig на каждый state descriptor.

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(30))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter(1000)
    .build();

ValueStateDescriptor<UserProfile> desc =
    new ValueStateDescriptor<>("user-profile", UserProfile.class);
desc.enableTimeToLive(ttlConfig);

Или явно state.clear() в ProcessFunction когда вы логически знаете, что ключ устарел (например, по timer).

Превентивно: alert на deriv(flink_jobmanager_job_lastCheckpointSize[1h]) > 1MB/час — заметите рост за дни до проблемы.


2. Late events: окно закрылось, данные потеряны

Симптом: в результатах windowed aggregation отсутствуют записи. Counts/sums меньше, чем ожидаете.

Причина: event arrived later than watermark (late event). Окно уже закрылось, событие отброшено.

Фикс: allowedLateness + sideOutputLateData.

OutputTag<Event> lateTag = new OutputTag<>("late-events"){};

SingleOutputStreamOperator<Result> agg = stream
    .keyBy(e -> e.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(10))   // окно держится ещё 10 мин
    .sideOutputLateData(lateTag)          // совсем поздние - в side output
    .aggregate(new SumAggregate());

DataStream<Event> lateEvents = agg.getSideOutput(lateTag);
// Лог или специальная обработка

allowedLateness отодвигает закрытие окна, давая опоздавшим событиям шанс быть включёнными. sideOutputLateData собирает совсем поздних в отдельный поток для анализа или backfill.

Метрика: flink_taskmanager_job_task_operator_numLateRecordsDropped — counter поздних событий. Должен быть около нуля при правильно настроенных watermarks.


3. Watermark stuck: всё стоит

Kafka consumer groups и partition assignment

Симптом: windowed агрегации перестали выдавать результаты. Web UI показывает watermark не двигается.

Причина: одна из партиций Kafka источника простаивает (нет новых событий). Watermark = min(per-partition watermark), поэтому одна “молчащая” партиция держит весь watermark.

Фикс: WatermarkStrategy.withIdleness(...).

WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
    .withIdleness(Duration.ofMinutes(1))   // партиция idle если 1 мин без событий
    .withTimestampAssigner((e, ts) -> e.eventTime);

Idle партиции исключаются из расчёта watermark, давая остальным двигаться. Не теряете данных в активных партициях.

Альтернатива: убедитесь, что в идеале нет idle партиций — но это часто не контролируете.


4. KryoSerializer fallback: молчаливая deg производительности

Симптом: throughput неожиданно ниже, чем должен быть. В логах Flink при старте: INFO ... is not a POJO type. Defining as a generic type. CPU upiraется в сериализацию.

Причина: ваш класс не подходит под POJO requirements (например, нет no-args constructor, нет setters). Flink молча использует Kryo, который в 5-10 раз медленнее.

Фикс: сделайте класс POJO:

// Не POJO (final fields, нет no-args constructor)
public final class Order {
    public final long id;
    public final String userId;
    public Order(long id, String userId) { ... }
}

// POJO-compliant
public class Order {
    public long id;       // public field
    public String userId;
    public Order() {}      // no-args constructor
}

Или используйте Avro / Protobuf — они и для serialization лучше Kryo.

Превентивно: установить pipeline.disable-generic-types: true в конфиге — Flink упадёт при попытке Kryo fallback, заставляя вас исправить класс.

flinkConfiguration:
  pipeline.disable-generic-types: "true"

5. Data skew: одна партиция перегружена

Симптом: один TM-под занят 100% CPU, остальные idle. Throughput низкий. Per-operator backpressure на одной субзадаче (subtask) высокий.

Причина: keyBy по полю с неравномерным распределением. Например, keyBy(userId), но 1% “топ-пользователей” генерируют 50% событий.

Фикс 1: pre-aggregation перед keyBy — снижает rate перед партиционированием.

stream
    .keyBy(e -> e.userId)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    .reduce(new EventReducer())  // local aggregation
    .keyBy(e -> e.userId)
    .process(new HeavyLogic());

Фикс 2: salting — искусственно разделить hot keys на несколько слотов.

stream
    .map(e -> new Tuple2<>(e.userId + "_" + (e.hashCode() % 4), e))  // salt
    .keyBy(t -> t.f0)
    .process(...)
    .map(t -> t.f1)
    .keyBy(e -> e.userId)
    .process(new FinalAggregation());

Метрика для диагностики: flink_taskmanager_job_task_operator_numRecordsInPerSecond по subtask_index. Если один subtask имеет 100x больше records — это skew.


6. OOM TaskManager: state не помещается в RocksDB managed memory

Симптом: TaskManager OOMKilled. В логах: Direct buffer memory или RocksDB CompactionStallException.

Причина: state больше RocksDB managed memory, idescriptors используют unmanaged native memory без лимита.

Фикс: explicit RocksDB memory tuning.

flinkConfiguration:
  taskmanager.memory.process.size: "4096m"
  taskmanager.memory.managed.fraction: "0.4"   # 40% from process = 1638m для RocksDB
  state.backend.rocksdb.memory.managed: "true"
  state.backend.rocksdb.memory.high-prio-pool-ratio: "0.1"
  state.backend.rocksdb.memory.write-buffer-ratio: "0.5"

memory.managed — это native memory pool, который RocksDB использует для block cache и write buffers. Без memory.managed: true RocksDB берёт native memory из off-heap unmanaged — а это не лимитировано и приводит к OOM container-а.

Также: если ваш state очень большой (100+ GB на TM), используйте state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM или твикайте RocksDB вручную.


7. JobManager OOM: ExecutionGraph слишком большой

Симптом: JM в CrashLoopBackOff с OOM. Кластер недоступен.

Причина: ExecutionGraph держится в памяти JM. Большой parallelism (parallelism × число операторов = много vertices) + накопившиеся metadata = OOM. Также утечки в Flink на некоторых версиях.

Фикс:

  1. Увеличить heap JM: jobmanager.memory.process.size: "4096m" (default 1600m часто мало для большого графа).
  2. Уменьшить parallelism, если возможно (более крупный per-slot work).
  3. Использовать operator chaining (по умолчанию on) — снижает число vertices.

Превентивно: alert на flink_jobmanager_Status_JVM_Memory_Heap_Used / flink_jobmanager_Status_JVM_Memory_Heap_Max > 0.85 — успеете увеличить до OOM.


8. Slow sink: backpressure до источника

Симптом: backpressure 800-1000 ms/s на sink-операторе. Lag к Kafka растёт.

Причина: sink (JDBC, REST, S3, Elasticsearch) медленнее, чем upstream production. Backpressure распространяется через весь граф до source — source перестаёт читать Kafka.

Фикс:

  1. Увеличить parallelism sink-а: если у JDBC 1 connection, поднять до 16 с pool.
  2. Batch writes: вместо INSERT per record — batch INSERT (sink.batch.max-size: 1000).
  3. Async sink: для I/O-bound sink-ов используйте AsyncSinkBase (Flink 1.15+) — non-blocking I/O с pipelining.
  4. Throttle source (как временный workaround): kafka.consumer.max.poll.records: 100 чтобы не задыхаться.

В долгосрочной перспективе: capacity planning sink-а должно быть на capacity peak load × 1.5 safety margin.


9. Checkpoint timeout: tail latency на S3

Симптом: flink_jobmanager_job_numFailedCheckpoints растёт. Failed checkpoints с reason “Checkpoint expired”.

Причина: один из subtask-ов очень медленно делает snapshot — обычно из-за S3 latency. Default timeout 10 минут, но при медленном S3 (тайл lat) можно не уложиться.

Фикс:

  1. Увеличить timeout: execution.checkpointing.timeout: 30min.
  2. Включить unaligned: execution.checkpointing.unaligned: true — barrier-ы быстрее проходят через backpressured пайплайн.
  3. Native incremental snapshots (Flink 2.0+): state.savepoints.format: native — RocksDB делает incremental snapshot, быстрее full canonical.
  4. Параллелизм S3 upload: s3.upload.max.concurrent.uploads: 32 (default обычно 4).
  5. Сменить S3 на S3 Express One Zone (если AWS) — single-digit ms latency, в 5-10x быстрее регулярного S3 для high-throughput workload.

10. Restart loop из-за non-recoverable error

Симптом: flink_jobmanager_job_numRestarts растёт постоянно. Джоб никогда не реaches STABLE.

Причина: в коде есть exception, который выбрасывается каждый раз для одной и той же записи (например, NullPointerException при unexpected schema). Restart strategy перезапускает джоб, он сразу падает на той же записи.

Фикс:

  1. Поймать exception в operator-е и отправить запись в dead letter queue (Kafka topic или S3) для разбора:
public class SafeMapFunction extends RichMapFunction<Event, Result> {
    @Override
    public Result map(Event event) {
        try {
            return process(event);
        } catch (Exception e) {
            // send to DLQ
            dlqProducer.send(event);
            return null;  // или специальное значение
        }
    }
}
  1. Restart strategy: restart-strategy.fixed-delay.attempts: 3 с delay — после 3 failures джоб остановится. Лучше явная остановка, чем infinite loop.
flinkConfiguration:
  restart-strategy.type: "fixed-delay"
  restart-strategy.fixed-delay.attempts: "3"
  restart-strategy.fixed-delay.delay: "30s"
  1. Schema validation на source: если события могут быть с invalid schema, валидируйте при чтении (например, Schema Registry compatibility check), отбрасывайте invalid в DLQ.

Метрика: alert rate(flink_jobmanager_job_numRestarts[10m]) > 0.1 — > 1 restart за 10 мин уже стоит посмотреть.


Шпаргалка: симптом -> проблема

Симптомы и pitfalls
State растёт линейноUnbounded state без TTL. См. pitfall #1.
Window результаты неполныеLate events дропаются. См. pitfall #2.
Window не выдаёт результатыWatermark stuck из-за idle partition. См. pitfall #3.
Throughput низкий, CPU sериализацияKryo fallback. См. pitfall #4.
Один TM перегружен, остальные idleData skew по ключу. См. pitfall #5.
TaskManager OOMRocksDB native memory. См. pitfall #6.
JobManager OOMExecutionGraph слишком большой. См. pitfall #7.
Backpressure до источникаSlow sink. См. pitfall #8.
Checkpoint timeoutМедленный S3 или large state. См. pitfall #9.
Restart loopNon-recoverable error на одной записи. См. pitfall #10.

Ключевые выводы

  1. Unbounded state — самая частая бомба. Всегда настраивайте StateTtlConfig или явно очищайте.

  2. Late events и watermark stuck — частые проблемы event-time обработки. allowedLateness, sideOutputLateData, withIdleness — обязательные настройки.

  3. KryoSerializer fallback молчаливо убивает производительность. pipeline.disable-generic-types: true ловит проблему на старте.

  4. Data skew требует pre-aggregation или salting. Diagnostics — per-subtask records-in metric.

  5. OOM TM и JM — разные причины. TM: RocksDB native memory, нужно memory.managed: true. JM: размер ExecutionGraph, увеличить heap.

  6. Slow sink распространяет backpressure до источника. Поднять parallelism, batch writes, async sink.

  7. Checkpoint timeout — обычно медленный S3 или большой state. Unaligned + native incremental + параллелизм upload.

  8. Restart loop = non-recoverable error. DLQ + лимит restart attempts.

  9. Симптом-таблица: запомните mapping симптом -> проблема, чтобы быстро находить причину.

Проверка знанийKnowledge check
Сценарий в production: Flink-джоб обрабатывает события из 12 партиций Kafka. За последние 30 минут: lag растёт, во всех панелях метрик backpressure 700+ ms/s на оператор enrichment, throughput упал с 8k/sec до 3k/sec, checkpoint duration вырос с 8s до 95s, TaskManager-ы пока стабильны. Без понимания причины, какой план диагностики оптимален?
ОтветAnswer
Систематический план диагностики, от наиболее вероятных причин: Шаг 1 (1 минута): Проверить, не было ли deploy в последние 30 минут. git log --since='30 minutes ago' для repo джоба. Если был - откатиться через initialSavepointPath: первое подозрение - regression в enrichment коде. Шаг 2 (2 минуты): Проверить, не упирается ли enrichment в external dependency. - Если enrichment делает lookup в REST API: метрики этого API (latency, error rate). Возможно external service деградировал. - Если в DB: pg_stat_activity, slow queries. - Если в Redis: latency, memory. Шаг 3 (2 минуты): Проверить data skew. Метрика flink_taskmanager_job_task_operator_numRecordsInPerSecond по subtask_index для enrichment. Если один subtask имеет 5-10x больше records - skew. Тогда фикс: salting или pre-aggregation. См. pitfall #5. Шаг 4 (2 минуты): Проверить async I/O (если уже на async). - Если enrichment - sync, переписать на async (см. модуль 9 курса). - Если уже async, проверить queue capacity (capacity параметр AsyncDataStream.unorderedWait). Шаг 5 (5 минут): Включить unaligned checkpoints как workaround для checkpoint duration. Изменить flinkConfiguration: execution.checkpointing.unaligned: true, kubectl apply. Checkpoint duration быстро вернётся к норме (барьер не блокируется backpressure). Шаг 6 (минута): Проверить TM resources (CPU, heap). Если CPU 100% или heap 90%+ - увеличить ресурсы или parallelism. Шаг 7: Долгосрочно - capacity planning. Если throughput упал с 8k до 3k, и нет deploy, и нет external degradation, то rate per slot enrichment просто не успевает. Нужно: увеличить parallelism enrichment, или оптимизировать код, или предусмотреть burst handling. Что НЕ делать сразу: - НЕ рестартовать джоб без диагностики - симптомы могут вернуться через 5 минут. - НЕ менять много параметров одновременно - не поймёте, что помогло. Аналогии симптомов в шпаргалке: backpressure + checkpoint slow + lag growing = слабое звено в pipeline, обычно sink или single operator. Найти узкое место и масштабировать или оптимизировать.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Flink-джоб с keyBy(userId).process(...). За 3 месяца state checkpoint вырос с 100 MB до 8 GB при стабильном throughput 5k events/sec. Throughput не упал, нет рестартов, не было deploys. Что это и как фиксить?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5