Learning Platform
Глоссарий Troubleshooting
Урок 04.02 · 25 мин
Начальный
IngestionCDCBatchStreaming

Ingestion: как затащить данные в DWH

Ingestion — это стадия, где данные переезжают из источника в твоё хранилище. Это первая инженерная работа DE: всё остальное (storage, transform) не имеет смысла, если ingestion не работает.

В этом уроке — основные оси решений: batch vs stream, pull vs push, full vs incremental — и обзор источников.


Главные оси решений

Оси ingestion-дизайна
Batch vs Streaming
Pull vs Push
Full vs Incremental
Schema-on-read vs schema-on-write

Каждое решение — это трейдоффы. Нет «лучшего» варианта, есть подходящий для задачи.


Batch vs Streaming

Batch — данные приходят пачками через интервалы: каждый час / день / неделя. «Сегодня в 4 утра запускается ingestion заказов за вчерашний день».

Streaming — данные приходят event-by-event в реальном времени. «Каждая транзакция Stripe в течение секунды попадает в DWH».

Batch vs Streaming
Ось
Batch
Streaming
TIP

Правило: начни с batch. 90% задач решаются batch’ем. Streaming — это дорогая инфра и сложный код. Не делай streaming, пока бизнес-юзер не потребовал конкретно «надо в реальном времени». Углубление в стриминг — модуль 09-batch-vs-streaming и kafka-course / flink-course.

Kafka как распределённый commit log: концепция streaming ingestion

Pull vs Push

Pull — DE-система тянет данные. Раз в час Fivetran отправляет SQL-запрос в Salesforce: «дай мне все contacts, обновлённые после X». DE контролирует частоту и логику.

Push — источник отправляет данные сам. Stripe настраивает webhook: «при каждой транзакции — POST на твой URL». DE контролирует только endpoint.

Pull vs Push
Pull
Push

В реальности часто гибрид: webhook (push) пишет в Kafka, оттуда DE-система достаёт (pull) и грузит в DWH.


Full vs Incremental

Full load — каждый раз вытаскиваем всю таблицу целиком. Просто, но дорого, если таблица большая.

Incremental load — только дельта с прошлого раза. Используем поле updated_at или CDC. Сложнее, но дёшево.

-- Full load: каждый раз
SELECT * FROM source.orders;
-- 100M строк каждый раз -> дорого

-- Incremental: только новое
SELECT * FROM source.orders
WHERE updated_at > '2026-05-17 10:00:00';
-- Только 10k строк -> дёшево
Full vs Incremental
Full Load
Incremental

CDC — Change Data Capture

Особый случай incremental — CDC. Источник (Postgres, MySQL) пишет в свой transaction log (WAL, binlog) каждое изменение. DE-система читает этот лог и применяет изменения в DWH.

Плюсы CDC:

  • Low latency — изменения видны через секунды.
  • Видим удаления — DELETE-операции тоже в логе.
  • Не нагружаем source-БД — читаем лог, не select по таблицам.

Минусы:

  • Сложная инфра — обычно нужен Kafka и Debezium.
  • Высокая цена — на маленьких объёмах overkill.

Углубление — наш debezium-course.

CDC с Debezium: что такое Change Data Capture и как это работает Kafka Connect Source Connectors: ingestion в промышленных масштабах

Источники: что бывает

Типы источников
Транзакционные БД
REST/GraphQL API
Файлы
События
Webhooks
Логи

Инструменты ingestion

Кратко (подробнее — в M01.04):

  • Fivetran — managed SaaS, лучший для standard источников. Дорого.
  • Airbyte — open-source аналог. Self-hosted и cloud.
  • dlt — Python библиотека для custom-коннекторов.
  • Debezium + Kafka — CDC из БД.
  • Custom Python в Airflow — для уникальных случаев.

Реальный пример: ingestion из Stripe

Сценарий: e-commerce, нужно грузить транзакции из Stripe в Snowflake.

Вариант А: Fivetran (push не нужен)

# Setup в Fivetran UI
Source: Stripe
Destination: Snowflake
Schema: raw.stripe
Schedule: каждые 15 минут
Sync mode: incremental (через updated_at)

Что делает Fivetran:

  • каждые 15 минут зовёт Stripe API
  • подтягивает все объекты (charges, customers, invoices), изменённые с прошлого раза
  • кладёт в raw.stripe.charges, raw.stripe.customers в Snowflake
  • обрабатывает schema changes, retries, rate limits

Вариант B: Webhook + Kafka (real-time)

Stripe webhook -> API Gateway -> Kafka topic 'stripe-events'
Kafka -> Snowflake Snowpipe (auto-ingest)

Что получаем: секунды от транзакции до DWH. Сложнее в эксплуатации (Kafka надо держать живым), дороже в инфре.

Что выбрать? Для большинства задач (дашборды, ML с дневным циклом) — Fivetran с 15-минутным интервалом. Для антифрода или real-time мониторинга — Kafka + webhooks.


Распространённые грабли

WARNING

Главные ошибки джунов в ingestion:

  1. Не думают про дубликаты. Webhook от Stripe может прилететь дважды. Без idempotency-логики у тебя в DWH две записи об одной транзакции.
  2. Игнорируют schema changes. Источник добавил колонку в Salesforce — твой пайплайн упал. Нужен план: автоматически добавлять или алертить.
  3. Грузят full load на больших таблицах. 100M строк full каждый час — это сжигает бюджет Snowflake.
  4. Не следят за rate limits. Salesforce API имеет лимит. Когда упрёшься — Fivetran встаёт.
  5. Не алертят на failures. Пайплайн упал, никто не знает — данные в дашборде неактуальны, бизнес теряет доверие.

Попробуй сам

  1. Открой документацию Stripe Webhooks или Fivetran connector list. Посчитай: сколько API источников у твоей компании (или воображаемой). Какие из них требуют real-time, какие — batch?
  2. Напиши на бумаге pseudo-код incremental ingestion: «У меня есть таблица orders с updated_at. Как я каждый час буду тянуть только новые/изменённые строки?» Подумай: где хранить high-watermark (последнее загруженное updated_at)?
Проверка знанийKnowledge check
Почему incremental ingestion с использованием поля 'updated_at' не позволяет корректно отслеживать удаления (DELETE) в источнике, и какие есть workaround'ы?
ОтветAnswer
Incremental ingestion по 'updated_at' ловит INSERT'ы и UPDATE'ы (любое изменение апдейтит timestamp), но если строка физически удалена из source-таблицы, её нет в результате запроса, и в DWH она остаётся «вечной». Несколько workaround'ов: 1) Soft-delete: вместо DELETE источник делает UPDATE с флагом is_deleted=true и обновляет updated_at — incremental ловит изменение. 2) Periodic full refresh: раз в день/неделю полная перезагрузка таблицы, ловит удаления. 3) CDC (Change Data Capture): читаем transaction log БД, который содержит DELETE-операции явно. 4) Reconciliation jobs: периодически сравниваем COUNT(*) в source и DWH, флаги расхождения для ручной обработки. Выбор зависит от размера таблицы, требований к latency и контроля над схемой источника.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Какой подход к ingestion стоит выбрать для большинства задач в первую очередь, и почему?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5