Реализация
Полный код pipeline. Сначала наивная версия (для сравнения), потом оптимизированная.
Наивная версия (как могут написать с нуля)
import time
class NaivePipeline:
"""Простое хранение всех событий + линейные проходы для запросов."""
def __init__(self, window_seconds=86400):
self.window = window_seconds
self.events = [] # list всех событий
def ingest(self, user_id, url, ts=None):
if ts is None:
ts = time.time()
self.events.append((ts, user_id, url))
def cleanup(self, now=None):
if now is None:
now = time.time()
cutoff = now - self.window
# сохраняем только свежие — O(n)
self.events = [e for e in self.events if e[0] >= cutoff]
def unique_users_count(self):
return len({e[1] for e in self.events}) # O(n)
def top_k_users(self, k):
from collections import Counter
c = Counter(e[1] for e in self.events) # O(n) каждый раз
return c.most_common(k)
def top_k_urls(self, k):
from collections import Counter
c = Counter(e[2] for e in self.events)
return c.most_common(k)
def clicks_per_minute(self, now=None):
if now is None:
now = time.time()
cutoff = now - 60
return sum(1 for e in self.events if e[0] >= cutoff) # O(n)
Каждый запрос — линейный проход по всем событиям. На миллионах записей это смерть. Но работает корректно — это полезно для сравнения.
Оптимизированная версия
import time
from collections import Counter, deque
class OptimizedPipeline:
"""Несколько индексов под разные запросы."""
def __init__(self, window_seconds=86400):
self.window = window_seconds
# индексы
self.last_seen = {} # user_id -> last_ts
self.user_counts = Counter() # user_id -> count
self.url_counts = Counter() # url -> count
self.events = deque() # (ts, user_id, url) для cleanup
self.minute_buckets = [0] * 60 # для clicks_per_minute
self.bucket_last_update = [0.0] * 60
def ingest(self, user_id, url, ts=None):
if ts is None:
ts = time.time()
# обновляем все индексы
self.last_seen[user_id] = ts
self.user_counts[user_id] += 1
self.url_counts[url] += 1
self.events.append((ts, user_id, url))
# bucketed counter
b = int(ts) % 60
if ts - self.bucket_last_update[b] >= 60:
# bucket устарел, обнуляем
self.minute_buckets[b] = 0
self.minute_buckets[b] += 1
self.bucket_last_update[b] = ts
def cleanup(self, now=None):
if now is None:
now = time.time()
cutoff = now - self.window
removed = 0
while self.events and self.events[0][0] < cutoff:
ts, user_id, url = self.events.popleft()
self.user_counts[user_id] -= 1
if self.user_counts[user_id] == 0:
del self.user_counts[user_id]
self.url_counts[url] -= 1
if self.url_counts[url] == 0:
del self.url_counts[url]
# last_seen: только если данное событие было последним
if self.last_seen.get(user_id) == ts:
del self.last_seen[user_id]
removed += 1
return removed
def unique_users_count(self):
return len(self.last_seen)
def top_k_users(self, k):
return self.user_counts.most_common(k)
def top_k_urls(self, k):
return self.url_counts.most_common(k)
def clicks_per_minute(self, now=None):
if now is None:
now = time.time()
cutoff_ts = now - 60
total = 0
for b in range(60):
if self.bucket_last_update[b] >= cutoff_ts:
total += self.minute_buckets[b]
return total
Каждый запрос — O(1) или O(N_unique * log k). На сотнях тысяч uniques — миллисекунды.
Benchmarks
Загрузим pipeline миллионом событий и замерим:
import time
import random
import string
import sys
import tracemalloc
def random_user():
return ''.join(random.choices(string.ascii_lowercase, k=8))
def random_url():
return f"/page/{random.randint(1, 1000)}"
def benchmark(pipeline_class, n_events, n_queries):
pipe = pipeline_class()
# генерируем фиксированные user/url, чтобы создать реалистичный распределение
users = [random_user() for _ in range(1000)]
urls = [random_url() for _ in range(500)]
tracemalloc.start()
# ingest
t0 = time.time()
for i in range(n_events):
pipe.ingest(random.choice(users), random.choice(urls))
t_ingest = time.time() - t0
snapshot = tracemalloc.take_snapshot()
mem = sum(stat.size for stat in snapshot.statistics('filename')) / 1024 / 1024
tracemalloc.stop()
# queries
t0 = time.time()
for _ in range(n_queries):
_ = pipe.unique_users_count()
_ = pipe.top_k_users(100)
_ = pipe.top_k_urls(1000)
_ = pipe.clicks_per_minute()
t_queries = time.time() - t0
return {
"ingest": t_ingest,
"queries": t_queries,
"memory_mb": mem,
}
print("naive (n=10_000, queries=10):")
print(benchmark(NaivePipeline, 10_000, 10))
print("\noptimized (n=10_000, queries=10):")
print(benchmark(OptimizedPipeline, 10_000, 10))
print("\noptimized (n=1_000_000, queries=100):")
print(benchmark(OptimizedPipeline, 1_000_000, 100))
Ожидаемые результаты
Типичные числа (зависят от машины):
naive (n=10_000, queries=10):
ingest: 0.01 s
queries: 3.5 s
memory: 8 MB
optimized (n=10_000, queries=10):
ingest: 0.03 s
queries: 0.002 s
memory: 4 MB
optimized (n=1_000_000, queries=100):
ingest: 3.0 s
queries: 0.5 s
memory: 300 MB
Анализ:
- На 10k events наивная версия делает 10 запросов за 3.5 секунды. Каждый запрос — линейный проход. Это 350 мс/запрос — недопустимо для real-time.
- Оптимизированная версия отвечает на те же 10 запросов за 2 миллисекунды — в 1750 раз быстрее.
- На миллионе событий — 100 запросов за 500 мс, т.е. 5 мс/запрос. SLA 100 мс выполняется с запасом.
- Память масштабируется линейно от числа событий (через
eventsdeque), но Counter и last_seen — только от уникальных users/urls.
Разбор: где экономия
unique_users_count. Наивная: построить set из всех событий O(n). Оптимизированная: len(self.last_seen) O(1). Это log(n)/1 раз быстрее на каждом запросе.
top_k_users. Наивная: Counter с нуля на каждый запрос O(n). Оптимизированная: Counter уже построен, most_common(k) O(N_unique * log k). При 1000 уникальных users на 1M событий это 1000x ускорение.
clicks_per_minute. Наивная: filter и count O(n). Оптимизированная: sum 60 buckets O(1). 1M / 60 = ~16000x ускорение.
Profiling
Куда уходит время в optimized.ingest? Запустим cProfile:
import cProfile
import pstats
pr = cProfile.Profile()
pr.enable()
pipe = OptimizedPipeline()
for _ in range(100_000):
pipe.ingest(random.choice(users), random.choice(urls))
pr.disable()
stats = pstats.Stats(pr).sort_stats('cumulative')
stats.print_stats(10)
Топ-функции обычно:
_lru_cache_wrapperилиCounter.__setitem__(счётчики)random.choice(это benchmark overhead)dict.__setitem__(last_seen)time.time(если без передачи ts)
Реальная работа ingest распределяется между Counter и dict, оба O(1).
Возможные оптимизации (если бы было нужно)
- Парсинг ts. Вместо time.time() (~50 нс на вызов) принимать ts от вызывающего — он уже его знает.
- Batch ingest. Принимать список событий за раз, обновлять Counter одним update() (быстрее, чем N инкрементов).
- Cython/Rust extensions. Если bottleneck в Python — переписать критические участки.
- Sharding. Несколько pipeline по hash(user_id), параллельная обработка.
Но 5 мс/запрос на 1M событий — это уже отличный результат. Дальше оптимизировать не нужно — bottleneck окажется в network/disk.
Главный урок: не оптимизируйте преждевременно. Сначала правильно выберите структуры данных — это даёт 100x-1000x. Потом, если нужно, профилируйте и точечно оптимизируйте. Микро-оптимизация на неправильной структуре — это полировка кода, который и так медленный.
Попробуй сам
Скопируйте обе версии (Naive и Optimized) и запустите benchmark на своей машине. Поиграйте с параметрами:
- n_events = 100, 1k, 10k, 100k, 1M — посмотрите, как ingest и queries масштабируются.
- n_queries = 10, 100, 1000 — увидите, как страдает naive при увеличении числа запросов.
- Измените window на 60 секунд (sliding window только 1 минута) — посмотрите, как cleanup освобождает память.
Это упражнение даст вам интуицию: сколько событий поместится на вашем железе, сколько запросов в секунду оно выдержит. Это «sense for scale» — главный навык DE.
cProfile и pstats: профилировщик Python в production