Загрузка и моделирование данных в Iceberg
Кластер развёрнут, схема iceberg.retail создана. Теперь наполняем платформу данными. В этом уроке мы создаём Iceberg-таблицы RetailScope, загружаем в них данные, выстраиваем слои raw и marts и настраиваем обслуживание таблиц. Это превращает пустой каталог в работающий lakehouse.
Ключевая мысль: данные не сваливают в одну таблицу и не оставляют как попало. Их раскладывают слоями и поддерживают в форме. Иначе lakehouse за несколько недель эксплуатации деградирует до медленной свалки файлов.
Слоистая модель данных
В уроке постановки мы решили: данные раскладываются по слоям. Применим это.
Слой raw фиксирует данные в lakehouse в исходной форме. Минимум преобразований; задача слоя — чтобы данные были в Iceberg, под контролем, с историей. Слой marts — денормализованные витрины под конкретную аналитику. Они строятся запросами поверх raw и справочников; именно с marts работают потребители.
Зачем разделять, а не строить витрины прямо из источников: воспроизводимость (raw — фиксированная точка, витрину можно пересобрать), понятный lineage (видно, из чего собрана витрина), переиспользование (один raw кормит много витрин). Это та же логика слоёв, что и в ELT-подходе.
dbt: модели как слои raw/staging/martsСоздание Iceberg-таблицы фактов
Создаём центральную таблицу слоя raw — позиции продаж. Здесь применяем решение о партиционировании по дню.
CREATE TABLE iceberg.retail.raw_sales (
sale_id BIGINT,
store_id INTEGER,
product_id INTEGER,
customer_id BIGINT,
quantity INTEGER,
unit_price DECIMAL(10,2),
discount DECIMAL(10,2),
sale_ts TIMESTAMP(6)
)
WITH (
format = 'PARQUET',
partitioning = ARRAY['day(sale_ts)']
);
Разберём WITH. format = 'PARQUET' — колоночный формат: колоночное чтение, сжатие, статистика блоков. partitioning = ARRAY['day(sale_ts)'] — day()-transform: партиция определяется днём из sale_ts, отдельной колонки-даты не нужно. Это hidden partitioning Iceberg — запросы фильтруют по sale_ts, а Trino сам выводит партицию.
Iceberg-таблица создаётся как формат версии v2 по умолчанию — этого достаточно: v2 поддерживает row-level операции (MERGE, UPDATE, DELETE) через position delete files. Версия v3 с типом VARIANT и прочими новшествами для RetailScope не нужна.
Загрузка данных: CTAS и INSERT
Данные в Iceberg грузят SQL-запросами. Два основных способа.
CREATE TABLE AS SELECT (CTAS) — создать таблицу сразу с данными. Чтобы проект был воспроизводим без внешних файлов, наполним raw из встроенного tpch (его lineitem структурно похож на позиции продаж):
CREATE TABLE iceberg.retail.raw_sales
WITH (
format = 'PARQUET',
partitioning = ARRAY['day(sale_ts)']
)
AS
SELECT
l.orderkey * 10 + l.linenumber AS sale_id,
CAST(l.suppkey % 50 + 1 AS INTEGER) AS store_id,
CAST(l.partkey AS INTEGER) AS product_id,
o.custkey AS customer_id,
CAST(l.quantity AS INTEGER) AS quantity,
l.extendedprice / l.quantity AS unit_price,
l.extendedprice * l.discount AS discount,
CAST(l.shipdate AS TIMESTAMP) AS sale_ts
FROM tpch.sf10.lineitem l
JOIN tpch.sf10.orders o ON l.orderkey = o.orderkey;
INSERT INTO — добавить данные в существующую таблицу. Так грузят инкременты — новые партиции по мере поступления:
INSERT INTO iceberg.retail.raw_sales
SELECT ... FROM <источник свежих данных>
WHERE sale_ts >= TIMESTAMP '2026-05-01 00:00:00';
Каждый INSERT создаёт новый snapshot таблицы и новые файлы данных. Запомним этот факт — он определяет, зачем нужно обслуживание.
Метаданные Iceberg: что внутри таблицы
Iceberg-таблица — это файлы данных плюс дерево метаданных. Trino показывает метаданные через метадату-таблицы — обращение к ним даёт точную картину состояния таблицы.
-- История изменений: каждый snapshot — это запись
SELECT snapshot_id, made_current_at, operation
FROM iceberg.retail."raw_sales$snapshots"
ORDER BY made_current_at;
-- Файлы данных: размер и число записей каждого
SELECT file_path, record_count, file_size_in_bytes
FROM iceberg.retail."raw_sales$files"
ORDER BY file_size_in_bytes;
-- Партиции: сколько данных в каждой
SELECT partition, record_count, file_count
FROM iceberg.retail."raw_sales$partitions"
ORDER BY record_count DESC;
$snapshots — журнал версий: каждый CTAS, INSERT, MERGE добавляет snapshot. $files — список файлов данных с размерами; именно по нему ловят проблему мелких файлов. $partitions — сводка по партициям; по ней видно перекос данных между днями.
Снапшоты дают time travel — запрос к историческому состоянию таблицы:
-- Состояние таблицы на конкретный snapshot
SELECT count(*) FROM iceberg.retail.raw_sales
FOR VERSION AS OF 6219498723451234567;
-- Состояние на момент времени
SELECT count(*) FROM iceberg.retail.raw_sales
FOR TIMESTAMP AS OF TIMESTAMP '2026-05-10 00:00:00 UTC';
Time travel — это не отдельная фича, а прямое следствие snapshot-модели Iceberg: каждое состояние сохранено как версия, и любую можно прочитать.
Как Iceberg видит запись: дерево метаданных
Чтобы понимать обслуживание, надо знать, что Iceberg-таблица — это не просто папка с Parquet-файлами. Это многоуровневое дерево метаданных, и каждый INSERT добавляет в него новый слой.
Когда Trino планирует запрос к Iceberg-таблице, он спускается по этому дереву: catalog даёт корневой metadata file, тот указывает на текущий snapshot, snapshot — на манифесты, манифесты перечисляют файлы данных вместе со статистикой. Уже на уровне манифестов, не открывая ни одного файла данных, Trino отсекает то, что не нужно запросу. Это объясняет два факта. Первый: партиционирование и статистика работают, потому что они записаны в метаданных, а не вычисляются на лету. Второй: каждый INSERT создаёт новые data files, новый манифест и новый snapshot — дерево растёт, и без expire_snapshots старые ветки накапливаются бесконечно. Обслуживание — это, по сути, подрезание этого дерева.
Слой marts: витрина
Построим первую витрину — ежедневная выручка по магазину и товару. Это таблица слоя marts, собранная из raw_sales:
CREATE TABLE iceberg.retail.mart_sales_daily
WITH (
format = 'PARQUET',
partitioning = ARRAY['month(sale_date)']
)
AS
SELECT
CAST(sale_ts AS DATE) AS sale_date,
store_id,
product_id,
count(*) AS sale_lines,
sum(quantity) AS units_sold,
sum(quantity * unit_price - discount) AS net_revenue
FROM iceberg.retail.raw_sales
GROUP BY CAST(sale_ts AS DATE), store_id, product_id;
Витрина уже агрегирована (по дню, магазину, товару) — она кратно меньше raw_sales и заточена под типичные аналитические запросы. Партиционирование здесь по месяцу (month()): витрина мельче фактов, и помесячных партиций достаточно. Полноценную денормализацию — добавление в витрину названий товаров и магазинов из PostgreSQL — мы сделаем в уроке 4, когда подключим федеративный слой.
Слой marts в RetailScope обновляется тем же SQL, что его создал: периодически таблица пересобирается через CREATE OR REPLACE TABLE AS SELECT, либо свежие данные добавляются через INSERT. Такая пересборка витрин по расписанию — задача оркестратора (Airflow, dbt-trino из модуля 15). Trino — это движок, который исполняет SQL витрины; расписанием и зависимостями между витринами управляет инструмент над ним.
Обслуживание таблиц
Каждый INSERT создаёт новые файлы и новый snapshot. Без обслуживания накапливаются две проблемы: мелкие файлы (каждая вставка добавляет мелкие файлы — сканы замедляются) и разрастание истории (старые снапшоты держат старые файлы — место и метаданные растут). Iceberg даёт процедуры обслуживания через ALTER TABLE ... EXECUTE.
| Процедура | Что делает | Когда применять |
|---|---|---|
optimize | Компактит мелкие файлы в крупные | Регулярно, обычно по свежим партициям |
expire_snapshots | Удаляет старые снапшоты и их файлы | Регулярно, оставляя нужный retention |
remove_orphan_files | Удаляет файлы вне всех снапшотов | Периодически, реже остальных |
-- Компакция: переписать мелкие файлы свежей партиции в крупные
ALTER TABLE iceberg.retail.raw_sales EXECUTE optimize
WHERE sale_ts >= TIMESTAMP '2026-05-01 00:00:00';
-- Удалить снапшоты старше срока хранения
ALTER TABLE iceberg.retail.raw_sales EXECUTE expire_snapshots(retention_threshold => '7d');
-- Удалить файлы, не связанные ни с одним снапшотом
ALTER TABLE iceberg.retail.raw_sales EXECUTE remove_orphan_files(retention_threshold => '7d');
optimize обычно гоняют по фильтру WHERE на свежие партиции — компактить всю таблицу с миллиардом строк каждый раз дорого и не нужно: старые партиции уже скомпактированы. expire_snapshots хранит историю в разумных рамках; учтите, что после истечения снапшота time travel к нему станет недоступен. remove_orphan_files подчищает файлы, оставшиеся от прерванных операций.
Обслуживание Iceberg-таблиц — не разовое действие при создании, а постоянный процесс. raw_sales активно пополняется, и без регулярных optimize и expire_snapshots платформа деградирует постепенно и незаметно: запросы становятся всё медленнее, место в хранилище растёт, нагрузка на catalog увеличивается. Эти процедуры ставят на расписание оркестратором с первого дня эксплуатации, а не «когда станет медленно».
Попробуй сам
Наполните lakehouse RetailScope данными и поработайте с обслуживанием.
- Создайте таблицу
iceberg.retail.raw_salesчерез CTAS изtpch.sf10запросом из урока. Проверьте число строк:SELECT count(*) FROM iceberg.retail.raw_sales. - Посмотрите
raw_sales$filesиraw_sales$partitions: сколько файлов, как данные разложены по дням, есть ли перекос между партициями. - Сделайте 3-4 отдельных небольших
INSERT(например, по разным узким диапазонамsale_ts). Снова откройте$filesи$snapshots: как выросло число файлов и снапшотов? - Выполните
ALTER TABLE ... EXECUTE optimizeпо затронутой партиции и сравните$filesдо и после — изменились ли число и размер файлов. - Постройте витрину
mart_sales_dailyзапросом из урока. Сравнитеcount(*)витрины иraw_sales— насколько витрина компактнее. СделайтеFOR TIMESTAMP AS OFк раннему моменту и убедитесь, что time travel возвращает прошлое состояние.
Цель — увидеть полный цикл: загрузка, рост метаданных от вставок, компакция, построение витрины, time travel.