Требуемые знания:
- module-5/06-etl-elt-patterns
Real-time Feature Engineering для ML
У вас построен CDC pipeline: Debezium захватывает изменения в базе данных, Kafka доставляет события, PySpark обрабатывает потоки. Но что если вам нужно использовать эти события для machine learning моделей? Например, для fraud detection или персонализированных рекомендаций? Для этого нужны features — признаки, на которых обучается модель.
Real-time feature engineering — это процесс вычисления ML-признаков из streaming данных с минимальной задержкой. В этом уроке мы изучим, как использовать CDC события для построения feature pipelines с PySpark Structured Streaming.
Зачем real-time features из CDC?
Traditional ML: Batch Features (проблема staleness)
Features обновляются раз в сутки — устаревают на 12-24 часа
Features обновляются раз в сутки. К моменту prediction могут быть устаревшими на 12-24 часа. Fraud detection: мошенник успеет совершить 50 транзакций, пока features обновятся.
Проблема:
- Features обновляются раз в сутки
- К моменту prediction features могут быть устаревшими на 12-24 часа
- Fraud detection: мошенник успеет совершить 50 транзакций, пока features обновятся
- Recommendations: пользователь видит рекомендации на основе вчерашнего поведения
Real-time ML: CDC-Driven Features
Features обновляются при каждом CDC событии (latency в секундах)
Features обновляются при каждом CDC событии. Latency в секундах, не часах. Fraud detection: модель видит последние транзакции клиента в реальном времени.
Преимущества:
- Свежие features: обновляются при каждом CDC событии (latency в секундах, не часах)
- Fraud detection: модель видит последние транзакции клиента в реальном времени
- Recommendations: учитывают текущую активность пользователя
- Inventory management: актуальные остатки товаров для динамических цен
Production insight: Real-time features критичны для use cases, где свежесть данных напрямую влияет на бизнес-метрики. Но они дороже batch features — выбирайте wisely.
Типы признаков из CDC событий
CDC события содержат полную историю изменений — это золотая жила для feature engineering.
1. Point-in-Time Features (текущее состояние)
Что это: Значение поля в данный момент времени.
Примеры:
customer_status: текущий статус клиента (active, suspended, churned)last_order_total: сумма последнего заказаcurrent_inventory_level: текущий остаток товара
Источник: Поле after из CDC события (CREATE/UPDATE операции).
# Extract point-in-time feature
current_status_df = orders_df \
.filter(col("op") == "u") \
.select(
col("customer_id"),
col("after.status").alias("customer_status"),
col("event_time")
)
2. Aggregated Features (агрегации во времени)
Что это: Статистика за временное окно (rolling window).
Примеры:
order_count_30d: количество заказов за последние 30 днейtotal_spend_7d: общая сумма покупок за 7 днейavg_order_value_90d: средний чек за 90 дней
Источник: Window aggregations по полю event_time.
# Tumbling window aggregation (7-day window)
from pyspark.sql.functions import window, count, sum, avg
features_df = orders_df \
.withWatermark("event_time", "1 hour") \
.groupBy(
col("customer_id"),
window(col("event_time"), "7 days")
) \
.agg(
count("*").alias("order_count_7d"),
sum("total").alias("total_spend_7d"),
avg("total").alias("avg_order_value_7d")
)
3. Behavioral Features (паттерны поведения)
Что это: Метрики, описывающие как пользователь взаимодействует с системой.
Примеры:
days_since_last_order: давность последнего заказа (recency)order_frequency: заказов в месяц (frequency)order_velocity: изменение частоты заказов (acceleration)unique_products_30d: разнообразие покупок
Источник: Комбинация point-in-time и aggregated features.
from pyspark.sql.functions import datediff, current_timestamp
behavioral_df = features_df.withColumn(
"days_since_last_order",
datediff(current_timestamp(), col("last_order_time"))
)
4. Derived Features (комбинации)
Что это: Features, вычисленные из других features (feature crosses).
Примеры:
is_high_value:total_spend_30d > 1000is_frequent_buyer:order_count_30d > 10avg_days_between_orders:30 / order_count_30dspend_acceleration:total_spend_7d / total_spend_30d * 4
Источник: SQL-like transformations на aggregated features.
derived_df = features_df.withColumn(
"is_high_value",
(col("total_spend_30d") > 1000).cast("int")
).withColumn(
"avg_days_between_orders",
lit(30) / col("order_count_30d")
)
Важно: Выбор feature types зависит от ML задачи. Fraud detection любит behavioral features (velocity, patterns). Recommendations предпочитают aggregated features (popularity, recency).
Проверка знанийЧем CDC события уникально полезны для feature engineering по сравнению с обычными batch-запросами к базе данных? Какой тип features невозможен без CDC?
Паттерн: Customer Behavior Features
Рассмотрим классический пример: вычисление customer features из CDC событий таблицы orders.
Use Case: Real-time Customer Segmentation
Задача: Вычислять customer features в реальном времени для модели churn prediction.
Features:
order_count_30d: количество заказов за 30 днейtotal_spend_30d: общая сумма покупокavg_order_value_30d: средний чекdays_since_last_order: давность последнего заказаis_high_value: клиент с суммой покупок > $1000
Architecture
4-слойная архитектура: Source → CDC → Computation → Store
4-слойная архитектура разделяет concerns: source → CDC → computation → store. CDC Layer обеспечивает real-time capture. Computation Layer вычисляет features. Store Layer обеспечивает dual write: Redis для online inference, Parquet для offline training.
Implementation: Compute Customer Features
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
from_json, col, to_timestamp, window,
count, sum, avg, max, stddev, datediff, current_timestamp, lit
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, LongType
# Create Spark session
spark = SparkSession.builder \
.appName("CustomerFeaturesEngineering") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints/features") \
.getOrCreate()
# Define CDC schema
cdc_schema = StructType([
StructField("payload", StructType([
StructField("before", StructType([
StructField("id", IntegerType()),
StructField("customer_id", IntegerType()),
StructField("total", DecimalType(10, 2)),
StructField("status", StringType()),
StructField("created_at", StringType())
])),
StructField("after", StructType([
StructField("id", IntegerType()),
StructField("customer_id", IntegerType()),
StructField("total", DecimalType(10, 2)),
StructField("status", StringType()),
StructField("created_at", StringType())
])),
StructField("op", StringType()),
StructField("ts_ms", LongType())
]))
])
# Read CDC stream from Kafka
cdc_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "dbserver1.public.orders") \
.option("startingOffsets", "latest") \
.load()
# Parse CDC events
parsed_df = cdc_stream.select(
from_json(col("value").cast("string"), cdc_schema).alias("data")
).select("data.payload.*")
# Extract current state (only inserts and updates)
current_df = parsed_df \
.filter(col("op").isin("c", "u", "r")) \
.select(
col("after.id").alias("order_id"),
col("after.customer_id").alias("customer_id"),
col("after.total").alias("total"),
col("after.status").alias("status"),
to_timestamp(col("ts_ms") / 1000).alias("event_time")
)
# Compute customer features with 30-day rolling window
def compute_customer_features(orders_df):
"""Compute real-time features for customer behavior."""
# Time-based aggregations (30-day window)
features_df = orders_df \
.withWatermark("event_time", "1 hour") \
.groupBy(
col("customer_id"),
window(col("event_time"), "30 days")
) \
.agg(
count("*").alias("order_count_30d"),
sum("total").alias("total_spend_30d"),
avg("total").alias("avg_order_value_30d"),
stddev("total").alias("order_value_stddev_30d"),
max("event_time").alias("last_order_time")
)
# Recency features
features_df = features_df.withColumn(
"days_since_last_order",
datediff(current_timestamp(), col("last_order_time"))
)
# Derived features
features_df = features_df.withColumn(
"is_high_value",
(col("total_spend_30d") > 1000).cast("int")
).withColumn(
"is_frequent_buyer",
(col("order_count_30d") > 10).cast("int")
).withColumn(
"avg_days_between_orders",
lit(30) / col("order_count_30d")
)
return features_df
# Compute features
customer_features = compute_customer_features(current_df)
# Write to console (for testing)
query = customer_features \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Важные детали:
-
Watermark:
.withWatermark("event_time", "1 hour")позволяет обрабатывать late-arriving events (события, пришедшие с опозданием до 1 часа). -
Window aggregation:
window(col("event_time"), "30 days")создает tumbling window на 30 дней. Каждое событие попадает ровно в одно окно. -
Derived features: Вычисляются после агрегации — используют результаты
agg(). -
outputMode(“update”): Для aggregations с watermark используйте
updatemode — выводит только обновленные строки.
Паттерн: Product Metrics Features
Аналогичный паттерн для вычисления product features (полезно для inventory management, dynamic pricing).
Use Case: Product Popularity Metrics
Features:
units_sold_7d: количество проданных единиц за 7 днейrevenue_7d: выручка за 7 днейunique_customers_7d: уникальные покупатели
def compute_product_features(orders_df):
"""Compute real-time features for product metrics."""
# Assuming orders_df has product_id and quantity fields
features_df = orders_df \
.withWatermark("event_time", "1 hour") \
.groupBy(
col("product_id"),
window(col("event_time"), "7 days")
) \
.agg(
sum("quantity").alias("units_sold_7d"),
sum(col("total")).alias("revenue_7d"),
countDistinct("customer_id").alias("unique_customers_7d")
)
# Velocity metrics
features_df = features_df.withColumn(
"sales_per_day",
col("units_sold_7d") / lit(7)
)
return features_df
Запись в Feature Store
Вычисленные features нужно сохранить для использования в ML моделях. Для этого используется feature store.
Зачем Feature Store?
Проблемы без feature store:
- ML модель и feature pipeline используют разные версии features (training-serving skew)
- Нет версионирования features (не можем воспроизвести predictions)
- Нет lineage (непонятно, как feature был вычислен)
Feature Store решает:
- Consistency: одинаковые features для training и inference
- Versioning: можем откатиться к старой версии features
- Lineage: знаем, из каких данных feature вычислен
- Reusability: features переиспользуются между моделями
Feature Store Options
| Option | Type | Use When | Pros | Cons |
|---|---|---|---|---|
| Redis | Key-Value Store | Simple online features | Fast (μs latency), простой setup | No versioning, no lineage |
| DynamoDB | NoSQL | AWS environment | Serverless, scalable | AWS vendor lock-in |
| Feast | Open-Source Feature Store | Need offline + online | Versioning, lineage, open standard | Requires setup, no PySpark connector |
| Tecton | Enterprise Feature Store | Production ML at scale | Full feature platform, monitoring | Expensive, vendor lock-in |
Recommendation для курса: Используйте Redis для простоты. Упомяните Feast как “production option” для enterprise use cases.
Pattern: Write to Feature Store with foreachBatch
PySpark Structured Streaming не имеет прямого connector для feature stores. Используйте foreachBatch() для записи в external storage.
def write_to_feature_store(batch_df, batch_id):
"""
Write features to Redis (online store) or Parquet (offline store).
Args:
batch_df: DataFrame with computed features
batch_id: Spark batch identifier
"""
import redis
# Connect to Redis
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Write each row to Redis
for row in batch_df.collect():
feature_key = f"customer:{row.customer_id}:features"
# Store features as hash
feature_data = {
"order_count_30d": row.order_count_30d,
"total_spend_30d": float(row.total_spend_30d),
"avg_order_value_30d": float(row.avg_order_value_30d),
"days_since_last_order": row.days_since_last_order,
"is_high_value": row.is_high_value
}
redis_client.hset(feature_key, mapping=feature_data)
# Set TTL (expire after 60 days)
redis_client.expire(feature_key, 60 * 24 * 60 * 60)
# Also write to Parquet for offline training
batch_df.write \
.mode("append") \
.partitionBy("window") \
.parquet("/data/features/customer_features")
# Apply foreachBatch sink
query = customer_features \
.writeStream \
.foreachBatch(write_to_feature_store) \
.outputMode("update") \
.start()
Важные детали:
-
Redis hash structure:
customer:123:features→{order_count_30d: 5, total_spend_30d: 1234.56, ...}. Удобно для lookup по customer_id. -
TTL (expire): Автоматическое удаление старых features через 60 дней (избегаем unbounded storage growth).
-
Dual write: Пишем в Redis (online store) для real-time inference и в Parquet (offline store) для batch training.
-
foreachBatch vs foreach:
foreachBatchполучает весь micro-batch как DataFrame — можно использовать DataFrame API.foreachработает по одной строке — медленнее.
Production warning:
.collect()загружает весь batch в память driver node. Для больших batches используйтеbatch_df.foreachPartition()для distributed writes.
Feature Store интеграция: Feast
Feast — open-source feature store, поддерживаемый Tecton и community.
Feast Architecture
Один feature computation → два outputs (online + offline)
Один feature computation → два outputs. Online Store (Redis) для real-time inference с low latency. Offline Store (Parquet) для batch training с historical features. Dual-write обеспечивает consistency между training и inference.
Feast Integration Pattern
Feast нет официального PySpark connector. Используйте workaround:
- Streaming → Online Store: PySpark пишет в Redis через
foreachBatch(как показано выше) - Batch → Offline Store: PySpark пишет в Parquet, Feast читает через offline store config
- Feast Registry: Регистрируете feature views в Feast для metadata tracking
# feast_features.py (Feast feature definitions)
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float64, Int64
from datetime import timedelta
# Define customer entity
customer = Entity(
name="customer",
join_keys=["customer_id"]
)
# Define feature source (offline store)
customer_features_source = FileSource(
path="/data/features/customer_features",
timestamp_field="event_time"
)
# Define feature view
customer_features_fv = FeatureView(
name="customer_features",
entities=[customer],
ttl=timedelta(days=60),
schema=[
Field(name="order_count_30d", dtype=Int64),
Field(name="total_spend_30d", dtype=Float64),
Field(name="avg_order_value_30d", dtype=Float64),
Field(name="is_high_value", dtype=Int64)
],
source=customer_features_source
)
Workflow:
- PySpark streaming вычисляет features → пишет в Redis + Parquet
- Feast читает metadata из feature registry
- Training:
feast.get_historical_features()→ читает из Parquet - Inference:
feast.get_online_features()→ читает из Redis
Open Question из RESEARCH.md: Best pattern for PySpark → Feast integration unclear. No official connector. Recommendation: Use dual write (Redis + Parquet) + manual Feast registry.
Feature Freshness vs Latency трейдофф
Real-time features не всегда нужны. Иногда batch features достаточно (и дешевле).
Decision Framework
| Use Case | Freshness Required | Latency Target | Approach |
|---|---|---|---|
| Fraud detection | Real-time (seconds) | менее 1s | CDC → Streaming features |
| Product recommendations | Near real-time (minutes) | менее 5min | CDC → Micro-batch features |
| Customer segmentation | Daily refresh | 1 day | Batch features |
| Churn prediction | Weekly refresh | 7 days | Batch features |
Costs comparison:
- Real-time features: High compute (always running), complex infrastructure (Kafka + Spark), operational overhead (monitoring, alerting)
- Batch features: Low compute (runs once per day), simple infrastructure (cron job), easier to debug
Hybrid approach:
Critical features (fraud score) → Real-time (CDC streaming)
Stable features (lifetime value) → Batch (daily ETL)
Production principle: Use real-time features only when freshness directly impacts business metrics. If model performance same with daily features → use batch.
Проверка знанийКомпания хочет перевести ВСЕ ML features на real-time вычисление из CDC потока. Почему это плохая идея и какой принцип определяет, какие features вычислять в real-time?
Мониторинг feature pipeline
Feature pipelines требуют специфичного мониторинга (не только Kafka lag).
Key Metrics
-
Feature computation latency:
histogram_quantile(0.99, rate(feature_computation_duration_seconds_bucket[5m]))Alert if p99 latency > 10 seconds (features too slow for real-time inference).
-
Feature drift: Track distribution of feature values over time. Alert if mean/stddev shifts significantly.
# Log feature statistics per batch batch_df.select( mean("total_spend_30d").alias("mean_spend"), stddev("total_spend_30d").alias("stddev_spend") ).show() -
Missing features: Count of customers with
nullfeatures (data quality issue).SELECT COUNT(*) FROM features WHERE total_spend_30d IS NULL -
Feature store write failures: Track errors in
foreachBatchsink.try: redis_client.hset(feature_key, mapping=feature_data) except Exception as e: write_errors_counter.inc() logger.error(f"Failed to write features: {e}")
Лабораторная работа: Customer Features Pipeline
Цель: Построить end-to-end feature pipeline от CDC событий до feature store.
Задание
-
Setup: Убедитесь, что Debezium захватывает CDC из таблицы
orders -
Compute features:
- Реализуйте
compute_customer_features()для вычисления:order_count_30dtotal_spend_30davg_order_value_30d
- Используйте 30-day tumbling window
- Добавьте watermark 1 hour
- Реализуйте
-
Test with console sink:
query = customer_features \ .writeStream \ .outputMode("update") \ .format("console") \ .start() -
Generate CDC events: Вставьте несколько заказов в PostgreSQL:
INSERT INTO orders (customer_id, total, status, created_at) VALUES (1, 100.00, 'completed', NOW()), (1, 200.00, 'completed', NOW()), (2, 50.00, 'pending', NOW()); -
Verify features: Проверьте, что в console output появились:
customer_id=1:order_count_30d=2,total_spend_30d=300.00,avg_order_value_30d=150.00customer_id=2:order_count_30d=1,total_spend_30d=50.00,avg_order_value_30d=50.00
-
(Bonus) Implement foreachBatch sink:
- Запишите features в Redis (если Redis доступен)
- Или симулируйте запись логированием:
print(f"Writing features for customer {row.customer_id}")
Ожидаемый результат
-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------------------------------+-------------------------------+----------------+----------------+-----------------------+
|window |customer_id |order_count_30d |total_spend_30d |avg_order_value_30d |
+-------------------------------------------+-------------------------------+----------------+----------------+-----------------------+
|{2026-01-02 00:00:00, 2026-02-01 00:00:00}|1 |2 |300.00 |150.00 |
|{2026-01-02 00:00:00, 2026-02-01 00:00:00}|2 |1 |50.00 |50.00 |
+-------------------------------------------+-------------------------------+----------------+----------------+-----------------------+
Что дальше?
Вы изучили, как строить real-time feature pipelines из CDC событий с PySpark. Вы знаете:
- Типы ML features (point-in-time, aggregated, behavioral, derived)
- Как вычислять customer behavior features с window aggregations
- Как писать features в feature store с
foreachBatch() - Когда использовать real-time features vs batch features
Следующие шаги:
- Feature versioning: Как версионировать features и избегать training-serving skew
- Feature monitoring: Практики drift detection и data quality checks
- Advanced patterns: Stream-stream joins для cross-entity features
- Production deployment: Как деплоить feature pipelines с monitoring и alerting
Это завершает Module 5 (Data Engineering with Python). Вы прошли путь от простого Python consumer до production-grade feature pipelines с CDC streams.
Ключевые выводы
- Real-time features критичны для ML use cases, где свежесть данных влияет на business metrics (fraud, recommendations)
- CDC события — отличный источник для features: полная история изменений, low latency
- Типы features: point-in-time (текущее состояние), aggregated (rolling windows), behavioral (паттерны), derived (комбинации)
- PySpark window aggregations с watermark — стандартный способ вычисления time-based features
- Feature store обеспечивает consistency между training и inference (Redis для online, Parquet для offline)
- foreachBatch pattern для записи PySpark features в external stores (Redis, DynamoDB)
- Feast — open-source feature store, но нет прямого PySpark connector (используйте dual write)
- Freshness vs Cost: Real-time features дороже batch features — используйте, когда свежесть критична
- Monitoring: Отслеживайте latency, drift, missing features, write failures
- Watermark critical: Без watermark late-arriving events будут потеряны
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс