Split-level watermarks (Flink 2.1+)
В предыдущих уроках мы говорили о watermark на уровне source subtask — каждая subtask имеет один current watermark, который аggregates от всех её splits (Kafka partitions, file ranges). Это работает для базовой propagation logic, но даёт ограниченную observability. Если subtask читает 10 Kafka partitions, и одна из них stuck, в Web UI вы видите только “subtask watermark X” без info о том, какая partition является причиной.
Split-level watermarks (FLIP-217, Flink 2.1+) добавляют finer-grained tracking. Каждый split в source maintains свой собственный current watermark, exposed как метрика, и used для split-level alignment. Это даёт DevOps tools для production diagnostics и более granular control над bootstrap behavior.
Watermark alignment на практикеБез split-level: blind spot
Без split-level watermarks, single source subtask reading from 10 Kafka partitions имеет один current watermark = min от всех partition-level progress. Если 9 partitions catching up, и 1 stuck (например, due to skewed key distribution в той partition), вы видите:
SourceSubtask 0:
currentWatermark = 1700000000000 (lagged)
lastReceivedTimestamp = 1700000300000 (fresh)
Watermark lagged, потому что один из splits stuck. Но какой split? Стандартные метрики не говорят. Diagnostics требует digging в Kafka consumer logs, partition assignment, individual partition offsets — много manual work.
Split-level watermarks эту проблему решают. Каждый split exposed independently:
SourceSubtask 0:
currentWatermark = 1700000000000
splitWatermarks = {
"topic-1-partition-0": 1700000300000,
"topic-1-partition-1": 1700000280000,
...
"topic-1-partition-7": 1700000000000, <- THIS IS THE LAGGER
"topic-1-partition-9": 1700000290000
}
Immediately видно, что partition 7 stuck. Можно дальше investigate почему — например, что events с teми ключами идут только в эту partition, и они lagging.
API: getCurrentWatermark per split
В Source API (FLIP-27) каждый SplitReader exposes свой split state. Для watermark API:
public interface SplitsCurrentWatermarkable {
Map<String, Long> currentWatermarksOfSplits();
}
Это метод, который SourceReader implementations can implement (mixin pattern). Если implemented, Flink exposes split-level метрики через Web UI и REST API.
KafkaSource в Flink 2.1+ implements это:
public class KafkaSourceReader implements SplitsCurrentWatermarkable {
private Map<TopicPartition, Long> watermarks;
@Override
public Map<String, Long> currentWatermarksOfSplits() {
return watermarks.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey().toString(),
Map.Entry::getValue
));
}
}
Аналогично implementation для FileSource, KinesisSource, etc.
Observability через метрики
Каждый split-watermark exposed как Flink metric:
flink_taskmanager_job_task_operator_split_watermark{
job_name="my-job",
task_name="kafka-source",
subtask_index="0",
split_id="topic-events-partition-7"
} 1700000000000
В Grafana можно build dashboard:
- Per-partition watermark age (now - partition_watermark).
- Heatmap всех partitions, hot spots визуально видны.
- Alert: any partition watermark > 5 minutes behind subtask average.
Это критично для production: 99% production issues источников связаны с specific partitions (skew, replica fail, broker degradation), не with overall throughput.
Split-level alignment
В Flink 2.1+ watermark alignment расширен до split level. Это означает: не только source subtask может быть paused, но отдельные splits.
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withWatermarkAlignment("group", Duration.ofMinutes(2), Duration.ofSeconds(1))
// в 2.1+ это работает на split level автоматически
С split-level alignment, если 3 splits в subtask 0 advanced, а 2 splits laggin, только 3 fast splits paused. Subtask continues reading from 2 slow splits, поскольку downstream watermark still bottlenecked их progress.
Это subtle but important difference от subtask-level pause. Без split-level pause, subtask 0 либо paused fully (no progress at all) или running fully (state grows). С split-level, fast splits paused, slow splits continue — exactly то, что нужно.
Реализация: SourceReaderBase.pauseOrResumeSplits
В SourceReaderBase (base class for all FLIP-27 source readers) есть method:
public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume) {
splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
}
splitFetcherManager дерёт thread pool, который physically reads из splits. При pause он stops poll() на specific split, при resume — продолжает.
Координация: SourceCoordinator (на JobMaster) собирает per-split watermarks от всех subtasks, computes group min per split, distributes back. AlignmentAgent на TM делает pause/resume decisions per split.
Use case 1: production diagnostics
Most common use case — diagnostics watermark issues:
- Open Web UI -> Source operator -> Subtask 0 -> Watermark.
- See subtask watermark = X (stale).
- Expand split-level metrics.
- Identify split with lowest watermark.
- Investigate this specific split:
- Kafka: check partition offsets, broker health, consumer lag.
- File: check file size, read progress, IO health.
Без split-level это часто заканчивается “переподнять job, надеяться, что rebalance issue решит”. С split-level — точно identify root cause.
Use case 2: granular alignment
Bootstrap из historic data всегда uneven по splits. Split-level alignment даёт более efficient use of resources:
- Splits с less backlog catch up first, paused.
- Splits с much backlog continue reading.
- Total bootstrap time = max(split bootstrap times).
Без split-level: subtask paused as soon as any split advances. Lagging splits also paused (поскольку they’re in same subtask). Bootstrap time = sum или worse чем max.
Production result: 30-40% faster bootstrap для heavily skewed data с split-level alignment.
Use case 3: per-partition latency SLA
Some production systems have per-partition SLA. Например: “data из partition X must be processed within 1 minute”. С split-level watermarks можно alert: partition X watermark age > 1 minute.
Это allows distinguishing “global slow” from “specific partition slow”. На monitoring dashboard split-level age chart показывает which partitions consistently violating SLA — possibly need rebalance или skew investigation.
Limitations
Limitation 1: requires modern source connector. Not все connectors implement SplitsCurrentWatermarkable. Older connectors (pre-Flink 2.1, third-party) могут не expose split-level metrics. Check connector docs/code.
Limitation 2: metric cardinality. Per-split metrics increase total metric count. На source с 1000+ partitions это может overwhelm metrics backend (Prometheus, Datadog). Mitigate: aggregate split metrics в monitoring layer, retain raw только for last hour.
Limitation 3: split assignment changes. Если splits redistributed между subtasks (например, при rescale), per-split metric labels change. Это complicates time-series analysis. Best practice: aggregate by split ID, not by (subtask, split).
Limitation 4: not all sources have “splits”. Some sources are single-stream by nature (например, custom HTTP source). Для них split-level == subtask-level, нет дополнительной observability.
Configuration
# Включить split-level alignment
pipeline.watermark-alignment.split-level.enabled: true
# Update interval для split-level coordination
pipeline.watermark-alignment.split-level.update-interval: 1s
В 2.1+ defaults обычно sensible — split-level alignment включён, если SplitsCurrentWatermarkable implemented в source.
Если вы upgrade Flink с 2.0 на 2.1+, no API changes нужны для existing jobs. Split-level alignment работает automatically для compatible sources (KafkaSource, FileSource, KinesisSource). В Web UI появятся новые метрики split-level — используйте их для better diagnostics.
Чтение source
org.apache.flink.connector.base.source.reader.SourceReaderBase#pauseOrResumeSplits— split control.org.apache.flink.api.connector.source.SplitsCurrentWatermarkable— interface mixin.org.apache.flink.runtime.source.coordinator.WatermarkAlignmentManager— JM coordinator (extended в 2.1).org.apache.flink.connector.kafka.source.reader.KafkaSourceReader— example implementation.- FLIP-217 (Per-split watermark alignment) в Apache Flink Wiki — design doc.
Попробуй сам
-
Explore split-level metrics в Web UI. На running job с Kafka source (multi-partition) откройте Source -> Metrics -> filter “split_watermark”. Увидите per-partition watermarks. Compare freshness между partitions — typically один partition consistently behind.
-
Build Grafana dashboard. Query
flink_taskmanager_job_task_operator_split_watermark, group by split_id, plot as heatmap. Это даёт instant visual для production health. -
Test split-level alignment. На bootstrap scenario, compare time-to-catchup и max state size: subtask-level alignment (Flink 1.x) vs split-level (2.1+). На heavily skewed data ожидайте 30-40% speedup.