Iceberg materialization, S3+Glue handoff, failure modes external
В middle-курсе вы видели external_parquet — minimal external materialization для DuckDB. Сейчас идём в production-grade territory: dbt-iceberg pkg internals, custom S3 + Glue materialization для real analytics → data lake handoff, failure modes external materializations (orphan files, eventual consistency, retry idempotency), и cleanup pattern через post-hooks.
delete+insert vs merge: deep dive (dbt II)
Iceberg materialization из dbt-iceberg pkg
Apache Iceberg — table format для data lakes, поддерживающий ACID transactions, schema evolution, time travel поверх Parquet files. dbt-iceberg package добавляет materialized='iceberg' к dbt projects.
Что Iceberg даёт сверх raw Parquet
Iceberg materialization implementation pattern
Упрощённая структура materialization iceberg, default из dbt-iceberg:
{% materialization iceberg, default %}
{%- set target_relation = this.incorporate(type='iceberg') -%}
{%- set catalog = config.require('catalog') -%}
{%- set partition_by = config.get('partition_by', []) -%}
{%- set table_properties = config.get('table_properties', {}) -%}
{# 1. Check adapter capability — Iceberg support #}
{% if not adapter.capabilities.iceberg_supported %}
{{ exceptions.raise_compiler_error("Iceberg not supported on adapter " ~ adapter.type()) }}
{% endif %}
{%- set existing_relation = load_cached_relation(this) -%}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{% if existing_relation is none %}
{# Initial CREATE — Iceberg DDL c partitioning #}
{% call statement('main') -%}
CREATE TABLE {{ target_relation }}
{% if partition_by %}
PARTITION BY ({{ partition_by | join(', ') }})
{% endif %}
WITH (
format = 'PARQUET',
catalog = '{{ catalog }}'
{% for k, v in table_properties.items() %}
, {{ k }} = '{{ v }}'
{% endfor %}
)
AS (
{{ sql }}
)
{%- endcall %}
{% else %}
{# Incremental update — INSERT OVERWRITE для full refresh, MERGE для incremental #}
{% if should_full_refresh() %}
{% call statement('main') -%}
INSERT OVERWRITE TABLE {{ target_relation }}
SELECT * FROM ({{ sql }})
{%- endcall %}
{% else %}
{%- set unique_key = config.get('unique_key') -%}
{% call statement('main') -%}
MERGE INTO {{ target_relation }} AS target
USING ({{ sql }}) AS source
ON target.{{ unique_key }} = source.{{ unique_key }}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
{%- endcall %}
{% endif %}
{% endif %}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ adapter.cache_added(target_relation) }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Adapter capabilities — какие use Iceberg
adapter.capabilities — это enum-like dataclass, который описывает, что adapter поддерживает. В core/dbt/adapters/contracts/adapter.py:
@dataclass
class AdapterCapabilities:
schema_metadata_by_relations: Optional[Capability] = None
materialized_view_support: Optional[Capability] = None
iceberg_supported: Optional[Capability] = None # Iceberg-specific
table_last_modified_metadata: Optional[Capability] = None
...
В Jinja: adapter.capabilities.iceberg_supported возвращает True/False/None для текущего adapter. Iceberg materialization использует этот flag для fail-fast: если запустят на adapter без поддержки, понятная error в compile-time, не runtime SQL fail.
Adapter packages устанавливают capabilities в своих connections.py:
# dbt-snowflake/dbt/adapters/snowflake/impl.py
class SnowflakeAdapter(SQLAdapter):
Capabilities = AdapterCapabilities(
iceberg_supported=True, # Snowflake 7.x+ нативный Iceberg
materialized_view_support=True,
)
DuckDB добавил Iceberg в 1.10+, dbt-duckdb 1.10+ exposes capability. Postgres, Redshift — iceberg_supported=False.
Schema evolution handling
Iceberg позволяет ALTER table metadata-only. Custom materialization для handle schema change:
{% if existing_relation is not none and on_schema_change == 'sync_all_columns' %}
{%- set source_columns = get_columns_in_query(sql) -%}
{%- set target_columns = adapter.get_columns_in_relation(existing_relation) -%}
{%- set new_columns = diff_columns(source_columns, target_columns) -%}
{%- set removed_columns = diff_columns(target_columns, source_columns) -%}
{% for col in new_columns %}
{% call statement('add_col_' ~ loop.index) %}
ALTER TABLE {{ existing_relation }} ADD COLUMN {{ col.name }} {{ col.data_type }}
{% endcall %}
{% endfor %}
{% for col in removed_columns %}
{% call statement('drop_col_' ~ loop.index) %}
ALTER TABLE {{ existing_relation }} DROP COLUMN {{ col.name }}
{% endcall %}
{% endfor %}
{% endif %}
На Iceberg ALTER TABLE ADD/DROP COLUMN — metadata operation, instant, не требует данных переписать. Это ключевое преимущество над raw Parquet, где schema change = переписать все файлы.
Custom S3 + Glue materialization
Real production use case: analytics → data lake handoff. Dbt-project на DuckDB/Snowflake создаёт aggregated data, пишет в S3 как Parquet, регистрирует в AWS Glue catalog. Downstream consumers — Athena, Spark, EMR.
Архитектура
Implementation
{% materialization s3_glue_table, adapter='duckdb' %}
{%- set s3_location = config.require('s3_location') -%}
{%- set glue_database = config.require('glue_database') -%}
{%- set glue_table = config.get('glue_table', this.identifier) -%}
{%- set partition_by = config.get('partition_by', []) -%}
{%- set retention_days = config.get('retention_days', 90) -%}
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='view') -%}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{# Phase 1: Generate timestamped S3 path #}
{%- set run_partition = run_started_at.strftime('%Y-%m-%d-%H%M%S') -%}
{%- set timestamped_path = s3_location ~ '/_writes/' ~ run_partition -%}
{# Phase 2: COPY data to S3 в timestamped path (для idempotency / retry) #}
{% set copy_options = ['FORMAT PARQUET', 'COMPRESSION ZSTD'] %}
{% if partition_by %}
{% do copy_options.append('PARTITION_BY (' ~ partition_by | join(', ') ~ ')') %}
{% do copy_options.append('OVERWRITE_OR_IGNORE') %}
{% endif %}
{{ log("Writing Parquet to " ~ timestamped_path, info=True) }}
{% call statement('write_parquet') -%}
COPY ({{ sql }}) TO '{{ timestamped_path }}' (
{{ copy_options | join(', ') }}
)
{%- endcall %}
{# Phase 3: Atomic swap — move timestamped path to current/ via S3 manifest или Glue partition update #}
{%- set current_path = s3_location ~ '/current' -%}
{# Update Glue table location to point to new path #}
{% do register_glue_table(
glue_database=glue_database,
glue_table=glue_table,
s3_path=timestamped_path,
columns=adapter.get_columns_from_query(sql),
partition_by=partition_by
) %}
{# Phase 4: Create view wrapper для downstream dbt models #}
{% call statement('create_view') -%}
CREATE OR REPLACE VIEW {{ target_relation }} AS
SELECT * FROM read_parquet('{{ timestamped_path }}/**/*.parquet'
{%- if partition_by -%}
, hive_partitioning=true
{%- endif -%}
)
{%- endcall %}
{# Phase 5: Cleanup старых _writes/ partitions (retention policy) #}
{% do gc_old_s3_writes(s3_location, retention_days) %}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ adapter.cache_added(target_relation) }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
Glue registration через Python step
dbt-duckdb поддерживает Python pre/post hooks. register_glue_table это helper, который под капотом делает boto3 call:
# dbt_project/macros/python_helpers/glue_helpers.py
def register_glue_table(
glue_database: str,
glue_table: str,
s3_path: str,
columns: list,
partition_by: list,
):
import boto3
glue = boto3.client('glue')
# Build column schema for Glue
glue_columns = [
{'Name': col.name, 'Type': map_duckdb_to_glue_type(col.data_type)}
for col in columns if col.name not in partition_by
]
partition_keys = [
{'Name': col, 'Type': 'string'} # partitions всегда string в Glue
for col in partition_by
]
try:
# Update existing
glue.update_table(
DatabaseName=glue_database,
TableInput={
'Name': glue_table,
'StorageDescriptor': {
'Location': s3_path,
'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
'Columns': glue_columns,
'SerdeInfo': {
'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe',
},
},
'PartitionKeys': partition_keys,
'TableType': 'EXTERNAL_TABLE',
'Parameters': {'classification': 'parquet'},
},
)
except glue.exceptions.EntityNotFoundException:
# First time — create
glue.create_table(
DatabaseName=glue_database,
TableInput={...},
)
В Jinja вызывается через dbt-duckdb’s Python integration:
{% macro register_glue_table(glue_database, glue_table, s3_path, columns, partition_by) %}
{% do adapter.execute_python(
module='macros.python_helpers.glue_helpers',
function='register_glue_table',
args={
'glue_database': glue_database,
'glue_table': glue_table,
's3_path': s3_path,
'columns': columns,
'partition_by': partition_by,
}
) %}
{% endmacro %}
adapter.execute_python доступен в dbt-duckdb через PythonJobHelper. На Snowflake — через UDFs / stored procedures. На BigQuery — через external API call (нет native Python exec в SQL context).
Failure modes external materialization
External materializations имеют unique failure modes, которых нет в warehouse-internal materializations.
Failure mode 1: Orphan files
Сценарий: COPY завершился успешно, файлы записаны в S3. Затем CREATE EXTERNAL TABLE падает (например, недостаточно прав в Glue, neтup AWS credentials). Materialization aborts.
Result: файлы в S3 есть, но Glue table не зарегистрирована. Downstream Athena queries не видят данные. Файлы — orphans.
Storage cost: orphan файлы продолжают платить за S3 storage. Накапливаются с каждым failed run.
Mitigation:
{# Wrap critical operations в try-catch via {% set _ = ... %} с error handling #}
{% materialization s3_with_orphan_protection, adapter='duckdb' %}
...
{%- set s3_write_succeeded = false -%}
{% call statement('write_parquet') %}
COPY ({{ sql }}) TO '{{ timestamped_path }}' (FORMAT PARQUET);
{% endcall %}
{%- set s3_write_succeeded = true -%}
{# If Glue registration fails — explicit cleanup S3 files #}
{%- set glue_success = false -%}
{% if s3_write_succeeded %}
{% do register_glue_table(...) %}
{%- set glue_success = true -%}
{% endif %}
{% if s3_write_succeeded and not glue_success %}
{{ log("Glue registration failed — cleaning up orphan S3 files at " ~ timestamped_path, info=True) }}
{% do cleanup_s3_path(timestamped_path) %}
{{ exceptions.raise_compiler_error("Glue registration failed") }}
{% endif %}
...
{% endmaterialization %}
Note: Jinja не имеет native try/except. Реальная error handling делается через Python step или внешнюю orchestration (Airflow with retries + cleanup).
Failure mode 2: Eventual consistency на S3
S3 strong consistency since 2020-12 — PUT/DELETE/LIST immediately visible. Раньше было eventual.
Современная проблема — multi-region replication. Если write в us-east-1, read в eu-west-1 через replicated bucket — replication lag (~minutes).
COPY -> s3://my-bucket-us/path/data.parquet (writes to us-east-1)
|
v (replication lag ~30s-5min)
read_parquet('s3://my-bucket-eu/path/data.parquet') # может не найти файл
Mitigation:
- Same-region reads / writes. dbt-project в region X пишет в bucket region X.
- Versioning aware reads —
read_parquetсmetadata_consistency=true(DuckDB 1.x experimental). - Retry logic в post-hook:
{% macro verify_parquet_readable(path, max_retries=5, sleep_seconds=30) %}
{% for attempt in range(max_retries) %}
{%- set verify_sql -%}
SELECT COUNT(*) FROM read_parquet('{{ path }}/**/*.parquet') LIMIT 1
{%- endset -%}
{%- set result = run_query(verify_sql) -%}
{% if result is not none and result | length > 0 %}
{{ log("Verified parquet readable at " ~ path, info=True) }}
{{ return(true) }}
{% endif %}
{{ log("Attempt " ~ (attempt + 1) ~ " failed, sleeping " ~ sleep_seconds ~ "s", info=True) }}
{% do modules.time.sleep(sleep_seconds) %}
{% endfor %}
{{ exceptions.raise_compiler_error("Parquet at " ~ path ~ " not readable after " ~ max_retries ~ " attempts") }}
{% endmacro %}
Failure mode 3: Retry idempotency
Scenario: dbt run упал во время COPY (network issue, OOM, timeout). Airflow retries dbt run. Что произойдёт?
Без idempotency:
- Retry 1:
COPY ... TO 's3://bucket/path/data.parquet'— DuckDB пишет partial file. - Retry 2: тот же COPY — DuckDB пишет full file поверх. Reader, который читал в момент retry, мог получить corrupt data.
С timestamped path (наш materialization):
- Retry 1:
COPY ... TO 's3://bucket/_writes/2026-05-19-040523/...'. Падает. - Retry 2: новый
run_started_at=2026-05-19-040712. Пишет в другой path:'s3://bucket/_writes/2026-05-19-040712/...'. Старый partial path остаётся, но не используется (Glue table не указывает на него). - После cleanup
gc_old_s3_writesстарый partial path удалится.
Это idempotency через explicit versioning. Failed retries не corrupt current state.
Failure mode 4: Schema mismatch на read
Scenario: Run 1 пишет Parquet с columns [a, b, c]. Run 2 меняется SELECT, пишет [a, b, c, d]. Старые files имеют 3 колонки, новые — 4. Reader read_parquet('**/*.parquet') получает inconsistent schema.
DuckDB behavior:
- Default: schema из first file, остальные с union → ошибка
column d not found in first file. - С
union_by_name=true: схема — union всех files, missing values = NULL. Slow (full scan для schema discovery).
Iceberg solves this: schema хранится в metadata layer, не в files. ALTER TABLE ADD COLUMN — atomic metadata operation, старые files автоматически возвращают NULL для new column.
Raw Parquet mitigation:
- Each run пишет в new path. Не append к существующему. View переключается на new path атомарно.
- Или: использовать
partition_byтак, чтобы старые files оставались immutable (например, partition by date — старые dates не trogаются).
Cleanup pattern: GC через post-hooks
gc_old_s3_writes — макрос, который удаляет старые versioned paths старше retention period:
{% macro gc_old_s3_writes(s3_location, retention_days) %}
{%- set cutoff = (modules.datetime.datetime.now() - modules.datetime.timedelta(days=retention_days)).strftime('%Y-%m-%d-%H%M%S') -%}
{{ log("Garbage collecting S3 writes older than " ~ cutoff ~ " under " ~ s3_location, info=True) }}
{% do adapter.execute_python(
module='macros.python_helpers.s3_helpers',
function='delete_s3_objects_older_than',
args={
's3_prefix': s3_location ~ '/_writes/',
'cutoff_iso': cutoff,
}
) %}
{% endmacro %}
Python helper:
def delete_s3_objects_older_than(s3_prefix: str, cutoff_iso: str):
import boto3
s3 = boto3.client('s3')
bucket = s3_prefix.split('/')[2]
prefix = '/'.join(s3_prefix.split('/')[3:])
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get('Contents', []):
# Key format: prefix/2026-05-19-040523/...
path_parts = obj['Key'].split('/')
timestamp_part = path_parts[len(prefix.split('/')) - 1]
if timestamp_part < cutoff_iso:
s3.delete_object(Bucket=bucket, Key=obj['Key'])
Альтернатива — S3 Lifecycle Policy
Вместо Jinja GC можно использовать S3 Lifecycle Policy на bucket:
{
"Rules": [{
"Id": "DeleteOldWrites",
"Status": "Enabled",
"Prefix": "_writes/",
"Expiration": {"Days": 90}
}]
}
S3 сам удалит objects старше 90 дней. Бесплатно (S3 lifecycle — free feature), runs async — не блокирует dbt. Production recommendation: lifecycle policy для bulk GC, Jinja GC только для surgical cleanups.
Ссылки на dbt-iceberg / dbt-glue source
dbt-iceberg/dbt/include/iceberg/macros/materializations/— pkg source.dbt-snowflake/dbt/include/snowflake/macros/materializations/table.sql::create_iceberg_table— Snowflake native Iceberg.dbt-duckdb/dbt/include/duckdb/macros/materializations/external.sql— встроенный external для DuckDB.dbt-athena/dbt/include/athena/macros/materializations/— adapter, который как раз read S3 + Glue.core/dbt/adapters/contracts/adapter.py::AdapterCapabilities— capability flags.
Ключевые выводы
-
Iceberg materialization обеспечивает ACID, schema evolution metadata-only, snapshot isolation поверх Parquet. Проверять через
adapter.capabilities.iceberg_supported. На raw Parquet — re-write всё при schema change. -
S3 + Glue analytics handoff — реальный use case: dbt пишет Parquet в S3, регистрирует в Glue catalog, downstream Athena/EMR consume. Glue registration через Python boto3 step.
-
Orphan files — failed run может оставить S3 files без Glue registration. Mitigation: explicit cleanup при partial failure, или S3 lifecycle policy для bulk GC.
-
Eventual consistency в multi-region S3 replication — read из другого региона может не сразу видеть write. Same-region reads / writes или retry logic в verify-post-hook.
-
Retry idempotency через timestamped paths: каждый run пишет в
_writes/<timestamp>/, не переписывает существующее. Failed retry не corrupt current state. -
Cleanup pattern: S3 lifecycle policy для bulk GC (бесплатно, async). Jinja GC через
adapter.execute_python+ boto3 для surgical cleanup.