Learning Platform
Глоссарий Troubleshooting
Урок 05.03 · 22 мин
Начальный
ThreadsRace conditionsMutexAtomicMemory ordering

Синхронизация — race conditions, mutex, semaphore, atomic, memory ordering

Самая зловредная багофабрика в многопоточном программировании — race conditions. Тест прошёл 99 раз — всё ок, на 100-й — упало. Запустили в продакшене — работает, но раз в неделю баланс пользователя становится отрицательным. Причина: два потока одновременно прочитали состояние, оба изменили, оба записали — одно изменение потерялось.

В этом уроке разберём, что такое race condition на железном уровне, зачем нужны мьютексы, что такое atomic-операции (fetch_add, compare-and-swap), и зачем существует ужасная штука под названием memory ordering, которую регулярно ломают на собеседованиях.

Для junior data engineer это нужно не только в системном программировании. Те же race conditions есть в БД (отсюда транзакции и SELECT FOR UPDATE), в распределённых системах (отсюда консенсус-алгоритмы), даже в SQL-скриптах. Понимая race conditions на железе, легче понимать их везде.

Разрешение коллизий в hash-таблицах: тот же принцип конкурентного доступа

Race condition: что физически происходит

Возьмём канонический пример. Два потока инкрементируют общий счётчик 1 миллион раз каждый. Ожидаемый результат: 2 миллиона. На практике почти всегда меньше.

// Псевдокод
int counter = 0;
void worker() {
    for (int i = 0; i < 1000000; i++) {
        counter++;  // Race condition!
    }
}
// thread1: worker()
// thread2: worker()
// Ожидаем counter == 2000000, получаем 1234567 или подобное

Почему? Потому что counter++ — это не одна операция, а три:

counter++ -- три операции на железе
1. LOADПрочитать значение counter из RAM в регистр CPU (RAX). Mov rax, [counter]
2. INCИнкрементировать значение в регистре. add rax, 1
3. STOREЗаписать новое значение обратно в RAM. mov [counter], rax

Между этими тремя операциями планировщик может переключить поток. Или другой CPU параллельно сделает то же самое.

Гонка: оба потока загрузили 5, оба записали 6
Время t1Thread 1: LOAD counter (читает 5)
Время t2Thread 2: LOAD counter (читает тоже 5! потому что T1 ещё не сохранил)
Время t3Thread 1: INC -> 6 в регистре. counter в RAM всё ещё 5
Время t4Thread 2: INC -> 6 в регистре. counter в RAM всё ещё 5
Время t5Thread 1: STORE 6 в counter. counter == 6
Время t6Thread 2: STORE 6 в counter. counter всё ещё 6, ИНКРЕМЕНТ ПОТЕРЯН

Должно было стать 7. Стало 6. Один инкремент потерян. На миллион итераций таких пропусков сотни тысяч.

# Воспроизведём руками:
cat > /tmp/race.py << 'EOF'
import threading
counter = 0
def worker():
    global counter
    for _ in range(100000):
        counter += 1
threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Expected: 400000, got: {counter}")
EOF
python3 /tmp/race.py
# В CPython 3.13 + free-threading (GIL отключен) -- увидите потерю инкрементов
# В обычном CPython с GIL -- получите 400000 (GIL играет роль глобального мьютекса)

Кстати, в Python с GIL race conditions есть всё равно — но в более узких сценариях (между байткод-инструкциями). И в Python 3.13+ с no-GIL режимом классические race вернулись.


Mutex: эксклюзивная защита

Самый базовый инструмент — mutex (mutual exclusion — взаимное исключение). Только один поток может «держать» мьютекс в каждый момент. Остальные ждут.

Mutex -- эксклюзивный доступ к ресурсу
Thread 1Захватил мьютекс. Выполняет critical section: read-modify-write counter. Другие потоки ждут
critical sectionКод между lock и unlock. Сюда попадает только один поток. counter++ выполняется атомарно с точки зрения других потоков
Thread 2 waitПопытался lock, но lock уже захвачен. Поток заблокирован, ядро переводит его в state S (sleep). Не тратит CPU
Thread 3 waitТоже ждёт мьютекс. В очереди ожидания
T1 unlockОсвобождает мьютекс. Ядро будит одного из ждущих (порядок зависит от реализации -- FIFO или sometimes LIFO)
T2 wakes, locksT2 разбужен, захватил мьютекс, выполняет своё counter++
# Правильная версия с мьютексом:
import threading
counter = 0
lock = threading.Lock()
def worker():
    global counter
    for _ in range(100000):
        with lock:  # захват мьютекса; with автоматически освобождает
            counter += 1

threads = [threading.Thread(target=worker) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)  # Гарантированно 400000

Под капотом в Linux — futex. Если мьютекс свободен — захват через atomic CAS в userspace (один lock-инструкция CPU, ~5 ns). Если занят — syscall futex(FUTEX_WAIT), ядро переводит поток в sleep, ждёт пока owner разбудит через futex(FUTEX_WAKE). Это «fast path» (uncontested) и «slow path» (contested).

# Глянем futex-вызовы при использовании Lock в Python:
strace -e futex -c python3 /tmp/race.py 2>&1 | tail -5
# В колонке "calls" -- сколько futex_wait/futex_wake было
# Чем больше contention -- тем больше syscalls -- тем медленнее

Semaphore: счётчик доступов

Semaphore — обобщение мьютекса. Это счётчик: сколько потоков могут одновременно быть в critical section. У мьютекса счётчик 1, у semaphore — N.

# Пример: ограничиваем число одновременных HTTP-запросов до 10
import threading, requests
sem = threading.Semaphore(10)

def fetch(url):
    with sem:  # acquire, потом release в конце with
        # Только 10 потоков одновременно здесь
        return requests.get(url).text

Semaphore нужен, когда ресурс физически ограничен в количестве (например, max 10 одновременных соединений к API), но не один-в-один.

Binary semaphore — семафор со счётчиком 1. Концептуально равен мьютексу, но есть тонкая разница: семафор могут release разные потоки (тот, кто acquire, не обязан release). Мьютекс должен быть unlock тем же потоком, который lock.

NOTE

Самая частая ошибка с семафорами — забыть release. Если поток упал между acquire и release, счётчик не вернётся, скоро все потоки заблокируются навсегда. Используйте with-блок (Python) или RAII (C++/Rust) для автоматического release.


Read-Write lock: множество читателей, один писатель

RWLock — оптимизация мьютекса для случая, когда читателей много, писатель редкий. Множество потоков могут одновременно read_lock (читать). Только один поток может write_lock (писать), и в это время никто не читает.

RWLock -- много readers, один writer
Reader 1Захватил read lock. Может читать данные. Не блокирует других читателей
Reader 2Захватил read lock одновременно. Параллельно читает
Reader 3Тоже читает. Все три не мешают друг другу
Writer waitХочет write lock. Ждёт, пока ВСЕ читатели не отпустят свои read locks. Только после этого получит эксклюзивный доступ
Writer writesВсе читатели отпустили. Writer захватил эксклюзивный lock. Новые читатели ждут, пока writer не закончит

Подходит для кэшей, конфигов, реестров — где 99% доступа read, 1% write. Не подходит, если writers тоже частые — мьютекс будет проще и быстрее.

В Python — threading.RLock это reentrant lock (тот же поток может lock несколько раз), а не RWLock. RWLock есть в сторонних библиотеках или в asyncio (asyncio.Lock).


Atomic: lock-free для простых операций

Для простых случаев — инкремент счётчика, флаг, указатель — можно обойтись без мьютекса. CPU предоставляет atomic-инструкции, которые выполняют read-modify-write одной неделимой операцией.

Atomic instructions на x86
lock xaddAtomic fetch-and-add. lock-prefix блокирует cache line на время операции. На современных CPU это L1-cache-only -- быстрая операция, ~10-30 ns
lock cmpxchgCompare-and-Swap. Если значение в памяти == expected, заменить на new. Атомарно. Базис большинства lock-free алгоритмов
lock or/andAtomic bitwise operations. Для флагов
Без mutexAtomic-операции не требуют futex/syscall. Полностью userspace, на L1-cache. Идеально для тривиальных операций
Лимит: одна операцияAtomic работает только для одной операции (инкремент, CAS). Если нужно несколько действий вместе -- нужен мьютекс или сложные lock-free алгоритмы

В Python (CPython) atomic-операции скрыты, но через C-расширения доступны:

# Через cffi/ctypes можно использовать atomic_load/store
# Или через стандартную мультипроцессинг.Value (хотя там Lock)
# Самый простой пример -- multiprocessing.Value с lock=False:
from multiprocessing import Value
counter = Value('i', 0, lock=False)
# Доступ через counter.value -- не атомарный по умолчанию в Python

В C++:

#include <atomic>
std::atomic<int> counter{0};
// Поток 1 и 2 могут одновременно делать:
counter.fetch_add(1);
// Никаких race conditions. И никаких мьютексов.

Когда выбирать atomic vs mutex:

  • Простой счётчик, флаг — atomic, в 10x быстрее.
  • Несколько связанных операций (увеличить counter И обновить indexes) — mutex.
  • Lock-free структуры данных (queue, stack) — atomic CAS в цикле — сложно, но без блокировок.

Memory ordering: главный шок-эффект

Здесь начинается то, что многих обескураживает. CPU и компилятор могут переставлять операции для оптимизации, при условии что результат «для этого потока» не меняется. Но «для другого потока» — может меняться драматически.

Канонический пример:

int x = 0, y = 0;
int r1, r2;

// Thread 1:
x = 1;
r1 = y;

// Thread 2:
y = 1;
r2 = x;

Что мы ожидаем? Хотя бы один из r1, r2 должен быть 1 (нельзя оба нулевыми — либо T1 прошёл первым, либо T2). А на самом деле возможна ситуация r1 == 0 && r2 == 0. Как? CPU может переставить x = 1 и r1 = y в Thread 1 (потому что это разные адреса памяти, для этого потока порядок неважен), аналогично в Thread 2. И тогда оба читают раньше, чем записывают.

Memory reordering -- CPU и компилятор переставляют операции
ПрограммаЧто вы написали в коде: x=1; r1=y; в одном потоке. y=1; r2=x; в другом
КомпиляторМожет переставить инструкции для оптимизации регистров и cache
CPUOut-of-order execution: CPU выполняет инструкции в любом удобном порядке, лишь бы результат 'для этой ниточки исполнения' не отличался
Memory barrier (fence)Инструкция, заставляющая упорядочить операции памяти. mfence на x86, dmb на ARM. Тормозит, но гарантирует видимость
atomic/volatileВ языке: atomic с memory_order_acquire/release добавляет barriers автоматически

Чтобы решить проблему, нужно явно сказать компилятору и CPU: «после этой операции все предыдущие должны быть видны». Это и есть memory barrier (или fence).

В большинстве языков высокого уровня это скрыто: mutex.unlock() или atomic.store(value, memory_order_release) ставят правильные барьеры. Вам обычно не нужно думать про барьеры — если используете стандартные мьютексы и atomic с дефолтным memory_order_seq_cst (sequential consistency).

Memory orderings (в C++/Rust):

Уровни memory ordering -- от слабого к сильному
relaxedНикаких гарантий порядка. Только атомарность отдельной операции. Самый быстрый, самый опасный. Можно для счётчиков статистики
acquireПри чтении: все последующие чтения/записи не могут быть переставлены раньше этого. Используется в lock() операциях
releaseПри записи: все предыдущие чтения/записи не могут быть переставлены позже этого. Используется в unlock() операциях
acq_relДля read-modify-write операций (fetch_add, CAS). Сочетает acquire (на read) + release (на write)
seq_cstSequential consistency. Самая сильная гарантия: все операции на всех потоках видны в одинаковом глобальном порядке. По умолчанию в std::atomic

Для junior data engineer достаточно знать: используйте mutex или seq_cst atomic, не пытайтесь хитрить с relaxed/acquire/release, пока не разберётесь в memory model досконально. Цена «оптимизации» здесь — баги, которые проявляются раз в год на продакшене.


Реальный пример: producer-consumer

Классическая задача: один поток (producer) кладёт элементы в очередь, другой (consumer) забирает. Как реализовать без race conditions?

Вариант 1: мьютекс + условная переменная (condition variable):

import threading
from collections import deque

queue = deque()
lock = threading.Lock()
not_empty = threading.Condition(lock)

def producer():
    for i in range(100):
        with lock:
            queue.append(i)
            not_empty.notify()  # будит одного consumer

def consumer():
    while True:
        with lock:
            while not queue:
                not_empty.wait()  # отпускает lock и спит
            item = queue.popleft()
        process(item)

Вариант 2: thread-safe queue (готовая реализация):

from queue import Queue
q = Queue()
# producer:
q.put(item)
# consumer:
item = q.get()  # блокируется, если пусто

queue.Queue внутри использует мьютекс + condition variables, всё корректно. Для большинства задач это правильный выбор — не изобретайте велосипед.

Вариант 3: lock-free queue через atomic — сложно, не для общего применения. Используется в high-performance системах (LMAX Disruptor, kafka producer), где каждая микросекунда важна.

Конкуррентность в сетевых серверах: producer-consumer через event loop

Практика: посмотрите futex-активность в реальной программе

# Запустите многопоточную программу с интенсивной синхронизацией:
cat > /tmp/many-locks.py << 'EOF'
import threading
lock = threading.Lock()
counter = 0
def w():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1
threads = [threading.Thread(target=w) for _ in range(8)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)
EOF

# Замерим futex calls:
strace -e futex -c python3 /tmp/many-locks.py 2>&1 | tail -10
# Колонка "calls" -- может быть сотни тысяч futex_wait
# Это и есть стоимость contention -- много syscall'ов

Если в strace видите много FUTEX_WAIT — ваш код страдает от lock contention. Lock-удержание слишком длинное, потоки толкаются. Решения:

  1. Уменьшите critical section (минимум кода под lock).
  2. Используйте atomic вместо mutex, если возможно.
  3. Сделайте lock-free структуру данных.
  4. Шардируйте состояние (каждый поток имеет свой counter, в конце суммируем).

Попробуй сам

# 1. Спровоцируйте race condition:
cat > /tmp/race-c.c << 'EOF'
#include <pthread.h>
#include <stdio.h>
long counter = 0;
void* w(void* a) {
    for (long i = 0; i < 10000000; i++) counter++;
    return NULL;
}
int main() {
    pthread_t t[4];
    for (int i = 0; i < 4; i++) pthread_create(&t[i], NULL, w, NULL);
    for (int i = 0; i < 4; i++) pthread_join(t[i], NULL);
    printf("Expected: 40000000, got: %ld\n", counter);
}
EOF
gcc -O2 /tmp/race-c.c -lpthread -o /tmp/race-c
/tmp/race-c
# Получите число < 40000000 -- race condition подтверждена

# 2. Исправьте через mutex (добавьте pthread_mutex_lock/unlock вокруг counter++)
# Замерьте время с -O2: race-версия в 10-50x быстрее но даёт неверный результат

# 3. Исправьте через __sync_fetch_and_add (atomic):
# Получите правильный результат, и быстрее чем с mutex

# 4. Замерьте futex-syscalls для версии с mutex:
strace -e futex -c /tmp/race-c-mutex 2>&1 | tail -5

Проверка знанийKnowledge check
Junior пишет: 'У меня многопоточный код, который считает суммы в общую переменную. Тест показывает, что иногда сумма неправильная -- то 99500, то 99000 вместо ожидаемых 100000. В чём может быть дело и какие 4 разных решения существуют?'
ОтветAnswer
Это классический race condition на инкременте. Операция sum += value не атомарна -- она состоит из LOAD (прочитать sum), ADD (прибавить value), STORE (записать обратно). Между LOAD и STORE другой поток может вклиниться, прочитать старое значение, и оба потока перезапишут друг друга -- одно прибавление потеряется. Четыре решения: 1. Mutex/Lock. Оборачиваем sum += value в lock/unlock. Самое прямолинейное и надёжное. Минус: каждая операция -- захват лока, контент при высокой нагрузке -> CPU тратится на futex_wait. Подходит, когда обновление сложное (не только +=). 2. Atomic. На C/C++ -- std::atomic с fetch_add. На Java -- AtomicLong. На Rust -- AtomicI64 с Ordering::Relaxed (для счётчиков relaxed безопасен). Не требует syscall, использует lock-prefix CPU инструкцию. В 5-10 раз быстрее mutex для простых случаев. Минус: только для одной операции -- если нужно изменить несколько связанных полей, atomic не поможет. 3. Шардирование (sharding/thread-local). Каждый поток имеет свой локальный counter. В конце работы суммируем все локальные. Никакой синхронизации в горячем пути. Идеально для аккумуляторов и метрик. Минус: нужна финализация (сбор результатов), и память на N counters. 4. Lock-free структуры. Использовать готовые concurrent collections -- ConcurrentHashMap, AtomicReference с CAS-циклом, или из библиотек типа Disruptor (LMAX). Сложно реализовать самим (вы скорее всего сделаете баг), используйте готовое. Бонус (Python-специфика): в Python с GIL обычный counter += 1 АТОМАРЕН по байткоду (одна BINARY_OP инструкция?). Но в 3.13+ no-GIL режиме -- нет, race condition появляется. В CPython вообще полагаться на GIL для синхронизации плохая практика -- используйте threading.Lock в любом случае.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Почему counter++ в двух потоках без синхронизации может терять инкременты?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4