Постановка задачи
Вы пришли в стартап, который запускает analytics-as-a-service для веб-сайтов. Сервис должен принимать поток кликов (clickstream) от клиентов и в реальном времени отвечать на запросы:
- сколько уникальных пользователей кликали за последний час;
- какие 100 пользователей самые активные за последние 24 часа;
- сколько раз каждый URL был кликнут (top-1000 URLs);
- скользящий count кликов в минуту.
Целевые показатели:
- Нагрузка: 1 миллион событий в час (~280 событий в секунду).
- Latency запросов: меньше 100 мс на любой запрос.
- Память: укладываться в 8 ГБ.
- Полная история не нужна, только текущее окно (24 часа max).
Это типичные требования для real-time dashboard или продукта вроде Google Analytics, Mixpanel, Heap.
Что мы построим
In-memory pipeline на Python. Архитектура:
- Ingestion — поток событий через генератор (имитация Kafka consumer).
- In-memory state — несколько структур данных под разные запросы.
- Query API — функции для ответа на 4 типа запросов.
- Cleanup — фоновая задача для очистки старых данных.
Это не production-ready решение (без durability, без distribution), но это правильное решение для понимания DSA в реальной задаче.
Каждый класс запроса — своя структура данных. Данные общие, индексы разные.
Размышление перед кодом
Прежде чем писать, прикинем размеры.
Сколько событий в окне. 1 млн в час, окно 24 часа = 24 миллиона событий. Если хранить каждое — 24M записей в RAM. При 200 байт на запись — 4.8 ГБ. Влезаем в 8 ГБ.
Сколько уникальных user_id. Зависит от аудитории. Допустим 100 тысяч активных пользователей в день. Counter[user_id] — это 100k записей по ~80 байт ~ 8 МБ. Тривиально.
Сколько уникальных URL. Зависит от сайта. Допустим 50 тысяч. Counter[url] — 50k * ~100 байт (URL + count) = 5 МБ. Тривиально.
Главная проблема — 24 миллиона событий. Если хранить их как dict — это сотни МБ. Как tuple — десятки МБ. Лучше — не хранить вообще, только агрегаты.
Но для unique users в скользящем окне нам нужны event_id, привязанные к timestamp. Хитрость: храним только два индекса:
last_seen[user_id] -> timestamp— словарь «когда видели в последний раз». Проверка unique = просто len(last_seen). При cleanup — удалить из last_seen старше cutoff.events_timeline— deque (timestamp, user_id). Для cleanup: вычёркиваем старые из last_seen.
Это снижает память с 24M записей до 100k unique users + deque timestamps.
Спецификация структур
from collections import deque, Counter, defaultdict
class ClickPipeline:
def __init__(self, window_seconds=86400):
self.window = window_seconds
# для unique users count
self.last_seen = {} # user_id -> last_ts
# для top-K активных
self.user_counts = Counter() # user_id -> num_clicks
# для top-1000 URLs
self.url_counts = Counter() # url -> num_clicks
# для cleanup и sliding window count
self.events = deque() # (ts, user_id, url)
Каждое событие обновляет ВСЕ индексы. Каждый запрос читает ОДИН индекс. Это классический pattern «несколько индексов по одним данным» — как в реляционной БД.
Запросы
Реализуем 4 функции:
unique_users_in_window()— len(last_seen) с фильтрацией.top_k_users(k=100)— heapq.nlargest на user_counts.top_k_urls(k=1000)— heapq.nlargest на url_counts.clicks_per_minute(now)— sliding window count.
Каждый запрос должен работать за O(k log k) или O(log n + k) — миллисекунды на 100k записей.
Cleanup
Главная сложность — поддерживать актуальность данных. Без cleanup всё растёт неограниченно.
Cleanup алгоритм: периодически (раз в минуту):
- Вычислить cutoff = now - window.
- Из
eventsпоппнуть всё сts < cutoff. - Для каждого вытолкнутого: декрементировать user_counts, url_counts. Если count стал 0 — удалить ключ из Counter.
- Из
last_seenудалить user_id, чейlast_ts < cutoff.
Это O(removed_events). На потоке 280 ev/s за минуту это ~16k событий — миллисекунды.
Cleanup-on-write vs cleanup-on-read vs periodic cleanup — три стратегии. On-write проще: при каждом event сразу очищаем устаревшие. Но это удваивает работу на event. Periodic (раз в минуту) — лучший компромисс при высоком write rate.
Что измерим
После реализации замерим:
- Throughput: сколько событий в секунду может обработать pipeline.
- Latency: время выполнения каждого типа запроса.
- Memory: реальное потребление RAM через
tracemalloc. - Сравнение наивной vs оптимизированной: одна и та же логика, но первая использует list+sort везде, вторая — правильные структуры.
Цель — увидеть на конкретных числах, что выбор структуры даёт 100x-1000x ускорение.
План работ
Следующие два урока — реализация:
- Урок 2: выбор структур, обоснование, дизайн API.
- Урок 3: код, benchmarks, сравнение версий.
- Урок 4: reflection, что узнали, куда дальше.
Это не самостоятельная домашка с пустого листа — мы пройдём её вместе. Цель — увидеть полную картину: от требований до измерений.
Попробуй сам
Перед тем как читать следующий урок, попробуйте мысленно ответить:
- Если бы вам поручили эту задачу, какие три критических ошибки джуны делают, и как избежать?
- Что бы вы выбрали для top-1000 URLs: sorted(url_counts.items())[:1000] или heapq.nlargest(1000, url_counts.items(), key=lambda x: x[1])? Почему?
- Какую структуру вы бы взяли для tracking unique users за 24 часа?
Подумайте 5-10 минут. Потом переходите ко второму уроку capstone — сравним ваш план с разобранным.
pandas: DataFrame как in-memory batch-processing движок