Синхронизация — race conditions, mutex, semaphore, atomic, memory ordering
Самая зловредная багофабрика в многопоточном программировании — race conditions. Тест прошёл 99 раз — всё ок, на 100-й — упало. Запустили в продакшене — работает, но раз в неделю баланс пользователя становится отрицательным. Причина: два потока одновременно прочитали состояние, оба изменили, оба записали — одно изменение потерялось.
В этом уроке разберём, что такое race condition на железном уровне, зачем нужны мьютексы, что такое atomic-операции (fetch_add, compare-and-swap), и зачем существует ужасная штука под названием memory ordering, которую регулярно ломают на собеседованиях.
Для junior data engineer это нужно не только в системном программировании. Те же race conditions есть в БД (отсюда транзакции и SELECT FOR UPDATE), в распределённых системах (отсюда консенсус-алгоритмы), даже в SQL-скриптах. Понимая race conditions на железе, легче понимать их везде.
Race condition: что физически происходит
Возьмём канонический пример. Два потока инкрементируют общий счётчик 1 миллион раз каждый. Ожидаемый результат: 2 миллиона. На практике почти всегда меньше.
// Псевдокод
int counter = 0;
void worker() {
for (int i = 0; i < 1000000; i++) {
counter++; // Race condition!
}
}
// thread1: worker()
// thread2: worker()
// Ожидаем counter == 2000000, получаем 1234567 или подобное
Почему? Потому что counter++ — это не одна операция, а три:
Между этими тремя операциями планировщик может переключить поток. Или другой CPU параллельно сделает то же самое.
Должно было стать 7. Стало 6. Один инкремент потерян. На миллион итераций таких пропусков сотни тысяч.
# Воспроизведём руками:
cat > /tmp/race.py << 'EOF'
import threading
counter = 0
def worker():
global counter
for _ in range(100000):
counter += 1
threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 400000, got: {counter}")
EOF
python3 /tmp/race.py
# В CPython 3.13 + free-threading (GIL отключен) -- увидите потерю инкрементов
# В обычном CPython с GIL -- получите 400000 (GIL играет роль глобального мьютекса)
Кстати, в Python с GIL race conditions есть всё равно — но в более узких сценариях (между байткод-инструкциями). И в Python 3.13+ с no-GIL режимом классические race вернулись.
Mutex: эксклюзивная защита
Самый базовый инструмент — mutex (mutual exclusion — взаимное исключение). Только один поток может «держать» мьютекс в каждый момент. Остальные ждут.
# Правильная версия с мьютексом:
import threading
counter = 0
lock = threading.Lock()
def worker():
global counter
for _ in range(100000):
with lock: # захват мьютекса; with автоматически освобождает
counter += 1
threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # Гарантированно 400000
Под капотом в Linux — futex. Если мьютекс свободен — захват через atomic CAS в userspace (один lock-инструкция CPU, ~5 ns). Если занят — syscall futex(FUTEX_WAIT), ядро переводит поток в sleep, ждёт пока owner разбудит через futex(FUTEX_WAKE). Это «fast path» (uncontested) и «slow path» (contested).
# Глянем futex-вызовы при использовании Lock в Python:
strace -e futex -c python3 /tmp/race.py 2>&1 | tail -5
# В колонке "calls" -- сколько futex_wait/futex_wake было
# Чем больше contention -- тем больше syscalls -- тем медленнее
Semaphore: счётчик доступов
Semaphore — обобщение мьютекса. Это счётчик: сколько потоков могут одновременно быть в critical section. У мьютекса счётчик 1, у semaphore — N.
# Пример: ограничиваем число одновременных HTTP-запросов до 10
import threading, requests
sem = threading.Semaphore(10)
def fetch(url):
with sem: # acquire, потом release в конце with
# Только 10 потоков одновременно здесь
return requests.get(url).text
Semaphore нужен, когда ресурс физически ограничен в количестве (например, max 10 одновременных соединений к API), но не один-в-один.
Binary semaphore — семафор со счётчиком 1. Концептуально равен мьютексу, но есть тонкая разница: семафор могут release разные потоки (тот, кто acquire, не обязан release). Мьютекс должен быть unlock тем же потоком, который lock.
Самая частая ошибка с семафорами — забыть release. Если поток упал между acquire и release, счётчик не вернётся, скоро все потоки заблокируются навсегда. Используйте with-блок (Python) или RAII (C++/Rust) для автоматического release.
Read-Write lock: множество читателей, один писатель
RWLock — оптимизация мьютекса для случая, когда читателей много, писатель редкий. Множество потоков могут одновременно read_lock (читать). Только один поток может write_lock (писать), и в это время никто не читает.
Подходит для кэшей, конфигов, реестров — где 99% доступа read, 1% write. Не подходит, если writers тоже частые — мьютекс будет проще и быстрее.
В Python — threading.RLock это reentrant lock (тот же поток может lock несколько раз), а не RWLock. RWLock есть в сторонних библиотеках или в asyncio (asyncio.Lock).
Atomic: lock-free для простых операций
Для простых случаев — инкремент счётчика, флаг, указатель — можно обойтись без мьютекса. CPU предоставляет atomic-инструкции, которые выполняют read-modify-write одной неделимой операцией.
В Python (CPython) atomic-операции скрыты, но через C-расширения доступны:
# Через cffi/ctypes можно использовать atomic_load/store
# Или через стандартную мультипроцессинг.Value (хотя там Lock)
# Самый простой пример -- multiprocessing.Value с lock=False:
from multiprocessing import Value
counter = Value('i', 0, lock=False)
# Доступ через counter.value -- не атомарный по умолчанию в Python
В C++:
#include <atomic>
std::atomic<int> counter{0};
// Поток 1 и 2 могут одновременно делать:
counter.fetch_add(1);
// Никаких race conditions. И никаких мьютексов.
Когда выбирать atomic vs mutex:
- Простой счётчик, флаг — atomic, в 10x быстрее.
- Несколько связанных операций (увеличить counter И обновить indexes) — mutex.
- Lock-free структуры данных (queue, stack) — atomic CAS в цикле — сложно, но без блокировок.
Memory ordering: главный шок-эффект
Здесь начинается то, что многих обескураживает. CPU и компилятор могут переставлять операции для оптимизации, при условии что результат «для этого потока» не меняется. Но «для другого потока» — может меняться драматически.
Канонический пример:
int x = 0, y = 0;
int r1, r2;
// Thread 1:
x = 1;
r1 = y;
// Thread 2:
y = 1;
r2 = x;
Что мы ожидаем? Хотя бы один из r1, r2 должен быть 1 (нельзя оба нулевыми — либо T1 прошёл первым, либо T2). А на самом деле возможна ситуация r1 == 0 && r2 == 0. Как? CPU может переставить x = 1 и r1 = y в Thread 1 (потому что это разные адреса памяти, для этого потока порядок неважен), аналогично в Thread 2. И тогда оба читают раньше, чем записывают.
Чтобы решить проблему, нужно явно сказать компилятору и CPU: «после этой операции все предыдущие должны быть видны». Это и есть memory barrier (или fence).
В большинстве языков высокого уровня это скрыто: mutex.unlock() или atomic.store(value, memory_order_release) ставят правильные барьеры. Вам обычно не нужно думать про барьеры — если используете стандартные мьютексы и atomic с дефолтным memory_order_seq_cst (sequential consistency).
Memory orderings (в C++/Rust):
Для junior data engineer достаточно знать: используйте mutex или seq_cst atomic, не пытайтесь хитрить с relaxed/acquire/release, пока не разберётесь в memory model досконально. Цена «оптимизации» здесь — баги, которые проявляются раз в год на продакшене.
Реальный пример: producer-consumer
Классическая задача: один поток (producer) кладёт элементы в очередь, другой (consumer) забирает. Как реализовать без race conditions?
Вариант 1: мьютекс + условная переменная (condition variable):
import threading
from collections import deque
queue = deque()
lock = threading.Lock()
not_empty = threading.Condition(lock)
def producer():
for i in range(100):
with lock:
queue.append(i)
not_empty.notify() # будит одного consumer
def consumer():
while True:
with lock:
while not queue:
not_empty.wait() # отпускает lock и спит
item = queue.popleft()
process(item)
Вариант 2: thread-safe queue (готовая реализация):
from queue import Queue
q = Queue()
# producer:
q.put(item)
# consumer:
item = q.get() # блокируется, если пусто
queue.Queue внутри использует мьютекс + condition variables, всё корректно. Для большинства задач это правильный выбор — не изобретайте велосипед.
Вариант 3: lock-free queue через atomic — сложно, не для общего применения. Используется в high-performance системах (LMAX Disruptor, kafka producer), где каждая микросекунда важна.
Конкуррентность в сетевых серверах: producer-consumer через event loopПрактика: посмотрите futex-активность в реальной программе
# Запустите многопоточную программу с интенсивной синхронизацией:
cat > /tmp/many-locks.py << 'EOF'
import threading
lock = threading.Lock()
counter = 0
def w():
global counter
for _ in range(100000):
with lock:
counter += 1
threads = [threading.Thread(target=w) for _ in range(8)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)
EOF
# Замерим futex calls:
strace -e futex -c python3 /tmp/many-locks.py 2>&1 | tail -10
# Колонка "calls" -- может быть сотни тысяч futex_wait
# Это и есть стоимость contention -- много syscall'ов
Если в strace видите много FUTEX_WAIT — ваш код страдает от lock contention. Lock-удержание слишком длинное, потоки толкаются. Решения:
- Уменьшите critical section (минимум кода под lock).
- Используйте atomic вместо mutex, если возможно.
- Сделайте lock-free структуру данных.
- Шардируйте состояние (каждый поток имеет свой counter, в конце суммируем).
Попробуй сам
# 1. Спровоцируйте race condition:
cat > /tmp/race-c.c << 'EOF'
#include <pthread.h>
#include <stdio.h>
long counter = 0;
void* w(void* a) {
for (long i = 0; i < 10000000; i++) counter++;
return NULL;
}
int main() {
pthread_t t[4];
for (int i = 0; i < 4; i++) pthread_create(&t[i], NULL, w, NULL);
for (int i = 0; i < 4; i++) pthread_join(t[i], NULL);
printf("Expected: 40000000, got: %ld\n", counter);
}
EOF
gcc -O2 /tmp/race-c.c -lpthread -o /tmp/race-c
/tmp/race-c
# Получите число < 40000000 -- race condition подтверждена
# 2. Исправьте через mutex (добавьте pthread_mutex_lock/unlock вокруг counter++)
# Замерьте время с -O2: race-версия в 10-50x быстрее но даёт неверный результат
# 3. Исправьте через __sync_fetch_and_add (atomic):
# Получите правильный результат, и быстрее чем с mutex
# 4. Замерьте futex-syscalls для версии с mutex:
strace -e futex -c /tmp/race-c-mutex 2>&1 | tail -5