dbt-clickhouse: 4 инкрементальные стратегии и кластерный режим
dbt-clickhouse — официальный адаптер dbt для ClickHouse. Адаптер поддерживает все стандартные materializations dbt (table, view, incremental, snapshot), а также специфичные для ClickHouse возможности: 4 инкрементальные стратегии, кластерный режим и distributed_incremental materialization.
Стратегия legacy (полная копия + EXCHANGE)
Стратегия legacy — поведение по умолчанию для инкрементальных моделей. При каждом dbt run создаётся новая таблица с полной копией данных, затем производится атомарный swap через EXCHANGE TABLES.
{{ config(
materialized='incremental',
engine='MergeTree()',
order_by='(user_id, event_time)',
unique_key='event_id',
incremental_strategy='legacy'
) }}
SELECT
event_id,
user_id,
event_time,
action
FROM {{ source('app', 'events') }}
{% if is_incremental() %}
WHERE event_time > (SELECT max(event_time) FROM {{ this }})
{% endif %}
Механизм работы:
- dbt создаёт временную таблицу
__dbt_tmp - Выполняет INSERT со всеми строками (новые + незатронутые)
- Выполняет
EXCHANGE TABLES target AND __dbt_tmp— атомарный swap - DROP старой таблицы (теперь
__dbt_tmp)
Стратегия legacy выполняет полную копию таблицы при каждом dbt run. На таблицах в сотни ГБ это приводит к таймаутам и огромной I/O нагрузке. Используйте delete+insert или append для больших таблиц.
Когда использовать: Таблицы небольшого размера (до нескольких ГБ), когда нужна максимальная безопасность от частичных обновлений.
Стратегия delete+insert (lightweight delete + INSERT)
Стратегия delete+insert использует Lightweight DELETE для удаления старых версий строк перед INSERT новых. Это значительно быстрее legacy для больших таблиц.
{{ config(
materialized='incremental',
engine='MergeTree()',
order_by='(user_id, session_date)',
unique_key='session_id',
incremental_strategy='delete+insert',
partition_by='toYYYYMM(session_date)'
) }}
SELECT
session_id,
user_id,
session_date,
events_count,
revenue
FROM {{ source('raw', 'sessions') }}
{% if is_incremental() %}
WHERE session_date >= (SELECT max(session_date) FROM {{ this }}) - INTERVAL 1 DAY
{% endif %}
Механизм работы:
- dbt определяет строки для обновления по
unique_key - Выполняет
DELETE FROM target WHERE unique_key IN (...)(Lightweight DELETE) - Выполняет
INSERT INTO target SELECT ...с новыми данными
Важно: Lightweight DELETE не переписывает части немедленно — помечает строки как удалённые. Физическая очистка происходит при следующем merge.
Когда использовать: Большие таблицы с точечными обновлениями, когда дубликаты недопустимы. Рекомендован для большинства production-сценариев.
Стратегия append (только INSERT)
Стратегия append выполняет только INSERT новых строк без удаления старых. Самая быстрая стратегия — дупликаты возможны.
{{ config(
materialized='incremental',
engine='MergeTree()',
order_by='(user_id, event_time)',
incremental_strategy='append'
) }}
SELECT
event_id,
user_id,
event_time,
action
FROM {{ source('app', 'events') }}
{% if is_incremental() %}
WHERE event_time > (SELECT max(event_time) FROM {{ this }})
{% endif %}
Механизм работы:
- dbt добавляет только новые строки (
INSERT INTO target SELECT ...) - Старые строки не удаляются и не обновляются
Когда использовать: Append-only данные (логи, события), когда дубликаты допустимы или обрабатываются на уровне аналитики.
Стратегия append — оптимальный выбор для таблиц событий (event logs, metrics), где каждая строка является уникальным событием с уникальным timestamp. Используйте event_time > (SELECT max(event_time) FROM this) как фильтр инкрементальности.
Стратегия insert_overwrite (перезапись партиций)
Стратегия insert_overwrite перезаписывает целые партиции, а не отдельные строки. Безопасный вариант при работе с партиционированными таблицами.
{{ config(
materialized='incremental',
engine='MergeTree()',
order_by='(user_id, report_date)',
unique_key='(user_id, report_date)',
incremental_strategy='insert_overwrite',
partition_by='toYYYYMM(report_date)'
) }}
SELECT
user_id,
report_date,
total_events,
total_revenue
FROM {{ source('aggregated', 'daily_stats') }}
{% if is_incremental() %}
WHERE report_date >= (SELECT max(report_date) FROM {{ this }}) - INTERVAL 7 DAY
{% endif %}
Механизм работы:
- dbt определяет затронутые партиции по
partition_by - Выполняет
DROP PARTITIONдля затронутых партиций - Выполняет
INSERT INTO target SELECT ...для всех строк в этих партициях
Когда использовать: Агрегированные таблицы с партиционированием по дате, где нужно пересчитывать данные за последние N дней (например, daily aggregates, обновляемые несколько раз в день).
Сравнительная таблица стратегий
| Стратегия | Механизм | Скорость | Дупликаты | Рекомендация |
|---|---|---|---|---|
legacy | Полная копия + EXCHANGE | Медленная на больших таблицах | Нет | Только для малых таблиц |
delete+insert | Lightweight DELETE + INSERT | Быстрая | Нет | Рекомендована по умолчанию |
append | Только INSERT | Максимальная | Возможны | Append-only логи и события |
insert_overwrite | DROP PARTITION + INSERT | Быстрая для партиций | Нет | Партиционированные агрегаты |
Кластерный режим
dbt-clickhouse поддерживает кластерный режим: все DDL-операции (CREATE TABLE, ALTER TABLE) автоматически выполняются с ON CLUSTER. Для включения укажите cluster в профиле dbt.
profiles.yml:
clickhouse_profile:
target: prod
outputs:
prod:
type: clickhouse
host: clickhouse-node-01.internal
port: 8123
schema: analytics
user: dbt_user
password: "{{ env_var('DBT_CLICKHOUSE_PASSWORD') }}"
cluster: my_cluster # имя кластера из remote_servers конфига
distributed_by_random: true # INSERT через Distributed table
С настройкой cluster: my_cluster dbt автоматически:
- Добавляет
ON CLUSTER my_clusterк CREATE TABLE, DROP TABLE, ALTER TABLE - Создаёт Distributed таблицы для каждой модели
- Управляет репликацией схемы через все узлы кластера
distributed_incremental materialization
Для distributed tables существует отдельная materialization distributed_incremental — не путайте с обычной incremental.
{{ config(
materialized='distributed_incremental',
engine='MergeTree()',
order_by='(user_id, event_time)',
unique_key='event_id',
incremental_strategy='delete+insert',
dist_local_suffix='_local' -- суффикс для локальных шардовых таблиц
) }}
SELECT event_id, user_id, event_time, action
FROM {{ source('app', 'events') }}
{% if is_incremental() %}
WHERE event_time > (SELECT max(event_time) FROM {{ this }})
{% endif %}
distributed_incremental — отдельная materialization для distributed tables. Не путайте с обычной incremental с cluster: в профиле. Используйте distributed_incremental когда нужно работать с Distributed engine напрямую (сквозные запросы через все шарды).
Отключение ON CLUSTER для конкретных моделей
Иногда нужно отключить ON CLUSTER для отдельной модели (например, для staging-таблиц только на одном узле):
{{ config(
materialized='incremental',
engine='MergeTree()',
order_by='id',
disable_on_cluster=true -- отключает ON CLUSTER для этой модели
) }}
SELECT id, name FROM {{ source('staging', 'temp_data') }}
Ключевые выводы
delete+insert— рекомендована по умолчанию. Использует Lightweight DELETE + INSERT, не делает полную копию, подходит для большинства production-сценариев с обновляемыми данными.legacyопасна для больших таблиц. Полная копия при каждом dbt run создаёт огромную I/O нагрузку. На таблицах в сотни ГБ приводит к таймаутам.append— максимальная скорость для логов. Только INSERT без удалений. Оптимальна для append-only данных, где дубли недопустимы или обрабатываются аналитикой.- Кластерный режим через
cluster:в профиле. Все DDL автоматически получаютON CLUSTER— не требует изменений в моделях. distributed_incremental— отдельная materialization. Предназначена для работы с Distributed engine, отличается от обычнойincrementalс кластерным профилем.