Источники данных в типичной компании
Когда DE приходит в новый проект, первый вопрос — «что у нас за источники?». Источников обычно много, они разные по характеру, и для каждого нужен свой подход.
В этом уроке — обзор шести главных типов источников с примерами.
Карта источников
1. Транзакционные БД
Главный источник для большинства компаний. Production-БД, где приложение пишет заказы, пользователей, продукты.
Типичные технологии
- Postgres — самая популярная open-source. Богатый функционал.
- MySQL — широко распространена в web.
- MongoDB — NoSQL, JSON-документы.
- Oracle / SQL Server — enterprise legacy.
- DynamoDB / Cosmos DB — managed NoSQL в облаках.
Как DE с ними работает
Есть три подхода:
Что выбрать:
- Малые таблицы — JDBC pull через Fivetran/Airbyte.
- Большие таблицы, нужна свежесть — read replica + Fivetran/CDC.
- Real-time нужен — CDC через Debezium + Kafka.
Углубление CDC — debezium-course.
Грабли
- Не делай SELECT в production. Используй replica или CDC.
- Не забудь updated_at. Без него incremental невозможен.
- Smart-on-delete. Если в источнике hard delete, без CDC ты не узнаешь.
2. REST / GraphQL API
API-источники: SaaS-сервисы, внутренние микросервисы, внешние партнёры.
Примеры
- Salesforce — CRM.
- Stripe — платежи.
- Hubspot — маркетинг.
- Google Ads / Facebook Ads — реклама.
- Mixpanel / Amplitude — продуктовая аналитика.
Как работают
REST API — пишем HTTP-запрос, получаем JSON:
curl -H "Authorization: Bearer $TOKEN" \
https://api.stripe.com/v1/charges?created[gte]=1715817600
# Возвращает JSON с charges
DE-задачи:
- Аутентификация — OAuth, API keys, JWT.
- Пагинация — большой результат разбит по страницам.
- Rate limits — API не любит, когда долбят. 100 req/sec — обычный лимит.
- Schema changes — API может добавить поле, и пайплайн должен это пережить.
Инструменты
- Fivetran / Airbyte — готовые коннекторы к 400+ API.
- dlt — пишешь Python-функцию, она становится коннектором.
- Кастомный Python в Airflow — для нестандартных API.
Пример dlt
import dlt
from dlt.sources.helpers import requests
@dlt.resource
def stripe_charges(api_key, start_date):
url = f"https://api.stripe.com/v1/charges"
headers = {"Authorization": f"Bearer {api_key}"}
params = {"created[gte]": start_date}
while True:
resp = requests.get(url, headers=headers, params=params)
data = resp.json()
yield data["data"]
if not data.get("has_more"):
break
params["starting_after"] = data["data"][-1]["id"]
pipeline = dlt.pipeline(destination="snowflake", dataset_name="stripe")
pipeline.run(stripe_charges(api_key="...", start_date="2026-01-01"))
3. Файлы
Файлы часто встречаются у партнёров и в legacy:
- ERP-системы экспортируют Excel/CSV.
- Старые партнёры присылают CSV по email или SFTP.
- Cloud-сервисы пишут JSON-логи в S3.
- Современные пайплайны — Parquet в S3.
Форматы
Подробнее форматы — модуль 04-storage-formats.
Как DE с ними работает
- CSV/Excel от партнёров — приходят в S3 / SFTP / email. Cron-скрипт парсит и грузит в DWH.
- JSON-логи — Fluentd/Logstash собирают в S3. Spark / Snowflake читают.
- Parquet в S3 — Lakehouse-формат. Iceberg/Delta поверх.
Грабли с CSV
- Кодировки — UTF-8 vs Windows-1251 vs Latin-1.
- Разделители — comma vs semicolon vs tab.
- Кавычки —
"Имя, Фамилия"vsИмя, Фамилия. - Переводы строк внутри полей — портят парсинг.
- Дробные числа —
1,234.50vs1234,50(US vs EU).
CSV — анти-паттерн для нового пайплайна, но реальность: 30% источников до сих пор CSV.
4. События (event streams)
Real-time потоки сообщений: каждое событие — отдельная запись.
Типичные технологии
- Apache Kafka — стандарт индустрии. Распределённый log.
- AWS Kinesis — managed Kafka на AWS.
- Google Pub/Sub — managed на GCP.
- RabbitMQ — традиционный message broker.
- Redis Streams — лёгкий вариант.
Когда нужны
- Антифрод — нужно реагировать за секунды.
- Рекомендации в реальном времени — клик -> обновлённый feed.
- Микросервисы между собой — event-driven архитектура.
- Лог-агрегация — миллионы событий в секунду.
Как DE с ними работает
Углубление — наш kafka-course.
5. Webhooks
Подвид push-источников. SaaS-сервис (Stripe, Shopify) вызывает твой HTTP endpoint при событии.
Примеры
- Stripe webhook на
payment_intent.succeeded— клиент оплатил. - Shopify webhook на
order.created— новый заказ в магазине. - GitHub webhook на push — кто-то запушил код.
- Twilio webhook на SMS-delivery.
Как DE с ними работает
Endpoint — это обычно НЕ напрямую в DWH:
SaaS -> webhook endpoint (Cloud Function) -> Kafka topic -> Sink в DWH
Зачем такая цепочка:
- Reliability — если DWH временно недоступен, Kafka буферизует.
- Idempotency — Stripe может отправить одно событие дважды. Дедупликация в pipeline.
- Speed — webhook должен ответить за 5 секунд, или Stripe будет ретраить.
6. 3rd-party SaaS
Отдельно выделим SaaS как категорию источников (хотя технически это API):
DE обычно не пишет кастомные коннекторы к этим — используют Fivetran/Airbyte. Цена — заметная, но дешевле, чем 6 месяцев работы инженера.
Типичный inventory источников в средней компании
Скрапнут scenarii e-commerce, 200 человек:
| Источник | Тип | Объём | Как ingest |
|---|---|---|---|
| Postgres production | OLTP | 50 GB | Fivetran (incremental) |
| Stripe | API | 100k transactions/day | Fivetran + webhooks |
| Salesforce | SaaS API | 500k contacts | Fivetran (daily) |
| Google Ads | SaaS API | 50k ad metrics/day | Fivetran (daily) |
| Mixpanel | SaaS API | 10M events/day | Fivetran (hourly) |
| Frontend events | Webhook -> Segment | 50M events/day | Segment -> Snowpipe |
| Партнёрские CSV | SFTP | 1 GB/day | Airflow + S3 |
| App logs | Files | 100 GB/day | Fluentd -> S3 |
Это типичная картина. 8-15 источников. Сложность не в количестве, а в поддержке: каждый источник может сломаться, schema может поменяться, rate limits могут стрельнуть.
Попробуй сам
- Подумай о любой компании, которую ты знаешь (Yandex, Wildberries, твой банк). Перечисли 5 источников данных, которые у них точно есть. Какие из них API, какие — внутренние БД, какие — события?
- Открой документацию любого API (Stripe API docs, GitHub API). Посмотри: какие методы есть для incremental ingestion — есть ли поле
updated_atили его аналог?