Ingestion: как затащить данные в DWH
Ingestion — это стадия, где данные переезжают из источника в твоё хранилище. Это первая инженерная работа DE: всё остальное (storage, transform) не имеет смысла, если ingestion не работает.
В этом уроке — основные оси решений: batch vs stream, pull vs push, full vs incremental — и обзор источников.
Главные оси решений
Каждое решение — это трейдоффы. Нет «лучшего» варианта, есть подходящий для задачи.
Batch vs Streaming
Batch — данные приходят пачками через интервалы: каждый час / день / неделя. «Сегодня в 4 утра запускается ingestion заказов за вчерашний день».
Streaming — данные приходят event-by-event в реальном времени. «Каждая транзакция Stripe в течение секунды попадает в DWH».
Правило: начни с batch. 90% задач решаются batch’ем. Streaming — это дорогая инфра и сложный код. Не делай streaming, пока бизнес-юзер не потребовал конкретно «надо в реальном времени». Углубление в стриминг — модуль 09-batch-vs-streaming и kafka-course / flink-course.
Pull vs Push
Pull — DE-система тянет данные. Раз в час Fivetran отправляет SQL-запрос в Salesforce: «дай мне все contacts, обновлённые после X». DE контролирует частоту и логику.
Push — источник отправляет данные сам. Stripe настраивает webhook: «при каждой транзакции — POST на твой URL». DE контролирует только endpoint.
В реальности часто гибрид: webhook (push) пишет в Kafka, оттуда DE-система достаёт (pull) и грузит в DWH.
Full vs Incremental
Full load — каждый раз вытаскиваем всю таблицу целиком. Просто, но дорого, если таблица большая.
Incremental load — только дельта с прошлого раза. Используем поле updated_at или CDC. Сложнее, но дёшево.
-- Full load: каждый раз
SELECT * FROM source.orders;
-- 100M строк каждый раз -> дорого
-- Incremental: только новое
SELECT * FROM source.orders
WHERE updated_at > '2026-05-17 10:00:00';
-- Только 10k строк -> дёшево
CDC — Change Data Capture
Особый случай incremental — CDC. Источник (Postgres, MySQL) пишет в свой transaction log (WAL, binlog) каждое изменение. DE-система читает этот лог и применяет изменения в DWH.
Плюсы CDC:
- Low latency — изменения видны через секунды.
- Видим удаления — DELETE-операции тоже в логе.
- Не нагружаем source-БД — читаем лог, не select по таблицам.
Минусы:
- Сложная инфра — обычно нужен Kafka и Debezium.
- Высокая цена — на маленьких объёмах overkill.
Углубление — наш debezium-course.
Источники: что бывает
Инструменты ingestion
Кратко (подробнее — в M01.04):
- Fivetran — managed SaaS, лучший для standard источников. Дорого.
- Airbyte — open-source аналог. Self-hosted и cloud.
- dlt — Python библиотека для custom-коннекторов.
- Debezium + Kafka — CDC из БД.
- Custom Python в Airflow — для уникальных случаев.
Реальный пример: ingestion из Stripe
Сценарий: e-commerce, нужно грузить транзакции из Stripe в Snowflake.
Вариант А: Fivetran (push не нужен)
# Setup в Fivetran UI
Source: Stripe
Destination: Snowflake
Schema: raw.stripe
Schedule: каждые 15 минут
Sync mode: incremental (через updated_at)
Что делает Fivetran:
- каждые 15 минут зовёт Stripe API
- подтягивает все объекты (charges, customers, invoices), изменённые с прошлого раза
- кладёт в
raw.stripe.charges,raw.stripe.customersв Snowflake - обрабатывает schema changes, retries, rate limits
Вариант B: Webhook + Kafka (real-time)
Stripe webhook -> API Gateway -> Kafka topic 'stripe-events'
Kafka -> Snowflake Snowpipe (auto-ingest)
Что получаем: секунды от транзакции до DWH. Сложнее в эксплуатации (Kafka надо держать живым), дороже в инфре.
Что выбрать? Для большинства задач (дашборды, ML с дневным циклом) — Fivetran с 15-минутным интервалом. Для антифрода или real-time мониторинга — Kafka + webhooks.
Распространённые грабли
Главные ошибки джунов в ingestion:
- Не думают про дубликаты. Webhook от Stripe может прилететь дважды. Без idempotency-логики у тебя в DWH две записи об одной транзакции.
- Игнорируют schema changes. Источник добавил колонку в Salesforce — твой пайплайн упал. Нужен план: автоматически добавлять или алертить.
- Грузят full load на больших таблицах. 100M строк full каждый час — это сжигает бюджет Snowflake.
- Не следят за rate limits. Salesforce API имеет лимит. Когда упрёшься — Fivetran встаёт.
- Не алертят на failures. Пайплайн упал, никто не знает — данные в дашборде неактуальны, бизнес теряет доверие.
Попробуй сам
- Открой документацию Stripe Webhooks или Fivetran connector list. Посчитай: сколько API источников у твоей компании (или воображаемой). Какие из них требуют real-time, какие — batch?
- Напиши на бумаге pseudo-код incremental ingestion: «У меня есть таблица
ordersсupdated_at. Как я каждый час буду тянуть только новые/изменённые строки?» Подумай: где хранить high-watermark (последнее загруженноеupdated_at)?