Learning Platform
Глоссарий Troubleshooting
Урок 19.02 · 25 мин
Начальный
streamingsliding-windowdequering-buffertumbling-windowsession-window

Что такое streaming окна

В batch processing мы видим все данные сразу. В streaming данные текут потоком, и мы не можем хранить всё. Главный приём — работать в окне: последние N событий или события за последние T секунд.

Три типа окон, которые встречаются повсеместно:

  1. Tumbling window — непересекающиеся блоки фиксированного размера. Например, «каждый час: count, sum». Окна не перекрываются.
  2. Sliding window — окно фиксированного размера, перемещающееся с каждым событием. Например, «количество кликов за последние 10 минут» — каждый момент времени.
  3. Session window — окно открыто, пока есть события; закрывается после паузы. Например, «пользовательская сессия» — действия до 30 минут простоя.
Три типа окон по времени

Tumbling — блоки, sliding — скользящие, session — динамические по активности.

tumbling0..60s | 60..120s | 120..180sнепересекающиеся блоки по 60 секунд
slidingt..t+60 каждое событиеокно 60s перемещается с каждым новым event
sessionevent...event(пауза 30s)...eventокно открыто, пока есть события; закрывается после 30s пустоты

Deque с maxlen — самое простое скользящее окно

Если окно — это «последние N событий», deque из collections с maxlen=N идеален:

from collections import deque

window = deque(maxlen=100)   # хранит последние 100 событий

for event in stream:
    window.append(event)
    # старый элемент автоматически удаляется при добавлении 101-го
    if len(window) == 100:
        process_window(window)

maxlen — это «ёмкость». Когда мы добавляем элемент в полный deque, самый старый автоматически удаляется. Это O(1) операция в обе стороны (deque реализован как ring buffer с двойной связью блоков).

Когда подходит: «последние N штук», нет привязки ко времени, фиксированное количество.

Ring buffer вручную

Если хочется явный контроль (например, нужен индекс «куда сейчас писать»):

class RingBuffer:
    def __init__(self, capacity):
        self.buffer = [None] * capacity
        self.capacity = capacity
        self.write_pos = 0
        self.size = 0

    def add(self, value):
        self.buffer[self.write_pos] = value
        self.write_pos = (self.write_pos + 1) % self.capacity
        if self.size < self.capacity:
            self.size += 1

    def __iter__(self):
        if self.size < self.capacity:
            return iter(self.buffer[:self.size])
        # окно полное — итерация с write_pos в правильном порядке
        return iter(
            list(self.buffer[self.write_pos:]) + list(self.buffer[:self.write_pos])
        )

Ring buffer на массиве — это деталь имплементации deque. Своя реализация полезна, когда нужен прямой доступ по позиции «N шагов назад» или интеграция с C-кодом (например, на массиве numpy).

Time-based sliding window

Окно «события за последние T секунд» сложнее: количество элементов плавает.

from collections import deque
import time

class TimeWindow:
    def __init__(self, duration_seconds):
        self.duration = duration_seconds
        self.events = deque()                   # (timestamp, payload)

    def add(self, event, ts=None):
        if ts is None:
            ts = time.time()
        self.events.append((ts, event))
        self._expire(ts)

    def _expire(self, now):
        cutoff = now - self.duration
        while self.events and self.events[0][0] < cutoff:
            self.events.popleft()

    def count(self):
        return len(self.events)

    def all(self):
        return [e for _, e in self.events]

Каждое добавление O(1) амортизированно (popleft несколько раз, но в среднем по 1 на событие). Окно автоматически держит только актуальные события.

Это паттерн используется в rate limiting («не более 100 запросов в минуту»), real-time дашбордах, anomaly detection.

Top-K в окне через heap

Часто нужно не просто «события за последние 10 минут», а «топ-100 самых активных пользователей в окне». Это сочетание sliding window и top-K (об этом будет урок 4):

from collections import deque, defaultdict
import heapq

class TopKWindow:
    def __init__(self, window_size, k):
        self.window = deque(maxlen=window_size)
        self.k = k
        self.counts = defaultdict(int)

    def add(self, user_id):
        # вытесняем старый если окно полное
        if len(self.window) == self.window.maxlen:
            old = self.window[0]
            self.counts[old] -= 1
            if self.counts[old] == 0:
                del self.counts[old]
        self.window.append(user_id)
        self.counts[user_id] += 1

    def top_k(self):
        return heapq.nlargest(self.k, self.counts.items(), key=lambda x: x[1])

heapq.nlargest(k, items, key) — O(n log k), не O(n log n) как полная сортировка. Удобно, когда k маленькое.

Сравнение типов окон

Когда какое окно использовать

Tumbling — для отчётов по периодам, sliding — для real-time, session — для пользовательских сценариев.

tumblingотчёты по часам/днямконкретные блоки времени, как в SQL GROUP BY date_trunc('hour', ts)
реализацияbatch с date_truncлегко в pandas или SQL
slidingrate limit, real-time alertsнужно ответить 'сколько событий за последние X' прямо сейчас
реализацияdeque + expireO(1) на добавление, O(1) на запрос count
sessionuser behavior, conversion funnelпривязка к активности пользователя, а не к фиксированному времени
реализацияdict[user_id] -> session_dataзакрытие сессии по timeout

Session window

Session — самый сложный тип окна, потому что границы динамические.

from collections import defaultdict
import time

class SessionTracker:
    def __init__(self, session_timeout=1800):       # 30 минут
        self.timeout = session_timeout
        self.sessions = defaultdict(list)
        self.last_activity = {}

    def track(self, user_id, event, ts=None):
        if ts is None:
            ts = time.time()

        # был ли пользователь активен недавно?
        if user_id in self.last_activity:
            if ts - self.last_activity[user_id] > self.timeout:
                # сессия истекла, закрываем
                self._close_session(user_id)

        self.sessions[user_id].append((ts, event))
        self.last_activity[user_id] = ts

    def _close_session(self, user_id):
        session = self.sessions.pop(user_id)
        # тут — финализация сессии: запись в БД, метрики
        print(f"session closed for {user_id}: {len(session)} events")
        del self.last_activity[user_id]

    def cleanup(self, now=None):
        """Закрыть все таймаут-сессии."""
        if now is None:
            now = time.time()
        expired = [u for u, t in self.last_activity.items()
                   if now - t > self.timeout]
        for u in expired:
            self._close_session(u)

В real-time системах cleanup вызывается периодически (cron или background task), чтобы закрывать сессии пользователей, которые перестали быть активными.

WARNING

В session windows важно не забывать про cleanup. Если ваш сервис работает 24/7 и пользователи иногда «исчезают» (например, закрыли вкладку), их сессии остаются в памяти навсегда. Без периодического cleanup memory leak неизбежен — особенно опасен в long-running демонах.

Memory analysis

Сколько памяти занимают окна на 1 миллион событий:

окноэлементов в RAMпамять
tumbling 1 hourагрегаты за окноKB на ключ агрегации
sliding maxlen=10000до 10000~ 1-10 МБ (зависит от размера события)
sliding 60s @ 1k req/s~60000~ 6-60 МБ
session (10k active users)10000 сессийсотни МБ если хранить полную историю

Tumbling выгоднее всего: храним только агрегаты, не сырые события. Session — самое дорогое: каждый активный пользователь — отдельная коллекция событий.

Замеры

Сравним добавление 1 млн событий в обычный list vs deque:

import time
from collections import deque

n = 1_000_000

# наивно: list.pop(0) — O(n)!
xs = []
t0 = time.time()
for i in range(n):
    xs.append(i)
    if len(xs) > 100:
        xs.pop(0)
print(f"list with pop(0): {time.time() - t0:.2f} s")

# правильно: deque
dq = deque(maxlen=100)
t0 = time.time()
for i in range(n):
    dq.append(i)
print(f"deque maxlen=100: {time.time() - t0:.2f} s")

Типичный результат: list — десятки секунд, deque — около 0.1 секунды. Разница в 100-1000 раз. На production это превращает скрипт «не вписался в SLA» в «работает за миллисекунды».

Попробуй сам

Реализуйте rate limiter «не более 10 запросов в секунду на пользователя»:

from collections import deque, defaultdict
import time

class RateLimiter:
    def __init__(self, max_requests, window_seconds):
        self.max = max_requests
        self.window = window_seconds
        self.requests = defaultdict(deque)

    def allow(self, user_id, now=None):
        if now is None:
            now = time.time()
        q = self.requests[user_id]
        # очищаем устаревшие
        cutoff = now - self.window
        while q and q[0] < cutoff:
            q.popleft()
        # проверяем лимит
        if len(q) >= self.max:
            return False
        q.append(now)
        return True

# тест
rl = RateLimiter(10, 1)
now = 0.0
for i in range(15):
    print(f"req {i+1}: allowed = {rl.allow('user1', now=now)}")
    now += 0.05    # 50ms между запросами

# первые 10 пропускает, остальные блокирует пока окно не сдвинется

Это рабочий rate limiter, который вы могли бы использовать в production. Достаточно скопировать.

Коллекции Python: deque — двусторонняя очередь
Проверка знанийKnowledge check
Почему list.pop(0) для удаления старых событий из окна — это antipattern, и почему deque.popleft() в десятки раз быстрее? Что происходит на уровне памяти?
ОтветAnswer
list.pop(0) — это O(n) операция. Под капотом Python list хранит элементы в непрерывном массиве указателей. Удаление первого элемента требует сдвига ВСЕХ остальных на одну ячейку влево — это O(n) memmove. На окне 1000 элементов это 1000 операций копирования за каждое удаление. На миллионе событий = 10^9 операций. Deque реализована как двусвязный список БЛОКОВ (каждый блок — массив ~64 элемента). popleft() меняет указатель на голову и не двигает данные — O(1). На уровне CPU: list.pop(0) делает memmove на 8000 байт (1000 указателей * 8), deque просто декрементирует size и обновляет head pointer — единицы тактов. На миллионе вызовов разница 100-1000x. Поэтому: для скользящего окна — deque, для стека — list (append/pop O(1)), для очереди — deque (popleft/append O(1)).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В чём разница между tumbling и sliding window?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5