Справочник ключевых терминов курса System Design (DE).
Процесс проектирования архитектуры распределённых систем с учётом требований к масштабируемости, надёжности и производительности. Включает выбор компонентов, определение интерфейсов между ними и анализ компромиссов. В контексте Data Engineering фокус на проектировании data pipeline-ов и storage-систем.
# Типичный System Design процесс
1. Сбор требований (functional + non-functional)
2. Оценка нагрузки (QPS, data volume, latency)
3. High-level design (компоненты + потоки данных)
4. Детализация (storage, processing, serving)
5. Анализ bottleneck-ов и tradeoff-овТеорема Брюера, утверждающая что распределённая система может одновременно обеспечить только два из трёх свойств: Consistency (согласованность), Availability (доступность), Partition tolerance (устойчивость к разделению сети). На практике partition tolerance обязателен, поэтому выбор стоит между CP и AP системами.
# CP-системы (Consistency + Partition tolerance)
# HBase, MongoDB (с majority write concern)
# → Откажут в обслуживании при network partition
# AP-системы (Availability + Partition tolerance)
# Cassandra, DynamoDB
# → Вернут устаревшие данные при partition
# Пример: Kafka — AP для producers, CP для consumers с read_committedМетодология систематической оценки компромиссов при проектировании систем. Каждое архитектурное решение усиливает одни качества за счёт других: latency vs throughput, consistency vs availability, cost vs performance. Результат — обоснованный выбор с документированными последствиями.
# Пример tradeoff: выбор формата хранения
#
# Parquet:
# + Columnar → быстрые аналитические запросы
# + Compression → меньше storage cost
# - Медленная запись (row → column конвертация)
#
# Avro:
# + Быстрая запись (row-based)
# + Schema evolution
# - Медленнее для аналитики
#
# Решение: Avro для ingestion, Parquet для servingНабор независимых компьютеров, работающих совместно как единая система. Характеризуются отсутствием общего состояния, частичными отказами и недетерминированной латентностью сети. Ключевые паттерны: репликация, шардирование, консенсус (Raft/Paxos), eventual consistency.
# Fallacies of Distributed Computing
# 1. Сеть надёжна → нет, нужен retry + timeout
# 2. Латентность нулевая → нет, нужен batching
# 3. Пропускная способность бесконечна → нет, нужен backpressure
# 4. Сеть безопасна → нет, нужен TLS + authN
# Пример: Kafka — распределённый лог
# Partitions → параллелизм
# Replication factor 3 → отказоустойчивость
# ISR (In-Sync Replicas) → consistency guaranteeТрёхуровневая система метрик надёжности. SLI (Service Level Indicator) — конкретная метрика (p99 latency, availability %). SLO (Service Level Objective) — целевое значение SLI (p99 < 200ms). SLA (Service Level Agreement) — юридическое соглашение с последствиями нарушения SLO. Для data pipeline-ов: freshness SLI, completeness SLI, accuracy SLI.
# SLI для data pipeline:
# freshness: время от события до доступности в DWH
# completeness: % записей без потерь
# accuracy: % корректных трансформаций
# SLO:
# freshness < 15 min (p99)
# completeness > 99.9%
# accuracy > 99.99%
# SLA:
# Нарушение freshness > 1 час → алерт oncall
# Нарушение completeness < 99% → incident P1Два основных паттерна интеграции данных. ETL (Extract-Transform-Load) трансформирует данные до загрузки в целевую систему — подходит для ограниченного storage. ELT (Extract-Load-Transform) загружает сырые данные и трансформирует на месте — доминирует в cloud-era благодаря дешёвому storage и мощному compute (BigQuery, Snowflake).
# ETL: трансформация до загрузки
raw_data = extract(source_db)
cleaned = transform(raw_data) # в памяти ETL-сервера
load(cleaned, target_warehouse)
# ELT: загрузка → трансформация в warehouse
raw_data = extract(source_db)
load(raw_data, staging_table) # as-is в warehouse
# dbt model трансформирует в warehouse
# SELECT cleaned_col FROM staging_table WHERE ...Архитектурный паттерн, объединяющий batch и speed layer для обработки данных. Batch layer обеспечивает полноту и точность (переобработка всей истории), speed layer — низкую латентность (инкрементальные обновления). Serving layer объединяет результаты. Недостаток: дублирование логики между двумя слоями.
# Lambda Architecture
#
# ┌─────────┐ ┌──────────────┐ ┌──────────────┐
# │ Source │───→│ Batch Layer │───→│ │
# │ Events │ │ (Spark) │ │ Serving │
# │ │───→│ Speed Layer │───→│ Layer │
# └─────────┘ │ (Flink) │ │ (Druid) │
# └──────────────┘ └──────────────┘
#
# Batch: ежечасный пересчёт агрегатов
# Speed: real-time дельта с последнего batchУпрощённая альтернатива Lambda Architecture, использующая только stream processing для всех задач. Все данные проходят через единый потоковый слой (Kafka + Flink/Spark Streaming). Переобработка выполняется replay из того же потока. Устраняет дублирование логики, но требует зрелой stream-инфраструктуры.
# Kappa Architecture — единый поток
#
# ┌─────────┐ ┌──────────────┐ ┌──────────────┐
# │ Source │───→│ Stream │───→│ Serving │
# │ Events │ │ Processor │ │ Layer │
# └─────────┘ │ (Flink) │ │ (ClickHouse)│
# └──────────────┘ └──────────────┘
#
# Переобработка: replay topic с offset 0
# Нет batch layer → единая кодовая базаТрёхслойная архитектура данных: Bronze (сырые данные as-is), Silver (очищенные и нормализованные), Gold (бизнес-агрегаты и витрины). Каждый слой повышает качество данных. Доминирует в Lakehouse-платформах (Databricks, Delta Lake). Обеспечивает lineage, incremental processing и data quality gates между слоями.
# Bronze → Silver → Gold
#
# Bronze: raw JSON events из Kafka
# spark.readStream.format('kafka')...writeStream.format('delta')
#
# Silver: parsed, deduplicated, typed
# spark.read.format('delta').load('/bronze/events')
# .dropDuplicates(['event_id'])
# .withColumn('ts', to_timestamp('raw_ts'))
# .write.format('delta').save('/silver/events')
#
# Gold: business aggregates
# spark.read.format('delta').load('/silver/events')
# .groupBy('date', 'product_id').agg(sum('revenue'))
# .write.format('delta').save('/gold/daily_revenue')Паттерн синхронизации данных из Data Warehouse обратно в операционные системы (CRM, маркетинг, поддержка). Замыкает цикл данных: operational DB → warehouse → обогащённые данные → operational tools. Инструменты: Census, Hightouch, custom Airflow DAG.
# Reverse ETL: DWH → Salesforce
#
# 1. Модель в dbt (Gold layer):
# SELECT user_id, ltv_score, churn_risk
# FROM gold.user_segments
#
# 2. Sync job (Census / Hightouch):
# Source: gold.user_segments
# Destination: Salesforce.Contact
# Mapping: ltv_score → Custom_LTV__c
# Schedule: every 4 hours
# Mode: upsert on user_idОбработка данных конечными порциями (batch) по расписанию. Данные накапливаются за период (час, день), затем обрабатываются целиком. Преимущества: простота, оптимальное использование ресурсов, возможность переобработки. Недостаток: высокая латентность (минуты-часы). Основные инструменты: Apache Spark, dbt.
# Spark batch job — ежедневная агрегация
df = spark.read.parquet(f'/data/events/dt={yesterday}')
aggregated = df \
.filter(col('event_type') == 'purchase') \
.groupBy('product_id') \
.agg(
count('*').alias('order_count'),
sum('amount').alias('total_revenue')
)
aggregated.write \
.mode('overwrite') \
.partitionBy('product_id') \
.parquet(f'/output/daily_revenue/dt={yesterday}')Промежуточный подход между batch и true streaming: данные обрабатываются маленькими порциями с интервалом от секунд до минут. Spark Structured Streaming использует micro-batch по умолчанию (trigger interval). Проще в реализации чем true streaming, но с более высокой латентностью (секунды vs миллисекунды).
# Spark Structured Streaming — micro-batch
df = spark.readStream \
.format('kafka') \
.option('subscribe', 'events') \
.load()
result = df \
.selectExpr("CAST(value AS STRING)") \
.select(from_json('value', schema).alias('data')) \
.select('data.*')
result.writeStream \
.trigger(processingTime='30 seconds') # micro-batch каждые 30 сек\
.format('delta') \
.start('/silver/events')Непрерывная обработка данных по мере их поступления (event-by-event или small window). Обеспечивает латентность от миллисекунд до секунд. Ключевые концепции: event time vs processing time, watermarks, windows, exactly-once semantics. Основные инструменты: Apache Flink, Apache Kafka Streams.
# Flink — stream processing с event time
env = StreamExecutionEnvironment.get_execution_environment()
events = env \
.add_source(FlinkKafkaConsumer('events', schema, props)) \
.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(10))
.with_timestamp_assigner(EventTimestampAssigner())
)
events \
.key_by(lambda e: e.user_id) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.aggregate(CountAggregateFunction()) \
.add_sink(sink)Два временных домена в stream processing. Event time — момент создания события в source-системе. Processing time — момент обработки события в stream processor. Разница между ними (event time skew) вызвана сетевыми задержками, backpressure и out-of-order delivery. Корректные агрегаты требуют event time + watermarks.
# Event time vs Processing time
#
# Event создан: 10:00:05 (event time)
# Доставлен в Kafka: 10:00:07
# Обработан Flink: 10:00:12 (processing time)
# Skew: 7 секунд
#
# Окно 10:00 - 10:05 по event time:
# → Событие 10:00:05 попадёт в это окно
#
# Окно 10:00 - 10:05 по processing time:
# → Событие НЕ попадёт (processing time = 10:00:12)
#
# Вывод: event time = корректные бизнес-метрикиМеханизм отслеживания прогресса event time в stream processing. Watermark(t) означает: 'все события с event time ≤ t уже получены'. Позволяет системе решить когда закрывать временные окна. Bounded out-of-orderness: watermark = max_event_time - allowed_lateness. Поздние события после watermark отбрасываются или обрабатываются отдельно.
# Watermark с допуском 10 секунд
#
# Поток событий:
# event(time=10:00:01) → watermark = 09:59:51
# event(time=10:00:08) → watermark = 09:59:58
# event(time=10:00:15) → watermark = 10:00:05
#
# Окно [10:00:00, 10:00:05) закрывается
# когда watermark >= 10:00:05
#
# Late event (time=10:00:03, arrived at 10:00:20)
# watermark уже 10:00:10 → событие отброшеноГарантия, что каждое событие обработано ровно один раз, даже при сбоях. Реализуется через: idempotent writes + at-least-once delivery, либо транзакционный протокол (Kafka transactions, Flink checkpoints + two-phase commit). Самая сильная и дорогая гарантия доставки. Альтернативы: at-most-once (потеря), at-least-once (дупликаты).
# Kafka exactly-once: transactional producer + consumer
producer = KafkaProducer(
transactional_id='etl-job-01',
enable_idempotence=True
)
producer.init_transactions()
try:
producer.begin_transaction()
producer.send('output-topic', value=result)
producer.send_offsets_to_transaction(
{tp: offset}, consumer_group_id
)
producer.commit_transaction()
except:
producer.abort_transaction()Централизованное хранилище структурированных данных, оптимизированное для аналитических запросов (OLAP). Данные загружаются из операционных систем, очищаются и организуются в dimensional model. Характеристики: schema-on-write, columnar storage, MPP execution. Примеры: Snowflake, BigQuery, Redshift.
-- Типичный аналитический запрос в DWH
SELECT
d.month_name,
p.category,
SUM(f.revenue) as total_revenue,
COUNT(DISTINCT f.customer_id) as unique_customers
FROM fact_sales f
JOIN dim_date d ON f.date_key = d.date_key
JOIN dim_product p ON f.product_key = p.product_key
WHERE d.year = 2024
GROUP BY d.month_name, p.category
ORDER BY total_revenue DESC;Хранилище данных в сыром формате без предварительной схемы (schema-on-read). Хранит структурированные, полуструктурированные и неструктурированные данные в object storage (S3, GCS, ADLS). Низкая стоимость хранения, но требует governance для предотвращения data swamp. Основа Medallion Architecture (Bronze layer).
# Data Lake на S3 — структура директорий
# s3://company-datalake/
# ├── raw/ # Bronze: as-is
# │ ├── salesforce/
# │ ├── clickstream/
# │ └── iot-sensors/
# ├── processed/ # Silver: cleaned
# │ ├── users/
# │ └── events/
# └── curated/ # Gold: business-ready
# ├── daily_revenue/
# └── user_segments/Архитектура, объединяющая гибкость Data Lake (дешёвый object storage, открытые форматы) с возможностями Data Warehouse (ACID, schema enforcement, SQL analytics). Реализуется через table formats: Delta Lake, Apache Iceberg, Apache Hudi. Обеспечивает time travel, schema evolution, уникальность и транзакции поверх Parquet-файлов.
# Delta Lake — Lakehouse операции
# ACID транзакции на Parquet
df.write.format('delta') \
.mode('overwrite') \
.save('/lakehouse/silver/orders')
# Time travel — чтение версии 3
spark.read.format('delta') \
.option('versionAsOf', 3) \
.load('/lakehouse/silver/orders')
# MERGE (upsert) — CDC применение
DeltaTable.forPath(spark, '/lakehouse/silver/orders') \
.merge(updates, 'target.id = source.id') \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()Формат хранения, организующий данные по столбцам вместо строк. Оптимален для аналитики: читает только нужные колонки (column pruning), эффективно сжимает однородные данные. Apache Parquet — стандарт де-факто. Альтернативы: ORC (Hive-экосистема), Apache Arrow (in-memory columnar). Недостаток: медленнее для point lookups и inserts.
# Row-oriented vs Columnar — чтение SELECT name, age
#
# Row: читаем ВСЕ колонки каждой строки
# [id=1, name='Alice', age=30, city='NY', ...]
# [id=2, name='Bob', age=25, city='LA', ...]
# → Читаем 100% данных, используем 20%
#
# Columnar: читаем только нужные столбцы
# name: ['Alice', 'Bob', ...]
# age: [30, 25, ...]
# → Читаем 20% данных, используем 100%
#
# Parquet файл: ~10x compression vs CSV
# 1 TB CSV ≈ 100 GB ParquetРазделение данных на физически изолированные сегменты по ключу (дата, регион, category). Позволяет query engine пропускать нерелевантные partition-ы (partition pruning), значительно ускоряя запросы. В Hive-style partitioning директория кодирует ключ: /dt=2024-01-15/. Выбор partition key критичен: слишком мелкое — small file problem, слишком крупное — нет pruning.
# Hive-style partitioning в Spark
df.write \
.partitionBy('year', 'month', 'day') \
.format('parquet') \
.save('/data/events')
# Структура на диске:
# /data/events/year=2024/month=01/day=15/part-00000.parquet
# /data/events/year=2024/month=01/day=16/part-00000.parquet
# Запрос с partition pruning:
# SELECT * FROM events WHERE year=2024 AND month=01
# → Читает только 31 директорию вместо 365Методология проектирования хранилищ данных Ральфа Кимбалла. Организует данные вокруг бизнес-процессов: fact-таблицы содержат измеримые метрики, dimension-таблицы — описательные атрибуты. Оптимизирована для понятности бизнес-пользователям и производительности аналитических запросов. Реализуется как star schema или snowflake schema.
-- Dimensional model: продажи
-- Fact table: зернистость = одна транзакция
CREATE TABLE fact_sales (
sale_id BIGINT,
date_key INT, -- FK → dim_date
product_key INT, -- FK → dim_product
customer_key INT, -- FK → dim_customer
store_key INT, -- FK → dim_store
quantity INT, -- measure
amount DECIMAL, -- measure
discount DECIMAL -- measure
);Структура dimensional model, где центральная fact-таблица окружена денормализованными dimension-таблицами. Каждая dimension — один JOIN от fact-таблицы (один уровень). Простая для понимания, минимум JOIN-ов, оптимальная для BI-инструментов. Недостаток: денормализация dimensions увеличивает redundancy.
-- Star Schema: fact в центре, dimensions вокруг
--
-- dim_date
-- |
-- dim_product — fact_sales — dim_customer
-- |
-- dim_store
--
SELECT
d.quarter,
p.category,
SUM(f.amount) as revenue
FROM fact_sales f
JOIN dim_date d ON f.date_key = d.date_key -- 1 hop
JOIN dim_product p ON f.product_key = p.product_key -- 1 hop
GROUP BY d.quarter, p.category;Нормализованный вариант star schema, где dimension-таблицы имеют собственные подтаблицы (multi-level hierarchy). Экономит storage за счёт устранения redundancy, но усложняет запросы дополнительными JOIN-ами. Полезна для больших dimension-ов с глубокой иерархией (география: страна → регион → город).
-- Snowflake: dimension разбита на под-таблицы
--
-- dim_product → dim_subcategory → dim_category
--
-- Star: dim_product.category = 'Electronics'
-- Snowflake: JOIN dim_subcategory → JOIN dim_category
SELECT c.category_name, SUM(f.amount)
FROM fact_sales f
JOIN dim_product p ON f.product_key = p.product_key
JOIN dim_subcategory sc ON p.subcategory_key = sc.subcategory_key
JOIN dim_category c ON sc.category_key = c.category_key
GROUP BY c.category_name;Техника отслеживания изменений в dimension-таблицах. SCD Type 1: перезапись (теряем историю). SCD Type 2: новая строка с valid_from/valid_to (полная история, но взрыв строк). SCD Type 3: дополнительная колонка previous_value (одно изменение). Type 2 — наиболее распространённый для DWH.
-- SCD Type 2: история изменений адреса клиента
CREATE TABLE dim_customer (
customer_key INT, -- surrogate key
customer_id VARCHAR(20), -- natural key
name VARCHAR(100),
city VARCHAR(50),
valid_from DATE,
valid_to DATE, -- '9999-12-31' для текущей
is_current BOOLEAN
);
-- Клиент переехал из Москвы в Питер:
-- Row 1: city='Москва', valid_from='2023-01-01', valid_to='2024-06-15', is_current=false
-- Row 2: city='Питер', valid_from='2024-06-15', valid_to='9999-12-31', is_current=trueМетодология моделирования Дэна Линстедта для enterprise DWH. Три типа таблиц: Hub (бизнес-ключи), Link (связи между Hub-ами), Satellite (атрибуты с историей). Оптимизирована для agile-загрузки из множества источников и полного аудита. Сложнее star schema, но лучше масштабируется для enterprise.
-- Data Vault: Hub + Satellite + Link
-- Hub: бизнес-ключ
CREATE TABLE hub_customer (
hub_customer_hk BINARY(32), -- hash key
customer_bk VARCHAR(20), -- business key
load_dts TIMESTAMP,
record_source VARCHAR(50)
);
-- Satellite: атрибуты с историей
CREATE TABLE sat_customer_details (
hub_customer_hk BINARY(32),
load_dts TIMESTAMP,
name VARCHAR(100),
city VARCHAR(50),
hash_diff BINARY(32) -- detect changes
);
-- Link: связь customer ↔ order
CREATE TABLE link_customer_order (
link_hk BINARY(32),
hub_customer_hk BINARY(32),
hub_order_hk BINARY(32),
load_dts TIMESTAMP
);Техника захвата изменений из source-базы данных (INSERT, UPDATE, DELETE) и передачи их downstream. Три подхода: query-based (polling), trigger-based, log-based. Log-based CDC (Debezium) читает WAL/binlog без нагрузки на source. Обеспечивает near-real-time синхронизацию и полную историю изменений.
# Debezium CDC — Postgres → Kafka
# connector config (JSON):
{
"name": "pg-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.dbname": "orders_db",
"table.include.list": "public.orders",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"topic.prefix": "cdc"
}
}
# Результат: топик cdc.public.orders с before/after payloadНаиболее надёжный метод CDC, читающий transaction log базы данных (WAL в PostgreSQL, binlog в MySQL, redo log в Oracle). Преимущества: zero-impact на source, захват DELETE-ов, сохранение порядка транзакций, low latency. Debezium — стандартный open-source инструмент для log-based CDC в Kafka-экосистеме.
# PostgreSQL WAL → Debezium → Kafka
#
# 1. PostgreSQL пишет в WAL (Write-Ahead Log)
# 2. Debezium читает WAL через logical replication slot
# 3. Каждое изменение → Kafka message:
# {
# "op": "u", // c=create, u=update, d=delete
# "before": {"id": 1, "status": "pending"},
# "after": {"id": 1, "status": "shipped"},
# "source": {"lsn": 12345, "txId": 678}
# }
# 4. Consumer применяет изменения к targetСтепень пригодности данных для использования по назначению. Шесть измерений: completeness (полнота), accuracy (точность), consistency (согласованность), timeliness (актуальность), uniqueness (уникальность), validity (валидность). Инструменты: Great Expectations, dbt tests, Soda, Monte Carlo. Data quality gates между слоями Medallion Architecture предотвращают распространение ошибок.
# Great Expectations — data quality checks
import great_expectations as gx
context = gx.get_context()
batch = context.get_batch('silver_orders')
# Completeness
batch.expect_column_values_to_not_be_null('order_id')
# Uniqueness
batch.expect_column_values_to_be_unique('order_id')
# Validity
batch.expect_column_values_to_be_between(
'amount', min_value=0, max_value=1_000_000
)
# Freshness
batch.expect_column_max_to_be_between(
'updated_at',
min_value=datetime.now() - timedelta(hours=1)
)Мониторинг здоровья data pipeline-ов по пяти столпам: freshness (актуальность), volume (объём), schema (структура), distribution (распределение значений), lineage (происхождение). В отличие от data quality (проверка данных), data observability — проактивный мониторинг pipeline-ов с anomaly detection и alerting.
# 5 столпов Data Observability:
#
# 1. Freshness: таблица обновлялась 3ч назад → ALERT
# monitor: MAX(updated_at) > NOW() - INTERVAL 1 HOUR
#
# 2. Volume: сегодня 50% меньше строк → ALERT
# monitor: COUNT(*) within 2σ of 7-day average
#
# 3. Schema: колонка удалена → ALERT
# monitor: schema diff vs previous version
#
# 4. Distribution: NULL% скакнул с 1% до 40% → ALERT
# monitor: column_null_pct < threshold
#
# 5. Lineage: upstream table broken → trace impact
# monitor: dependency graph traversalФормальное соглашение между producer-ом и consumer-ом данных, описывающее schema, SLA, quality expectations и ownership. Shift-left подход: quality проблемы обнаруживаются на этапе производства данных, а не потребления. Включает: schema (fields, types), SLA (freshness, completeness), semantics (бизнес-определения), contacts (owner, oncall).
# data_contract.yaml
version: 1.0
owner: team-payments
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: Unique order identifier
amount:
type: number
minimum: 0
status:
type: string
enum: [pending, paid, refunded]
sla:
freshness: 15 minutes
completeness: 99.9%
quality:
- column: order_id
check: unique
- column: amount
check: not_nullВ контексте оркестрации — граф зависимостей между задачами pipeline-а. Каждый узел — задача (extract, transform, load), рёбра — зависимости (task B запускается после task A). Acyclic гарантирует отсутствие циклов → детерминированный порядок выполнения. Apache Airflow — наиболее распространённый DAG-оркестратор.
# Airflow DAG — ежедневный ETL
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG('daily_etl', schedule='@daily') as dag:
extract = PythonOperator(
task_id='extract_from_postgres',
python_callable=extract_data
)
transform = PythonOperator(
task_id='transform_with_spark',
python_callable=run_spark_job
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_data
)
quality = PythonOperator(
task_id='run_quality_checks',
python_callable=check_quality
)
extract >> transform >> load >> qualityКоординация выполнения data pipeline-ов: scheduling, dependency management, retry, alerting. Отвечает на вопрос 'когда и в каком порядке выполнять задачи'. Инструменты: Apache Airflow, Dagster, Prefect, Mage. Ключевые концепции: DAG, backfill, sensor, trigger, SLA monitoring, task dependency.
# Airflow vs Dagster — подходы к orchestration
#
# Airflow: task-centric
# DAG = набор задач + зависимости
# Scheduler проверяет расписание
# Task Instance выполняется Worker-ом
#
# Dagster: asset-centric
# @asset
# def daily_revenue(orders, products):
# return orders.join(products).groupBy('date').sum('amount')
#
# # Dagster автоматически строит DAG из зависимостей
# # между assets (не задачами)Отслеживание происхождения и трансформаций данных от source до конечного consumer-а. Отвечает на вопросы: откуда пришли данные? какие трансформации применены? кто зависит от этой таблицы? Критично для impact analysis (что сломается при изменении source), debugging (где данные испортились) и compliance (GDPR — где хранятся PII).
# Data Lineage граф
#
# postgres.orders ─┐
# ├─→ bronze.orders ─→ silver.orders ─┐
# kafka.payments ──┘ │
# ├─→ gold.daily_revenue
# postgres.products ─→ bronze.products ─→ silver.products┘
#
# Impact analysis: изменение postgres.orders
# → затронуты: bronze.orders, silver.orders, gold.daily_revenue
# → 3 downstream dashboards, 2 ML modelsЦентрализованный реестр метаданных организации: таблицы, колонки, описания, владельцы, lineage, quality metrics, usage statistics. Решает проблему data discovery — 'какие данные у нас есть и где'. Инструменты: DataHub, Apache Atlas, Amundsen, Atlan. Интегрируется с DWH, orchestrator и quality tools.
# DataHub — каталогизация таблицы
# Автоматически собирает метаданные:
#
# Table: gold.daily_revenue
# Owner: team-analytics
# Description: Ежедневная выручка по продуктам
# Schema:
# - date: DATE (partition key)
# - product_id: STRING
# - revenue: DECIMAL
# - order_count: INT
# Lineage: silver.orders + silver.products → gold.daily_revenue
# Quality: 99.8% freshness SLO met (30-day)
# Usage: 47 queries/day, 12 unique users
# Tags: #revenue #finance #dailyСистематическое снижение затрат на data infrastructure без деградации производительности и SLA. Основные рычаги: right-sizing compute (spot/preemptible instances), storage tiering (hot → warm → cold), query optimization (partition pruning, predicate pushdown), data lifecycle management (TTL, archiving). Cloud spend на данные растёт быстрее бизнеса без активного управления.
# Cost optimization checklist
#
# Storage:
# ✓ Parquet вместо CSV (10x compression)
# ✓ Storage tiering: S3 Standard → Glacier после 90 дней
# ✓ Partition pruning: queries read 5% of data
#
# Compute:
# ✓ Spot instances для batch (70% savings)
# ✓ Auto-scaling: 0 instances ночью
# ✓ Right-sizing: m5.2xlarge → m5.xlarge
#
# Query:
# ✓ Materialized views для hot queries
# ✓ Predicate pushdown (filter at source)
# ✓ CLUSTER BY для DWH tablesЦентрализованное хранилище и сервис для ML-фичей, обеспечивающее переиспользование, консистентность между training и serving, point-in-time correctness. Решает проблему training-serving skew: одна и та же фича вычисляется одинаково в batch (training) и online (serving). Инструменты: Feast, Tecton, Databricks Feature Store.
# Feast Feature Store
from feast import FeatureStore
store = FeatureStore(repo_path='feature_repo/')
# Offline: получение фичей для training
training_df = store.get_historical_features(
entity_df=entities, # user_id + event_timestamp
features=[
'user_features:total_purchases_30d',
'user_features:avg_order_value',
'user_features:days_since_last_order'
]
).to_df()
# Online: получение фичей для serving (p99 < 10ms)
online_features = store.get_online_features(
features=['user_features:total_purchases_30d'],
entity_rows=[{'user_id': 'u123'}]
).to_dict()Процесс создания информативных признаков (features) из сырых данных для ML-моделей. Включает: агрегации (sum/avg/count за период), temporal features (day_of_week, is_holiday), encoding (one-hot, target), текстовые (TF-IDF, embeddings). Качество фичей определяет потолок модели — garbage in, garbage out.
# Feature engineering для prediction модели
import pyspark.sql.functions as F
# Temporal features
df = df.withColumn('hour', F.hour('event_ts'))
df = df.withColumn('day_of_week', F.dayofweek('event_ts'))
df = df.withColumn('is_weekend', F.col('day_of_week').isin(1, 7))
# Aggregation features (window)
window_30d = Window.partitionBy('user_id') \
.orderBy('event_ts') \
.rangeBetween(-30*86400, 0)
df = df.withColumn(
'purchases_30d',
F.count('order_id').over(window_30d)
)
df = df.withColumn(
'avg_amount_30d',
F.avg('amount').over(window_30d)
)Аналитические запросы с латентностью от миллисекунд до секунд по данным, поступающим в реальном времени. Отличается от batch analytics (минуты-часы) скоростью, от stream processing — фокусом на ad-hoc SQL-запросы. Требует специализированных OLAP-движков: ClickHouse, Apache Druid, Apache Pinot, StarRocks.
# ClickHouse — real-time analytics
# Ingestion: 500K events/sec
# Query latency: p99 < 100ms
CREATE TABLE events (
event_id UUID,
user_id UInt64,
event_type LowCardinality(String),
amount Decimal(10,2),
event_time DateTime
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_type, user_id, event_time);
-- Dashboard query: revenue last 5 minutes
SELECT
toStartOfMinute(event_time) as minute,
sum(amount) as revenue
FROM events
WHERE event_time > now() - INTERVAL 5 MINUTE
GROUP BY minute
ORDER BY minute;Предвычисленный результат запроса, хранящийся физически и обновляемый инкрементально. Ускоряет повторяющиеся аналитические запросы за счёт storage. Ключевой tradeoff: write amplification vs read latency. Поддерживается в DWH (Snowflake, Redshift), OLAP-движках (ClickHouse, Druid), и Lakehouse (Databricks).
-- ClickHouse: Materialized View для real-time агрегатов
CREATE MATERIALIZED VIEW hourly_revenue
ENGINE = SummingMergeTree()
ORDER BY (hour, product_id)
AS SELECT
toStartOfHour(event_time) as hour,
product_id,
sum(amount) as total_revenue,
count() as order_count
FROM events
WHERE event_type = 'purchase'
GROUP BY hour, product_id;
-- Query: мгновенный ответ вместо scan 1B rows
SELECT hour, sum(total_revenue)
FROM hourly_revenue
WHERE hour >= today()
GROUP BY hour;Многомерная структура данных, предварительно агрегирующая метрики по комбинациям измерений (dimensions). Операции: slice (фиксация одного измерения), dice (фиксация нескольких), roll-up (агрегация вверх по иерархии), drill-down (детализация). Современная реализация: columnar OLAP-движки (ClickHouse, Druid) вместо классических MOLAP кубов.
# OLAP операции на примере revenue cube
# Dimensions: date, product, region
# Measure: revenue
#
# Slice: revenue WHERE region = 'Europe'
# Dice: revenue WHERE region IN ('EU','US') AND year = 2024
# Roll-up: daily → monthly → yearly
# Drill-down: yearly → quarterly → monthly
#
# Apache Druid — modern OLAP
# Pre-aggregation at ingestion time
# Sub-second queries on billions of rows
# Native support for time-series roll-up