Что такое ring buffer
Ring buffer (он же circular buffer) — это очередь, размещённая в массиве фиксированного размера, индексы которой «оборачиваются» по модулю. Когда tail доходит до конца массива, он начинает заново с 0. Buffer выглядит как «кольцо», отсюда и название.
Это самая компактная и предсказуемая реализация FIFO для случаев, когда максимальный размер заранее известен.
head и tail двигаются по кругу. Между ними — занятые элементы. Снаружи — свободные ячейки.
Операции:
- enqueue(x): записать в
data[tail], потомtail = (tail + 1) % capacity. - dequeue(): прочитать
data[head], потомhead = (head + 1) % capacity. - full?:
size == capacity. - empty?:
size == 0.
Все операции — O(1) без аллокаций, без сдвигов. Фиксированная память. Никакого pointer chasing.
Реализация на Python
class RingBuffer:
def __init__(self, capacity):
self._data = [None] * capacity
self._capacity = capacity
self._head = 0
self._tail = 0
self._size = 0
def enqueue(self, x):
if self._size == self._capacity:
raise OverflowError("ring buffer is full")
self._data[self._tail] = x
self._tail = (self._tail + 1) % self._capacity
self._size += 1
def dequeue(self):
if self._size == 0:
raise IndexError("ring buffer is empty")
x = self._data[self._head]
self._data[self._head] = None # для GC: не держать ссылку
self._head = (self._head + 1) % self._capacity
self._size -= 1
return x
def peek(self):
if self._size == 0:
raise IndexError("ring buffer is empty")
return self._data[self._head]
def __len__(self):
return self._size
def __iter__(self):
for i in range(self._size):
yield self._data[(self._head + i) % self._capacity]
rb = RingBuffer(5)
rb.enqueue(10)
rb.enqueue(20)
rb.enqueue(30)
print(list(rb)) # [10, 20, 30]
print(rb.dequeue()) # 10
rb.enqueue(40)
rb.enqueue(50)
rb.enqueue(60) # tail обернулся
print(list(rb)) # [20, 30, 40, 50, 60]
Заметьте: после enqueue(60) tail прошёл от 4 до 0 (по модулю 6). Это работает прозрачно для пользователя.
Что значит overflow и две стратегии
Когда буфер полон, есть два варианта:
-
Reject: новый элемент отвергается, исключение или возврат False. Так делает
queue.Queue(maxsize=N)без block=True. -
Overwrite: новый элемент пишется поверх самого старого. Старый молча выкидывается. Так делает
collections.deque(maxlen=N).
class OverwritingRing:
def enqueue(self, x):
self._data[self._tail] = x
self._tail = (self._tail + 1) % self._capacity
if self._size < self._capacity:
self._size += 1
else:
# буфер полон, head двигается вместе с tail
self._head = (self._head + 1) % self._capacity
Овервраит — поведение «трекинг последних N». Reject — поведение «строгая очередь с пределом».
Применение 1: audio streaming
Звук с микрофона приходит фиксированной скоростью (44100 sample/sec). Программа должна обрабатывать с такой же скоростью. Если на мгновение задержка — данные надо где-то «удержать», но не накапливать бесконечно.
Ring buffer фиксированного размера (например, 1 секунда * 44100 = 44100 sample) — идеально:
- Микрофон пишет в буфер (producer).
- Обработчик читает (consumer).
- При обгоне consumer’а producer мгновенно перезаписывает старое — последняя секунда всегда свежая.
Это так работает в ALSA на Linux, CoreAudio на macOS, WASAPI на Windows. Кольцевой буфер — фундамент real-time audio.
Применение 2: лог-буфер в kernel
Ядро Linux пишет dmesg в kernel ring buffer (__log_buf в kernel/printk/printk.c). Размер — конфигурируется при компиляции (CONFIG_LOG_BUF_SHIFT, обычно 1 MB). Когда буфер заполняется — старые записи выкидываются.
Тот же приём: journald, systemd-journald, varnishlog — все стримящие логи используют ring buffer для ограничения памяти.
Применение 3: lock-free SPSC очередь
Single-Producer-Single-Consumer ring buffer — один из самых популярных lock-free паттернов. Если у вас ровно один producer и ровно один consumer, индексы head и tail могут обновляться атомарно без mutex:
# псевдокод — Python с GIL не нужен, но в C/Rust это популярно
class SPSCRing:
def __init__(self, capacity):
self._data = [None] * capacity
self._capacity = capacity
self._head = AtomicInt(0) # обновляет только consumer
self._tail = AtomicInt(0) # обновляет только producer
def try_enqueue(self, x): # вызывает только producer
t = self._tail.load()
next_t = (t + 1) % self._capacity
if next_t == self._head.load():
return False # буфер полон
self._data[t] = x
self._tail.store(next_t)
return True
def try_dequeue(self): # вызывает только consumer
h = self._head.load()
if h == self._tail.load():
return None # буфер пуст
x = self._data[h]
self._head.store((h + 1) % self._capacity)
return x
Без блокировок, без syscall’ов — только атомарные операции. Это фундамент high-frequency trading, network packet processing, real-time audio.
Tradeoff: memory vs flexibility
Ring — пред-выделенная память, без аллокаций. Deque — динамическая память, всегда впрок.
Главный аргумент в пользу ring buffer — отсутствие аллокаций в горячем пути. Для embedded, real-time, low-latency систем это критично. Для обычного Python-приложения deque удобнее (не надо беспокоиться о capacity).
Реализация maxlen-семантики через ring
Если вы хотите «последние N значений», в Python готова:
from collections import deque
last_100 = deque(maxlen=100)
Внутри это блок-linked, но семантика идентична overwriting ring buffer. Для real-time C-кода ring buffer был бы выбран напрямую.
Производительность
import time
from collections import deque
class RingBuffer:
def __init__(self, capacity):
self._data = [None] * capacity
self._capacity = capacity
self._head = 0
self._tail = 0
self._size = 0
def push(self, x):
if self._size == self._capacity:
self._head = (self._head + 1) % self._capacity
else:
self._size += 1
self._data[self._tail] = x
self._tail = (self._tail + 1) % self._capacity
def pop(self):
if self._size == 0:
return None
x = self._data[self._head]
self._head = (self._head + 1) % self._capacity
self._size -= 1
return x
N = 1_000_000
# ring buffer
rb = RingBuffer(1024)
t0 = time.perf_counter()
for i in range(N):
rb.push(i)
if i % 2:
rb.pop()
t1 = time.perf_counter()
print(f"ring buffer: {(t1-t0)*1000:.2f} ms")
# deque
dq = deque(maxlen=1024)
t0 = time.perf_counter()
for i in range(N):
dq.append(i)
if i % 2:
dq.popleft()
t1 = time.perf_counter()
print(f"deque: {(t1-t0)*1000:.2f} ms")
Чистый Python ring buffer медленнее deque, потому что deque реализована на C. Но на C сравнение даст обратный результат — ring buffer обычно быстрее, потому что одна арифметическая операция вместо block-linked обхода.
Попробуй сам
Реализуйте трекер «последних N измерений» через overwriting ring buffer для метрик сервиса:
class MetricRing:
"""Хранит последние N значений, считает min/max/avg за O(N)."""
def __init__(self, capacity):
self._data = [None] * capacity
self._capacity = capacity
self._head = 0
self._tail = 0
self._size = 0
# running sum для O(1) avg
self._sum = 0.0
def add(self, x):
if self._size == self._capacity:
old = self._data[self._head]
self._sum -= old
self._head = (self._head + 1) % self._capacity
else:
self._size += 1
self._data[self._tail] = x
self._sum += x
self._tail = (self._tail + 1) % self._capacity
def avg(self):
if self._size == 0:
return 0.0
return self._sum / self._size
def min(self):
return min(self._data[(self._head + i) % self._capacity]
for i in range(self._size))
def max(self):
return max(self._data[(self._head + i) % self._capacity]
for i in range(self._size))
mr = MetricRing(5)
for x in [10, 20, 30, 40, 50, 60, 70]:
mr.add(x)
print(f"after {x}: avg={mr.avg():.2f}, min={mr.min()}, max={mr.max()}")
Заметьте: avg — O(1) через running sum. min/max — O(N) полный проход, но N маленькое (capacity ring buffer). Если нужна O(1) min/max на скользящем окне — это monotonic deque алгоритм, более сложная техника.