Learning Platform
Глоссарий Troubleshooting
Урок 12.02 · 30 мин
Продвинутый
Delta LakeACIDOptimistic Concurrency ControlConflict ResolutionSerializableWriteSerializableCoordinated CommitsLogStoreDynamoDB

ACID-транзакции и разрешение конфликтов

В предыдущем уроке мы разобрали структуру transaction log — JSON-коммиты, типы action, checkpoints. Но как Delta Lake обеспечивает ACID при одновременной записи нескольких клиентов в одну таблицу?

Delta Lake реализует optimistic concurrency control (OCC) — оптимистичное управление конкурентностью. Идея: каждый клиент работает так, будто он единственный, и только в момент коммита проверяет, не конфликтует ли его работа с другими.

Optimistic Concurrency Control: цикл Read-Validate-Commit

OCC: цикл Read → Validate → Commit
  1. READ: snapshot на версии N
Фаза 1: Read. Клиент читает текущее состояние таблицы через log replay. Запоминает номер прочитанной версии (readVersion). Все дальнейшие операции выполняются на этом snapshot.

Локальная обработка: создание Parquet-файлов, формирование actions

Клиент выполняет операцию (INSERT, UPDATE, DELETE, MERGE) локально: создаёт новые Parquet-файлы, формирует список add/remove actions. Это может занять секунды или минуты.
  1. VALIDATE: проверить конфликты с v(N+1)..v(N+K)
Фаза 2: Validate. Перед записью коммита клиент проверяет: появились ли новые коммиты (версия N+1, N+2...) пока он работал? Если да — нужно проверить конфликт между своими и чужими действиями.
Нет конфликта
  1. COMMIT: записать v(N+K+1).json
Фаза 3: Commit. Атомарная запись JSON-файла версии N+K+1 в _delta_log/. Способ атомарности зависит от storage backend (HDFS rename, S3 conditional put, ADLS ETag).
Конфликт

RETRY: перечитать → повторить операцию

Конфликт обнаружен: операции другого клиента затронули файлы, которые мы тоже читали или модифицировали. Клиент может retry: перечитать состояние, повторить операцию с нуля на новом snapshot.

Ключевой вопрос: что считается конфликтом? Не каждая параллельная запись конфликтует. Два INSERTа в разные партиции — не конфликт. UPDATE и DELETE одних и тех же строк — конфликт.

Алгоритм разрешения конфликтов

Delta Lake определяет конфликт на основе пересечения затронутых файлов и предикатов:

  1. Клиент A прочитал snapshot версии N, подготовил коммит
  2. Пока A работал, клиент B записал версии N+1, N+2
  3. A пытается записать N+3
  4. Validation: для каждого коммита B (N+1, N+2) — проверить:
  • Добавил ли B файлы в партиции, которые A читал для своей операции?
  • Удалил ли B файлы, которые A тоже удаляет или читал?
  • Изменил ли B метаданные или протокол?
Пример: два конкурентных писателя
Writer AWriter A читает snapshot версии 5, начинает INSERT в партицию date=2025-01-15. Создаёт новый Parquet-файл.
Writer BWriter B тоже читает snapshot версии 5, начинает INSERT в партицию date=2025-01-16. Создаёт свой Parquet-файл.
B коммитит первым → v6

A validates: B затронул date=‘01-16’, A — date=‘01-15’

A пытается коммитить. Validate: B добавил файлы в date=2025-01-16 (v6). A вставлял в date=2025-01-15. Партиции не пересекаются → нет конфликта.
B committedWriter B успешно записал версию 6. Добавил файл с данными за 2025-01-16.

A commits → v7 + (нет конфликта — разные партиции)

Нет конфликта: A коммитит как версию 7. Оба INSERTа в разные партиции — обе записи сохранены. Это преимущество WriteSerializable: disjoint appends не конфликтуют.

Матрица конфликтов

Не все комбинации операций конфликтуют. Delta Lake определяет конфликт на основе типа операции и пересечения затронутых данных:

Матрица конфликтов операций
Заголовок матрицы: строки — операция клиента A (txn), столбцы — операция клиента B (winning commit).
INSERTКлиент B выполнил INSERT — добавил новые файлы в таблицу.
DELETEКлиент B выполнил DELETE — пометил файлы как removed.
UPDATEКлиент B выполнил UPDATE — remove старых файлов + add новых.
OPTIMIZEКлиент B выполнил OPTIMIZE (compaction) — перепаковал файлы без изменения данных.
INSERTКлиент A выполняет INSERT.
+Два INSERTа — всегда успех. Каждый добавляет свои файлы, нет пересечений.
+INSERT (A) + DELETE (B) — нет конфликта. A добавляет новые файлы, B удаляет старые. Нет пересечения.
+INSERT (A) + UPDATE (B) — нет конфликта. A добавляет файлы, B обновляет существующие. Разные файлы.
+INSERT (A) + OPTIMIZE (B) — нет конфликта. A добавляет новые данные, B перепаковывает старые. Разные файлы.
DELETEКлиент A выполняет DELETE.
!DELETE (A) + INSERT (B) — может конфликтовать при Serializable, если B добавил файлы в партицию, которую A сканировал для DELETE. При WriteSerializable — нет конфликта.
НетDELETE (A) + DELETE (B) — конфликт, если оба пытаются удалить одни и те же файлы. Один из них уже не существует.
НетDELETE (A) + UPDATE (B) — конфликт, если B обновил файлы, которые A удаляет. Файлы A уже заменены.
НетDELETE (A) + OPTIMIZE (B) — конфликт. B перепаковал файлы, которые A удаляет. Файлы A больше не существуют.
UPDATEКлиент A выполняет UPDATE.
!UPDATE (A) + INSERT (B) — может конфликтовать при Serializable. При WriteSerializable — нет конфликта, если B не затронул файлы, которые A читал.
НетUPDATE (A) + DELETE (B) — конфликт, если B удалил файлы, которые A обновляет. A основывал свой UPDATE на данных, которых больше нет.
НетUPDATE (A) + UPDATE (B) — конфликт, если оба обновляют одни и те же файлы. Результат зависит от порядка — нарушение serializability.
НетUPDATE (A) + OPTIMIZE (B) — конфликт. B перепаковал файлы, A основывал update на старой раскладке файлов.
NOTE

«Те же файлы» — значит оба клиента затронули (read + modify) одни и те же Parquet-файлы. Если DELETE A и DELETE B удаляют строки из разных файлов — конфликта нет. Конфликт определяется на уровне файлов, не строк (без deletion vectors).

Isolation Levels: Serializable vs WriteSerializable

Delta Lake поддерживает два уровня изоляции:

Serializable vs WriteSerializable
SerializableСтрогий уровень: результат параллельного выполнения эквивалентен какому-то последовательному порядку. Проверяет и read-конфликты: если B добавил данные в партицию, которую A сканировал — конфликт. Максимальная корректность, максимум retry.
WriteSerializable (default)Менее строгий уровень (по умолчанию): проверяет только write-write конфликты. Два INSERTа никогда не конфликтуют. DELETE конфликтует с DELETE/UPDATE только если затрагивают одни и те же файлы. Меньше retry → выше throughput.

WriteSerializable — уровень по умолчанию. Он разрешает «phantom reads»: если клиент A выполняет DELETE WHERE amount > 1000, а клиент B параллельно вставляет новую строку с amount = 5000 — при WriteSerializable это не конфликт. Строка B не будет удалена, хотя подпадает под предикат A. При Serializable — это конфликт, и один из клиентов получит retry.

# Установка isolation level
from deltalake import DeltaTable

# При создании таблицы
# delta.isolationLevel = "WriteSerializable" (default)
# delta.isolationLevel = "Serializable"

dt = DeltaTable("./my_table")
config = dt.metadata().configuration
print(f"Isolation: {config.get('delta.isolationLevel', 'WriteSerializable (default)')}")
WARNING

Большинство ETL-пайплайнов не нуждаются в Serializable. WriteSerializable достаточен для append-only загрузки, batch UPDATE/MERGE (если один писатель на таблицу), и OPTIMIZE. Serializable нужен, если несколько клиентов конкурентно читают и пишут одну и ту же таблицу, и корректность зависит от snapshot isolation.

Атомарность коммита: LogStore

Атомарность записи JSON-коммита зависит от storage backend. Delta Lake абстрагирует это через LogStore:

LogStore: атомарность коммита по storage backend

LogStore: put-if-absent для JSON-коммитов

LogStore — абстракция в Delta Lake, которая обеспечивает put-if-absent семантику для записи коммитов. Конкретная реализация зависит от storage backend.
HDFS / Local FSAtomic rename: клиент записывает коммит во временный файл (temp-xxx.json), затем выполняет атомарный rename в целевое имя (00000000000000000042.json). Rename атомарен на POSIX-совместимых FS. Если файл уже существует — rename fail → конфликт.
Amazon S3S3 поддерживает conditional put (put-if-absent) через If-None-Match заголовок. До 2024 года S3 не гарантировал этого — использовался DynamoDB как координатор. С S3 conditional writes (2024) — нативная поддержка.
Azure ADLSAzure Data Lake Storage Gen2 поддерживает conditional write через ETag (If-None-Match). Атомарность гарантирована на уровне storage.
GCSGoogle Cloud Storage поддерживает conditional create через ifGenerationMatch=0. Операция успешна только если объект ещё не существует.
TIP

До 2024 года S3 не поддерживал conditional put напрямую. Delta Lake использовал DynamoDB как внешний координатор для обеспечения put-if-absent. С появлением S3 conditional writes (август 2024) — DynamoDB для базовой атомарности больше не нужен. Однако для Coordinated Commits (multi-cluster writes) DynamoDB по-прежнему используется — это другой уровень координации.

Coordinated Commits (Writer V7)

Стандартный OCC работает, когда все писатели могут видеть _delta_log/ на одном storage. Но что если:

  • Разные Spark-кластеры на разных облаках пишут в одну таблицу?
  • Storage не гарантирует мгновенную видимость новых файлов (eventual consistency)?
  • Нужны multi-table transactions?

Delta Lake 4.0 ввёл Coordinated Commits (Writer V7) — внешний координатор управляет всеми коммитами:

Coordinated Commits: внешний координатор
Cluster A (AWS)Spark-кластер в AWS. Вместо прямой записи в _delta_log/ — отправляет prepared commit координатору.
Cluster B (Azure)Spark-кластер в Azure. Тоже отправляет prepared commit координатору. Не пишет в _delta_log/ напрямую.
Cluster C (GCP)Spark-кластер в GCP. Все три кластера координируются через один внешний coordinator.
Prepared commits

Commit Coordinator (DynamoDB / CosmosDB)

Commit Coordinator — внешний сервис (DynamoDB, Azure CosmosDB), который сериализует все коммиты. Получает prepared commits от клиентов, проверяет конфликты, назначает версии, и backfills в _delta_log/.
Serialized commits + backfill
_delta_log/ (S3 / ADLS / GCS)Координатор записывает финальные JSON-коммиты в _delta_log/. Клиенты, которые не используют Coordinated Commits, могут читать таблицу как обычно — лог совместим.

Процесс:

  1. Клиент готовит коммит (Parquet-файлы + actions) — но не записывает JSON в _delta_log/
  2. Клиент отправляет prepared commit координатору
  3. Координатор сериализует коммиты: проверяет конфликты, назначает монотонную версию
  4. Координатор выполняет backfill: записывает JSON-коммит в _delta_log/
  5. Координатор уведомляет клиента об успехе
# Включение Coordinated Commits при создании таблицы (через Spark)
# delta.coordinatedCommits.commitCoordinator-preview = "dynamodb"
# delta.coordinatedCommits.commitCoordinatorConf-preview =
# '{"dynamoDBTableName": "delta_commits", "dynamoDBEndpoint": "..."}'

# Чтение таблицы с Coordinated Commits — прозрачно
dt = DeltaTable("s3://bucket/my_table")
print(dt.version()) # Работает как обычно
NOTE

Coordinated Commits — это table feature (Writer V7). Включение необратимо (до Delta Lake 4.0, где появился DROP FEATURE). Клиенты, не поддерживающие Writer V7, не смогут писать в такую таблицу, но читать могут — JSON-коммиты в _delta_log/ совместимы.

Практический пример: конкурентные операции

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

# Создаём таблицу
schema = pa.schema([
 ("id", pa.int64()),
 ("name", pa.string()),
 ("amount", pa.float64()),
 ("date", pa.string()),
])

data = pa.table({
 "id": [1, 2, 3],
 "name": ["Alice", "Bob", "Carol"],
 "amount": [100.0, 200.0, 300.0],
 "date": ["2025-01-15", "2025-01-15", "2025-01-16"],
})

write_deltalake("./demo_table", data, partition_by=["date"])

# Два "конкурентных" append в разные партиции — не конфликтуют
dt = DeltaTable("./demo_table")
print(f"Версия до: {dt.version()}")

new_data_a = pa.table({
 "id": [4], "name": ["Dave"],
 "amount": [400.0], "date": ["2025-01-17"],
})
write_deltalake(dt, new_data_a, mode="append")

new_data_b = pa.table({
 "id": [5], "name": ["Eve"],
 "amount": [500.0], "date": ["2025-01-18"],
})
write_deltalake(dt, new_data_b, mode="append")

dt.update_incremental()
print(f"Версия после: {dt.version()}")
print(f"Файлов: {len(dt.file_uris())}")

# История
for entry in dt.history():
 print(f" v{entry['version']}: {entry['operation']}")

Retry и convergence

При конфликте клиент выполняет retry:

  1. Перечитать текущее состояние таблицы (новый snapshot)
  2. Повторить операцию на новом snapshot
  3. Попытаться коммитить

Delta Lake автоматически выполняет retry (настраиваемое количество попыток). Для большинства сценариев convergence наступает за 1–2 retry. Проблемы возникают при:

  • Hot partitions: множество клиентов пишут в одну партицию → высокая вероятность конфликта
  • Long-running transactions: чем дольше операция, тем больше шанс, что кто-то другой записал новую версию
  • Full table scans + writes: Serializable + DELETE с широким предикатом + параллельные INSERTs → частые retry
WARNING

Если retry не помогает (постоянные конфликты), решения: разделить таблицу на несколько (по домену), перейти на append-only + downstream MERGE, или использовать Coordinated Commits для сериализации конкурентных записей.

Итоги

КонцепцияСуть
OCCRead → Validate → Commit. Оптимистичный: конфликт проверяется в момент коммита
Conflict resolutionНа уровне файлов: пересечение затронутых файлов между txn и winning commits
WriteSerializableDefault. Проверяет только write-write конфликты. Append + Append = OK
SerializableСтрогий. Проверяет read-write конфликты. INSERT в сканированную партицию = конфликт
LogStoreАбстракция put-if-absent: HDFS rename, S3 conditional put, ADLS ETag
Coordinated CommitsWriter V7. Внешний координатор (DynamoDB) сериализует коммиты из разных кластеров

В следующем уроке — time travel (чтение прошлых версий), детали checkpoint-механики, и VACUUM (очистка файлов, которые больше не нужны).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Два процесса одновременно выполняют MERGE на одной Delta-таблице. Процесс A начал раньше (readVersion=5), процесс B коммитит первым (version=6). Что произойдёт с процессом A?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6