Change Data Feed и Streaming
В стандартном режиме Delta Lake отвечает на вопрос «каково текущее состояние таблицы?» через snapshot. Но для инкрементальных ETL-пайплайнов нужен ответ на другой вопрос: «что изменилось между версией N и версией M?»
Change Data Feed (CDF) — механизм, который записывает изменения (не только результат, но и тип изменения) в отдельные файлы, позволяя downstream-потребителям эффективно читать только дельту.
Механика CDF
Когда CDF включён, каждая операция записи (INSERT, UPDATE, DELETE, MERGE) помимо стандартных add/remove action записывает change records — специальные Parquet-файлы с метаданными об изменении.
Каждый change record содержит все колонки исходной таблицы плюс три служебных:
| Колонка | Тип | Описание |
|---|---|---|
_change_type | STRING | Тип изменения: insert, update_preimage, update_postimage, delete |
_commit_version | LONG | Номер версии коммита, в котором произошло изменение |
_commit_timestamp | TIMESTAMP | Timestamp коммита |
Включение CDF
CDF включается через table property:
-- При создании таблицы
CREATE TABLE orders (
order_id BIGINT,
city STRING,
amount DECIMAL(10, 2)
) TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
-- Для существующей таблицы
ALTER TABLE orders SET TBLPROPERTIES (
'delta.enableChangeDataFeed' = 'true'
);
from deltalake import DeltaTable
dt = DeltaTable("./orders")
# Включение CDF для существующей таблицы
dt.alter.set_table_properties({
"delta.enableChangeDataFeed": "true"
})
CDF записывает изменения только после включения. Ретроактивно восстановить изменения до включения CDF невозможно — для этого нужен полный diff между snapshot-ами.
Хранение CDF: директория _change_data/
CDF-файлы хранятся в поддиректории _change_data/ внутри таблицы:
my_table/
Корневая директория Delta-таблицы. Содержит данные (Parquet), transaction log (_delta_log/), и CDF-файлы (_change_data/).Важные детали:
- CDF-файлы — это отдельные Parquet-файлы, не дублирующие данные основной таблицы
- Для INSERT и DELETE на уровне целых файлов (без deletion vectors) CDF-файлы могут не создаваться — движок вычисляет изменения из
add/removeaction - CDF-файлы подчиняются VACUUM — удаляются, когда старше retention period
- В JSON-коммите появляется action
cdc— аналогadd, но указывающий на файл в_change_data/
Чтение CDF
По диапазону версий
from deltalake import DeltaTable
dt = DeltaTable("./orders")
# Чтение изменений между версиями 5 и 10
changes = dt.load_cdf(
starting_version=5,
ending_version=10
).read_all().to_pandas()
print(changes[["order_id", "city", "amount",
"_change_type", "_commit_version"]].head(10))
# Пример вывода:
# order_id city amount _change_type _commit_version
# 0 1001 Moscow 150.00 insert 5
# 1 1002 Berlin 75.50 insert 5
# 2 1001 Moscow 200.00 update_preimage 7
# 3 1001 Moscow 250.00 update_postimage 7
# 4 1003 Paris 30.00 delete 9
По диапазону timestamp
from deltalake import DeltaTable
dt = DeltaTable("./orders")
# Чтение изменений за период
changes = dt.load_cdf(
starting_timestamp="2025-01-01T00:00:00Z",
ending_timestamp="2025-01-31T23:59:59Z"
).read_all().to_pandas()
# Фильтрация по типу изменения
inserts = changes[changes["_change_type"] == "insert"]
updates = changes[changes["_change_type"] == "update_postimage"]
deletes = changes[changes["_change_type"] == "delete"]
print(f"За январь: {len(inserts)} вставок, "
f"{len(updates)} обновлений, {len(deletes)} удалений")
CDF в потоке данных
Операция: UPDATE orders SET amount = 250 WHERE order_id = 1001
Операция записи: INSERT, UPDATE, DELETE или MERGE. Движок определяет затронутые строки и тип изменения для каждой.Стандартный путь
Стандартный путь записи: старый файл помечается как remove, новый файл с обновлённой записью — как add в JSON-коммите.CDF-путь (если включён)
CDF-путь: дополнительно к стандартным action, записываются CDF-файлы в _change_data/ с preimage и postimage изменённых строк.Streaming Reads
Delta Lake поддерживает использование таблицы как streaming source — потребитель читает новые коммиты по мере их появления.
Spark Structured Streaming
# Spark Structured Streaming — для контекста
# (это Spark-specific API, не delta-rs)
stream = (spark.readStream
.format("delta")
.option("startingVersion", 5)
# или: .option("startingTimestamp", "2025-01-01")
.option("ignoreChanges", "true") # игнорировать UPDATE/DELETE
.load("./orders"))
# ignoreChanges=true: UPDATE/DELETE не вызывают ошибку,
# но postimage-строки обрабатываются как INSERT
# ignoreDeletes=true: DELETE не вызывают ошибку,
# но удалённые строки пропускаются
ignoreChanges и ignoreDeletes — компромиссные опции для streaming без CDF. С включённым CDF предпочтительнее использовать readChangeFeed = true, который передаёт все типы изменений downstream.
Streaming с CDF
# Spark Structured Streaming с CDF
stream = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.load("./orders"))
# Каждый микробатч содержит колонки
# _change_type, _commit_version, _commit_timestamp
# Downstream может обрабатывать insert/update/delete по-разному
Инкрементальный ETL: CDF + Medallion Architecture
Типичный паттерн — использование CDF для инкрементального обновления downstream-таблиц в medallion architecture (Bronze → Silver → Gold):
Пример инкрементального ETL
from deltalake import DeltaTable, write_deltalake
import pyarrow.compute as pc
# Читаем изменения из Bronze (с последнего обработанного коммита)
bronze = DeltaTable("./bronze_orders")
last_processed_version = 42 # сохранённый checkpoint
changes = bronze.load_cdf(
starting_version=last_processed_version + 1
).read_all().to_pandas()
if len(changes) > 0:
# Фильтруем только новые записи для Silver
new_records = changes[
changes["_change_type"].isin(["insert", "update_postimage"])
].drop(columns=["_change_type", "_commit_version",
"_commit_timestamp"])
# Применяем трансформации
new_records["amount_usd"] = new_records["amount"] * 0.011
new_records["processed_at"] = "2025-03-27"
# Записываем в Silver
silver = DeltaTable("./silver_orders")
(silver.merge(
source=pa.Table.from_pandas(new_records),
predicate="target.order_id = source.order_id",
source_alias="source",
target_alias="target"
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute())
# Обновляем checkpoint
last_processed_version = bronze.version()
print(f"Обработано {len(new_records)} записей, "
f"новый checkpoint: {last_processed_version}")
Храните last_processed_version в отдельной Delta-таблице или внешнем хранилище (Redis, файл). Это позволяет инкрементальному ETL продолжить с точки остановки после сбоя — exactly-once семантика при идемпотентном MERGE.
Что дальше
В следующем уроке мы разберём Delta Kernel (новый фундамент для коннекторов), UniForm (автоматическая генерация Iceberg-метаданных) и экосистему движков.
Delta как source/sink в Structured Streaming