Learning Platform
Глоссарий Troubleshooting
Урок 20.01 · 25 мин
Начальный
capstonedesignclickstreamdata-pipelinerequirements

Постановка задачи

Вы пришли в стартап, который запускает analytics-as-a-service для веб-сайтов. Сервис должен принимать поток кликов (clickstream) от клиентов и в реальном времени отвечать на запросы:

  • сколько уникальных пользователей кликали за последний час;
  • какие 100 пользователей самые активные за последние 24 часа;
  • сколько раз каждый URL был кликнут (top-1000 URLs);
  • скользящий count кликов в минуту.

Целевые показатели:

  • Нагрузка: 1 миллион событий в час (~280 событий в секунду).
  • Latency запросов: меньше 100 мс на любой запрос.
  • Память: укладываться в 8 ГБ.
  • Полная история не нужна, только текущее окно (24 часа max).

Это типичные требования для real-time dashboard или продукта вроде Google Analytics, Mixpanel, Heap.

Что мы построим

In-memory pipeline на Python. Архитектура:

  1. Ingestion — поток событий через генератор (имитация Kafka consumer).
  2. In-memory state — несколько структур данных под разные запросы.
  3. Query API — функции для ответа на 4 типа запросов.
  4. Cleanup — фоновая задача для очистки старых данных.

Это не production-ready решение (без durability, без distribution), но это правильное решение для понимания DSA в реальной задаче.

Архитектура pipeline

Каждый класс запроса — своя структура данных. Данные общие, индексы разные.

ingestionevent streamгенератор событий (user_id, url, timestamp)
state updateобновляем все 4 структурыкаждое событие распределяется по индексам
index 1set unique usersдля unique count за окно
index 2Counter user_idдля top-K активных
index 3Counter urlдля top-1000 URLs
index 4deque timestampsдля sliding window count
cleanupпериодическийудалить события старше 24 часов из всех индексов
query API4 функцииотвечают за миллисекунды из in-memory индексов

Размышление перед кодом

Прежде чем писать, прикинем размеры.

Сколько событий в окне. 1 млн в час, окно 24 часа = 24 миллиона событий. Если хранить каждое — 24M записей в RAM. При 200 байт на запись — 4.8 ГБ. Влезаем в 8 ГБ.

Сколько уникальных user_id. Зависит от аудитории. Допустим 100 тысяч активных пользователей в день. Counter[user_id] — это 100k записей по ~80 байт ~ 8 МБ. Тривиально.

Сколько уникальных URL. Зависит от сайта. Допустим 50 тысяч. Counter[url] — 50k * ~100 байт (URL + count) = 5 МБ. Тривиально.

Главная проблема — 24 миллиона событий. Если хранить их как dict — это сотни МБ. Как tuple — десятки МБ. Лучше — не хранить вообще, только агрегаты.

Но для unique users в скользящем окне нам нужны event_id, привязанные к timestamp. Хитрость: храним только два индекса:

  1. last_seen[user_id] -> timestamp — словарь «когда видели в последний раз». Проверка unique = просто len(last_seen). При cleanup — удалить из last_seen старше cutoff.
  2. events_timeline — deque (timestamp, user_id). Для cleanup: вычёркиваем старые из last_seen.

Это снижает память с 24M записей до 100k unique users + deque timestamps.

Спецификация структур

from collections import deque, Counter, defaultdict

class ClickPipeline:
    def __init__(self, window_seconds=86400):
        self.window = window_seconds

        # для unique users count
        self.last_seen = {}                # user_id -> last_ts

        # для top-K активных
        self.user_counts = Counter()       # user_id -> num_clicks

        # для top-1000 URLs
        self.url_counts = Counter()        # url -> num_clicks

        # для cleanup и sliding window count
        self.events = deque()              # (ts, user_id, url)

Каждое событие обновляет ВСЕ индексы. Каждый запрос читает ОДИН индекс. Это классический pattern «несколько индексов по одним данным» — как в реляционной БД.

Запросы

Реализуем 4 функции:

  1. unique_users_in_window() — len(last_seen) с фильтрацией.
  2. top_k_users(k=100) — heapq.nlargest на user_counts.
  3. top_k_urls(k=1000) — heapq.nlargest на url_counts.
  4. clicks_per_minute(now) — sliding window count.

Каждый запрос должен работать за O(k log k) или O(log n + k) — миллисекунды на 100k записей.

Cleanup

Главная сложность — поддерживать актуальность данных. Без cleanup всё растёт неограниченно.

Cleanup алгоритм: периодически (раз в минуту):

  1. Вычислить cutoff = now - window.
  2. Из events поппнуть всё с ts < cutoff.
  3. Для каждого вытолкнутого: декрементировать user_counts, url_counts. Если count стал 0 — удалить ключ из Counter.
  4. Из last_seen удалить user_id, чей last_ts < cutoff.

Это O(removed_events). На потоке 280 ev/s за минуту это ~16k событий — миллисекунды.

TIP

Cleanup-on-write vs cleanup-on-read vs periodic cleanup — три стратегии. On-write проще: при каждом event сразу очищаем устаревшие. Но это удваивает работу на event. Periodic (раз в минуту) — лучший компромисс при высоком write rate.

Что измерим

После реализации замерим:

  • Throughput: сколько событий в секунду может обработать pipeline.
  • Latency: время выполнения каждого типа запроса.
  • Memory: реальное потребление RAM через tracemalloc.
  • Сравнение наивной vs оптимизированной: одна и та же логика, но первая использует list+sort везде, вторая — правильные структуры.

Цель — увидеть на конкретных числах, что выбор структуры даёт 100x-1000x ускорение.

План работ

Следующие два урока — реализация:

  • Урок 2: выбор структур, обоснование, дизайн API.
  • Урок 3: код, benchmarks, сравнение версий.
  • Урок 4: reflection, что узнали, куда дальше.

Это не самостоятельная домашка с пустого листа — мы пройдём её вместе. Цель — увидеть полную картину: от требований до измерений.

Попробуй сам

Перед тем как читать следующий урок, попробуйте мысленно ответить:

  1. Если бы вам поручили эту задачу, какие три критических ошибки джуны делают, и как избежать?
  2. Что бы вы выбрали для top-1000 URLs: sorted(url_counts.items())[:1000] или heapq.nlargest(1000, url_counts.items(), key=lambda x: x[1])? Почему?
  3. Какую структуру вы бы взяли для tracking unique users за 24 часа?

Подумайте 5-10 минут. Потом переходите ко второму уроку capstone — сравним ваш план с разобранным.

pandas: DataFrame как in-memory batch-processing движок
Проверка знанийKnowledge check
Какая самая распространённая ошибка джунов при дизайне real-time event pipeline, и как это связано с тем, что мы прошли в модуле 18?
ОтветAnswer
Самая частая ошибка — хранить все события в RAM как list (например, list of dicts), а на каждый запрос делать линейный проход по всему списку с фильтрацией и сортировкой. Это O(n) на каждый запрос плюс O(n) памяти на полное хранение. При 24M событий и 100 запросах в секунду это 2.4 миллиарда операций — серверный таймаут. Связь с модулем 17: правильный подход — НЕ хранить события целиком, а агрегировать в специальные индексы под каждый класс запросов. Counter для частот (O(1) update, O(K) на top-K через nlargest). deque для sliding count (O(1) update, O(1) на count). dict для last_seen (O(1) update, O(N_unique) на count). Каждая структура решает свой класс запросов. Память O(unique_users + active_events), не O(all_events). Это разница между 'сервис работает' и 'OOM каждые 10 минут'.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Для real-time pipeline с 1M событий в час, какая САМАЯ распространённая ошибка джунов?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4