Что такое streaming окна
В batch processing мы видим все данные сразу. В streaming данные текут потоком, и мы не можем хранить всё. Главный приём — работать в окне: последние N событий или события за последние T секунд.
Три типа окон, которые встречаются повсеместно:
- Tumbling window — непересекающиеся блоки фиксированного размера. Например, «каждый час: count, sum». Окна не перекрываются.
- Sliding window — окно фиксированного размера, перемещающееся с каждым событием. Например, «количество кликов за последние 10 минут» — каждый момент времени.
- Session window — окно открыто, пока есть события; закрывается после паузы. Например, «пользовательская сессия» — действия до 30 минут простоя.
Tumbling — блоки, sliding — скользящие, session — динамические по активности.
Deque с maxlen — самое простое скользящее окно
Если окно — это «последние N событий», deque из collections с maxlen=N идеален:
from collections import deque
window = deque(maxlen=100) # хранит последние 100 событий
for event in stream:
window.append(event)
# старый элемент автоматически удаляется при добавлении 101-го
if len(window) == 100:
process_window(window)
maxlen — это «ёмкость». Когда мы добавляем элемент в полный deque, самый старый автоматически удаляется. Это O(1) операция в обе стороны (deque реализован как ring buffer с двойной связью блоков).
Когда подходит: «последние N штук», нет привязки ко времени, фиксированное количество.
Ring buffer вручную
Если хочется явный контроль (например, нужен индекс «куда сейчас писать»):
class RingBuffer:
def __init__(self, capacity):
self.buffer = [None] * capacity
self.capacity = capacity
self.write_pos = 0
self.size = 0
def add(self, value):
self.buffer[self.write_pos] = value
self.write_pos = (self.write_pos + 1) % self.capacity
if self.size < self.capacity:
self.size += 1
def __iter__(self):
if self.size < self.capacity:
return iter(self.buffer[:self.size])
# окно полное — итерация с write_pos в правильном порядке
return iter(
list(self.buffer[self.write_pos:]) + list(self.buffer[:self.write_pos])
)
Ring buffer на массиве — это деталь имплементации deque. Своя реализация полезна, когда нужен прямой доступ по позиции «N шагов назад» или интеграция с C-кодом (например, на массиве numpy).
Time-based sliding window
Окно «события за последние T секунд» сложнее: количество элементов плавает.
from collections import deque
import time
class TimeWindow:
def __init__(self, duration_seconds):
self.duration = duration_seconds
self.events = deque() # (timestamp, payload)
def add(self, event, ts=None):
if ts is None:
ts = time.time()
self.events.append((ts, event))
self._expire(ts)
def _expire(self, now):
cutoff = now - self.duration
while self.events and self.events[0][0] < cutoff:
self.events.popleft()
def count(self):
return len(self.events)
def all(self):
return [e for _, e in self.events]
Каждое добавление O(1) амортизированно (popleft несколько раз, но в среднем по 1 на событие). Окно автоматически держит только актуальные события.
Это паттерн используется в rate limiting («не более 100 запросов в минуту»), real-time дашбордах, anomaly detection.
Top-K в окне через heap
Часто нужно не просто «события за последние 10 минут», а «топ-100 самых активных пользователей в окне». Это сочетание sliding window и top-K (об этом будет урок 4):
from collections import deque, defaultdict
import heapq
class TopKWindow:
def __init__(self, window_size, k):
self.window = deque(maxlen=window_size)
self.k = k
self.counts = defaultdict(int)
def add(self, user_id):
# вытесняем старый если окно полное
if len(self.window) == self.window.maxlen:
old = self.window[0]
self.counts[old] -= 1
if self.counts[old] == 0:
del self.counts[old]
self.window.append(user_id)
self.counts[user_id] += 1
def top_k(self):
return heapq.nlargest(self.k, self.counts.items(), key=lambda x: x[1])
heapq.nlargest(k, items, key) — O(n log k), не O(n log n) как полная сортировка. Удобно, когда k маленькое.
Сравнение типов окон
Tumbling — для отчётов по периодам, sliding — для real-time, session — для пользовательских сценариев.
Session window
Session — самый сложный тип окна, потому что границы динамические.
from collections import defaultdict
import time
class SessionTracker:
def __init__(self, session_timeout=1800): # 30 минут
self.timeout = session_timeout
self.sessions = defaultdict(list)
self.last_activity = {}
def track(self, user_id, event, ts=None):
if ts is None:
ts = time.time()
# был ли пользователь активен недавно?
if user_id in self.last_activity:
if ts - self.last_activity[user_id] > self.timeout:
# сессия истекла, закрываем
self._close_session(user_id)
self.sessions[user_id].append((ts, event))
self.last_activity[user_id] = ts
def _close_session(self, user_id):
session = self.sessions.pop(user_id)
# тут — финализация сессии: запись в БД, метрики
print(f"session closed for {user_id}: {len(session)} events")
del self.last_activity[user_id]
def cleanup(self, now=None):
"""Закрыть все таймаут-сессии."""
if now is None:
now = time.time()
expired = [u for u, t in self.last_activity.items()
if now - t > self.timeout]
for u in expired:
self._close_session(u)
В real-time системах cleanup вызывается периодически (cron или background task), чтобы закрывать сессии пользователей, которые перестали быть активными.
В session windows важно не забывать про cleanup. Если ваш сервис работает 24/7 и пользователи иногда «исчезают» (например, закрыли вкладку), их сессии остаются в памяти навсегда. Без периодического cleanup memory leak неизбежен — особенно опасен в long-running демонах.
Memory analysis
Сколько памяти занимают окна на 1 миллион событий:
| окно | элементов в RAM | память |
|---|---|---|
| tumbling 1 hour | агрегаты за окно | KB на ключ агрегации |
| sliding maxlen=10000 | до 10000 | ~ 1-10 МБ (зависит от размера события) |
| sliding 60s @ 1k req/s | ~60000 | ~ 6-60 МБ |
| session (10k active users) | 10000 сессий | сотни МБ если хранить полную историю |
Tumbling выгоднее всего: храним только агрегаты, не сырые события. Session — самое дорогое: каждый активный пользователь — отдельная коллекция событий.
Замеры
Сравним добавление 1 млн событий в обычный list vs deque:
import time
from collections import deque
n = 1_000_000
# наивно: list.pop(0) — O(n)!
xs = []
t0 = time.time()
for i in range(n):
xs.append(i)
if len(xs) > 100:
xs.pop(0)
print(f"list with pop(0): {time.time() - t0:.2f} s")
# правильно: deque
dq = deque(maxlen=100)
t0 = time.time()
for i in range(n):
dq.append(i)
print(f"deque maxlen=100: {time.time() - t0:.2f} s")
Типичный результат: list — десятки секунд, deque — около 0.1 секунды. Разница в 100-1000 раз. На production это превращает скрипт «не вписался в SLA» в «работает за миллисекунды».
Попробуй сам
Реализуйте rate limiter «не более 10 запросов в секунду на пользователя»:
from collections import deque, defaultdict
import time
class RateLimiter:
def __init__(self, max_requests, window_seconds):
self.max = max_requests
self.window = window_seconds
self.requests = defaultdict(deque)
def allow(self, user_id, now=None):
if now is None:
now = time.time()
q = self.requests[user_id]
# очищаем устаревшие
cutoff = now - self.window
while q and q[0] < cutoff:
q.popleft()
# проверяем лимит
if len(q) >= self.max:
return False
q.append(now)
return True
# тест
rl = RateLimiter(10, 1)
now = 0.0
for i in range(15):
print(f"req {i+1}: allowed = {rl.allow('user1', now=now)}")
now += 0.05 # 50ms между запросами
# первые 10 пропускает, остальные блокирует пока окно не сдвинется
Это рабочий rate limiter, который вы могли бы использовать в production. Достаточно скопировать.
Коллекции Python: deque — двусторонняя очередь