Анализ каждого запроса
Прежде чем писать код, проанализируем каждый из 4 запросов отдельно.
Запрос 1: уникальные пользователи в окне
«Сколько уникальных user_id кликали за последние T секунд?»
Варианты:
- Set всех user_id в окне. O(1) на проверку, O(N_unique) память. Но нужно поддерживать
setактуальным — удалять user_id, чьи последние события устарели. - dict[user_id] -> last_ts. Аналогично set, но с timestamp последнего события. Удобнее для cleanup: «если last_seen[u] < cutoff — удалить».
- HyperLogLog. Approximate cardinality с памятью O(log log N). Точность ~2%. Хорошо для очень больших данных.
Выбираем dict[user_id] -> last_ts. Точно, простая логика cleanup. Память ~10-50 МБ для 100k uniques.
Запрос 2: top-K активных пользователей
«Какие 100 пользователей самые активные?»
Варианты:
- Counter[user_id] + nlargest. O(1) на инкремент, O(N_unique * log K) на top-K. Стандарт.
- dict + SortedList. O(log N) на update, O(K) на top-K. Лучше, если top-K делается часто.
- min-heap размера K в потоке. O(log K) на event, O(K) на top-K. Но не учитывает декремент при cleanup.
Выбираем Counter + nlargest для простоты. Top-K вызывается «иногда» (раз в секунду максимум), не на каждое событие — accuracy достижима за O(N log K) при N=100k = ~700k операций, миллисекунды.
Запрос 3: top-K URLs
Аналогично Запросу 2, только ключ — URL. Та же структура: Counter + nlargest.
Запрос 4: clicks per minute (sliding window count)
«Сколько кликов за последнюю минуту?»
Варианты:
- deque(timestamps). Append на каждое событие O(1). При запросе — popleft устаревших, len(deque). Точно, но требует cleanup на каждый запрос или периодически.
- Bucketed counter. Массив из 60 ячеек, каждая — счёт за конкретную секунду. O(1) на update, O(1) на count (sum of 60). Менее точно (округление до секунды), но дёшево.
Выбираем bucketed counter для clicks_per_minute — нам не нужны индивидуальные timestamps, нужна только агрегация. Память 60*8 байт = 480 байт. Идеально.
Каждая структура под свою задачу. Память умножается на каждое событие, но через индексы.
API
Опишем интерфейс класса перед реализацией:
class ClickPipeline:
def __init__(self, window_seconds=86400):
"""Создать pipeline с указанным окном (по умолчанию 24 часа)."""
def ingest(self, user_id: str, url: str, ts: float = None) -> None:
"""Принять одно событие. Обновить все индексы. O(1) на событие."""
def cleanup(self, now: float = None) -> int:
"""Удалить устаревшие данные из всех индексов. Возвращает число удалённых событий."""
def unique_users_count(self) -> int:
"""Количество уникальных пользователей в окне. O(1)."""
def top_k_users(self, k: int) -> list[tuple[str, int]]:
"""Top-K пользователей по числу событий. O(N_users * log K)."""
def top_k_urls(self, k: int) -> list[tuple[str, int]]:
"""Top-K URL по числу кликов. O(N_urls * log K)."""
def clicks_per_minute(self, now: float = None) -> int:
"""Количество кликов за последние 60 секунд. O(1)."""
API явный, без скрытых обещаний. Каждый метод имеет документированную сложность.
Cleanup strategy
Главный вопрос: когда и как чистить устаревшие данные?
Стратегия 1: cleanup-on-write. При каждом ingest очищаем устаревшие.
- Плюс: всегда актуальное состояние.
- Минус: удваивает работу на каждый event. На 280 ev/s это не критично, но при 100k ev/s — заметно.
Стратегия 2: cleanup-on-read. При каждом запросе чистим, потом отвечаем.
- Плюс: пишем дёшево.
- Минус: при редких запросах состояние сильно устаревает; первый запрос за час делает O(n) cleanup.
Стратегия 3: periodic cleanup. Раз в минуту фоновый поток вызывает cleanup().
- Плюс: write и read дёшевы. Чистка amortизированно O(1) на событие.
- Минус: max stale lag = 1 минута.
Выбираем стратегию 3. Для real-time analytics 1 минута stale acceptable. Для критичных метрик (rate limit) — стратегия 1.
Тонкости
Что хранить в events deque. Просто (ts, user_id, url). Tuple, не dict — экономия памяти 3x.
Bucketed counter — реализация. Массив длины 60. Индекс = int(ts) % 60. При записи в bucket важно сначала очистить, если последнее обновление этого bucket было больше 60 секунд назад — тогда там устаревшие данные. Это решается через last_update[60].
Thread safety. В нашем pipeline нет — это single-threaded демонстрация. В продакшене нужен threading.Lock или, лучше, акторная модель (один поток на pipeline, очередь на ingest).
Counter с нулями. При cleanup декрементируем user_counts[u]. Когда user_counts[u] == 0, удалить ключ из Counter — иначе будут «фантомные» ключи с count=0, занимающие память.
self.user_counts[u] -= 1
if self.user_counts[u] == 0:
del self.user_counts[u]
Это типовой паттерн для maintained counters.
Что НЕ оптимизируем
В этом capstone мы не делаем:
- Persistence. Состояние теряется при рестарте. В продакшене — checkpoint в Redis/SQLite каждые N секунд.
- Distribution. Один процесс, одна машина. В продакшене — Kafka + множество consumer-ов с partition-based sharding.
- Approximation. Точные данные. В продакшене для очень больших cardinalities — HyperLogLog, Count-Min Sketch.
Эти темы — продвинутый курс. Здесь мы сосредотачиваемся на правильном выборе базовых структур.
Замеры памяти на бумаге
Прикинем итоговую память:
last_seendict: 100k entries * ~100 байт = 10 МБ.user_countsCounter: то же, 10 МБ.url_countsCounter: 50k * 150 байт = 7.5 МБ.eventsdeque: 24M tuples * 100 байт = 2.4 ГБ.clicks_buckets: 60 * 8 байт = 480 байт.
Итого: ~2.5 ГБ. В основном это events deque. Можем сэкономить, если хранить там только тимштамп (для cleanup), а не полные события:
eventsdeque: 24M * 24 байта (tuple с одним float) = 576 МБ.
Это уже укладывается в 1 ГБ. С учётом Python overhead — около 2 ГБ. Цель «уложиться в 8 ГБ» достигнута с запасом.
В реальности 24M tuples в Python — это всё равно ощутимо: pymalloc fragmentation, GC overhead. На production такие объёмы обычно держат в Redis/Memcache или в специализированных структурах (Numpy array для timestamps). Для capstone это приемлемо.
Альтернатива: minimal state
Если совсем хочется экономить память, можно отказаться от events deque вовсе и хранить только агрегаты + dict[user_id] -> last_ts. Cleanup тогда основан только на last_ts:
- Для user_counts: при cleanup перебрать last_seen, для u с
last_ts < cutoff— удалить из user_counts и last_seen. - Для url_counts: проблема — мы потеряли информацию, какие URLs кликнул этот user. Нужен либо отдельный индекс user -> urls (память), либо не делать cleanup для url_counts (накопление).
Trade-off: меньше памяти, но cleanup сложнее или менее точный. Для упрощения держим events deque.
Попробуй сам
Не запуская код, прикиньте:
- Если бы у нас было 10 млн событий в час (вместо 1 млн), какие структуры стали бы критическими по памяти?
- Если бы запросы делались 10 000 раз в секунду, какую стратегию cleanup вы бы выбрали?
- Если уникальных user_id миллион, поместится ли Counter в RAM (8 ГБ доступно)?
Эти размышления — половина работы DE. Реальные системы строятся через такие прикидки до написания кода.
Heap file и индексы PostgreSQL: несколько индексов по одной таблице