Топ-10 production pitfalls в Flink
Каждый, кто бил Flink в production, проходил через одни и те же грабли. Этот урок — пройденный путь, спрессованный в 10 проблем с симптомами, диагностикой и фиксами. Если запомнить их и проверять при дизайне новых джобов — сэкономите недели.
1. Unbounded state: тихая бомба
Симптом: размер checkpoint растёт линейно во времени. Через недели/месяцы — OOM на TaskManager-ах или RocksDB не открывается.
Причина: keyed state без TTL. Например, ProcessFunction по userId, каждый новый пользователь добавляет ключ в RocksDB, и никто его не удаляет. У вас 10M активных пользователей в день, но историчных — 100M, и все они в state.
Фикс: StateTtlConfig на каждый state descriptor.
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(30))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(1000)
.build();
ValueStateDescriptor<UserProfile> desc =
new ValueStateDescriptor<>("user-profile", UserProfile.class);
desc.enableTimeToLive(ttlConfig);
Или явно state.clear() в ProcessFunction когда вы логически знаете, что ключ устарел (например, по timer).
Превентивно: alert на deriv(flink_jobmanager_job_lastCheckpointSize[1h]) > 1MB/час — заметите рост за дни до проблемы.
2. Late events: окно закрылось, данные потеряны
Симптом: в результатах windowed aggregation отсутствуют записи. Counts/sums меньше, чем ожидаете.
Причина: event arrived later than watermark (late event). Окно уже закрылось, событие отброшено.
Фикс: allowedLateness + sideOutputLateData.
OutputTag<Event> lateTag = new OutputTag<>("late-events"){};
SingleOutputStreamOperator<Result> agg = stream
.keyBy(e -> e.key)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(10)) // окно держится ещё 10 мин
.sideOutputLateData(lateTag) // совсем поздние - в side output
.aggregate(new SumAggregate());
DataStream<Event> lateEvents = agg.getSideOutput(lateTag);
// Лог или специальная обработка
allowedLateness отодвигает закрытие окна, давая опоздавшим событиям шанс быть включёнными. sideOutputLateData собирает совсем поздних в отдельный поток для анализа или backfill.
Метрика: flink_taskmanager_job_task_operator_numLateRecordsDropped — counter поздних событий. Должен быть около нуля при правильно настроенных watermarks.
3. Watermark stuck: всё стоит
Kafka consumer groups и partition assignmentСимптом: windowed агрегации перестали выдавать результаты. Web UI показывает watermark не двигается.
Причина: одна из партиций Kafka источника простаивает (нет новых событий). Watermark = min(per-partition watermark), поэтому одна “молчащая” партиция держит весь watermark.
Фикс: WatermarkStrategy.withIdleness(...).
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1)) // партиция idle если 1 мин без событий
.withTimestampAssigner((e, ts) -> e.eventTime);
Idle партиции исключаются из расчёта watermark, давая остальным двигаться. Не теряете данных в активных партициях.
Альтернатива: убедитесь, что в идеале нет idle партиций — но это часто не контролируете.
4. KryoSerializer fallback: молчаливая deg производительности
Симптом: throughput неожиданно ниже, чем должен быть. В логах Flink при старте: INFO ... is not a POJO type. Defining as a generic type. CPU upiraется в сериализацию.
Причина: ваш класс не подходит под POJO requirements (например, нет no-args constructor, нет setters). Flink молча использует Kryo, который в 5-10 раз медленнее.
Фикс: сделайте класс POJO:
// Не POJO (final fields, нет no-args constructor)
public final class Order {
public final long id;
public final String userId;
public Order(long id, String userId) { ... }
}
// POJO-compliant
public class Order {
public long id; // public field
public String userId;
public Order() {} // no-args constructor
}
Или используйте Avro / Protobuf — они и для serialization лучше Kryo.
Превентивно: установить pipeline.disable-generic-types: true в конфиге — Flink упадёт при попытке Kryo fallback, заставляя вас исправить класс.
flinkConfiguration:
pipeline.disable-generic-types: "true"
5. Data skew: одна партиция перегружена
Симптом: один TM-под занят 100% CPU, остальные idle. Throughput низкий. Per-operator backpressure на одной субзадаче (subtask) высокий.
Причина: keyBy по полю с неравномерным распределением. Например, keyBy(userId), но 1% “топ-пользователей” генерируют 50% событий.
Фикс 1: pre-aggregation перед keyBy — снижает rate перед партиционированием.
stream
.keyBy(e -> e.userId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.reduce(new EventReducer()) // local aggregation
.keyBy(e -> e.userId)
.process(new HeavyLogic());
Фикс 2: salting — искусственно разделить hot keys на несколько слотов.
stream
.map(e -> new Tuple2<>(e.userId + "_" + (e.hashCode() % 4), e)) // salt
.keyBy(t -> t.f0)
.process(...)
.map(t -> t.f1)
.keyBy(e -> e.userId)
.process(new FinalAggregation());
Метрика для диагностики: flink_taskmanager_job_task_operator_numRecordsInPerSecond по subtask_index. Если один subtask имеет 100x больше records — это skew.
6. OOM TaskManager: state не помещается в RocksDB managed memory
Симптом: TaskManager OOMKilled. В логах: Direct buffer memory или RocksDB CompactionStallException.
Причина: state больше RocksDB managed memory, idescriptors используют unmanaged native memory без лимита.
Фикс: explicit RocksDB memory tuning.
flinkConfiguration:
taskmanager.memory.process.size: "4096m"
taskmanager.memory.managed.fraction: "0.4" # 40% from process = 1638m для RocksDB
state.backend.rocksdb.memory.managed: "true"
state.backend.rocksdb.memory.high-prio-pool-ratio: "0.1"
state.backend.rocksdb.memory.write-buffer-ratio: "0.5"
memory.managed — это native memory pool, который RocksDB использует для block cache и write buffers. Без memory.managed: true RocksDB берёт native memory из off-heap unmanaged — а это не лимитировано и приводит к OOM container-а.
Также: если ваш state очень большой (100+ GB на TM), используйте state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM или твикайте RocksDB вручную.
7. JobManager OOM: ExecutionGraph слишком большой
Симптом: JM в CrashLoopBackOff с OOM. Кластер недоступен.
Причина: ExecutionGraph держится в памяти JM. Большой parallelism (parallelism × число операторов = много vertices) + накопившиеся metadata = OOM. Также утечки в Flink на некоторых версиях.
Фикс:
- Увеличить heap JM:
jobmanager.memory.process.size: "4096m"(default 1600m часто мало для большого графа). - Уменьшить parallelism, если возможно (более крупный per-slot work).
- Использовать operator chaining (по умолчанию on) — снижает число vertices.
Превентивно: alert на flink_jobmanager_Status_JVM_Memory_Heap_Used / flink_jobmanager_Status_JVM_Memory_Heap_Max > 0.85 — успеете увеличить до OOM.
8. Slow sink: backpressure до источника
Симптом: backpressure 800-1000 ms/s на sink-операторе. Lag к Kafka растёт.
Причина: sink (JDBC, REST, S3, Elasticsearch) медленнее, чем upstream production. Backpressure распространяется через весь граф до source — source перестаёт читать Kafka.
Фикс:
- Увеличить parallelism sink-а: если у JDBC 1 connection, поднять до 16 с pool.
- Batch writes: вместо INSERT per record — batch INSERT (
sink.batch.max-size: 1000). - Async sink: для I/O-bound sink-ов используйте AsyncSinkBase (Flink 1.15+) — non-blocking I/O с pipelining.
- Throttle source (как временный workaround):
kafka.consumer.max.poll.records: 100чтобы не задыхаться.
В долгосрочной перспективе: capacity planning sink-а должно быть на capacity peak load × 1.5 safety margin.
9. Checkpoint timeout: tail latency на S3
Симптом: flink_jobmanager_job_numFailedCheckpoints растёт. Failed checkpoints с reason “Checkpoint expired”.
Причина: один из subtask-ов очень медленно делает snapshot — обычно из-за S3 latency. Default timeout 10 минут, но при медленном S3 (тайл lat) можно не уложиться.
Фикс:
- Увеличить timeout:
execution.checkpointing.timeout: 30min. - Включить unaligned:
execution.checkpointing.unaligned: true— barrier-ы быстрее проходят через backpressured пайплайн. - Native incremental snapshots (Flink 2.0+):
state.savepoints.format: native— RocksDB делает incremental snapshot, быстрее full canonical. - Параллелизм S3 upload:
s3.upload.max.concurrent.uploads: 32(default обычно 4). - Сменить S3 на S3 Express One Zone (если AWS) — single-digit ms latency, в 5-10x быстрее регулярного S3 для high-throughput workload.
10. Restart loop из-за non-recoverable error
Симптом: flink_jobmanager_job_numRestarts растёт постоянно. Джоб никогда не реaches STABLE.
Причина: в коде есть exception, который выбрасывается каждый раз для одной и той же записи (например, NullPointerException при unexpected schema). Restart strategy перезапускает джоб, он сразу падает на той же записи.
Фикс:
- Поймать exception в operator-е и отправить запись в dead letter queue (Kafka topic или S3) для разбора:
public class SafeMapFunction extends RichMapFunction<Event, Result> {
@Override
public Result map(Event event) {
try {
return process(event);
} catch (Exception e) {
// send to DLQ
dlqProducer.send(event);
return null; // или специальное значение
}
}
}
- Restart strategy:
restart-strategy.fixed-delay.attempts: 3с delay — после 3 failures джоб остановится. Лучше явная остановка, чем infinite loop.
flinkConfiguration:
restart-strategy.type: "fixed-delay"
restart-strategy.fixed-delay.attempts: "3"
restart-strategy.fixed-delay.delay: "30s"
- Schema validation на source: если события могут быть с invalid schema, валидируйте при чтении (например, Schema Registry compatibility check), отбрасывайте invalid в DLQ.
Метрика: alert rate(flink_jobmanager_job_numRestarts[10m]) > 0.1 — > 1 restart за 10 мин уже стоит посмотреть.
Шпаргалка: симптом -> проблема
Ключевые выводы
-
Unbounded state — самая частая бомба. Всегда настраивайте StateTtlConfig или явно очищайте.
-
Late events и watermark stuck — частые проблемы event-time обработки. allowedLateness, sideOutputLateData, withIdleness — обязательные настройки.
-
KryoSerializer fallback молчаливо убивает производительность.
pipeline.disable-generic-types: trueловит проблему на старте. -
Data skew требует pre-aggregation или salting. Diagnostics — per-subtask records-in metric.
-
OOM TM и JM — разные причины. TM: RocksDB native memory, нужно
memory.managed: true. JM: размер ExecutionGraph, увеличить heap. -
Slow sink распространяет backpressure до источника. Поднять parallelism, batch writes, async sink.
-
Checkpoint timeout — обычно медленный S3 или большой state. Unaligned + native incremental + параллелизм upload.
-
Restart loop = non-recoverable error. DLQ + лимит restart attempts.
-
Симптом-таблица: запомните mapping симптом -> проблема, чтобы быстро находить причину.