Learning Platform
Глоссарий Troubleshooting
Урок 08.02 · 25 мин
Начальный
queueFIFOdequelist-pitfallBFS

Что такое очередь

Очередь — это ADT с операциями:

  • enqueue(x) (он же put, add) — добавить в хвост.
  • dequeue() (он же get, pop, poll) — снять с головы.
  • peek() — посмотреть голову без снятия.

Принцип: FIFO — First In, First Out. Кто первый пришёл — первый ушёл. Классическая аналогия — очередь в магазине.

Очередь: добавление справа, удаление слева

Голова (head) — где забирают. Хвост (tail) — куда кладут.

HEADпервый ушёлdequeue снимает отсюда
10
20
30
40последний пришёл
TAILenqueue кладёт сюда

FIFO противоположен LIFO (стек). Применение — везде, где важен порядок обработки: задачи в worker pool, сообщения в message broker, BFS-обход, события в event loop.

Почему list — плохой выбор

Кажется логичным:

queue = []
queue.append(10)        # enqueue right — O(1)
queue.append(20)
queue.append(30)
first = queue.pop(0)    # dequeue left — но это O(n)!

Вторая операция — катастрофа. pop(0) убирает первый элемент list. Список — плотный массив, после удаления нужно сдвинуть все остальные элементы на одну позицию влево, чтобы сохранить непрерывность. Это O(n) на каждый вызов.

На очереди из N элементов суммарная стоимость N dequeue’ов — O(n^2). На 100k — секунды. На миллион — невозможно.

Замер:

import time

for N in [1000, 10_000, 100_000]:
    q = list(range(N))
    t0 = time.perf_counter()
    while q:
        q.pop(0)
    t1 = time.perf_counter()
    print(f"N={N:>7,}  time={(t1-t0)*1000:>10.2f} ms")

Получится:

N=  1,000   time=      0.30 ms
N= 10,000   time=     17.50 ms      # x60 от 1k
N=100,000   time=   1640.00 ms      # x100 от 10k

При увеличении N в 10 раз время растёт в 100 раз — это квадратичность. На миллион это были бы сотни секунд.

Правильное решение: collections.deque

from collections import deque

queue = deque()
queue.append(10)         # enqueue — O(1)
queue.append(20)
queue.append(30)
first = queue.popleft()  # dequeue — O(1)!

deque (double-ended queue) поддерживает обе операции за O(1). Под капотом — block-linked list (модуль 6, урок 5).

Замер тот же:

import time
from collections import deque

for N in [1_000, 10_000, 100_000, 1_000_000]:
    q = deque(range(N))
    t0 = time.perf_counter()
    while q:
        q.popleft()
    t1 = time.perf_counter()
    print(f"N={N:>9,}  time={(t1-t0)*1000:>8.2f} ms")
N=    1,000   time=    0.10 ms
N=   10,000   time=    1.00 ms
N=  100,000   time=   10.20 ms      # линейно
N=1,000,000   time=  103.50 ms      # линейно

Время растёт линейно с N — это и есть O(n) суммарно, O(1) на операцию. Сравните с list (где 100k уже занимает 1.6 секунды).

API deque для очереди

from collections import deque

q = deque()

# enqueue
q.append(item)          # в хвост
q.appendleft(item)      # в голову (необычная очередь, но возможно)

# dequeue
front = q.popleft()     # с головы — FIFO
back = q.pop()          # с хвоста — LIFO (стек)

# peek
if q:
    head = q[0]         # первый
    tail = q[-1]        # последний

# размер
print(len(q))

# проверка не пустоты
if q:
    print("есть элементы")

Замечательно: deque покрывает и FIFO, и LIFO, и любой гибрид. Это универсальная двусторонняя очередь.

Сложности операций

list vs deque для очереди

Главное: list.pop(0) — O(n), deque.popleft — O(1). На больших объёмах эта разница превращается в проблему производительности.

append (enqueue)list O(1), deque O(1)оба добавляют в хвост за константу
pop (LIFO, конец)list O(1), deque O(1)оба умеют забирать с хвоста
popleft / pop(0)list O(n)!, deque O(1)главное различие; на больших данных list даёт O(n^2) на полную очистку
appendleft / insert(0, x)list O(n), deque O(1)вставка в голову; list сдвигает всё
random access [i]list O(1), deque O(n)deque жертвует random access ради O(1) edge-операций

Применение 1: worker pool

Главный сервер кладёт задачи в очередь, worker’ы забирают и обрабатывают:

from collections import deque
import threading

class TaskQueue:
    def __init__(self):
        self._q = deque()
        self._lock = threading.Lock()

    def put(self, task):
        with self._lock:
            self._q.append(task)

    def get(self):
        with self._lock:
            if self._q:
                return self._q.popleft()
        return None

Для thread-safe варианта в Python есть queue.Queue (synchronized) или asyncio.Queue (для асинхронного кода). Под капотом обоих — deque.

Применение 2: BFS

Breadth-First Search обходит граф «слой за слоем». Очередь хранит вершины, которые надо посетить:

from collections import deque

def bfs(start, graph):
    visited = {start}
    queue = deque([start])
    order = []
    while queue:
        node = queue.popleft()    # снять с головы
        order.append(node)
        for neighbor in graph.get(node, []):
            if neighbor not in visited:
                visited.add(neighbor)
                queue.append(neighbor)  # положить в хвост
    return order

graph = {
    'A': ['B', 'C'],
    'B': ['D', 'E'],
    'C': ['F'],
    'D': [],
    'E': [],
    'F': [],
}
print(bfs('A', graph))   # A, B, C, D, E, F  (по уровням)

Если бы здесь была list.pop(0) — алгоритм O(V^2). С deque — корректно O(V+E). Это критично для больших графов (миллионы вершин).

Применение 3: rate limiting

Ограничение запросов «не более N в минуту» через скользящее окно:

import time
from collections import deque

class RateLimiter:
    def __init__(self, max_requests, window_seconds):
        self.max = max_requests
        self.window = window_seconds
        self.timestamps = deque()

    def allow(self):
        now = time.time()
        # выкидываем устаревшие
        while self.timestamps and self.timestamps[0] < now - self.window:
            self.timestamps.popleft()
        if len(self.timestamps) < self.max:
            self.timestamps.append(now)
            return True
        return False


rl = RateLimiter(max_requests=5, window_seconds=10)
for _ in range(10):
    print(rl.allow())
    time.sleep(0.5)

popleft устаревших + append свежих — каждая O(1). На миллион запросов это работает за миллисекунды; на list было бы часами.

Применение 4: producer-consumer в asyncio

Асинхронный код в Python:

import asyncio

async def producer(queue, items):
    for item in items:
        await queue.put(item)

async def consumer(queue, name):
    while True:
        item = await queue.get()
        print(f"{name}: {item}")
        queue.task_done()

async def main():
    q = asyncio.Queue()
    producer_task = asyncio.create_task(producer(q, range(10)))
    consumer_tasks = [asyncio.create_task(consumer(q, f"c{i}")) for i in range(3)]
    await producer_task
    await q.join()
    for t in consumer_tasks:
        t.cancel()

# asyncio.run(main())

asyncio.Queue — это FIFO + блокирование на пустой очереди (consumer ждёт). Под капотом — deque + futures для синхронизации.

Когда нужна priority queue вместо FIFO

Иногда задачи в очереди имеют приоритет: высокоприоритетные обслуживаются первыми. Это не FIFO, это priority queue. Реализуется через heap (модуль 11).

В Python готовая в heapq или queue.PriorityQueue. FIFO — это частный случай, когда приоритет = временная метка поступления.

Попробуй сам

Реализуйте StreamingMedian: добавляем поток чисел, в любой момент должны уметь сказать median (медиану) за O(log n). Это не очередь, но требует комбинации структур — deque для трекинга последних N, heap для медианы. Пока упрощённая версия — running average через скользящее окно:

from collections import deque

class StreamingStats:
    def __init__(self, window=100):
        self.window = window
        self.q = deque()
        self.sum = 0.0

    def add(self, x):
        self.q.append(x)
        self.sum += x
        if len(self.q) > self.window:
            old = self.q.popleft()
            self.sum -= old

    def mean(self):
        return self.sum / len(self.q) if self.q else 0.0


s = StreamingStats(window=10)
for i in range(20):
    s.add(i * 2)
    print(f"i={i} mean={s.mean():.2f}")

Каждая операция O(1). Окно не пересчитывается — поддерживается инкрементально.

Message queues и shared memory в Linux: FIFO-семантика IPC
Проверка знанийKnowledge check
Замер показывает: dequeue 100k элементов из list через pop(0) занимает 1.6 секунды; то же из deque через popleft — 10 ms. В 160 раз. Объясни, что делает эти операции такими разными по сложности.
ОтветAnswer
list — плотный массив указателей, непрерывный буфер. pop(0) удаляет первый элемент, после чего нужно сдвинуть все остальные N-1 указателей на одну позицию влево (memmove ~N*8 байт), чтобы массив остался непрерывным. Это O(n) на каждый pop(0); для N таких pop'ов суммарно O(n^2). При N=100k это 5*10^9 операций сдвига. deque — block-linked list блоков по 64 указателя. popleft декрементит leftindex; если leftindex ушёл за границу блока — удаляется пустой блок и leftblock переключается на следующий. Все операции O(1) на каждый popleft. Суммарно для N popleft'ов — O(N). Разница в 100k раз для N=100k превращается в видимые 160x на бенчмарке (из-за общего overhead Python разрыв немного меньше теоретического).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Что такое FIFO?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5