Learning Platform
Глоссарий Troubleshooting
Урок 08.05 · 28 мин
Начальный
backpressureBFSJSON-parserDAGstreaming

Зачем junior DE нужны эти структуры

В этом модуле мы прошли стек, очередь, deque, ring buffer. Звучит абстрактно. На практике в DE-системах они встречаются на каждом шагу. Я покажу 5 типичных применений в реальном коде.

Применение 1: backpressure через bounded queue

В streaming-системах (Kafka consumer, asyncio worker pool) у вас есть producer и consumer. Если producer быстрее consumer — данные накапливаются. Если очередь неограничена, рано или поздно OOM.

Решение: bounded queue — ограниченная по размеру очередь. Когда полна, producer блокируется или отбрасывает входящие. Это и есть backpressure:

import asyncio

async def producer(queue):
    for i in range(1_000_000):
        await queue.put(i)   # БЛОКИРУЕТСЯ, если queue полна
        # producer не может перегнать consumer

async def consumer(queue):
    while True:
        item = await queue.get()
        await process(item)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=1000)   # bounded!
    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )

asyncio.Queue(maxsize=1000) — backpressure встроен. Если consumer тормозит, producer ждёт на put — нагрузка автоматически распределяется по системе.

Без maxsize очередь была бы unbounded — producer бы перегружал consumer, накапливая миллионы задач в RAM. Это типичная ошибка junior, приводящая к OOM в продакшене.

Применение 2: BFS в DAG зависимостей

Airflow DAG, dbt model, Make build graph — везде есть зависимости. «Запустить X после Y» — это рёбра графа. Чтобы определить порядок выполнения, нужен topological sort через BFS:

from collections import deque

def topological_sort(graph):
    """Kahn algorithm — BFS-based topo sort."""
    # подсчёт in-degree каждой вершины
    in_degree = {node: 0 for node in graph}
    for node in graph:
        for neighbor in graph[node]:
            in_degree[neighbor] = in_degree.get(neighbor, 0) + 1

    # очередь вершин без зависимостей
    queue = deque([n for n, d in in_degree.items() if d == 0])
    order = []
    while queue:
        node = queue.popleft()
        order.append(node)
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    if len(order) != len(graph):
        raise ValueError("cycle detected")
    return order


graph = {
    'extract':  ['transform'],
    'transform': ['load_a', 'load_b'],
    'load_a':   ['report'],
    'load_b':   ['report'],
    'report':   [],
}
print(topological_sort(graph))
# ['extract', 'transform', 'load_a', 'load_b', 'report']

deque here критичен: BFS требует FIFO. На list это было бы O(n^2). На графах с миллионами зависимостей (большой Airflow или внутренний планировщик) — разница между ‘работает’ и ‘висит’.

Применение 3: JSON streaming parser через стек

Простой JSON-парсер для streaming: данные приходят по чанкам, обработка по одному токену.

class JSONStreamParser:
    """Упрощённый парсер; отслеживает структурную балансировку через стек."""

    def __init__(self):
        self.stack = []     # содержит '{' или '['
        self.depth = 0

    def feed(self, chunk):
        for ch in chunk:
            if ch in '{[':
                self.stack.append(ch)
                self.depth += 1
            elif ch in '}]':
                if not self.stack:
                    raise ValueError(f"unexpected {ch}")
                opener = self.stack.pop()
                if (opener == '{' and ch != '}') or (opener == '[' and ch != ']'):
                    raise ValueError(f"mismatch: {opener} ... {ch}")
                self.depth -= 1

    def is_complete(self):
        return self.depth == 0


p = JSONStreamParser()
p.feed('{"users": [')
print(p.is_complete())   # False
p.feed('1, 2, 3]')
print(p.is_complete())   # False
p.feed('}')
print(p.is_complete())   # True

Это упрощённая модель: реальный JSON-parser держит больше контекстов (внутри строки, внутри числа, после двоеточия). Но фундамент — стек открывающих скобок. То же для XML, SQL, любой структурированной грамматики.

Применение 4: SQL parser стэки выражений

SQL-парсер строит AST через shunting-yard algorithm (Dijkstra). Два стека: один для операндов, один для операторов:

def evaluate_simple(expr):
    """Упрощённый shunting-yard для +, -, *, /, ( )."""
    precedence = {'+': 1, '-': 1, '*': 2, '/': 2}
    operands = []
    operators = []

    def apply():
        op = operators.pop()
        b = operands.pop()
        a = operands.pop()
        if op == '+': operands.append(a + b)
        elif op == '-': operands.append(a - b)
        elif op == '*': operands.append(a * b)
        elif op == '/': operands.append(a / b)

    tokens = expr.replace('(', ' ( ').replace(')', ' ) ').split()
    for token in tokens:
        if token.isdigit():
            operands.append(int(token))
        elif token == '(':
            operators.append(token)
        elif token == ')':
            while operators and operators[-1] != '(':
                apply()
            operators.pop()  # выкидываем '('
        elif token in precedence:
            while (operators and operators[-1] != '('
                   and precedence.get(operators[-1], 0) >= precedence[token]):
                apply()
            operators.append(token)

    while operators:
        apply()
    return operands[0]


print(evaluate_simple("( 1 + 2 ) * ( 3 + 4 )"))  # 21
print(evaluate_simple("2 + 3 * 4"))               # 14

Все ANSI SQL-парсеры (PostgreSQL, MySQL, SQLite) используют похожий подход для выражений в WHERE и SELECT. Если вы пишете кастомный DSL для DE — стеки незаменимы.

Применение 5: streaming sliding window aggregation

Окно ‘последняя минута событий’ в realtime-аналитике. Через deque это становится скользящим окном с O(1) обновлением:

import time
from collections import deque

class SlidingWindow:
    """Скользящее окно по времени, с running sum."""

    def __init__(self, window_seconds):
        self.window = window_seconds
        self.events = deque()  # (timestamp, value)
        self.sum = 0.0

    def add(self, value):
        now = time.time()
        # выкидываем устаревшие
        while self.events and self.events[0][0] < now - self.window:
            _, old_v = self.events.popleft()
            self.sum -= old_v
        self.events.append((now, value))
        self.sum += value

    def aggregate(self):
        return {
            'count': len(self.events),
            'sum': self.sum,
            'avg': self.sum / len(self.events) if self.events else 0.0,
        }


sw = SlidingWindow(window_seconds=60)
for v in [1.0, 2.0, 3.0, 4.0, 5.0]:
    sw.add(v)
print(sw.aggregate())

Каждая add — O(1) amortized: popleft устаревших проходит каждый элемент один раз (когда добавляется и когда выкидывается). Это используется в Prometheus rate(), Grafana queries, любой rolling-метрике.

Сводная таблица: какая структура где

Какую структуру использовать в задаче DE

Шпаргалка по выбору. Все определяется операциями: что часто, что редко.

Producer-consumer с лимитомasyncio.Queue(maxsize=N) — backpressureвнутри deque + futures
BFS / topo sortcollections.deque — FIFOне list! list.pop(0) убьёт производительность
DFS / парсер / undolist или collections.deque — LIFOappend + pop с одного конца
Парсинг с балансом скобокlist as stackпростой LIFO
Скользящее окно по времениcollections.dequepopleft устаревших + append новых, O(1) amortized
Последние N значенийcollections.deque(maxlen=N)overwriting ring buffer бесплатно
Real-time audio bufferring buffer на C/Rustнет аллокаций в горячем пути
Streaming JSON token streamstack для контекстаодин стек, отслеживает open/close

Гайдлайны выбора

Простые правила:

  1. FIFO в Python: всегда collections.deque, никогда list.
  2. Bounded очередь с backpressure: asyncio.Queue(maxsize=N) для async, queue.Queue(maxsize=N) для threads.
  3. LIFO в Python: list (через append/pop) или deque — оба O(1).
  4. Скользящее окно: deque + running sum для O(1) aggregation.
  5. Last-N-events: deque(maxlen=N) — встроенный ring buffer.
  6. Topo sort / BFS на DAG: Kahn’s algorithm с deque.
  7. Парсер: stack для контекстов.

Главный навык — видеть очередь/стек в задаче. Когда видите ‘обработать в порядке поступления’ — это FIFO. ‘Откатиться к предыдущему состоянию’ — LIFO. ‘Последние N’ — ring buffer.

Попробуй сам

Реализуйте простой DAG-runner: вершины с зависимостями, выполнить в порядке topological order, параллельные ветви — последовательно (для упрощения):

from collections import deque

class DAGRunner:
    def __init__(self, dag):
        self.dag = dag
        self.results = {}

    def run(self):
        # подсчёт in-degree
        in_degree = {n: 0 for n in self.dag}
        for n in self.dag:
            for dep in self.dag[n]:
                in_degree[dep] = in_degree.get(dep, 0) + 1

        queue = deque([n for n, d in in_degree.items() if d == 0])
        while queue:
            task = queue.popleft()
            print(f"running {task}")
            self.results[task] = f"output_of_{task}"
            for dep in self.dag[task]:
                in_degree[dep] -= 1
                if in_degree[dep] == 0:
                    queue.append(dep)

        if len(self.results) != len(self.dag):
            raise ValueError("cycle detected")
        return self.results


dag = {
    'extract':   ['transform'],
    'transform': ['load_a', 'load_b'],
    'load_a':    ['report'],
    'load_b':    ['report'],
    'report':    [],
}
runner = DAGRunner(dag)
runner.run()

Это базовая модель Airflow/dbt: топологический порядок через BFS, deque для FIFO. На прикладном уровне добавляется параллелизм (запускать независимые задачи одновременно), retries, timeouts — но фундамент тот же.

Unix pipes: очередь данных между процессами
Проверка знанийKnowledge check
Объясни, что такое backpressure в streaming-системе и как именно bounded queue его реализует. Что произойдёт, если использовать unbounded очередь между producer и consumer?
ОтветAnswer
Backpressure — механизм, при котором быстрый компонент в pipeline замедляется до скорости медленного. В streaming-системе если producer (например, Kafka consumer) выдаёт 100k событий/сек, а consumer (downstream обработчик) обрабатывает только 10k/сек, без backpressure 90k событий/сек копятся в памяти. Bounded queue (asyncio.Queue(maxsize=N), queue.Queue(maxsize=N)) реализует это автоматически: когда очередь полна, метод put() блокируется до тех пор, пока consumer не освободит место через get(). Producer вынужден ждать — его скорость естественно подстраивается под consumer-а. Без maxsize (unbounded queue) producer не блокируется: события копятся в RAM, через минуту-десять процесс падает с OOM. Это типичная авария в Python-сервисах. Поэтому bounded queue — не опция, а архитектурная необходимость для production streaming.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Что такое backpressure в streaming-системе?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5