Learning Platform
Глоссарий Troubleshooting
Урок 13.04 · 24 мин
Продвинутый
Edge CasesFailure RecoveryStuck TransactionsManual Interventiontransactional.id CollisionsOperations

Edge cases и failure recovery

В предыдущих уроках мы видели happy path exactly-once: Two-Phase Commit работает, Kafka transactions commit normally, Flink coordinates всё прозрачно. В этом финальном уроке модуля разбираем edge cases и failure scenarios, с которыми сталкиваются production teams: stuck transactions, transactional.id collisions, broker failures во время commit, manual recovery procedures.

Это операционная сторона exactly-once — не theory, а practice debugging production incidents.

Kafka transactional producer: ошибки и recovery

Edge case 1: Failover между prepare и commit

Самый сложный scenario. Sink уже сделал prepareCommit (records durable на Kafka в pre-commit state), checkpoint complete, но commitTransaction не успел.

Timeline:
  T0: Sink prepareCommit — records sent, durable, in pre-commit
  T1: JobManager notifies checkpoint complete
  T2: Sink committer scheduled to commit
  T3: TaskManager crashes BEFORE actual commitTransaction call
  
State:
  - Kafka coordinator: ONGOING transaction, не yet committed
  - Records на partition leaders: durable но invisible to read_committed
  - Flink checkpoint state: committables stored
  - Pending commit: should be done но TaskManager dead

Recovery scenarios:

Scenario A: Same TaskManager restarts
  - Reads checkpoint state
  - Sees committables for completed checkpoint
  - Replays commit() для each
  - Producer initTransactions с same TID
  - Kafka coordinator looks up TID, sees ONGOING, resumes
  - commitTransaction() proceeds
  - Records become visible
  - Exactly-once preserved

Scenario B: Different TaskManager replaces
  - Flink schedules new TaskManager
  - Reads checkpoint state
  - Sees committables
  - Calls commit() с saved metadata
  - Producer initTransactions с same TID prefix + same subtask suffix
  - Kafka coordinator может fence старый PID/epoch
  - But: с saved PID/epoch from checkpoint, new TM provides correct values
  - If coordinator state ONGOING — commits
  - If coordinator already aborted (timeout) — error, restart from older checkpoint

Edge case в edge case:
  - Coordinator может expire transaction между T0 и recovery time
  - Default timeout: 15 min
  - Если recovery > 15 min, transaction aborted automatically
  - Result: pre-commit data DISCARDED, records lost
  - Solution: increase transaction.timeout.ms

Edge case 2: Stuck transactions

Sometimes Kafka transactions можеt стрят stuck — neither committed nor aborted, blocking downstream consumers.

Indicators:
  - read_committed consumer заблокирован (waiting на pending transaction)
  - Lag growing на consumer
  - Kafka coordinator metrics show ONGOING transactions
  - Flink job appears healthy

Causes:
  1. TaskManager crash mid-prepare
     Recovery должен handle, но не handles fence error properly
  
  2. Kafka coordinator failover
     New coordinator might lose state if __transaction_state replication failed
  
  3. Network partition between Flink и Kafka
     Producer cannot commit, but doesn't abort gracefully
  
  4. Pool exhaustion с long-running stuck transactions
     New transactions cannot acquire TID

Manual recovery:
  Option 1: Kafka admin tools
    bin/kafka-transactions.sh \
      --bootstrap-server localhost:9092 \
      --command-config admin.properties \
      list
    # Shows all ongoing transactions
    
    bin/kafka-transactions.sh \
      --bootstrap-server localhost:9092 \
      --transactional-id "stuck-tid" \
      --command-config admin.properties \
      describe
    
    # If transaction stuck, abort manually
    bin/kafka-transactions.sh \
      --bootstrap-server localhost:9092 \
      --transactional-id "stuck-tid" \
      --command-config admin.properties \
      abort
  
  Option 2: From Flink side
    - Stop Flink job
    - Manual abort Kafka transactions
    - Restart Flink job from earlier checkpoint
    - Source replays records, new transactions

Edge case 3: Broker restart с TX in-flight

Scenario:
  T0: Producer.commitTransaction() called
  T1: Coordinator writes PREPARE_COMMIT to __transaction_state
  T2: Coordinator sends WriteTxnMarkerRequest к partition leaders
  T3: Some partition leaders write markers, others не yet
  T4: Coordinator broker crashes

Recovery:
  - New coordinator broker elected (Kafka KRaft)
  - Reads __transaction_state до crash
  - Sees state PREPARE_COMMIT
  - Resumes: sends WriteTxnMarkerRequest для partitions which haven't acked
  - Eventually all markers written
  - Writes COMPLETE_COMMIT

Edge case: __transaction_state partition replication failure
  - Если replication.factor=1 для __transaction_state, leader crash = data loss
  - Stuck transactions unrecoverable from broker side
  - MUST: replication.factor=3 для __transaction_state в production
  - MUST: min.insync.replicas=2 для durability

Flink side:
  - Producer.commitTransaction() pending или throws
  - On throw: Flink retries based on KafkaSink configuration
  - On success after retry: idempotent (same TID, same epoch)
  - No data loss или duplication

Edge case 4: transactional.id collisions

Scenario:
  Two Flink jobs использовать same TID prefix:
    Job A: subtask 0 uses "myprefix-0"
    Job B: subtask 0 uses "myprefix-0"  -- same!
  
  What happens:
  1. Job A starts, gets PID=42, epoch=1
  2. Job B starts, requests same TID
  3. Coordinator fences epoch 1, assigns epoch=2 к Job B
  4. Job A продолжает попытки send/commit — все fenced
  5. Job A producer disabled, throws
  6. Flink runtime fails subtask, restart attempt
  7. On restart Job A: gets epoch=3, fences Job B
  8. И так далее — endless fencing battle
  
Result:
  - Both jobs unstable
  - Constant restarts
  - No exactly-once

Prevention:
  - Unique TID prefix per job
  - Include environment, version, deployment info:
    sink.transactional.id.prefix: ${ENV}-${JOB_NAME}-${VERSION}-${DEPLOY_ID}
  
  Example values:
    "prod-fraud-detector-v1.2.3-20260501"
    "staging-analytics-v2.1.0-pr-1234"

Detection:
  - Monitor producer fencing errors в Flink logs
  - Frequent restarts of subtasks
  - Kafka broker metrics: high transaction-fenced count

Edge case 5: Long checkpoint causing transaction timeout

Scenario:
  Kafka transaction.timeout.ms: 60 min (default)
  Flink checkpoint interval: 60 sec
  But: actual checkpoint takes 5+ min из-за large state
  
  Timeline:
  T0: prepareCommit called, transaction begun
  T+5min: checkpoint still ongoing для some subtask с huge state
  T+10min: checkpoint completes finally
  T+10min+1: commitTransaction called
  
  Если transaction.timeout.ms = 60 min: OK
  Если transaction.timeout.ms = 5 min: ABORTED!
  
Effects:
  - commitTransaction throws ProducerFencedException
  - Flink retries from checkpoint (incomplete, since commit failed)
  - May cascade to longer checkpoint durations
  - Eventual job failure

Solution:
  Set transaction.timeout.ms > expected_max_checkpoint_duration + buffer
  Example:
    Expected max checkpoint: 10 min
    Buffer: 5 min для slow commits
    Set: 15-20 min minimum
  
  Также: server-side transaction.max.timeout.ms must be >= producer value
    Default: 15 min
    Increase if needed: kafka-configs.sh --alter ...

Edge case 6: Kafka cluster total failure

Scenario:
  All Kafka brokers crash (catastrophic)
  Or: Kafka cluster unreachable (network partition)

State:
  - Pending transactions in unknown state
  - Flink job stuck, cannot commit
  - Records on Kafka side: in pre-commit, не yet visible

Recovery (Kafka comes back):
  - All non-committed transactions abort после timeout
  - Pre-committed records discarded
  - Flink job restarts с last successful checkpoint
  - Source replays from saved offsets
  - New transactions established
  - Output proceeds

Data behavior:
  - No exactly-once violation
  - Records before last checkpoint: committed, visible
  - Records после last checkpoint: lost from Kafka, will be reprocessed
  - Net result: at most last_checkpoint_interval worth of "rolled back"

Operational concerns:
  - Total Kafka outage = downtime
  - Mitigation: multi-region Kafka, cross-region replication
  - Recovery time depends на Kafka recovery + Flink restart

Edge case 7: Partial commit (split brain)

Scenario:
  Network partition between coordinator и some partition leaders.
  Coordinator sends WriteTxnMarkerRequest к all leaders.
  Some leaders write commit markers successfully.
  Others don't receive (network partition).
  Coordinator times out waiting.
  
Result:
  - Some partitions: records visible (markers written)
  - Other partitions: records not visible (no markers)
  - "Partial commit" - read_committed consumers see inconsistent state

Recovery:
  - Coordinator retries WriteTxnMarkerRequest
  - With KRaft and proper replication, eventually all partitions get markers
  - Consistency restored

Edge case в edge case:
  - If partition leader crashes BEFORE writing marker AND coordinator already moved on
  - New leader elected
  - Coordinator detects this, retries marker write
  - Eventually consistent

In practice:
  - Properly configured Kafka (replication.factor=3, min.insync.replicas=2)
  - Partial commits very rare and self-recovering
  - Major issue только в misconfigured clusters

Manual recovery procedures

Procedure 1: Stuck transaction abort
  
  Step 1: Identify stuck transaction
    bin/kafka-transactions.sh --bootstrap-server kafka:9092 --command-config admin.properties list
    
  Step 2: Inspect details
    bin/kafka-transactions.sh ... --transactional-id "STUCK_TID" describe
    
  Step 3: Abort if confirmed stuck
    bin/kafka-transactions.sh ... --transactional-id "STUCK_TID" abort
    
  Step 4: Verify consumers unblock
    Check consumer lag, should resume

Procedure 2: Reset Flink job after manual abort
  
  Step 1: Identify abortion scope
    Which checkpoint contained the stuck commit?
    Was it before or after notifyCheckpointComplete?
  
  Step 2: Choose recovery point
    If commit was после notify: restore from after-notify checkpoint
    If commit was до notify: restore from earlier checkpoint
  
  Step 3: Manual restart with explicit savepoint
    bin/flink run \
      --fromSavepoint s3://.../savepoint-XXX \
      --allowNonRestoredState \
      <job-args>
  
  Step 4: Monitor commit retry
    Ensure new transactions commit successfully

Procedure 3: TID cleanup
  
  When changing job name or version, old TIDs may remain.
  
  Step 1: List producers
    bin/kafka-producers.sh --bootstrap-server kafka:9092 list
    
  Step 2: Identify old TIDs
    Look for prefixes from previous deployments
  
  Step 3: Force expire
    Wait for transactional.id.expiration.ms (default 7 days)
    Or modify broker config

Monitoring и alerts

Critical alerts:

1. Producer fencing rate
   alert: kafka_transaction_producer_fenced_count > 0 sustained
   action: investigate TID collisions, network issues

2. Pending transactions count
   alert: kafka_transaction_ongoing_count > expected
   action: stuck transactions, manual intervention may needed

3. Transaction abort rate
   alert: rate(kafka_transactions_aborted_count[5m]) > 0.1 (10% abort rate)
   action: timeout issues, network problems

4. Commit duration p99
   alert: kafka_transaction_commit_duration_p99 > 5s
   action: slow brokers, coordinator issues

5. Checkpoint duration vs transaction timeout
   alert: flink_checkpoint_duration_p99 > kafka_transaction_timeout * 0.5
   action: tune one or the other

6. Flink subtask restart frequency
   alert: rate(flink_subtask_restarts[10m]) > 0.01
   action: instability, possibly fence-related

Operational dashboards:
  - Kafka __transaction_state status
  - Active TIDs per Flink subtask
  - Transaction lifecycle (begin, prepare, commit, abort) rates
  - End-to-end exactly-once latency (record produce -> consumer visible)
Failure recovery flow
Records pre-committedNormal operation: prepareCommit happens, records durable в pre-commit state на Kafka brokers.
failure
TM crash mid-protocolTaskManager crashes between prepare и commit. Pending transaction в Kafka coordinator. Pre-commit data invisible.
Recovery: new TMFlink schedules new TaskManager. Restores from checkpoint state. Committables loaded.
resume
Commit retry с same TIDcommitter.commit(loaded committables). Producer initTransactions с saved TID. Coordinator finds ONGOING transaction, resumes.
proceed
Records become visiblecommitTransaction completes. Markers written. Records visible. Exactly-once preserved.
Timeout: transaction abortedIf recovery > transaction.timeout.ms: Kafka aborts transaction. Pre-commit data discarded. Source replays from saved offsets.
replay
Source replays + new TXSource reads from Kafka offsets (saved в checkpoint). Records reprocessed. New transactions created.
commit
Restored normallyNew transactions commit normally. Records visible (may be slight reordering). No data loss, no duplicates.

Best practices для production

1. Configuration matrix:
   Kafka:
     replication.factor: 3 (cluster level)
     min.insync.replicas: 2
     __transaction_state replication.factor: 3
     transaction.max.timeout.ms: 1h
   
   Flink:
     execution.checkpointing.mode: EXACTLY_ONCE
     execution.checkpointing.interval: 10s-60s (balance)
     execution.checkpointing.timeout: 10min (less than transaction timeout)
     execution.checkpointing.tolerable-failed-checkpoints: 5
   
   Producer:
     transaction.timeout.ms: 30min (more than max checkpoint duration)
     enable.idempotence: true
     acks: all

2. Unique TID prefix per job
   Include: environment, job name, version, deployment ID

3. Monitor transactional state metrics
   Set alerts на abort rate, pending count, fence count

4. Test failure scenarios в staging
   Kill TaskManagers mid-checkpoint
   Network partitions
   Kafka broker failures
   Verify recovery behavior

5. Document recovery procedures
   Runbooks для common edge cases
   On-call training

6. Capacity planning
   __transaction_state partition count
   Coordinator broker load
   TID pool sizes
WARNING

Manual transaction abort должен быть LAST RESORT. Это destructive operation — losing data from pre-commit phase. Перед abort убедиться, что transaction действительно stuck (не просто slow). Используйте Kafka metrics, check Flink logs, ensure recovery procedures attempted. Always have rollback plan через savepoints.

Чтение source

Flink source:
  flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/
    KafkaCommitter.java         -- commit logic с retry
    FlinkKafkaInternalProducer.java -- producer wrapper

Kafka source:
  kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/
    TransactionManager.java     -- producer transaction logic
    TxnRequestHandler.java      -- broker protocol

  kafka/core/src/main/scala/kafka/coordinator/transaction/
    TransactionCoordinator.scala -- coordinator state machine
    TransactionMarkerChannelManager.scala -- WriteTxnMarker logic

Operations tools:
  kafka/bin/
    kafka-transactions.sh       -- manage transactions manually

Production runbooks:
  Confluent docs: "Operations Guide для Exactly Once"
  AWS MSK docs: "Operating Kafka transactions in production"
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Что произойдёт если transaction.timeout.ms на producer меньше чем actual checkpoint duration?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4