Edge cases и failure recovery
В предыдущих уроках мы видели happy path exactly-once: Two-Phase Commit работает, Kafka transactions commit normally, Flink coordinates всё прозрачно. В этом финальном уроке модуля разбираем edge cases и failure scenarios, с которыми сталкиваются production teams: stuck transactions, transactional.id collisions, broker failures во время commit, manual recovery procedures.
Это операционная сторона exactly-once — не theory, а practice debugging production incidents.
Kafka transactional producer: ошибки и recoveryEdge case 1: Failover между prepare и commit
Самый сложный scenario. Sink уже сделал prepareCommit (records durable на Kafka в pre-commit state), checkpoint complete, но commitTransaction не успел.
Timeline:
T0: Sink prepareCommit — records sent, durable, in pre-commit
T1: JobManager notifies checkpoint complete
T2: Sink committer scheduled to commit
T3: TaskManager crashes BEFORE actual commitTransaction call
State:
- Kafka coordinator: ONGOING transaction, не yet committed
- Records на partition leaders: durable но invisible to read_committed
- Flink checkpoint state: committables stored
- Pending commit: should be done но TaskManager dead
Recovery scenarios:
Scenario A: Same TaskManager restarts
- Reads checkpoint state
- Sees committables for completed checkpoint
- Replays commit() для each
- Producer initTransactions с same TID
- Kafka coordinator looks up TID, sees ONGOING, resumes
- commitTransaction() proceeds
- Records become visible
- Exactly-once preserved
Scenario B: Different TaskManager replaces
- Flink schedules new TaskManager
- Reads checkpoint state
- Sees committables
- Calls commit() с saved metadata
- Producer initTransactions с same TID prefix + same subtask suffix
- Kafka coordinator может fence старый PID/epoch
- But: с saved PID/epoch from checkpoint, new TM provides correct values
- If coordinator state ONGOING — commits
- If coordinator already aborted (timeout) — error, restart from older checkpoint
Edge case в edge case:
- Coordinator может expire transaction между T0 и recovery time
- Default timeout: 15 min
- Если recovery > 15 min, transaction aborted automatically
- Result: pre-commit data DISCARDED, records lost
- Solution: increase transaction.timeout.ms
Edge case 2: Stuck transactions
Sometimes Kafka transactions можеt стрят stuck — neither committed nor aborted, blocking downstream consumers.
Indicators:
- read_committed consumer заблокирован (waiting на pending transaction)
- Lag growing на consumer
- Kafka coordinator metrics show ONGOING transactions
- Flink job appears healthy
Causes:
1. TaskManager crash mid-prepare
Recovery должен handle, но не handles fence error properly
2. Kafka coordinator failover
New coordinator might lose state if __transaction_state replication failed
3. Network partition between Flink и Kafka
Producer cannot commit, but doesn't abort gracefully
4. Pool exhaustion с long-running stuck transactions
New transactions cannot acquire TID
Manual recovery:
Option 1: Kafka admin tools
bin/kafka-transactions.sh \
--bootstrap-server localhost:9092 \
--command-config admin.properties \
list
# Shows all ongoing transactions
bin/kafka-transactions.sh \
--bootstrap-server localhost:9092 \
--transactional-id "stuck-tid" \
--command-config admin.properties \
describe
# If transaction stuck, abort manually
bin/kafka-transactions.sh \
--bootstrap-server localhost:9092 \
--transactional-id "stuck-tid" \
--command-config admin.properties \
abort
Option 2: From Flink side
- Stop Flink job
- Manual abort Kafka transactions
- Restart Flink job from earlier checkpoint
- Source replays records, new transactions
Edge case 3: Broker restart с TX in-flight
Scenario:
T0: Producer.commitTransaction() called
T1: Coordinator writes PREPARE_COMMIT to __transaction_state
T2: Coordinator sends WriteTxnMarkerRequest к partition leaders
T3: Some partition leaders write markers, others не yet
T4: Coordinator broker crashes
Recovery:
- New coordinator broker elected (Kafka KRaft)
- Reads __transaction_state до crash
- Sees state PREPARE_COMMIT
- Resumes: sends WriteTxnMarkerRequest для partitions which haven't acked
- Eventually all markers written
- Writes COMPLETE_COMMIT
Edge case: __transaction_state partition replication failure
- Если replication.factor=1 для __transaction_state, leader crash = data loss
- Stuck transactions unrecoverable from broker side
- MUST: replication.factor=3 для __transaction_state в production
- MUST: min.insync.replicas=2 для durability
Flink side:
- Producer.commitTransaction() pending или throws
- On throw: Flink retries based on KafkaSink configuration
- On success after retry: idempotent (same TID, same epoch)
- No data loss или duplication
Edge case 4: transactional.id collisions
Scenario:
Two Flink jobs использовать same TID prefix:
Job A: subtask 0 uses "myprefix-0"
Job B: subtask 0 uses "myprefix-0" -- same!
What happens:
1. Job A starts, gets PID=42, epoch=1
2. Job B starts, requests same TID
3. Coordinator fences epoch 1, assigns epoch=2 к Job B
4. Job A продолжает попытки send/commit — все fenced
5. Job A producer disabled, throws
6. Flink runtime fails subtask, restart attempt
7. On restart Job A: gets epoch=3, fences Job B
8. И так далее — endless fencing battle
Result:
- Both jobs unstable
- Constant restarts
- No exactly-once
Prevention:
- Unique TID prefix per job
- Include environment, version, deployment info:
sink.transactional.id.prefix: ${ENV}-${JOB_NAME}-${VERSION}-${DEPLOY_ID}
Example values:
"prod-fraud-detector-v1.2.3-20260501"
"staging-analytics-v2.1.0-pr-1234"
Detection:
- Monitor producer fencing errors в Flink logs
- Frequent restarts of subtasks
- Kafka broker metrics: high transaction-fenced count
Edge case 5: Long checkpoint causing transaction timeout
Scenario:
Kafka transaction.timeout.ms: 60 min (default)
Flink checkpoint interval: 60 sec
But: actual checkpoint takes 5+ min из-за large state
Timeline:
T0: prepareCommit called, transaction begun
T+5min: checkpoint still ongoing для some subtask с huge state
T+10min: checkpoint completes finally
T+10min+1: commitTransaction called
Если transaction.timeout.ms = 60 min: OK
Если transaction.timeout.ms = 5 min: ABORTED!
Effects:
- commitTransaction throws ProducerFencedException
- Flink retries from checkpoint (incomplete, since commit failed)
- May cascade to longer checkpoint durations
- Eventual job failure
Solution:
Set transaction.timeout.ms > expected_max_checkpoint_duration + buffer
Example:
Expected max checkpoint: 10 min
Buffer: 5 min для slow commits
Set: 15-20 min minimum
Также: server-side transaction.max.timeout.ms must be >= producer value
Default: 15 min
Increase if needed: kafka-configs.sh --alter ...
Edge case 6: Kafka cluster total failure
Scenario:
All Kafka brokers crash (catastrophic)
Or: Kafka cluster unreachable (network partition)
State:
- Pending transactions in unknown state
- Flink job stuck, cannot commit
- Records on Kafka side: in pre-commit, не yet visible
Recovery (Kafka comes back):
- All non-committed transactions abort после timeout
- Pre-committed records discarded
- Flink job restarts с last successful checkpoint
- Source replays from saved offsets
- New transactions established
- Output proceeds
Data behavior:
- No exactly-once violation
- Records before last checkpoint: committed, visible
- Records после last checkpoint: lost from Kafka, will be reprocessed
- Net result: at most last_checkpoint_interval worth of "rolled back"
Operational concerns:
- Total Kafka outage = downtime
- Mitigation: multi-region Kafka, cross-region replication
- Recovery time depends на Kafka recovery + Flink restart
Edge case 7: Partial commit (split brain)
Scenario:
Network partition between coordinator и some partition leaders.
Coordinator sends WriteTxnMarkerRequest к all leaders.
Some leaders write commit markers successfully.
Others don't receive (network partition).
Coordinator times out waiting.
Result:
- Some partitions: records visible (markers written)
- Other partitions: records not visible (no markers)
- "Partial commit" - read_committed consumers see inconsistent state
Recovery:
- Coordinator retries WriteTxnMarkerRequest
- With KRaft and proper replication, eventually all partitions get markers
- Consistency restored
Edge case в edge case:
- If partition leader crashes BEFORE writing marker AND coordinator already moved on
- New leader elected
- Coordinator detects this, retries marker write
- Eventually consistent
In practice:
- Properly configured Kafka (replication.factor=3, min.insync.replicas=2)
- Partial commits very rare and self-recovering
- Major issue только в misconfigured clusters
Manual recovery procedures
Procedure 1: Stuck transaction abort
Step 1: Identify stuck transaction
bin/kafka-transactions.sh --bootstrap-server kafka:9092 --command-config admin.properties list
Step 2: Inspect details
bin/kafka-transactions.sh ... --transactional-id "STUCK_TID" describe
Step 3: Abort if confirmed stuck
bin/kafka-transactions.sh ... --transactional-id "STUCK_TID" abort
Step 4: Verify consumers unblock
Check consumer lag, should resume
Procedure 2: Reset Flink job after manual abort
Step 1: Identify abortion scope
Which checkpoint contained the stuck commit?
Was it before or after notifyCheckpointComplete?
Step 2: Choose recovery point
If commit was после notify: restore from after-notify checkpoint
If commit was до notify: restore from earlier checkpoint
Step 3: Manual restart with explicit savepoint
bin/flink run \
--fromSavepoint s3://.../savepoint-XXX \
--allowNonRestoredState \
<job-args>
Step 4: Monitor commit retry
Ensure new transactions commit successfully
Procedure 3: TID cleanup
When changing job name or version, old TIDs may remain.
Step 1: List producers
bin/kafka-producers.sh --bootstrap-server kafka:9092 list
Step 2: Identify old TIDs
Look for prefixes from previous deployments
Step 3: Force expire
Wait for transactional.id.expiration.ms (default 7 days)
Or modify broker config
Monitoring и alerts
Critical alerts:
1. Producer fencing rate
alert: kafka_transaction_producer_fenced_count > 0 sustained
action: investigate TID collisions, network issues
2. Pending transactions count
alert: kafka_transaction_ongoing_count > expected
action: stuck transactions, manual intervention may needed
3. Transaction abort rate
alert: rate(kafka_transactions_aborted_count[5m]) > 0.1 (10% abort rate)
action: timeout issues, network problems
4. Commit duration p99
alert: kafka_transaction_commit_duration_p99 > 5s
action: slow brokers, coordinator issues
5. Checkpoint duration vs transaction timeout
alert: flink_checkpoint_duration_p99 > kafka_transaction_timeout * 0.5
action: tune one or the other
6. Flink subtask restart frequency
alert: rate(flink_subtask_restarts[10m]) > 0.01
action: instability, possibly fence-related
Operational dashboards:
- Kafka __transaction_state status
- Active TIDs per Flink subtask
- Transaction lifecycle (begin, prepare, commit, abort) rates
- End-to-end exactly-once latency (record produce -> consumer visible)
Best practices для production
1. Configuration matrix:
Kafka:
replication.factor: 3 (cluster level)
min.insync.replicas: 2
__transaction_state replication.factor: 3
transaction.max.timeout.ms: 1h
Flink:
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 10s-60s (balance)
execution.checkpointing.timeout: 10min (less than transaction timeout)
execution.checkpointing.tolerable-failed-checkpoints: 5
Producer:
transaction.timeout.ms: 30min (more than max checkpoint duration)
enable.idempotence: true
acks: all
2. Unique TID prefix per job
Include: environment, job name, version, deployment ID
3. Monitor transactional state metrics
Set alerts на abort rate, pending count, fence count
4. Test failure scenarios в staging
Kill TaskManagers mid-checkpoint
Network partitions
Kafka broker failures
Verify recovery behavior
5. Document recovery procedures
Runbooks для common edge cases
On-call training
6. Capacity planning
__transaction_state partition count
Coordinator broker load
TID pool sizes
Manual transaction abort должен быть LAST RESORT. Это destructive operation — losing data from pre-commit phase. Перед abort убедиться, что transaction действительно stuck (не просто slow). Используйте Kafka metrics, check Flink logs, ensure recovery procedures attempted. Always have rollback plan через savepoints.
Чтение source
Flink source:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/
KafkaCommitter.java -- commit logic с retry
FlinkKafkaInternalProducer.java -- producer wrapper
Kafka source:
kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/
TransactionManager.java -- producer transaction logic
TxnRequestHandler.java -- broker protocol
kafka/core/src/main/scala/kafka/coordinator/transaction/
TransactionCoordinator.scala -- coordinator state machine
TransactionMarkerChannelManager.scala -- WriteTxnMarker logic
Operations tools:
kafka/bin/
kafka-transactions.sh -- manage transactions manually
Production runbooks:
Confluent docs: "Operations Guide для Exactly Once"
AWS MSK docs: "Operating Kafka transactions in production"