Learning Platform
Глоссарий Troubleshooting
Урок 12.05 · 30 мин
Продвинутый
Delta LakeChange Data FeedCDFStreamingIncremental ETL_change_type_change_dataMedallion Architecture

Change Data Feed и Streaming

В стандартном режиме Delta Lake отвечает на вопрос «каково текущее состояние таблицы?» через snapshot. Но для инкрементальных ETL-пайплайнов нужен ответ на другой вопрос: «что изменилось между версией N и версией M?»

Change Data Feed (CDF) — механизм, который записывает изменения (не только результат, но и тип изменения) в отдельные файлы, позволяя downstream-потребителям эффективно читать только дельту.

Механика CDF

Когда CDF включён, каждая операция записи (INSERT, UPDATE, DELETE, MERGE) помимо стандартных add/remove action записывает change records — специальные Parquet-файлы с метаданными об изменении.

CDF: типы изменений (_change_type)
insertНовая запись добавлена в таблицу. CDF-файл содержит полную запись с _change_type='insert'. Генерируется при INSERT и при INSERT-части MERGE.
update_preimageСостояние записи ДО обновления. CDF-файл содержит старые значения всех колонок. Генерируется при UPDATE и при UPDATE-части MERGE.
update_postimageСостояние записи ПОСЛЕ обновления. CDF-файл содержит новые значения всех колонок. Всегда идёт в паре с update_preimage.
deleteЗапись удалена из таблицы. CDF-файл содержит полную удалённую строку с _change_type='delete'. Генерируется при DELETE и при DELETE-части MERGE.

Каждый change record содержит все колонки исходной таблицы плюс три служебных:

КолонкаТипОписание
_change_typeSTRINGТип изменения: insert, update_preimage, update_postimage, delete
_commit_versionLONGНомер версии коммита, в котором произошло изменение
_commit_timestampTIMESTAMPTimestamp коммита

Включение CDF

CDF включается через table property:

-- При создании таблицы
CREATE TABLE orders (
 order_id BIGINT,
 city STRING,
 amount DECIMAL(10, 2)
) TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

-- Для существующей таблицы
ALTER TABLE orders SET TBLPROPERTIES (
 'delta.enableChangeDataFeed' = 'true'
);
from deltalake import DeltaTable

dt = DeltaTable("./orders")
# Включение CDF для существующей таблицы
dt.alter.set_table_properties({
 "delta.enableChangeDataFeed": "true"
})
NOTE

CDF записывает изменения только после включения. Ретроактивно восстановить изменения до включения CDF невозможно — для этого нужен полный diff между snapshot-ами.

Хранение CDF: директория _change_data/

CDF-файлы хранятся в поддиректории _change_data/ внутри таблицы:

Структура хранения CDF

my_table/

Корневая директория Delta-таблицы. Содержит данные (Parquet), transaction log (_delta_log/), и CDF-файлы (_change_data/).
Данные (Parquet)Стандартные Parquet-файлы с текущим состоянием данных. Не изменяются CDF — это те же файлы, что были бы без CDF.
_delta_log/Transaction log. cdc action в JSON-коммитах указывает на CDF-файлы в _change_data/ — аналогично add action для data-файлов.
_change_data/Отдельная директория с Parquet-файлами изменений. Каждый файл содержит записи с _change_type, _commit_version, _commit_timestamp. Файлы подчиняются VACUUM — удаляются, когда старше retention period.

Важные детали:

  • CDF-файлы — это отдельные Parquet-файлы, не дублирующие данные основной таблицы
  • Для INSERT и DELETE на уровне целых файлов (без deletion vectors) CDF-файлы могут не создаваться — движок вычисляет изменения из add/remove action
  • CDF-файлы подчиняются VACUUM — удаляются, когда старше retention period
  • В JSON-коммите появляется action cdc — аналог add, но указывающий на файл в _change_data/

Чтение CDF

По диапазону версий

from deltalake import DeltaTable

dt = DeltaTable("./orders")

# Чтение изменений между версиями 5 и 10
changes = dt.load_cdf(
 starting_version=5,
 ending_version=10
).read_all().to_pandas()

print(changes[["order_id", "city", "amount",
 "_change_type", "_commit_version"]].head(10))

# Пример вывода:
# order_id city amount _change_type _commit_version
# 0 1001 Moscow 150.00 insert 5
# 1 1002 Berlin 75.50 insert 5
# 2 1001 Moscow 200.00 update_preimage 7
# 3 1001 Moscow 250.00 update_postimage 7
# 4 1003 Paris 30.00 delete 9

По диапазону timestamp

from deltalake import DeltaTable

dt = DeltaTable("./orders")

# Чтение изменений за период
changes = dt.load_cdf(
 starting_timestamp="2025-01-01T00:00:00Z",
 ending_timestamp="2025-01-31T23:59:59Z"
).read_all().to_pandas()

# Фильтрация по типу изменения
inserts = changes[changes["_change_type"] == "insert"]
updates = changes[changes["_change_type"] == "update_postimage"]
deletes = changes[changes["_change_type"] == "delete"]

print(f"За январь: {len(inserts)} вставок, "
 f"{len(updates)} обновлений, {len(deletes)} удалений")

CDF в потоке данных

CDF: поток записи изменений

Операция: UPDATE orders SET amount = 250 WHERE order_id = 1001

Операция записи: INSERT, UPDATE, DELETE или MERGE. Движок определяет затронутые строки и тип изменения для каждой.

Стандартный путь

Стандартный путь записи: старый файл помечается как remove, новый файл с обновлённой записью — как add в JSON-коммите.
add + remove в _delta_log/JSON-коммит содержит remove action для старого файла и add action для нового файла с обновлённой записью.

CDF-путь (если включён)

CDF-путь: дополнительно к стандартным action, записываются CDF-файлы в _change_data/ с preimage и postimage изменённых строк.
cdc action + файл в _change_data/В JSON-коммит добавляется cdc action, указывающий на Parquet-файл в _change_data/. Файл содержит две строки: update_preimage (amount=200) и update_postimage (amount=250).

Streaming Reads

Delta Lake поддерживает использование таблицы как streaming source — потребитель читает новые коммиты по мере их появления.

Spark Structured Streaming

# Spark Structured Streaming — для контекста
# (это Spark-specific API, не delta-rs)
stream = (spark.readStream
 .format("delta")
 .option("startingVersion", 5)
 # или: .option("startingTimestamp", "2025-01-01")
 .option("ignoreChanges", "true") # игнорировать UPDATE/DELETE
 .load("./orders"))

# ignoreChanges=true: UPDATE/DELETE не вызывают ошибку,
# но postimage-строки обрабатываются как INSERT
# ignoreDeletes=true: DELETE не вызывают ошибку,
# но удалённые строки пропускаются
NOTE

ignoreChanges и ignoreDeletes — компромиссные опции для streaming без CDF. С включённым CDF предпочтительнее использовать readChangeFeed = true, который передаёт все типы изменений downstream.

Streaming с CDF

# Spark Structured Streaming с CDF
stream = (spark.readStream
 .format("delta")
 .option("readChangeFeed", "true")
 .option("startingVersion", 5)
 .load("./orders"))

# Каждый микробатч содержит колонки
# _change_type, _commit_version, _commit_timestamp
# Downstream может обрабатывать insert/update/delete по-разному

Инкрементальный ETL: CDF + Medallion Architecture

Типичный паттерн — использование CDF для инкрементального обновления downstream-таблиц в medallion architecture (Bronze → Silver → Gold):

Инкрементальный ETL с CDF: Medallion Architecture
Bronze (Raw)Raw-данные из источника. Append-only, все записи сохраняются без трансформации. CDF включён для отслеживания новых батчей.
CDF: insert
Silver (Cleaned)Очищенные и трансформированные данные. Читает CDF из Bronze (только insert), применяет трансформации, делает MERGE в Silver. CDF включён для передачи в Gold.
CDF: insert + update
Gold (Business)Бизнес-агрегаты и витрины. Читает CDF из Silver (insert + update_postimage), инкрементально обновляет агрегаты. Не нужно пересчитывать всю таблицу.
Без CDFБез CDF каждый уровень должен читать ПОЛНЫЙ snapshot предыдущего уровня и вычислять разницу. Для таблиц на терабайты — это часы на каждый батч.
С CDFС CDF каждый уровень читает ТОЛЬКО изменения. Silver читает только новые/изменённые записи из Bronze. Gold обновляет только затронутые агрегаты. Минуты вместо часов.

Пример инкрементального ETL

from deltalake import DeltaTable, write_deltalake
import pyarrow.compute as pc

# Читаем изменения из Bronze (с последнего обработанного коммита)
bronze = DeltaTable("./bronze_orders")
last_processed_version = 42 # сохранённый checkpoint

changes = bronze.load_cdf(
 starting_version=last_processed_version + 1
).read_all().to_pandas()

if len(changes) > 0:
 # Фильтруем только новые записи для Silver
 new_records = changes[
 changes["_change_type"].isin(["insert", "update_postimage"])
 ].drop(columns=["_change_type", "_commit_version",
 "_commit_timestamp"])

 # Применяем трансформации
 new_records["amount_usd"] = new_records["amount"] * 0.011
 new_records["processed_at"] = "2025-03-27"

 # Записываем в Silver
 silver = DeltaTable("./silver_orders")
 (silver.merge(
 source=pa.Table.from_pandas(new_records),
 predicate="target.order_id = source.order_id",
 source_alias="source",
 target_alias="target"
 )
 .when_matched_update_all()
 .when_not_matched_insert_all()
 .execute())

 # Обновляем checkpoint
 last_processed_version = bronze.version()
 print(f"Обработано {len(new_records)} записей, "
 f"новый checkpoint: {last_processed_version}")
TIP

Храните last_processed_version в отдельной Delta-таблице или внешнем хранилище (Redis, файл). Это позволяет инкрементальному ETL продолжить с точки остановки после сбоя — exactly-once семантика при идемпотентном MERGE.

Что дальше

В следующем уроке мы разберём Delta Kernel (новый фундамент для коннекторов), UniForm (автоматическая генерация Iceberg-метаданных) и экосистему движков.

Delta как source/sink в Structured Streaming

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. UPDATE меняет 100 строк в Delta-таблице с включённым CDF. Какие записи появятся в _change_data/?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6