Skip to content
Learning Platform
Advanced
50 minutes
Feature Engineering Machine Learning Real-Time Feature Store PySpark

Prerequisites:

  • module-5/06-etl-elt-patterns

Real-time Feature Engineering для ML

У вас построен CDC pipeline: Debezium захватывает изменения в базе данных, Kafka доставляет события, PySpark обрабатывает потоки. Но что если вам нужно использовать эти события для machine learning моделей? Например, для fraud detection или персонализированных рекомендаций? Для этого нужны features — признаки, на которых обучается модель.

Real-time feature engineering — это процесс вычисления ML-признаков из streaming данных с минимальной задержкой. В этом уроке мы изучим, как использовать CDC события для построения feature pipelines с PySpark Structured Streaming.

Зачем real-time features из CDC?

Traditional ML: Batch Features (проблема staleness)

Traditional Batch Features (Проблема Staleness)

Features обновляются раз в сутки — устаревают на 12-24 часа

PostgreSQL
Daily dump
ETL JobRuns at 2am
Extract
Data Warehouse
Feature query
Feature Store
Lookup
ML ModelFeatures (12h old)
Проблема:

Features обновляются раз в сутки. К моменту prediction могут быть устаревшими на 12-24 часа. Fraud detection: мошенник успеет совершить 50 транзакций, пока features обновятся.

Проблема:

  • Features обновляются раз в сутки
  • К моменту prediction features могут быть устаревшими на 12-24 часа
  • Fraud detection: мошенник успеет совершить 50 транзакций, пока features обновятся
  • Recommendations: пользователь видит рекомендации на основе вчерашнего поведения

Real-time ML: CDC-Driven Features

Real-time CDC Features

Features обновляются при каждом CDC событии (latency в секундах)

PostgreSQL
CDC events
Kafkams latency
Read stream
PySparkFeature Computation
Compute
Fresh featuresseconds old
Write
Feature StoreRedis
Real-time lookup
ML Model
Преимущество:

Features обновляются при каждом CDC событии. Latency в секундах, не часах. Fraud detection: модель видит последние транзакции клиента в реальном времени.

Преимущества:

  • Свежие features: обновляются при каждом CDC событии (latency в секундах, не часах)
  • Fraud detection: модель видит последние транзакции клиента в реальном времени
  • Recommendations: учитывают текущую активность пользователя
  • Inventory management: актуальные остатки товаров для динамических цен

Production insight: Real-time features критичны для use cases, где свежесть данных напрямую влияет на бизнес-метрики. Но они дороже batch features — выбирайте wisely.

Типы признаков из CDC событий

CDC события содержат полную историю изменений — это золотая жила для feature engineering.

1. Point-in-Time Features (текущее состояние)

Что это: Значение поля в данный момент времени.

Примеры:

  • customer_status: текущий статус клиента (active, suspended, churned)
  • last_order_total: сумма последнего заказа
  • current_inventory_level: текущий остаток товара

Источник: Поле after из CDC события (CREATE/UPDATE операции).

# Extract point-in-time feature
current_status_df = orders_df \
    .filter(col("op") == "u") \
    .select(
        col("customer_id"),
        col("after.status").alias("customer_status"),
        col("event_time")
    )

2. Aggregated Features (агрегации во времени)

Что это: Статистика за временное окно (rolling window).

Примеры:

  • order_count_30d: количество заказов за последние 30 дней
  • total_spend_7d: общая сумма покупок за 7 дней
  • avg_order_value_90d: средний чек за 90 дней

Источник: Window aggregations по полю event_time.

# Tumbling window aggregation (7-day window)
from pyspark.sql.functions import window, count, sum, avg

features_df = orders_df \
    .withWatermark("event_time", "1 hour") \
    .groupBy(
        col("customer_id"),
        window(col("event_time"), "7 days")
    ) \
    .agg(
        count("*").alias("order_count_7d"),
        sum("total").alias("total_spend_7d"),
        avg("total").alias("avg_order_value_7d")
    )

3. Behavioral Features (паттерны поведения)

Что это: Метрики, описывающие как пользователь взаимодействует с системой.

Примеры:

  • days_since_last_order: давность последнего заказа (recency)
  • order_frequency: заказов в месяц (frequency)
  • order_velocity: изменение частоты заказов (acceleration)
  • unique_products_30d: разнообразие покупок

Источник: Комбинация point-in-time и aggregated features.

from pyspark.sql.functions import datediff, current_timestamp

behavioral_df = features_df.withColumn(
    "days_since_last_order",
    datediff(current_timestamp(), col("last_order_time"))
)

4. Derived Features (комбинации)

Что это: Features, вычисленные из других features (feature crosses).

Примеры:

  • is_high_value: total_spend_30d > 1000
  • is_frequent_buyer: order_count_30d > 10
  • avg_days_between_orders: 30 / order_count_30d
  • spend_acceleration: total_spend_7d / total_spend_30d * 4

Источник: SQL-like transformations на aggregated features.

derived_df = features_df.withColumn(
    "is_high_value",
    (col("total_spend_30d") > 1000).cast("int")
).withColumn(
    "avg_days_between_orders",
    lit(30) / col("order_count_30d")
)

Важно: Выбор feature types зависит от ML задачи. Fraud detection любит behavioral features (velocity, patterns). Recommendations предпочитают aggregated features (popularity, recency).

Проверка знаний
Чем CDC события уникально полезны для feature engineering по сравнению с обычными batch-запросами к базе данных? Какой тип features невозможен без CDC?
Ответ
CDC события содержат полную историю изменений: before (состояние до) и after (состояние после) для каждой операции, с точными timestamps. Это позволяет вычислять behavioral features, которые невозможны из snapshot базы данных: order_velocity (изменение частоты заказов), price_change_frequency (как часто менялась цена), status_transition_time (время между статусами). Batch-запрос SELECT * FROM orders видит только текущее состояние, а CDC видит все промежуточные изменения. Например, если заказ менял статус pending -> processing -> shipped -> delivered, batch видит только delivered, а CDC видит все 4 перехода с timestamps.

Паттерн: Customer Behavior Features

Рассмотрим классический пример: вычисление customer features из CDC событий таблицы orders.

Use Case: Real-time Customer Segmentation

Задача: Вычислять customer features в реальном времени для модели churn prediction.

Features:

  • order_count_30d: количество заказов за 30 дней
  • total_spend_30d: общая сумма покупок
  • avg_order_value_30d: средний чек
  • days_since_last_order: давность последнего заказа
  • is_high_value: клиент с суммой покупок > $1000

Architecture

Customer Behavior Features Architecture

4-слойная архитектура: Source → CDC → Computation → Store

Source Database
orders tablePostgreSQL
CDC capture
CDC Layer
Debezium Connector
Publish
Kafka Topicdbserver1.public.orders
Read stream
Feature Computation
PySpark StreamingWindow aggregations
Watermark1 hour late data
foreachBatch
Feature Store (Dual Write)
RedisOnline Store
ParquetOffline Store
Multi-layer architecture:

4-слойная архитектура разделяет concerns: source → CDC → computation → store. CDC Layer обеспечивает real-time capture. Computation Layer вычисляет features. Store Layer обеспечивает dual write: Redis для online inference, Parquet для offline training.

Implementation: Compute Customer Features

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, to_timestamp, window,
    count, sum, avg, max, stddev, datediff, current_timestamp, lit
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, LongType

# Create Spark session
spark = SparkSession.builder \
    .appName("CustomerFeaturesEngineering") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoints/features") \
    .getOrCreate()

# Define CDC schema
cdc_schema = StructType([
    StructField("payload", StructType([
        StructField("before", StructType([
            StructField("id", IntegerType()),
            StructField("customer_id", IntegerType()),
            StructField("total", DecimalType(10, 2)),
            StructField("status", StringType()),
            StructField("created_at", StringType())
        ])),
        StructField("after", StructType([
            StructField("id", IntegerType()),
            StructField("customer_id", IntegerType()),
            StructField("total", DecimalType(10, 2)),
            StructField("status", StringType()),
            StructField("created_at", StringType())
        ])),
        StructField("op", StringType()),
        StructField("ts_ms", LongType())
    ]))
])

# Read CDC stream from Kafka
cdc_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "dbserver1.public.orders") \
    .option("startingOffsets", "latest") \
    .load()

# Parse CDC events
parsed_df = cdc_stream.select(
    from_json(col("value").cast("string"), cdc_schema).alias("data")
).select("data.payload.*")

# Extract current state (only inserts and updates)
current_df = parsed_df \
    .filter(col("op").isin("c", "u", "r")) \
    .select(
        col("after.id").alias("order_id"),
        col("after.customer_id").alias("customer_id"),
        col("after.total").alias("total"),
        col("after.status").alias("status"),
        to_timestamp(col("ts_ms") / 1000).alias("event_time")
    )

# Compute customer features with 30-day rolling window
def compute_customer_features(orders_df):
    """Compute real-time features for customer behavior."""

    # Time-based aggregations (30-day window)
    features_df = orders_df \
        .withWatermark("event_time", "1 hour") \
        .groupBy(
            col("customer_id"),
            window(col("event_time"), "30 days")
        ) \
        .agg(
            count("*").alias("order_count_30d"),
            sum("total").alias("total_spend_30d"),
            avg("total").alias("avg_order_value_30d"),
            stddev("total").alias("order_value_stddev_30d"),
            max("event_time").alias("last_order_time")
        )

    # Recency features
    features_df = features_df.withColumn(
        "days_since_last_order",
        datediff(current_timestamp(), col("last_order_time"))
    )

    # Derived features
    features_df = features_df.withColumn(
        "is_high_value",
        (col("total_spend_30d") > 1000).cast("int")
    ).withColumn(
        "is_frequent_buyer",
        (col("order_count_30d") > 10).cast("int")
    ).withColumn(
        "avg_days_between_orders",
        lit(30) / col("order_count_30d")
    )

    return features_df

# Compute features
customer_features = compute_customer_features(current_df)

# Write to console (for testing)
query = customer_features \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

Важные детали:

  1. Watermark: .withWatermark("event_time", "1 hour") позволяет обрабатывать late-arriving events (события, пришедшие с опозданием до 1 часа).

  2. Window aggregation: window(col("event_time"), "30 days") создает tumbling window на 30 дней. Каждое событие попадает ровно в одно окно.

  3. Derived features: Вычисляются после агрегации — используют результаты agg().

  4. outputMode(“update”): Для aggregations с watermark используйте update mode — выводит только обновленные строки.

Паттерн: Product Metrics Features

Аналогичный паттерн для вычисления product features (полезно для inventory management, dynamic pricing).

Use Case: Product Popularity Metrics

Features:

  • units_sold_7d: количество проданных единиц за 7 дней
  • revenue_7d: выручка за 7 дней
  • unique_customers_7d: уникальные покупатели
def compute_product_features(orders_df):
    """Compute real-time features for product metrics."""

    # Assuming orders_df has product_id and quantity fields
    features_df = orders_df \
        .withWatermark("event_time", "1 hour") \
        .groupBy(
            col("product_id"),
            window(col("event_time"), "7 days")
        ) \
        .agg(
            sum("quantity").alias("units_sold_7d"),
            sum(col("total")).alias("revenue_7d"),
            countDistinct("customer_id").alias("unique_customers_7d")
        )

    # Velocity metrics
    features_df = features_df.withColumn(
        "sales_per_day",
        col("units_sold_7d") / lit(7)
    )

    return features_df

Запись в Feature Store

Вычисленные features нужно сохранить для использования в ML моделях. Для этого используется feature store.

Зачем Feature Store?

Проблемы без feature store:

  • ML модель и feature pipeline используют разные версии features (training-serving skew)
  • Нет версионирования features (не можем воспроизвести predictions)
  • Нет lineage (непонятно, как feature был вычислен)

Feature Store решает:

  • Consistency: одинаковые features для training и inference
  • Versioning: можем откатиться к старой версии features
  • Lineage: знаем, из каких данных feature вычислен
  • Reusability: features переиспользуются между моделями

Feature Store Options

OptionTypeUse WhenProsCons
RedisKey-Value StoreSimple online featuresFast (μs latency), простой setupNo versioning, no lineage
DynamoDBNoSQLAWS environmentServerless, scalableAWS vendor lock-in
FeastOpen-Source Feature StoreNeed offline + onlineVersioning, lineage, open standardRequires setup, no PySpark connector
TectonEnterprise Feature StoreProduction ML at scaleFull feature platform, monitoringExpensive, vendor lock-in

Recommendation для курса: Используйте Redis для простоты. Упомяните Feast как “production option” для enterprise use cases.

Pattern: Write to Feature Store with foreachBatch

PySpark Structured Streaming не имеет прямого connector для feature stores. Используйте foreachBatch() для записи в external storage.

def write_to_feature_store(batch_df, batch_id):
    """
    Write features to Redis (online store) or Parquet (offline store).

    Args:
        batch_df: DataFrame with computed features
        batch_id: Spark batch identifier
    """
    import redis

    # Connect to Redis
    redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

    # Write each row to Redis
    for row in batch_df.collect():
        feature_key = f"customer:{row.customer_id}:features"

        # Store features as hash
        feature_data = {
            "order_count_30d": row.order_count_30d,
            "total_spend_30d": float(row.total_spend_30d),
            "avg_order_value_30d": float(row.avg_order_value_30d),
            "days_since_last_order": row.days_since_last_order,
            "is_high_value": row.is_high_value
        }

        redis_client.hset(feature_key, mapping=feature_data)

        # Set TTL (expire after 60 days)
        redis_client.expire(feature_key, 60 * 24 * 60 * 60)

    # Also write to Parquet for offline training
    batch_df.write \
        .mode("append") \
        .partitionBy("window") \
        .parquet("/data/features/customer_features")

# Apply foreachBatch sink
query = customer_features \
    .writeStream \
    .foreachBatch(write_to_feature_store) \
    .outputMode("update") \
    .start()

Важные детали:

  1. Redis hash structure: customer:123:features{order_count_30d: 5, total_spend_30d: 1234.56, ...}. Удобно для lookup по customer_id.

  2. TTL (expire): Автоматическое удаление старых features через 60 дней (избегаем unbounded storage growth).

  3. Dual write: Пишем в Redis (online store) для real-time inference и в Parquet (offline store) для batch training.

  4. foreachBatch vs foreach: foreachBatch получает весь micro-batch как DataFrame — можно использовать DataFrame API. foreach работает по одной строке — медленнее.

Production warning: .collect() загружает весь batch в память driver node. Для больших batches используйте batch_df.foreachPartition() для distributed writes.

Feature Store интеграция: Feast

Feast — open-source feature store, поддерживаемый Tecton и community.

Feast Architecture

Feature Store Dual-Write Pattern

Один feature computation → два outputs (online + offline)

Feature ComputationPySpark Streaming
foreachBatch
Online Store (Redis)
Rediscustomer:123:features
Low latency reads (ms)
Point lookups by key
Last N values per entity
Recent features only
Use case: Real-time inference
ML model serving
Batch export
Offline Store (Parquet)
Parquet FilesS3 / Data Lake
Batch reads for training
Historical point-in-time
Historical features
All history retained
Use case: Feature drift analysis
Model training
Dual-write pattern essential:

Один feature computation → два outputs. Online Store (Redis) для real-time inference с low latency. Offline Store (Parquet) для batch training с historical features. Dual-write обеспечивает consistency между training и inference.

Feast Integration Pattern

Feast нет официального PySpark connector. Используйте workaround:

  1. Streaming → Online Store: PySpark пишет в Redis через foreachBatch (как показано выше)
  2. Batch → Offline Store: PySpark пишет в Parquet, Feast читает через offline store config
  3. Feast Registry: Регистрируете feature views в Feast для metadata tracking
# feast_features.py (Feast feature definitions)
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float64, Int64
from datetime import timedelta

# Define customer entity
customer = Entity(
    name="customer",
    join_keys=["customer_id"]
)

# Define feature source (offline store)
customer_features_source = FileSource(
    path="/data/features/customer_features",
    timestamp_field="event_time"
)

# Define feature view
customer_features_fv = FeatureView(
    name="customer_features",
    entities=[customer],
    ttl=timedelta(days=60),
    schema=[
        Field(name="order_count_30d", dtype=Int64),
        Field(name="total_spend_30d", dtype=Float64),
        Field(name="avg_order_value_30d", dtype=Float64),
        Field(name="is_high_value", dtype=Int64)
    ],
    source=customer_features_source
)

Workflow:

  1. PySpark streaming вычисляет features → пишет в Redis + Parquet
  2. Feast читает metadata из feature registry
  3. Training: feast.get_historical_features() → читает из Parquet
  4. Inference: feast.get_online_features() → читает из Redis

Open Question из RESEARCH.md: Best pattern for PySpark → Feast integration unclear. No official connector. Recommendation: Use dual write (Redis + Parquet) + manual Feast registry.

Feature Freshness vs Latency трейдофф

Real-time features не всегда нужны. Иногда batch features достаточно (и дешевле).

Decision Framework

Use CaseFreshness RequiredLatency TargetApproach
Fraud detectionReal-time (seconds)менее 1sCDC → Streaming features
Product recommendationsNear real-time (minutes)менее 5minCDC → Micro-batch features
Customer segmentationDaily refresh1 dayBatch features
Churn predictionWeekly refresh7 daysBatch features

Costs comparison:

  • Real-time features: High compute (always running), complex infrastructure (Kafka + Spark), operational overhead (monitoring, alerting)
  • Batch features: Low compute (runs once per day), simple infrastructure (cron job), easier to debug

Hybrid approach:

Critical features (fraud score) → Real-time (CDC streaming)
Stable features (lifetime value) → Batch (daily ETL)

Production principle: Use real-time features only when freshness directly impacts business metrics. If model performance same with daily features → use batch.

Проверка знаний
Компания хочет перевести ВСЕ ML features на real-time вычисление из CDC потока. Почему это плохая идея и какой принцип определяет, какие features вычислять в real-time?
Ответ
Real-time features требуют постоянно работающей инфраструктуры (Kafka + Spark/Flink), мониторинга и operational overhead -- это значительно дороже batch features (cron job раз в сутки). Принцип: используйте real-time только когда свежесть данных напрямую влияет на бизнес-метрики. Пример: fraud_score (задержка = пропущенное мошенничество) -- real-time. customer_lifetime_value (не меняется значительно за сутки) -- batch. Hybrid approach оптимален: критичные features в real-time, стабильные в batch. Если модель показывает одинаковое качество с daily features -- batch дешевле и проще.

Мониторинг feature pipeline

Feature pipelines требуют специфичного мониторинга (не только Kafka lag).

Key Metrics

  1. Feature computation latency:

    histogram_quantile(0.99, rate(feature_computation_duration_seconds_bucket[5m]))

    Alert if p99 latency > 10 seconds (features too slow for real-time inference).

  2. Feature drift: Track distribution of feature values over time. Alert if mean/stddev shifts significantly.

    # Log feature statistics per batch
    batch_df.select(
        mean("total_spend_30d").alias("mean_spend"),
        stddev("total_spend_30d").alias("stddev_spend")
    ).show()
  3. Missing features: Count of customers with null features (data quality issue).

    SELECT COUNT(*) FROM features WHERE total_spend_30d IS NULL
  4. Feature store write failures: Track errors in foreachBatch sink.

    try:
        redis_client.hset(feature_key, mapping=feature_data)
    except Exception as e:
        write_errors_counter.inc()
        logger.error(f"Failed to write features: {e}")

Лабораторная работа: Customer Features Pipeline

Цель: Построить end-to-end feature pipeline от CDC событий до feature store.

Задание

  1. Setup: Убедитесь, что Debezium захватывает CDC из таблицы orders

  2. Compute features:

    • Реализуйте compute_customer_features() для вычисления:
      • order_count_30d
      • total_spend_30d
      • avg_order_value_30d
    • Используйте 30-day tumbling window
    • Добавьте watermark 1 hour
  3. Test with console sink:

    query = customer_features \
        .writeStream \
        .outputMode("update") \
        .format("console") \
        .start()
  4. Generate CDC events: Вставьте несколько заказов в PostgreSQL:

    INSERT INTO orders (customer_id, total, status, created_at)
    VALUES
        (1, 100.00, 'completed', NOW()),
        (1, 200.00, 'completed', NOW()),
        (2, 50.00, 'pending', NOW());
  5. Verify features: Проверьте, что в console output появились:

    • customer_id=1: order_count_30d=2, total_spend_30d=300.00, avg_order_value_30d=150.00
    • customer_id=2: order_count_30d=1, total_spend_30d=50.00, avg_order_value_30d=50.00
  6. (Bonus) Implement foreachBatch sink:

    • Запишите features в Redis (если Redis доступен)
    • Или симулируйте запись логированием: print(f"Writing features for customer {row.customer_id}")

Ожидаемый результат

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------------------------------+-------------------------------+----------------+----------------+-----------------------+
|window                                     |customer_id                    |order_count_30d |total_spend_30d |avg_order_value_30d    |
+-------------------------------------------+-------------------------------+----------------+----------------+-----------------------+
|{2026-01-02 00:00:00, 2026-02-01 00:00:00}|1                              |2               |300.00          |150.00                 |
|{2026-01-02 00:00:00, 2026-02-01 00:00:00}|2                              |1               |50.00           |50.00                  |
+-------------------------------------------+-------------------------------+----------------+----------------+-----------------------+

Что дальше?

Вы изучили, как строить real-time feature pipelines из CDC событий с PySpark. Вы знаете:

  • Типы ML features (point-in-time, aggregated, behavioral, derived)
  • Как вычислять customer behavior features с window aggregations
  • Как писать features в feature store с foreachBatch()
  • Когда использовать real-time features vs batch features

Следующие шаги:

  1. Feature versioning: Как версионировать features и избегать training-serving skew
  2. Feature monitoring: Практики drift detection и data quality checks
  3. Advanced patterns: Stream-stream joins для cross-entity features
  4. Production deployment: Как деплоить feature pipelines с monitoring и alerting

Это завершает Module 5 (Data Engineering with Python). Вы прошли путь от простого Python consumer до production-grade feature pipelines с CDC streams.

Ключевые выводы

  1. Real-time features критичны для ML use cases, где свежесть данных влияет на business metrics (fraud, recommendations)
  2. CDC события — отличный источник для features: полная история изменений, low latency
  3. Типы features: point-in-time (текущее состояние), aggregated (rolling windows), behavioral (паттерны), derived (комбинации)
  4. PySpark window aggregations с watermark — стандартный способ вычисления time-based features
  5. Feature store обеспечивает consistency между training и inference (Redis для online, Parquet для offline)
  6. foreachBatch pattern для записи PySpark features в external stores (Redis, DynamoDB)
  7. Feast — open-source feature store, но нет прямого PySpark connector (используйте dual write)
  8. Freshness vs Cost: Real-time features дороже batch features — используйте, когда свежесть критична
  9. Monitoring: Отслеживайте latency, drift, missing features, write failures
  10. Watermark critical: Без watermark late-arriving events будут потеряны

Check Your Understanding

Score: 0 of 0
Conceptual
Question 1 of 5. Почему batch-features (обновляемые раз в сутки) недостаточны для системы fraud detection?

Finished the lesson?

Mark it as complete to track your progress