ELT-слой: staging, очистка и моделирование
В прошлом уроке мы спроектировали трёхслойный пайплайн: raw -> staging -> marts. Теперь реализуем его первые два перехода кодом. Прочитаем сырые файлы с объектного хранилища, построим staging-слой с очисткой и типизацией, а из него соберём первую бизнес-витрину. Весь код — на friendly SQL DuckDB, который курс разбирал в отдельном модуле; здесь он работает на реальной задаче.
Чтение raw: сырые файлы напрямую
Слой raw — это файлы на объектном хранилище, и DuckDB читает их без шага загрузки. Если данные на S3-совместимом хранилище, сначала нужен secret с доступом:
-- Доступ к S3-совместимому хранилищу (MinIO, R2, AWS S3)
CREATE SECRET raw_store (
TYPE s3,
KEY_ID 'минио_ключ',
SECRET 'минио_секрет',
ENDPOINT 'minio.internal:9000',
URL_STYLE 'path'
);
После этого помесячные Parquet-файлы поездок читаются одним globs-паттерном как единая таблица. DuckDB сам разбирает Hive-разметку путей year=.../month=... и добавляет year и month как колонки:
-- Все 28 месячных файлов читаются как одна таблица.
-- hive_partitioning извлекает year и month из путей.
SELECT count(*) AS total_trips,
min(year) AS from_year,
max(year) AS to_year
FROM read_parquet('s3://raw/trips/*/*/trips.parquet', hive_partitioning = true);
-- Результат:
-- total_trips | from_year | to_year
-- 41_207_563 | 2024 | 2026
Сорок один миллион строк, и мы ничего никуда не загружали — DuckDB просто прочитал файлы на месте. Колонки year и month появились из путей бесплатно. Справочник зон — это CSV, и его читает read_csv с автоматическим определением диалекта и типов:
-- CSV-sniffer сам определяет разделитель, типы и заголовок
SELECT * FROM read_csv('s3://raw/zones/zones.csv') LIMIT 3;
-- LocationID | Borough | Zone | service_zone
-- 1 | EWR | Newark Airport | EWR
-- 2 | Queens |Jamaica Bay | Boro Zone
-- 3 | Bronx | Allerton/Pelham... | Boro Zone
Видно, что CSV «грязноватый»: лишние пробелы в значениях, неровные регистры, имена колонок в смешанном стиле. Это нормальная сырая реальность — чинить её будем в staging.
Staging поездок: типизация и фильтрация мусора
Staging-объект для поездок решает одну задачу — сделать данные чистыми и предсказуемыми, один к одному с источником. Создадим его как VIEW в рабочей сессии: он не материализует данные, а просто описывает очистку поверх raw.
-- Staging поездок: единый нейминг, явные типы, отсев мусора
CREATE OR REPLACE VIEW stg_trips AS
SELECT
pickup_datetime::TIMESTAMP AS pickup_at,
dropoff_datetime::TIMESTAMP AS dropoff_at,
pu_location_id::INTEGER AS pickup_zone_id,
do_location_id::INTEGER AS dropoff_zone_id,
trip_distance::DECIMAL(8,2) AS distance_mi,
total_amount::DECIMAL(10,2) AS total_amount,
payment_type::INTEGER AS payment_type_id,
year,
month,
FROM read_parquet('s3://raw/trips/*/*/trips.parquet', hive_partitioning = true)
WHERE total_amount >= 0 -- отрицательная сумма — мусор
AND dropoff_datetime > pickup_datetime -- поездка не может кончиться раньше начала
AND trip_distance BETWEEN 0 AND 1000; -- абсурдные дистанции отсекаем
Что здесь происходит и почему именно так:
- Явные касты типов.
::TIMESTAMP,::DECIMAL,::INTEGER— мы не доверяем типам источника, а фиксируем их сами.DECIMALдля денег, а неDOUBLE: деньги нельзя хранить в плавающей точке, округление накопит ошибку. - Единый нейминг.
pu_location_idиз источника становитсяpickup_zone_id— все колонки приводятся к одному стилю, чтобы marts-слой не знал о причудах источника. - Отсев мусора в
WHERE. Отрицательные суммы, поездки с финишем раньше старта, дистанции в тысячи миль — это битые строки. Отсекаем их здесь, чтобы они не доползли до витрин. yearиmonthпрокидываются дальше — они понадобятся для партиционирования витрины.
Trailing comma после последней колонки month, — не опечатка: это friendly SQL, и добавление новой колонки в staging будет чистой однострочной правкой.
Staging зон: чиним грязный CSV
Справочник зон требует отдельной чистки — у CSV свои болячки. Staging-объект для зон чинит именно их:
-- Staging зон: убираем пробелы, нормализуем регистр, единый нейминг
CREATE OR REPLACE VIEW stg_zones AS
SELECT
LocationID::INTEGER AS zone_id,
trim(Borough) AS borough,
trim(Zone) AS zone_name,
lower(trim(service_zone)) AS service_zone,
FROM read_csv('s3://raw/zones/zones.csv');
trim() срезает лишние пробелы, lower() приводит service_zone к единому регистру. Снова — единый нейминг колонок и явный каст zone_id. После этого stg_zones — это чистый предсказуемый справочник, и marts-слою не нужно знать, что исходный CSV был грязным.
Граница staging-слоя строгая: здесь чистят и типизируют, но не считают бизнес-метрики. Соблазн посчитать выручку прямо в staging есть, но если поддаться — staging перестанет быть переиспользуемым и тестируемым. Когда stg_trips упадёт, должно быть ясно: проблема в данных источника. Когда упадёт витрина — проблема в бизнес-логике. Смешав слои, вы теряете эту диагностику.
Marts: сборка первой витрины
Теперь бизнес-слой. Витрина mart_daily_zone соединяет поездки со справочником зон и агрегирует метрики по дням и зонам — это и есть то, что нужно аналитикам. Здесь применяем friendly SQL: GROUP BY ALL сам выведет ключи группировки из неагрегатных колонок SELECT.
-- Витрина: выручка и метрики по дням и зонам посадки
CREATE OR REPLACE VIEW mart_daily_zone AS
SELECT
t.pickup_at::DATE AS trip_date,
z.borough,
z.zone_name,
count(*) AS trips,
sum(t.total_amount) AS revenue,
avg(t.distance_mi) AS avg_distance_mi,
avg(epoch(t.dropoff_at - t.pickup_at) / 60) AS avg_duration_min,
t.year,
t.month,
FROM stg_trips AS t
JOIN stg_zones AS z ON t.pickup_zone_id = z.zone_id
GROUP BY ALL;
GROUP BY ALL — это friendly SQL: DuckDB видит, что count, sum, avg — агрегаты, а trip_date, borough, zone_name, year, month — нет, и группирует ровно по последним. Не нужно вручную дублировать список колонок в GROUP BY и синхронизировать его при правках. epoch(dropoff_at - pickup_at) переводит интервал поездки в секунды, делёж на 60 даёт минуты.
Проверим витрину:
FROM mart_daily_zone
WHERE trip_date = '2026-04-15'
ORDER BY revenue DESC
LIMIT 4;
-- trip_date | borough | zone_name | trips | revenue | avg_distance_mi | avg_duration_min
-- 2026-04-15 | Manhattan | Midtown Center | 3812 | 71449.20 | 2.41 | 14.7
-- 2026-04-15 | Manhattan | Upper East Side | 2967 | 48122.05 | 1.98 | 12.3
-- 2026-04-15 | Queens | JFK Airport | 1044 | 47330.80 | 17.62 | 41.9
-- 2026-04-15 | Manhattan | Times Sq | 2580 | 44102.15 | 1.74 | 11.8
Витрина считается «насквозь»: запрос к mart_daily_zone разворачивается через stg_trips и stg_zones до чтения сырых Parquet и CSV. DuckDB строит из всей цепочки VIEW один план и оптимизирует его целиком — projection и filter pushdown проходят сквозь все слои до самих файлов.
ELT, а не ETL: где здесь буква T
Стоит зафиксировать, что мы только что сделали ELT, и почему это именно ELT.
Буква T (transform) живёт внутри DuckDB: чтение сырых файлов — это E и L, а staging и marts — это T, выраженный на SQL и исполняемый тем же движком. Не нужен отдельный Spark для трансформаций — DuckDB и читает, и считает. Витрины пока существуют как VIEW — это удобно для разработки, потому что каждый запрос пересчитывается от свежих данных. В следующих уроках мы материализуем тяжёлые расчёты и переведём mart_daily_zone на хранение в DuckLake с инкрементальными апдейтами.
Попробуй сам
Понадобится DuckDB 1.5.x. Если S3-хранилища нет, скачайте любой публичный помесячный Parquet-датасет (например, поездки такси) в локальную папку и читайте read_parquet по локальному globs-пути — логика идентична.
Задания:
- Прочитайте сырые Parquet одним globs-паттерном с
hive_partitioning = true. Убедитесь, что колонки разделов появились из путей. Посчитайте общее число строк. - Постройте
stg_tripsкакVIEWс явными кастами типов и фильтром мусора вWHERE. Сравнитеcount(*)до и после фильтра — сколько битых строк отсеялось. - Постройте
stg_zonesсtrim()иlower(). Проверьте на нескольких строках, что лишние пробелы ушли. - Соберите
mart_daily_zoneчерезJOINиGROUP BY ALL. Запустите витрину подEXPLAINи найдите в плане чтение исходных файлов — убедитесь, что цепочка VIEW схлопнулась в один сквозной план.
dbt: staging-слой как формализованное соглашение