Зачем junior DE нужны эти структуры
В этом модуле мы прошли стек, очередь, deque, ring buffer. Звучит абстрактно. На практике в DE-системах они встречаются на каждом шагу. Я покажу 5 типичных применений в реальном коде.
Применение 1: backpressure через bounded queue
В streaming-системах (Kafka consumer, asyncio worker pool) у вас есть producer и consumer. Если producer быстрее consumer — данные накапливаются. Если очередь неограничена, рано или поздно OOM.
Решение: bounded queue — ограниченная по размеру очередь. Когда полна, producer блокируется или отбрасывает входящие. Это и есть backpressure:
import asyncio
async def producer(queue):
for i in range(1_000_000):
await queue.put(i) # БЛОКИРУЕТСЯ, если queue полна
# producer не может перегнать consumer
async def consumer(queue):
while True:
item = await queue.get()
await process(item)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=1000) # bounded!
await asyncio.gather(
producer(queue),
consumer(queue),
)
asyncio.Queue(maxsize=1000) — backpressure встроен. Если consumer тормозит, producer ждёт на put — нагрузка автоматически распределяется по системе.
Без maxsize очередь была бы unbounded — producer бы перегружал consumer, накапливая миллионы задач в RAM. Это типичная ошибка junior, приводящая к OOM в продакшене.
Применение 2: BFS в DAG зависимостей
Airflow DAG, dbt model, Make build graph — везде есть зависимости. «Запустить X после Y» — это рёбра графа. Чтобы определить порядок выполнения, нужен topological sort через BFS:
from collections import deque
def topological_sort(graph):
"""Kahn algorithm — BFS-based topo sort."""
# подсчёт in-degree каждой вершины
in_degree = {node: 0 for node in graph}
for node in graph:
for neighbor in graph[node]:
in_degree[neighbor] = in_degree.get(neighbor, 0) + 1
# очередь вершин без зависимостей
queue = deque([n for n, d in in_degree.items() if d == 0])
order = []
while queue:
node = queue.popleft()
order.append(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(order) != len(graph):
raise ValueError("cycle detected")
return order
graph = {
'extract': ['transform'],
'transform': ['load_a', 'load_b'],
'load_a': ['report'],
'load_b': ['report'],
'report': [],
}
print(topological_sort(graph))
# ['extract', 'transform', 'load_a', 'load_b', 'report']
deque here критичен: BFS требует FIFO. На list это было бы O(n^2). На графах с миллионами зависимостей (большой Airflow или внутренний планировщик) — разница между ‘работает’ и ‘висит’.
Применение 3: JSON streaming parser через стек
Простой JSON-парсер для streaming: данные приходят по чанкам, обработка по одному токену.
class JSONStreamParser:
"""Упрощённый парсер; отслеживает структурную балансировку через стек."""
def __init__(self):
self.stack = [] # содержит '{' или '['
self.depth = 0
def feed(self, chunk):
for ch in chunk:
if ch in '{[':
self.stack.append(ch)
self.depth += 1
elif ch in '}]':
if not self.stack:
raise ValueError(f"unexpected {ch}")
opener = self.stack.pop()
if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
raise ValueError(f"mismatch: {opener} ... {ch}")
self.depth -= 1
def is_complete(self):
return self.depth == 0
p = JSONStreamParser()
p.feed('{"users": [')
print(p.is_complete()) # False
p.feed('1, 2, 3]')
print(p.is_complete()) # False
p.feed('}')
print(p.is_complete()) # True
Это упрощённая модель: реальный JSON-parser держит больше контекстов (внутри строки, внутри числа, после двоеточия). Но фундамент — стек открывающих скобок. То же для XML, SQL, любой структурированной грамматики.
Применение 4: SQL parser стэки выражений
SQL-парсер строит AST через shunting-yard algorithm (Dijkstra). Два стека: один для операндов, один для операторов:
def evaluate_simple(expr):
"""Упрощённый shunting-yard для +, -, *, /, ( )."""
precedence = {'+': 1, '-': 1, '*': 2, '/': 2}
operands = []
operators = []
def apply():
op = operators.pop()
b = operands.pop()
a = operands.pop()
if op == '+': operands.append(a + b)
elif op == '-': operands.append(a - b)
elif op == '*': operands.append(a * b)
elif op == '/': operands.append(a / b)
tokens = expr.replace('(', ' ( ').replace(')', ' ) ').split()
for token in tokens:
if token.isdigit():
operands.append(int(token))
elif token == '(':
operators.append(token)
elif token == ')':
while operators and operators[-1] != '(':
apply()
operators.pop() # выкидываем '('
elif token in precedence:
while (operators and operators[-1] != '('
and precedence.get(operators[-1], 0) >= precedence[token]):
apply()
operators.append(token)
while operators:
apply()
return operands[0]
print(evaluate_simple("( 1 + 2 ) * ( 3 + 4 )")) # 21
print(evaluate_simple("2 + 3 * 4")) # 14
Все ANSI SQL-парсеры (PostgreSQL, MySQL, SQLite) используют похожий подход для выражений в WHERE и SELECT. Если вы пишете кастомный DSL для DE — стеки незаменимы.
Применение 5: streaming sliding window aggregation
Окно ‘последняя минута событий’ в realtime-аналитике. Через deque это становится скользящим окном с O(1) обновлением:
import time
from collections import deque
class SlidingWindow:
"""Скользящее окно по времени, с running sum."""
def __init__(self, window_seconds):
self.window = window_seconds
self.events = deque() # (timestamp, value)
self.sum = 0.0
def add(self, value):
now = time.time()
# выкидываем устаревшие
while self.events and self.events[0][0] < now - self.window:
_, old_v = self.events.popleft()
self.sum -= old_v
self.events.append((now, value))
self.sum += value
def aggregate(self):
return {
'count': len(self.events),
'sum': self.sum,
'avg': self.sum / len(self.events) if self.events else 0.0,
}
sw = SlidingWindow(window_seconds=60)
for v in [1.0, 2.0, 3.0, 4.0, 5.0]:
sw.add(v)
print(sw.aggregate())
Каждая add — O(1) amortized: popleft устаревших проходит каждый элемент один раз (когда добавляется и когда выкидывается). Это используется в Prometheus rate(), Grafana queries, любой rolling-метрике.
Сводная таблица: какая структура где
Шпаргалка по выбору. Все определяется операциями: что часто, что редко.
Гайдлайны выбора
Простые правила:
- FIFO в Python: всегда
collections.deque, никогдаlist. - Bounded очередь с backpressure:
asyncio.Queue(maxsize=N)для async,queue.Queue(maxsize=N)для threads. - LIFO в Python:
list(через append/pop) или deque — оба O(1). - Скользящее окно: deque + running sum для O(1) aggregation.
- Last-N-events:
deque(maxlen=N)— встроенный ring buffer. - Topo sort / BFS на DAG: Kahn’s algorithm с deque.
- Парсер: stack для контекстов.
Главный навык — видеть очередь/стек в задаче. Когда видите ‘обработать в порядке поступления’ — это FIFO. ‘Откатиться к предыдущему состоянию’ — LIFO. ‘Последние N’ — ring buffer.
Попробуй сам
Реализуйте простой DAG-runner: вершины с зависимостями, выполнить в порядке topological order, параллельные ветви — последовательно (для упрощения):
from collections import deque
class DAGRunner:
def __init__(self, dag):
self.dag = dag
self.results = {}
def run(self):
# подсчёт in-degree
in_degree = {n: 0 for n in self.dag}
for n in self.dag:
for dep in self.dag[n]:
in_degree[dep] = in_degree.get(dep, 0) + 1
queue = deque([n for n, d in in_degree.items() if d == 0])
while queue:
task = queue.popleft()
print(f"running {task}")
self.results[task] = f"output_of_{task}"
for dep in self.dag[task]:
in_degree[dep] -= 1
if in_degree[dep] == 0:
queue.append(dep)
if len(self.results) != len(self.dag):
raise ValueError("cycle detected")
return self.results
dag = {
'extract': ['transform'],
'transform': ['load_a', 'load_b'],
'load_a': ['report'],
'load_b': ['report'],
'report': [],
}
runner = DAGRunner(dag)
runner.run()
Это базовая модель Airflow/dbt: топологический порядок через BFS, deque для FIFO. На прикладном уровне добавляется параллелизм (запускать независимые задачи одновременно), retries, timeouts — но фундамент тот же.
Unix pipes: очередь данных между процессами