Learning Platform
Глоссарий Troubleshooting
Урок 20.03 · 35 мин
Начальный
implementationpythonbenchmarkstimeittracemallocoptimization

Реализация

Полный код pipeline. Сначала наивная версия (для сравнения), потом оптимизированная.

Наивная версия (как могут написать с нуля)

import time

class NaivePipeline:
    """Простое хранение всех событий + линейные проходы для запросов."""

    def __init__(self, window_seconds=86400):
        self.window = window_seconds
        self.events = []                # list всех событий

    def ingest(self, user_id, url, ts=None):
        if ts is None:
            ts = time.time()
        self.events.append((ts, user_id, url))

    def cleanup(self, now=None):
        if now is None:
            now = time.time()
        cutoff = now - self.window
        # сохраняем только свежие — O(n)
        self.events = [e for e in self.events if e[0] >= cutoff]

    def unique_users_count(self):
        return len({e[1] for e in self.events})         # O(n)

    def top_k_users(self, k):
        from collections import Counter
        c = Counter(e[1] for e in self.events)          # O(n) каждый раз
        return c.most_common(k)

    def top_k_urls(self, k):
        from collections import Counter
        c = Counter(e[2] for e in self.events)
        return c.most_common(k)

    def clicks_per_minute(self, now=None):
        if now is None:
            now = time.time()
        cutoff = now - 60
        return sum(1 for e in self.events if e[0] >= cutoff)   # O(n)

Каждый запрос — линейный проход по всем событиям. На миллионах записей это смерть. Но работает корректно — это полезно для сравнения.

Оптимизированная версия

import time
from collections import Counter, deque

class OptimizedPipeline:
    """Несколько индексов под разные запросы."""

    def __init__(self, window_seconds=86400):
        self.window = window_seconds

        # индексы
        self.last_seen = {}                 # user_id -> last_ts
        self.user_counts = Counter()        # user_id -> count
        self.url_counts = Counter()         # url -> count
        self.events = deque()               # (ts, user_id, url) для cleanup
        self.minute_buckets = [0] * 60      # для clicks_per_minute
        self.bucket_last_update = [0.0] * 60

    def ingest(self, user_id, url, ts=None):
        if ts is None:
            ts = time.time()

        # обновляем все индексы
        self.last_seen[user_id] = ts
        self.user_counts[user_id] += 1
        self.url_counts[url] += 1
        self.events.append((ts, user_id, url))

        # bucketed counter
        b = int(ts) % 60
        if ts - self.bucket_last_update[b] >= 60:
            # bucket устарел, обнуляем
            self.minute_buckets[b] = 0
        self.minute_buckets[b] += 1
        self.bucket_last_update[b] = ts

    def cleanup(self, now=None):
        if now is None:
            now = time.time()
        cutoff = now - self.window
        removed = 0
        while self.events and self.events[0][0] < cutoff:
            ts, user_id, url = self.events.popleft()
            self.user_counts[user_id] -= 1
            if self.user_counts[user_id] == 0:
                del self.user_counts[user_id]
            self.url_counts[url] -= 1
            if self.url_counts[url] == 0:
                del self.url_counts[url]
            # last_seen: только если данное событие было последним
            if self.last_seen.get(user_id) == ts:
                del self.last_seen[user_id]
            removed += 1
        return removed

    def unique_users_count(self):
        return len(self.last_seen)

    def top_k_users(self, k):
        return self.user_counts.most_common(k)

    def top_k_urls(self, k):
        return self.url_counts.most_common(k)

    def clicks_per_minute(self, now=None):
        if now is None:
            now = time.time()
        cutoff_ts = now - 60
        total = 0
        for b in range(60):
            if self.bucket_last_update[b] >= cutoff_ts:
                total += self.minute_buckets[b]
        return total

Каждый запрос — O(1) или O(N_unique * log k). На сотнях тысяч uniques — миллисекунды.

Benchmarks

Загрузим pipeline миллионом событий и замерим:

import time
import random
import string
import sys
import tracemalloc

def random_user():
    return ''.join(random.choices(string.ascii_lowercase, k=8))

def random_url():
    return f"/page/{random.randint(1, 1000)}"

def benchmark(pipeline_class, n_events, n_queries):
    pipe = pipeline_class()

    # генерируем фиксированные user/url, чтобы создать реалистичный распределение
    users = [random_user() for _ in range(1000)]
    urls = [random_url() for _ in range(500)]

    tracemalloc.start()

    # ingest
    t0 = time.time()
    for i in range(n_events):
        pipe.ingest(random.choice(users), random.choice(urls))
    t_ingest = time.time() - t0
    snapshot = tracemalloc.take_snapshot()
    mem = sum(stat.size for stat in snapshot.statistics('filename')) / 1024 / 1024
    tracemalloc.stop()

    # queries
    t0 = time.time()
    for _ in range(n_queries):
        _ = pipe.unique_users_count()
        _ = pipe.top_k_users(100)
        _ = pipe.top_k_urls(1000)
        _ = pipe.clicks_per_minute()
    t_queries = time.time() - t0

    return {
        "ingest": t_ingest,
        "queries": t_queries,
        "memory_mb": mem,
    }

print("naive (n=10_000, queries=10):")
print(benchmark(NaivePipeline, 10_000, 10))

print("\noptimized (n=10_000, queries=10):")
print(benchmark(OptimizedPipeline, 10_000, 10))

print("\noptimized (n=1_000_000, queries=100):")
print(benchmark(OptimizedPipeline, 1_000_000, 100))

Ожидаемые результаты

Типичные числа (зависят от машины):

naive (n=10_000, queries=10):
  ingest:    0.01 s
  queries:   3.5 s
  memory:    8 MB

optimized (n=10_000, queries=10):
  ingest:    0.03 s
  queries:   0.002 s
  memory:    4 MB

optimized (n=1_000_000, queries=100):
  ingest:    3.0 s
  queries:   0.5 s
  memory:    300 MB

Анализ:

  • На 10k events наивная версия делает 10 запросов за 3.5 секунды. Каждый запрос — линейный проход. Это 350 мс/запрос — недопустимо для real-time.
  • Оптимизированная версия отвечает на те же 10 запросов за 2 миллисекунды — в 1750 раз быстрее.
  • На миллионе событий — 100 запросов за 500 мс, т.е. 5 мс/запрос. SLA 100 мс выполняется с запасом.
  • Память масштабируется линейно от числа событий (через events deque), но Counter и last_seen — только от уникальных users/urls.

Разбор: где экономия

unique_users_count. Наивная: построить set из всех событий O(n). Оптимизированная: len(self.last_seen) O(1). Это log(n)/1 раз быстрее на каждом запросе.

top_k_users. Наивная: Counter с нуля на каждый запрос O(n). Оптимизированная: Counter уже построен, most_common(k) O(N_unique * log k). При 1000 уникальных users на 1M событий это 1000x ускорение.

clicks_per_minute. Наивная: filter и count O(n). Оптимизированная: sum 60 buckets O(1). 1M / 60 = ~16000x ускорение.

Profiling

Куда уходит время в optimized.ingest? Запустим cProfile:

import cProfile
import pstats

pr = cProfile.Profile()
pr.enable()

pipe = OptimizedPipeline()
for _ in range(100_000):
    pipe.ingest(random.choice(users), random.choice(urls))

pr.disable()
stats = pstats.Stats(pr).sort_stats('cumulative')
stats.print_stats(10)

Топ-функции обычно:

  1. _lru_cache_wrapper или Counter.__setitem__ (счётчики)
  2. random.choice (это benchmark overhead)
  3. dict.__setitem__ (last_seen)
  4. time.time (если без передачи ts)

Реальная работа ingest распределяется между Counter и dict, оба O(1).

Возможные оптимизации (если бы было нужно)

  • Парсинг ts. Вместо time.time() (~50 нс на вызов) принимать ts от вызывающего — он уже его знает.
  • Batch ingest. Принимать список событий за раз, обновлять Counter одним update() (быстрее, чем N инкрементов).
  • Cython/Rust extensions. Если bottleneck в Python — переписать критические участки.
  • Sharding. Несколько pipeline по hash(user_id), параллельная обработка.

Но 5 мс/запрос на 1M событий — это уже отличный результат. Дальше оптимизировать не нужно — bottleneck окажется в network/disk.

TIP

Главный урок: не оптимизируйте преждевременно. Сначала правильно выберите структуры данных — это даёт 100x-1000x. Потом, если нужно, профилируйте и точечно оптимизируйте. Микро-оптимизация на неправильной структуре — это полировка кода, который и так медленный.

Попробуй сам

Скопируйте обе версии (Naive и Optimized) и запустите benchmark на своей машине. Поиграйте с параметрами:

  1. n_events = 100, 1k, 10k, 100k, 1M — посмотрите, как ingest и queries масштабируются.
  2. n_queries = 10, 100, 1000 — увидите, как страдает naive при увеличении числа запросов.
  3. Измените window на 60 секунд (sliding window только 1 минута) — посмотрите, как cleanup освобождает память.

Это упражнение даст вам интуицию: сколько событий поместится на вашем железе, сколько запросов в секунду оно выдержит. Это «sense for scale» — главный навык DE.

cProfile и pstats: профилировщик Python в production
Проверка знанийKnowledge check
В оптимизированной версии у нас 5 индексных структур, обновляемых на каждый ingest. Это в 5 раз больше операций, чем в наивной версии, которая просто делает list.append. Почему оптимизированная всё равно быстрее в целом, и в чём фундаментальное отличие?
ОтветAnswer
На уровне ingest наивная действительно быстрее в 5 раз — но это микросекунды разницы. Все 5 операций в оптимизированной — O(1) с маленькой константой (Counter increment, dict set, deque append, int increment). Главное различие — в QUERIES. Наивная делает O(n) на КАЖДЫЙ запрос. Оптимизированная — O(1) или O(N_unique log k). При 1M событий и 100 запросах: naive делает 100 * 10^6 = 10^8 операций на queries, оптимизированная — 100 * 1000 * 7 = 7*10^5 (если 1000 unique users) — в 140 раз меньше. На ingest naive делает 10^6 операций, оптимизированная — 5 * 10^6 — в 5 раз больше. Но 5*10^6 ingest добавок << 10^8 queries economy. Чистый выигрыш огромный. Фундаментальное отличие: 'агрегировать постепенно' vs 'обрабатывать на каждом запросе'. Первое — pre-computation, цена платится один раз. Второе — recomputation, цена платится на каждый запрос. В системах с read >> write первый подход всегда выигрывает.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Naive pipeline делает ingest за 0.01 секунды для 10k событий, а Optimized за 0.03 секунды. Naive в 3 раза быстрее на ingest. Почему мы всё равно выбираем Optimized?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4