Оптимизация размещения данных
Delta Lake хранит данные в Parquet-файлах, но сам по себе формат Parquet не решает задачу эффективного размещения данных по файлам. Без оптимизации запрос WHERE city = 'Moscow' может потребовать чтения всех файлов — даже тех, где нет ни одной записи с этим городом.
В первом уроке мы видели поле stats в action add. Этот урок — детальный разбор того, как Delta Lake использует статистику для data skipping, и какие инструменты оптимизации доступны: OPTIMIZE, Z-ORDER и Liquid Clustering.
File-Level Statistics
Каждый add action в transaction log содержит поле stats — JSON-строку с агрегированной статистикой по файлу:
{
"numRecords": 58247,
"minValues": {
"city": "Amsterdam",
"amount": 0.50,
"created_at": "2024-01-01T00:00:00Z"
},
"maxValues": {
"city": "Zurich",
"amount": 99999.99,
"created_at": "2024-12-31T23:59:59Z"
},
"nullCount": {
"city": 0,
"amount": 12,
"created_at": 0
}
}
add action → stats (JSON string)
Каждый add action в _delta_log/*.json содержит поле stats — JSON-строку с агрегированной статистикой по всем записям в этом Parquet-файле.По умолчанию Delta Lake собирает статистику по первым 32 колонкам таблицы. Это настраивается через свойство таблицы:
ALTER TABLE my_table SET TBLPROPERTIES (
'delta.dataSkippingNumIndexedCols' = '50'
);
Поместите колонки, по которым чаще всего фильтруете, в начало схемы. Статистика собирается по позиции колонки, а не по имени — первые 32 (по умолчанию) колонки получают min/max/nullCount.
Data Skipping
Data skipping — это механизм, который позволяет движку пропускать целые Parquet-файлы, не читая их содержимое. Движок читает min/max из transaction log и сравнивает с предикатами запроса:
Движок читает stats из transaction log
Движок читает stats из transaction log (из последнего checkpoint + JSON-коммитов). Для каждого файла проверяет: попадает ли значение 'Moscow' в диапазон [minValues.city, maxValues.city].Результат: 2 из 3 файлов пропущены (66% экономии I/O)
Из 3 файлов прочитан только 1 — экономия 66% I/O. Эффективность data skipping зависит от корреляции между значениями колонки и физическим расположением данных по файлам.Эффективность data skipping зависит от корреляции данных с файлами. Если записи с city = 'Moscow' разбросаны по всем файлам (wide min/max range), skipping не поможет. Задача инструментов оптимизации — сгруппировать похожие данные в одних файлах.
Data Skipping через delta-rs
from deltalake import DeltaTable
import pyarrow.dataset as ds
dt = DeltaTable("./orders")
# delta-rs автоматически применяет data skipping при фильтрации
# Движок читает stats из transaction log и пропускает файлы
dataset = dt.to_pyarrow_dataset()
table = dataset.to_table(
filter=ds.field("city") == "Moscow"
)
print(f"Прочитано записей: {len(table)}")
# Файлы, где min > 'Moscow' или max < 'Moscow', не читались
OPTIMIZE: Compaction мелких файлов
Streaming-инъекция, частые INSERT-ы и микро-батчи создают множество мелких файлов (small file problem). Каждый файл — это запись в transaction log, и при тысячах мелких файлов:
- Метаданные transaction log раздуваются
- Data skipping теряет эффективность (маленькие файлы ≈ узкие min/max диапазоны, но их слишком много)
- Overhead на открытие/закрытие файлов растёт
OPTIMIZE решает это через bin-packing — объединение мелких файлов в файлы целевого размера:
200+ мелких файлов → 3 оптимальных файла
OPTIMIZE — idempotent операция. Повторный запуск не переписывает уже оптимизированные файлы.
OPTIMIZE через delta-rs
from deltalake import DeltaTable
dt = DeltaTable("./orders")
# Compaction — объединение мелких файлов
result = dt.optimize.compact()
print(f"Файлов до: {result['numFilesAdded'] + result['numFilesRemoved']}")
print(f"Файлов добавлено: {result['numFilesAdded']}")
print(f"Файлов удалено: {result['numFilesRemoved']}")
print(f"Байт добавлено: {result['filesAdded']['totalSize']}")
print(f"Байт удалено: {result['filesRemoved']['totalSize']}")
OPTIMIZE не удаляет старые файлы физически — он создаёт remove action для старых файлов и add action для новых. Физическое удаление происходит при VACUUM (см. урок 03).
Z-ORDER: Multi-Column Co-Locality
Data skipping эффективен, когда данные физически сгруппированы по колонке фильтрации. Но как быть, если запросы фильтруют по нескольким колонкам (city AND date, user_id AND product_id)?
Z-ORDER использует Z-кривую (кривую Мортона) — space-filling curve, которая отображает многомерное пространство в одномерное, сохраняя co-locality: точки, близкие в N-мерном пространстве, остаются близкими на одномерной кривой.
Z-кривая: чередование битов координат → одномерный порядок с co-locality
Z-кривая (кривая Мортона) чередует биты координат: для 2D-точки (x, y) Z-значение = x₁y₁x₂y₂x₃y₃... Это сохраняет близость точек в многомерном пространстве при отображении на одномерную ось.Z-ORDER через delta-rs
from deltalake import DeltaTable
dt = DeltaTable("./orders")
# Z-ORDER по двум колонкам
result = dt.optimize.z_order(columns=["city", "order_date"])
print(f"Файлов переписано: {result['numFilesRemoved']}")
print(f"Файлов создано: {result['numFilesAdded']}")
Z-ORDER оптимален для 2-4 колонок. При большем количестве колонок эффективность Z-кривой падает — co-locality размывается в высокоразмерном пространстве. Если нужно оптимизировать под одну колонку, достаточно обычной сортировки.
Ограничения Z-ORDER
Z-ORDER требует полной перезаписи данных при каждом запуске — это дорого для больших таблиц. Нет инкрементальности: новые данные, вставленные после Z-ORDER, снова нарушают co-locality. Это фундаментальное ограничение, которое решает Liquid Clustering.
Liquid Clustering
Liquid Clustering — замена и партиционированию, и Z-ORDER. Доступен с Delta Lake 3.1.0 (open source), GA в Databricks Runtime 15.2+.
Ключевые отличия от Z-ORDER:
Hilbert Curve vs Z-Curve
Liquid Clustering использует Hilbert curve вместо Z-curve. Hilbert curve обеспечивает более равномерную co-locality в многомерном пространстве — при одинаковом количестве колонок, Hilbert curve даёт меньше «скачков» между далёкими точками, что улучшает data skipping.
Включение Liquid Clustering
-- При создании таблицы
CREATE TABLE orders (
order_id BIGINT,
city STRING,
order_date DATE,
amount DECIMAL(10, 2)
) CLUSTER BY (city, order_date);
-- Для существующей таблицы (Delta Lake 3.3+)
ALTER TABLE orders CLUSTER BY (city, order_date);
-- Изменение колонок кластеризации — без перезаписи данных
ALTER TABLE orders CLUSTER BY (city, product_category);
-- Удаление кластеризации
ALTER TABLE orders CLUSTER BY NONE;
from deltalake import DeltaTable
# Через delta-rs — DeltaTable API
# Создание таблицы с Liquid Clustering через SQL
# Кластеризация применяется при OPTIMIZE
dt = DeltaTable("./orders")
# OPTIMIZE для кластеризованной таблицы
# Перезаписывает только новые/изменённые данные
result = dt.optimize.compact()
Протокольные требования
Liquid Clustering требует:
- Writer V7 и Reader V1 (Delta Lake OSS) или Reader V3 (Databricks)
- Table features:
clusteringиdomainMetadata - Статистика по колонкам кластеризации (первые 32 колонки по умолчанию)
Liquid Clustering несовместим с Hive-style партиционированием и Z-ORDER. Нельзя включить кластеризацию на партиционированной таблице — сначала нужно создать новую таблицу и перенести данные.
Сравнение подходов
Практический пример: оптимизация таблицы заказов
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
import pyarrow.dataset as ds
# Допустим, у нас есть таблица с 500 мелкими файлами
dt = DeltaTable("./orders")
# Шаг 1: посмотрим текущее состояние
files = dt.files()
print(f"Количество файлов: {len(files)}")
# Шаг 2: OPTIMIZE (compaction)
compact_result = dt.optimize.compact()
print(f"Compaction: {compact_result['numFilesRemoved']} → "
f"{compact_result['numFilesAdded']} файлов")
# Шаг 3: Z-ORDER (если Liquid Clustering недоступен)
zorder_result = dt.optimize.z_order(columns=["city", "order_date"])
print(f"Z-ORDER: {zorder_result['numFilesRemoved']} файлов переписано")
# Шаг 4: проверим эффективность data skipping
dataset = dt.to_pyarrow_dataset()
# Без фильтра — все файлы
all_fragments = list(dataset.get_fragments())
print(f"Всего фрагментов: {len(all_fragments)}")
# С фильтром — data skipping в действии
filtered = dataset.get_fragments(
filter=ds.field("city") == "Moscow"
)
matched = list(filtered)
print(f"Фрагментов после skipping: {len(matched)}")
Рекомендация для новых таблиц: используйте Liquid Clustering вместо партиционирования и Z-ORDER. Liquid Clustering проще в обслуживании, дешевле по write amplification, и позволяет менять колонки кластеризации без перезаписи. Если ваш движок не поддерживает Liquid Clustering — используйте OPTIMIZE + Z-ORDER.
Что дальше
В следующем уроке мы разберём Change Data Feed — механизм отслеживания изменений в Delta-таблице для инкрементальных ETL-пайплайнов и streaming-обработки.