ACID-транзакции и разрешение конфликтов
В предыдущем уроке мы разобрали структуру transaction log — JSON-коммиты, типы action, checkpoints. Но как Delta Lake обеспечивает ACID при одновременной записи нескольких клиентов в одну таблицу?
Delta Lake реализует optimistic concurrency control (OCC) — оптимистичное управление конкурентностью. Идея: каждый клиент работает так, будто он единственный, и только в момент коммита проверяет, не конфликтует ли его работа с другими.
Optimistic Concurrency Control: цикл Read-Validate-Commit
- READ: snapshot на версии N
Локальная обработка: создание Parquet-файлов, формирование actions
Клиент выполняет операцию (INSERT, UPDATE, DELETE, MERGE) локально: создаёт новые Parquet-файлы, формирует список add/remove actions. Это может занять секунды или минуты.- VALIDATE: проверить конфликты с v(N+1)..v(N+K)
- COMMIT: записать v(N+K+1).json
RETRY: перечитать → повторить операцию
Конфликт обнаружен: операции другого клиента затронули файлы, которые мы тоже читали или модифицировали. Клиент может retry: перечитать состояние, повторить операцию с нуля на новом snapshot.Ключевой вопрос: что считается конфликтом? Не каждая параллельная запись конфликтует. Два INSERTа в разные партиции — не конфликт. UPDATE и DELETE одних и тех же строк — конфликт.
Алгоритм разрешения конфликтов
Delta Lake определяет конфликт на основе пересечения затронутых файлов и предикатов:
- Клиент A прочитал snapshot версии N, подготовил коммит
- Пока A работал, клиент B записал версии N+1, N+2
- A пытается записать N+3
- Validation: для каждого коммита B (N+1, N+2) — проверить:
- Добавил ли B файлы в партиции, которые A читал для своей операции?
- Удалил ли B файлы, которые A тоже удаляет или читал?
- Изменил ли B метаданные или протокол?
A validates: B затронул date=‘01-16’, A — date=‘01-15’
A пытается коммитить. Validate: B добавил файлы в date=2025-01-16 (v6). A вставлял в date=2025-01-15. Партиции не пересекаются → нет конфликта.A commits → v7 + (нет конфликта — разные партиции)
Нет конфликта: A коммитит как версию 7. Оба INSERTа в разные партиции — обе записи сохранены. Это преимущество WriteSerializable: disjoint appends не конфликтуют.Матрица конфликтов
Не все комбинации операций конфликтуют. Delta Lake определяет конфликт на основе типа операции и пересечения затронутых данных:
«Те же файлы» — значит оба клиента затронули (read + modify) одни и те же Parquet-файлы. Если DELETE A и DELETE B удаляют строки из разных файлов — конфликта нет. Конфликт определяется на уровне файлов, не строк (без deletion vectors).
Isolation Levels: Serializable vs WriteSerializable
Delta Lake поддерживает два уровня изоляции:
WriteSerializable — уровень по умолчанию. Он разрешает «phantom reads»: если клиент A выполняет DELETE WHERE amount > 1000, а клиент B параллельно вставляет новую строку с amount = 5000 — при WriteSerializable это не конфликт. Строка B не будет удалена, хотя подпадает под предикат A. При Serializable — это конфликт, и один из клиентов получит retry.
# Установка isolation level
from deltalake import DeltaTable
# При создании таблицы
# delta.isolationLevel = "WriteSerializable" (default)
# delta.isolationLevel = "Serializable"
dt = DeltaTable("./my_table")
config = dt.metadata().configuration
print(f"Isolation: {config.get('delta.isolationLevel', 'WriteSerializable (default)')}")
Большинство ETL-пайплайнов не нуждаются в Serializable. WriteSerializable достаточен для append-only загрузки, batch UPDATE/MERGE (если один писатель на таблицу), и OPTIMIZE. Serializable нужен, если несколько клиентов конкурентно читают и пишут одну и ту же таблицу, и корректность зависит от snapshot isolation.
Атомарность коммита: LogStore
Атомарность записи JSON-коммита зависит от storage backend. Delta Lake абстрагирует это через LogStore:
LogStore: put-if-absent для JSON-коммитов
LogStore — абстракция в Delta Lake, которая обеспечивает put-if-absent семантику для записи коммитов. Конкретная реализация зависит от storage backend.До 2024 года S3 не поддерживал conditional put напрямую. Delta Lake использовал DynamoDB как внешний координатор для обеспечения put-if-absent. С появлением S3 conditional writes (август 2024) — DynamoDB для базовой атомарности больше не нужен. Однако для Coordinated Commits (multi-cluster writes) DynamoDB по-прежнему используется — это другой уровень координации.
Coordinated Commits (Writer V7)
Стандартный OCC работает, когда все писатели могут видеть _delta_log/ на одном storage. Но что если:
- Разные Spark-кластеры на разных облаках пишут в одну таблицу?
- Storage не гарантирует мгновенную видимость новых файлов (eventual consistency)?
- Нужны multi-table transactions?
Delta Lake 4.0 ввёл Coordinated Commits (Writer V7) — внешний координатор управляет всеми коммитами:
Commit Coordinator (DynamoDB / CosmosDB)
Commit Coordinator — внешний сервис (DynamoDB, Azure CosmosDB), который сериализует все коммиты. Получает prepared commits от клиентов, проверяет конфликты, назначает версии, и backfills в _delta_log/.Процесс:
- Клиент готовит коммит (Parquet-файлы + actions) — но не записывает JSON в
_delta_log/ - Клиент отправляет prepared commit координатору
- Координатор сериализует коммиты: проверяет конфликты, назначает монотонную версию
- Координатор выполняет backfill: записывает JSON-коммит в
_delta_log/ - Координатор уведомляет клиента об успехе
# Включение Coordinated Commits при создании таблицы (через Spark)
# delta.coordinatedCommits.commitCoordinator-preview = "dynamodb"
# delta.coordinatedCommits.commitCoordinatorConf-preview =
# '{"dynamoDBTableName": "delta_commits", "dynamoDBEndpoint": "..."}'
# Чтение таблицы с Coordinated Commits — прозрачно
dt = DeltaTable("s3://bucket/my_table")
print(dt.version()) # Работает как обычно
Coordinated Commits — это table feature (Writer V7). Включение необратимо (до Delta Lake 4.0, где появился DROP FEATURE). Клиенты, не поддерживающие Writer V7, не смогут писать в такую таблицу, но читать могут — JSON-коммиты в _delta_log/ совместимы.
Практический пример: конкурентные операции
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
# Создаём таблицу
schema = pa.schema([
("id", pa.int64()),
("name", pa.string()),
("amount", pa.float64()),
("date", pa.string()),
])
data = pa.table({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Carol"],
"amount": [100.0, 200.0, 300.0],
"date": ["2025-01-15", "2025-01-15", "2025-01-16"],
})
write_deltalake("./demo_table", data, partition_by=["date"])
# Два "конкурентных" append в разные партиции — не конфликтуют
dt = DeltaTable("./demo_table")
print(f"Версия до: {dt.version()}")
new_data_a = pa.table({
"id": [4], "name": ["Dave"],
"amount": [400.0], "date": ["2025-01-17"],
})
write_deltalake(dt, new_data_a, mode="append")
new_data_b = pa.table({
"id": [5], "name": ["Eve"],
"amount": [500.0], "date": ["2025-01-18"],
})
write_deltalake(dt, new_data_b, mode="append")
dt.update_incremental()
print(f"Версия после: {dt.version()}")
print(f"Файлов: {len(dt.file_uris())}")
# История
for entry in dt.history():
print(f" v{entry['version']}: {entry['operation']}")
Retry и convergence
При конфликте клиент выполняет retry:
- Перечитать текущее состояние таблицы (новый snapshot)
- Повторить операцию на новом snapshot
- Попытаться коммитить
Delta Lake автоматически выполняет retry (настраиваемое количество попыток). Для большинства сценариев convergence наступает за 1–2 retry. Проблемы возникают при:
- Hot partitions: множество клиентов пишут в одну партицию → высокая вероятность конфликта
- Long-running transactions: чем дольше операция, тем больше шанс, что кто-то другой записал новую версию
- Full table scans + writes: Serializable + DELETE с широким предикатом + параллельные INSERTs → частые retry
Если retry не помогает (постоянные конфликты), решения: разделить таблицу на несколько (по домену), перейти на append-only + downstream MERGE, или использовать Coordinated Commits для сериализации конкурентных записей.
Итоги
| Концепция | Суть |
|---|---|
| OCC | Read → Validate → Commit. Оптимистичный: конфликт проверяется в момент коммита |
| Conflict resolution | На уровне файлов: пересечение затронутых файлов между txn и winning commits |
| WriteSerializable | Default. Проверяет только write-write конфликты. Append + Append = OK |
| Serializable | Строгий. Проверяет read-write конфликты. INSERT в сканированную партицию = конфликт |
| LogStore | Абстракция put-if-absent: HDFS rename, S3 conditional put, ADLS ETag |
| Coordinated Commits | Writer V7. Внешний координатор (DynamoDB) сериализует коммиты из разных кластеров |
В следующем уроке — time travel (чтение прошлых версий), детали checkpoint-механики, и VACUUM (очистка файлов, которые больше не нужны).