Learning Platform
Глоссарий Troubleshooting
Урок 19.02 · 40 мин
Продвинутый
IngestionCDCAvroProtobufSchema RegistryKafkaSerializationSchema Evolution

Проектирование Ingestion Pipeline

Первый deliverable capstone-проекта — проектирование ingestion pipeline: от OLTP-базы через Kafka до landing zone в object storage. Ключевые решения: какой serialization формат, как настроить Schema Registry, какая стратегия schema evolution.

Требования из обзора проекта:

  • 10K events/sec steady, 50K peak (Black Friday)
  • Schema меняется ~2 раза в месяц
  • Consumer lag SLA: не более 5 минут
  • Три типа источников: CDC (PostgreSQL), clickstream, external APIs

Анатомия CDC-события

Прежде чем выбирать формат, разберём что именно мы сериализуем:

Структура CDC-события от Debezium

Debezium Change Event

Debezium CDC event — JSON-структура с envelope. Содержит before/after state строки, metadata операции, и source info (LSN, timestamp, connector name). Для production обязателен перевод в бинарный формат.
beforeСостояние строки ДО операции. null для INSERT. Для UPDATE и DELETE содержит все колонки (или только PK, зависит от конфигурации connector). Тип: полная schema таблицы.
afterСостояние строки ПОСЛЕ операции. null для DELETE. Для INSERT и UPDATE содержит все колонки в новом состоянии. Тип: полная schema таблицы — та же что before.
opТип операции: 'c' (create/INSERT), 'u' (update), 'd' (delete), 'r' (read — initial snapshot). Single char, но критичен для downstream обработки: определяет merge strategy в silver layer.
sourceMetadata источника: connector name, database, table, LSN (Log Sequence Number), timestamp. LSN обеспечивает exactly-once ordering в пределах одной таблицы.
ts_msTimestamp обработки события Debezium в миллисекундах. Не путать с source.ts_ms (время операции в базе). Разница между ними = replication lag.
NOTE

Debezium по умолчанию генерирует JSON. Это удобно для debugging, но в production неприемлемо: JSON не имеет schema enforcement, занимает в 3-5 раз больше места, парсинг медленнее. Переход на бинарный формат — первый шаг оптимизации. См. JSON limitations из Модуля 06.

Serialization: Avro vs Protobuf для CDC

Для Kafka-based CDC pipeline два реальных кандидата: Apache Avro и Protocol Buffers. Сравним их в контексте нашего сценария:

Avro vs Protobuf для CDC pipeline

Apache Avro

Apache Avro — row-based формат с встроенной schema. Schema идёт вместе с данными (или через Schema Registry). Нативная интеграция с Kafka + Confluent экосистемой.
Преимущества для CDC1) Schema Registry: нативная интеграция, avro-serializer встроен в Kafka clients. 2) Schema evolution: добавление nullable полей без breaking change. 3) Debezium: нативный Avro converter, zero-config. 4) Ecosystem: Spark, Flink, Hive — all read Avro natively.
Недостатки1) Размер: Avro schema verbose (JSON-based), хотя данные компактны. 2) Нет strong typing: union types вместо oneOf. 3) Нет RPC: только сериализация (но нам RPC не нужен). 4) Dynamic schema: reader должен знать writer schema — overhead для Schema Registry.

Protocol Buffers

Protocol Buffers — binary формат от Google. Schema определяется в .proto файлах, компилируется в code. Более компактный чем Avro для некоторых типов данных.
Преимущества для CDC1) Компактность: varint encoding для integers, нет field names в wire format. 2) Strong typing: .proto файлы как IDL. 3) Code generation: типизированные классы на любом языке. 4) Backward/forward: field numbers обеспечивают compatibility.
Недостатки1) Schema Registry: поддержка добавлена позже, не такая зрелая как для Avro. 2) Debezium: Protobuf converter существует, но реже используется в production. 3) Spark: чтение Protobuf из Kafka требует custom deserializer. 4) Compilation step: .proto → code adds build complexity.

Рекомендация для capstone-сценария

Decision: Avro для CDC pipeline

Решение: Avro

Для нашего capstone-сценария Avro — рациональный выбор. Не потому что 'лучше', а потому что: (1) Debezium + Schema Registry — zero-config path, (2) Spark/Flink consumers читают Avro нативно, (3) schema evolution через Schema Registry покрывает наши ~2 изменения/месяц.
Причина 1Debezium Avro converter — battle-tested в production. Настройка: один параметр (key.converter / value.converter = io.confluent.connect.avro.AvroConverter). Protobuf converter работает, но имеет меньше production deployments.
Причина 2Schema Registry для Avro — зрелая технология (Confluent с 2014 года). Compatibility modes (BACKWARD, FORWARD, FULL) работают из коробки. Protobuf support в Schema Registry добавлен в 2020 — функционально ок, но меньше operational опыта в сообществе.
Причина 3Все downstream consumers в нашей архитектуре (Spark Structured Streaming, Flink, Trino) имеют нативные Avro readers. Protobuf требует custom serde для Spark, что добавляет build complexity и потенциальные баги.
TIP

Если бы наш pipeline был gRPC-based (микросервисы вместо Kafka), Protobuf был бы более естественным выбором — единый IDL для RPC и сериализации. Контекст определяет решение. См. comparison из Модуля 05.

Schema Registry: конфигурация

Schema Registry — центральный компонент нашего ingestion pipeline. Он обеспечивает контракт между producers (Debezium) и consumers (Spark/Flink):

Schema Registry в CDC pipeline

Debezium (Producer)

Debezium (producer) — регистрирует schema при первом event, обновляет при schema change в PostgreSQL. Каждый event содержит schema ID (4 bytes) вместо полной schema — экономия 90%+ на размере message.

Clickstream (Producer)

Clickstream producer — отдельная Avro schema для web-событий. Schema зарегистрирована вручную перед деплоем нового event type. Более стабильная чем CDC — меняется реже.
Register / Lookup schema

Schema Registry

Confluent Schema Registry — HTTP-сервис, хранит Avro/Protobuf/JSON schemas с версионированием. Subject naming strategy: TopicNameStrategy (topic → schema). Compatibility check на каждый register. Хранит schemas в Kafka internal topic (_schemas).
Schema ID → Full schema

Spark (Consumer)

Spark Structured Streaming — consumer, который читает Avro events из Kafka. Использует Schema Registry для десериализации: по schema ID из message header получает полную schema, десериализует binary → Row.

Flink (Consumer)

Flink CDC consumer — альтернативный consumer для real-time processing. Та же Schema Registry интеграция. Может обрабатывать schema changes on-the-fly без restart.

Compatibility Mode

Ключевая настройка Schema Registry — compatibility mode. Для нашего CDC сценария:

Compatibility modes для capstone
BACKWARD (рекомендация)BACKWARD compatibility: новая schema может читать данные, записанные старой schema. Правило: можно добавлять поля с default value, можно удалять поля. Нельзя менять тип поля. Для CDC — оптимально: PostgreSQL ALTER TABLE ADD COLUMN → Avro schema добавляет nullable field → BACKWARD compatible.
FORWARDFORWARD compatibility: старая schema может читать данные, записанные новой schema. Обратная ситуация: consumer ещё не обновился, а producer уже пишет новую schema. Для CDC менее критично — consumers обновляются быстрее чем producers.
FULLFULL compatibility = BACKWARD + FORWARD одновременно. Максимальная safety, но и максимальные ограничения: можно только добавлять/удалять optional поля. Для capstone-сценария — overly restrictive, но допустимо если хотите максимум safety.
WARNING

Подробный разбор compatibility modes — в Модуле 10, Урок 02. Здесь мы применяем эти правила к нашему сценарию, а не повторяем теорию.

Рекомендация: BACKWARD compatibility для всех CDC subjects:

  • PostgreSQL ALTER TABLE ADD COLUMN → Avro schema добавляет nullable field → BACKWARD compatible
  • PostgreSQL ALTER TABLE DROP COLUMN → Avro schema удаляет field → BACKWARD compatible (новый reader не ожидает поля)
  • PostgreSQL ALTER TABLE ALTER COLUMN TYPEbreaking change → требует новый topic или ручную миграцию

Для clickstream subjects — FULL compatibility: schema меняется редко и контролируется нами.

Schema Design: Avro для CDC

Пример Avro schema для CDC-таблицы orders:

Avro Schema Design для CDC

PostgreSQL: orders table

PostgreSQL таблица orders: id (BIGINT), user_id (BIGINT), status (VARCHAR), total (NUMERIC), created_at (TIMESTAMP), updated_at (TIMESTAMP). Debezium маппит SQL types → Avro types.
Debezium → Avro mapping
Avro SchemaDebezium генерирует Avro schema автоматически. Namespace: database.server.schema.table. Каждая SQL-колонка → Avro field. NUMERIC → bytes с logical type decimal. TIMESTAMP → long с logical type timestamp-millis. VARCHAR → string. NULL mapping: union ['null', 'type'].
Envelope schema
CDC EnvelopeПолный CDC event = envelope schema. Содержит before (nullable record), after (nullable record), op (string), ts_ms (long), source (record с LSN, connector, db). Subject naming: {topic}-value → 'ecommerce.public.orders-value'.

Ключевые решения schema design

Schema design decisions
Subject Naming StrategyTopicNameStrategy (default): один subject на Kafka topic. Все таблицы из одного PostgreSQL → один topic = один subject. RecordNameStrategy: один subject на Avro record type — подходит для multi-table topics. Для нашего сценария: TopicNameStrategy, один topic на таблицу.
Decimal HandlingPostgreSQL NUMERIC → Avro bytes с logical type 'decimal' (precision, scale). Альтернатива: string (теряем type safety). Рекомендация: decimal для финансовых колонок (total, price), string для колонок с переменной precision. См. [Avro logical types](/storage-formats/04-avro/02-schema-type-system/).
Timestamp MappingPostgreSQL TIMESTAMP → Avro long с logical type 'timestamp-millis' (миллисекунды от epoch). TIMESTAMPTZ → то же, но timezone info теряется (Avro не имеет timezone-aware timestamp). Workaround: хранить timezone в отдельном string field или конвертировать в UTC.
Null HandlingAvro nullable field = union ['null', 'actualType']. Default value для nullable fields — 'null'. Это обеспечивает BACKWARD compatibility при ADD COLUMN: старые events не имеют нового поля, reader подставляет null. Все nullable PostgreSQL колонки → nullable Avro fields.

Consumer Design: от Kafka до Bronze

Последний блок ingestion — как consumer читает из Kafka и приземляет данные в bronze layer:

Consumer Pipeline: Kafka → Bronze

Kafka Topic (Avro CDC)

Kafka topic с Avro-encoded CDC events. Партиционирование по PK (id). Гарантия ordering в пределах партиции. Consumer group: spark-bronze-consumer.
Spark Structured Streaming

Spark Consumer

Spark Structured Streaming читает из Kafka, десериализует Avro через Schema Registry, добавляет metadata (ingestion_ts, kafka_offset, kafka_partition). Trigger: micro-batch каждые 30 секунд (balance между latency и throughput).
Deserialization + Metadata
Deserializationfrom_avro() UDF: binary Kafka value → struct. Schema lookup через Schema Registry REST API. Кеширование schema на executor (schema ID → schema map). Overhead: ~1ms на первый event нового schema, ~0 на последующие.
Metadata EnrichmentДобавляем к каждому event: ingestion_timestamp (when Spark processed it), kafka_topic, kafka_partition, kafka_offset. Эти поля нужны для debugging, reprocessing, и exactly-once semantics verification.
Write to Bronze

Bronze Layer (S3 + Table Format)

Bronze layer в S3/MinIO: файлы в выбранном формате (file format + table format). Партиционирование по ingestion_date (date from ingestion_timestamp). Append-only: никогда не обновляем bronze. Retention: 3 года.

Настройки Consumer

Consumer Configuration
Trigger IntervalMicro-batch trigger: 30 секунд. Баланс между latency (время от event в Kafka до файла в S3) и throughput (больше events per batch = эффективнее write). При 10K events/sec: ~300K events per batch. Consumer lag SLA 5 мин → 30 sec trigger даёт 10x запас.
CheckpointSpark checkpoint в S3: kafka offsets + write-ahead log. Обеспечивает exactly-once semantics при restart. Checkpoint interval = trigger interval. HDFS-compatible filesystem обязателен (S3 через S3A connector).
Error HandlingPoison messages (malformed Avro): dead-letter topic. Schema mismatch: SchemaRegistryClient.getById() throws → retry 3x, then dead-letter. Network errors: backoff retry (1s, 5s, 30s). Batch-level failure: restart from last checkpoint.
ScalingKafka consumer parallelism = число партиций. 12 партиций → 12 Spark tasks maximum. При 50K events/sec peak: 12 tasks × 30 sec batch = ~125K events/task — комфортно. Scale out: увеличить Kafka партиции + Spark executors.
TIP

Выбор file и table format для bronze layer — тема следующего урока. Здесь мы фокусируемся на ingestion: как данные попадают из PostgreSQL в Kafka и из Kafka — к порогу bronze layer.

Clickstream Pipeline

CDC — не единственный источник. Clickstream имеет другие характеристики:

Clickstream Pipeline Design

Web SDK (JSON events)

Web application отправляет events через SDK: page_view, click, search, add_to_cart, purchase. JSON payload. Объём: ~100K events/sec peak (в 10 раз больше чем CDC).

Gateway → Avro conversion

Gateway service: принимает JSON, валидирует, конвертирует в Avro, публикует в Kafka. Avro schema строго определена — gateway отклоняет events не matching schema. Rate limiting: 100K events/sec.
Avro-encoded events

Kafka: clickstream-events (24 partitions)

Kafka topic: clickstream-events. Партиционирование по user_id (хэш). Больше партиций чем CDC (24 vs 12) — выше throughput requirement. Retention: 3 дня (clickstream менее ценен для replay чем CDC).
Отличия от CDC1) Объём: 10x больше events/sec. 2) Schema: более стабильная (мы контролируем SDK). 3) Partitioning: по user_id (для session analysis) vs по PK (для ordering). 4) Retention: 3 дня vs 7 дней. 5) Consumer: может допускать at-least-once (idempotent dedup в silver layer).

Различия в schema strategy

ПараметрCDC PipelineClickstream Pipeline
Schema sourcePostgreSQL DDL → Debezium autoВручную определённая .avsc
CompatibilityBACKWARDFULL
Change frequency~2 раза/месяц (DDL changes)~1 раз/квартал (SDK update)
Evolution strategyAuto-register by DebeziumManual register before deploy
ValidationSchema Registry at write timeGateway + Schema Registry

Упражнение: проектирование ingestion

Ваша задача — описать ingestion pipeline для capstone-платформы. Ответьте на вопросы:

Checklist: Ingestion Design
1. SerializationКакой формат вы выбираете для CDC и почему? Для clickstream — тот же или другой? Какие trade-offs вы принимаете? Подкрепите ссылками на [Модуль 04](/storage-formats/04-avro/01-container-format/) или [Модуль 05](/storage-formats/05-serialization/01-protobuf-wire-format/).
2. Schema RegistryКакой compatibility mode для каждого subject? Naming strategy? Как обрабатываете breaking changes (ALTER COLUMN TYPE)? Подкрепите ссылками на [Модуль 10](/storage-formats/10-schema-evolution/03-schema-registry-architecture/).
3. Consumer DesignSpark Structured Streaming или Flink? Trigger interval? Error handling strategy? Как масштабируете при peak load (Black Friday)? Какой checkpoint mechanism?
4. OperationalКак мониторите consumer lag? Alert при lag > 5 мин? Как делаете schema change deployment (Debezium restart? rolling update?)? Как reprocess данные если обнаружен баг в consumer?
NOTE

Не существует единственного правильного ответа. Вы можете выбрать Protobuf вместо Avro, если обоснование убедительно. Можете выбрать Flink вместо Spark Structured Streaming, если аргументируете trade-offs. Ключ — обоснование, а не конкретный выбор.

Переход к Storage Layer

Ingestion pipeline заканчивается на пороге bronze layer: данные десериализованы, обогащены metadata, готовы к записи. В следующем уроке вы выберете:

  • File format для каждого слоя (Parquet vs ORC vs Arrow)
  • Table format (Delta Lake vs Iceberg vs Hudi)
  • Encoding strategy per column type
  • Compression codec per layer
  • Partitioning и sort order

Решения ingestion layer влияют на storage: если вы выбрали Avro для Kafka, consumer должен десериализовать Avro → Row и сериализовать Row → Parquet/ORC. Overhead этой conversion — один из факторов при выборе file format.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Для CDC pipeline (Debezium → Kafka) выбор между Avro и Protobuf. Schema меняется ~2 раза в месяц. Debezium connector уже в production. Что выбрать?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4