Что такое очередь
Очередь — это ADT с операциями:
enqueue(x)(он жеput,add) — добавить в хвост.dequeue()(он жеget,pop,poll) — снять с головы.peek()— посмотреть голову без снятия.
Принцип: FIFO — First In, First Out. Кто первый пришёл — первый ушёл. Классическая аналогия — очередь в магазине.
Голова (head) — где забирают. Хвост (tail) — куда кладут.
FIFO противоположен LIFO (стек). Применение — везде, где важен порядок обработки: задачи в worker pool, сообщения в message broker, BFS-обход, события в event loop.
Почему list — плохой выбор
Кажется логичным:
queue = []
queue.append(10) # enqueue right — O(1)
queue.append(20)
queue.append(30)
first = queue.pop(0) # dequeue left — но это O(n)!
Вторая операция — катастрофа. pop(0) убирает первый элемент list. Список — плотный массив, после удаления нужно сдвинуть все остальные элементы на одну позицию влево, чтобы сохранить непрерывность. Это O(n) на каждый вызов.
На очереди из N элементов суммарная стоимость N dequeue’ов — O(n^2). На 100k — секунды. На миллион — невозможно.
Замер:
import time
for N in [1000, 10_000, 100_000]:
q = list(range(N))
t0 = time.perf_counter()
while q:
q.pop(0)
t1 = time.perf_counter()
print(f"N={N:>7,} time={(t1-t0)*1000:>10.2f} ms")
Получится:
N= 1,000 time= 0.30 ms
N= 10,000 time= 17.50 ms # x60 от 1k
N=100,000 time= 1640.00 ms # x100 от 10k
При увеличении N в 10 раз время растёт в 100 раз — это квадратичность. На миллион это были бы сотни секунд.
Правильное решение: collections.deque
from collections import deque
queue = deque()
queue.append(10) # enqueue — O(1)
queue.append(20)
queue.append(30)
first = queue.popleft() # dequeue — O(1)!
deque (double-ended queue) поддерживает обе операции за O(1). Под капотом — block-linked list (модуль 6, урок 5).
Замер тот же:
import time
from collections import deque
for N in [1_000, 10_000, 100_000, 1_000_000]:
q = deque(range(N))
t0 = time.perf_counter()
while q:
q.popleft()
t1 = time.perf_counter()
print(f"N={N:>9,} time={(t1-t0)*1000:>8.2f} ms")
N= 1,000 time= 0.10 ms
N= 10,000 time= 1.00 ms
N= 100,000 time= 10.20 ms # линейно
N=1,000,000 time= 103.50 ms # линейно
Время растёт линейно с N — это и есть O(n) суммарно, O(1) на операцию. Сравните с list (где 100k уже занимает 1.6 секунды).
API deque для очереди
from collections import deque
q = deque()
# enqueue
q.append(item) # в хвост
q.appendleft(item) # в голову (необычная очередь, но возможно)
# dequeue
front = q.popleft() # с головы — FIFO
back = q.pop() # с хвоста — LIFO (стек)
# peek
if q:
head = q[0] # первый
tail = q[-1] # последний
# размер
print(len(q))
# проверка не пустоты
if q:
print("есть элементы")
Замечательно: deque покрывает и FIFO, и LIFO, и любой гибрид. Это универсальная двусторонняя очередь.
Сложности операций
Главное: list.pop(0) — O(n), deque.popleft — O(1). На больших объёмах эта разница превращается в проблему производительности.
Применение 1: worker pool
Главный сервер кладёт задачи в очередь, worker’ы забирают и обрабатывают:
from collections import deque
import threading
class TaskQueue:
def __init__(self):
self._q = deque()
self._lock = threading.Lock()
def put(self, task):
with self._lock:
self._q.append(task)
def get(self):
with self._lock:
if self._q:
return self._q.popleft()
return None
Для thread-safe варианта в Python есть queue.Queue (synchronized) или asyncio.Queue (для асинхронного кода). Под капотом обоих — deque.
Применение 2: BFS
Breadth-First Search обходит граф «слой за слоем». Очередь хранит вершины, которые надо посетить:
from collections import deque
def bfs(start, graph):
visited = {start}
queue = deque([start])
order = []
while queue:
node = queue.popleft() # снять с головы
order.append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
visited.add(neighbor)
queue.append(neighbor) # положить в хвост
return order
graph = {
'A': ['B', 'C'],
'B': ['D', 'E'],
'C': ['F'],
'D': [],
'E': [],
'F': [],
}
print(bfs('A', graph)) # A, B, C, D, E, F (по уровням)
Если бы здесь была list.pop(0) — алгоритм O(V^2). С deque — корректно O(V+E). Это критично для больших графов (миллионы вершин).
Применение 3: rate limiting
Ограничение запросов «не более N в минуту» через скользящее окно:
import time
from collections import deque
class RateLimiter:
def __init__(self, max_requests, window_seconds):
self.max = max_requests
self.window = window_seconds
self.timestamps = deque()
def allow(self):
now = time.time()
# выкидываем устаревшие
while self.timestamps and self.timestamps[0] < now - self.window:
self.timestamps.popleft()
if len(self.timestamps) < self.max:
self.timestamps.append(now)
return True
return False
rl = RateLimiter(max_requests=5, window_seconds=10)
for _ in range(10):
print(rl.allow())
time.sleep(0.5)
popleft устаревших + append свежих — каждая O(1). На миллион запросов это работает за миллисекунды; на list было бы часами.
Применение 4: producer-consumer в asyncio
Асинхронный код в Python:
import asyncio
async def producer(queue, items):
for item in items:
await queue.put(item)
async def consumer(queue, name):
while True:
item = await queue.get()
print(f"{name}: {item}")
queue.task_done()
async def main():
q = asyncio.Queue()
producer_task = asyncio.create_task(producer(q, range(10)))
consumer_tasks = [asyncio.create_task(consumer(q, f"c{i}")) for i in range(3)]
await producer_task
await q.join()
for t in consumer_tasks:
t.cancel()
# asyncio.run(main())
asyncio.Queue — это FIFO + блокирование на пустой очереди (consumer ждёт). Под капотом — deque + futures для синхронизации.
Когда нужна priority queue вместо FIFO
Иногда задачи в очереди имеют приоритет: высокоприоритетные обслуживаются первыми. Это не FIFO, это priority queue. Реализуется через heap (модуль 11).
В Python готовая в heapq или queue.PriorityQueue. FIFO — это частный случай, когда приоритет = временная метка поступления.
Попробуй сам
Реализуйте StreamingMedian: добавляем поток чисел, в любой момент должны уметь сказать median (медиану) за O(log n). Это не очередь, но требует комбинации структур — deque для трекинга последних N, heap для медианы. Пока упрощённая версия — running average через скользящее окно:
from collections import deque
class StreamingStats:
def __init__(self, window=100):
self.window = window
self.q = deque()
self.sum = 0.0
def add(self, x):
self.q.append(x)
self.sum += x
if len(self.q) > self.window:
old = self.q.popleft()
self.sum -= old
def mean(self):
return self.sum / len(self.q) if self.q else 0.0
s = StreamingStats(window=10)
for i in range(20):
s.add(i * 2)
print(f"i={i} mean={s.mean():.2f}")
Каждая операция O(1). Окно не пересчитывается — поддерживается инкрементально.
Message queues и shared memory в Linux: FIFO-семантика IPC