Microbatch parallel: idempotency, retry per batch, —concurrent-batches
Microbatch концепт, event_time/batch_size/begin/lookback, базовый run/backfill — материал dbt-ii/03. Если ещё не знаешь — сюда. Здесь — --concurrent-batches deep, isolation на warehouses без snapshot isolation, retry orchestration.
Краткая сводка для контекста: microbatch разбивает один dbt run incremental-модели на множество DELETE+INSERT-операций per partition. event_time+batch_size определяют, как делить, lookback — окно для late-arriving events, begin — нижняя граница backfill. Каждый batch атомарен и идемпотентен on its own. Этого достаточно — здесь сразу к advanced.
—concurrent-batches: scheduler внутри
По умолчанию dbt-core выполняет batches последовательно даже если они независимы. Lookback=7 на таблице где каждый batch — 5 минут -> 35 минут wall-clock. С --concurrent-batches N появляется внутренний scheduler, который кладёт batches в task pool и забирает по N одновременно.
dbt run --select fct_events --concurrent-batches 8
Внутренности scheduler
В core/dbt/task/run.py есть MicrobatchModelRunner. Когда concurrent_batches > 1, runner:
- Строит list batch-задач из
event_time_start..event_time_endshifted bybatch_size. - Создаёт
ThreadPoolExecutor(max_workers=concurrent_batches). - Submit каждый batch как future — каждый future это отдельный
execute_batch(batch_meta)call. - Собирает results через
as_completed(). - После всех batches — мерж в
batch_resultsдля run_results.json.
Это отдельный thread pool в рамках одной model task, в добавок к глобальному threads pool в profile.
Взаимодействие с threads в profile
Это два разных уровня параллельности — junior часто путают:
threads: Nвprofiles.yml— сколько моделей dbt запускает одновременно. Глобальный pool task runner’а.--concurrent-batches M— сколько batches одной microbatch model идут одновременно. Локальный pool внутри model task.
Эффективный max concurrency = threads × concurrent_batches (если все модели microbatch и одного размера). 10 microbatch-моделей, threads=4, concurrent_batches=4 -> до 16 одновременных queries на warehouse.
Deadlock scenarios
Параллельные DELETE+INSERT на одну таблицу могут deadlock даже без cross-batch dependencies:
- Lock escalation на не-partitioned таблицах. PostgreSQL/Redshift: DELETE с большой выборкой эскалирует row locks до table lock. Два batch-thread’а одновременно делают DELETE на разных partitions, обе пытаются получить table lock — deadlock detection kills один.
- Snowflake table locks при DDL-like operations. Если microbatch триггерит ALTER (например,
on_schema_change='append_new_columns'), DDL берёт exclusive lock на таблицу. Параллельный batch с DML пытается прочитать metadata — wait или fail. - Cross-resource contention. Два batch’а одновременно делают MERGE на двух разных моделях, но обе зависят от одной staging-таблицы. Staging захвачена locks от обоих writes — deadlock через shared resource.
Симптом в логах: 40P01 deadlock detected (Postgres) или ER_LOCK_DEADLOCK (MySQL-семейство). На Snowflake — Statement reached its statement or warehouse timeout без явного deadlock message, потому что Snowflake резолвит через queueing, а не detection.
Isolation на warehouses без snapshot isolation
Главный gotcha parallel microbatch: между DELETE и INSERT одного batch’а reader видит partial state на warehouses без snapshot isolation.
Какие warehouses имеют какую isolation
| Warehouse | Default isolation | Reader sees mid-batch? |
|---|---|---|
| Snowflake | Snapshot (read-committed snapshot) | Нет — txn-level snapshot |
| BigQuery | Snapshot per query | Нет в одном query, да между queries |
| Redshift | Read-committed | Да — после DELETE и до INSERT |
| Postgres | Read-committed (default) | Да |
| DuckDB | Serializable single-writer | Нет |
Что видит downstream consumer
Сценарий: BI-dashboard каждые 30 секунд читает SELECT COUNT(*) FROM fct_events. dbt запускает microbatch с concurrent_batches=8, каждый batch ~ 5 минут. Между DELETE и INSERT для каждой партиции есть окно где её данных нет в таблице.
На Redshift или Postgres: пользователь refresh’нул dashboard ровно в это окно -> видит занижённый count. 8 batches × 5 минут окна = ~40 minutes total exposure window per run. На активных warehouse часть reads гарантированно попадут в дыру.
Защита: commit-per-batch с проверкой и retry
Naive подход — DELETE; INSERT; в одной транзакции. Это решает proboiлему частично:
BEGIN;
DELETE FROM fct_events WHERE event_date = '2026-05-15';
INSERT INTO fct_events SELECT ... WHERE event_date = '2026-05-15';
COMMIT;
На Redshift это даёт atomic switch — reader видит либо old, либо new state. Microbatch macro для Redshift в dbt-redshift обёртывает batch в BEGIN/COMMIT именно поэтому.
Но не на всех warehouses dbt-core это делает. Проверь adapter:
dbt-snowflake— не нужен explicit txn (snapshot isolation).dbt-redshift— wraps в txn в materialization.dbt-bigquery— нет txn, используетMERGEдля atomic swap.dbt-postgres— wraps в txn.
Если используешь custom microbatch macro на community-adapter — проверь, есть ли BEGIN/COMMIT. Если нет — добавляй.
Pattern: shadow table swap для read-heavy таблиц
Если read-traffic критичен и DELETE+INSERT окно неприемлемо, паттерн shadow table swap:
-- Custom pre/post hooks вокруг batch
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
pre_hook="CREATE TABLE IF NOT EXISTS {{ this }}_staging LIKE {{ this }}",
post_hook="ALTER TABLE {{ this }} SWAP WITH {{ this }}_staging"
) }}
Каждый batch пишет в staging, после успеха — atomic SWAP. Reader всё время видит consistent state. Стоимость — двойной storage временно.
Retry orchestration через run_results
run_results.json содержит per-batch status. Этим пользуются orchestrators (Airflow, Prefect, Dagster) для surgical retry — re-run только failed batches.
Структура batch_results
{
"results": [
{
"unique_id": "model.my_project.fct_events",
"status": "partial success",
"batch_results": {
"successful": [
["2026-05-12T00:00:00", "2026-05-13T00:00:00"],
["2026-05-13T00:00:00", "2026-05-14T00:00:00"]
],
"failed": [
["2026-05-14T00:00:00", "2026-05-15T00:00:00"]
],
"skipped": []
}
}
]
}
failed — list of [start, end] tuples, не date strings. Это важно для retry — orchestrator passes их в --event-time-start/--event-time-end.
dbt-native retry: result:error+1
Простейший случай — встроенная selector:
dbt run --select fct_events
# 3 batches failed
dbt retry # читает run_results.json, re-runs ТОЛЬКО failed batches
dbt retry парсит последний run_results.json, находит failed nodes и для microbatch-моделей — failed batches внутри node. Re-run происходит точечно.
Limitation: dbt retry не работает между runs если запускается из другой rabочей директории / другого CI job. run_results.json живёт в target/ — должен persist между runs.
Airflow pattern: parse run_results и dynamic retry
В production Airflow обычно делает свой retry — потому что dbt retry не интегрируется с Airflow’s task state.
# Airflow DAG (упрощённо)
from airflow.decorators import task
import json
@task
def run_microbatch_model(model: str):
result = subprocess.run(
["dbt", "run", "--select", model, "--concurrent-batches", "8"],
capture_output=True
)
with open("target/run_results.json") as f:
results = json.load(f)
for node_result in results["results"]:
if node_result["status"] == "partial success":
failed_batches = node_result["batch_results"]["failed"]
# Trigger retry tasks для каждого failed batch
for [start, end] in failed_batches:
retry_batch.override(retries=3).expand(
start=[start], end=[end], model=[model]
)
@task
def retry_batch(model: str, start: str, end: str):
subprocess.run([
"dbt", "run", "--select", model,
"--event-time-start", start,
"--event-time-end", end,
"--concurrent-batches", "4" # меньше concurrency на retry
], check=True)
Key insights production-orchestrator implementation:
- Меньше concurrency на retry — если batches упали из-за warehouse pressure, retry с тем же concurrency повторит проблему.
- Exponential backoff per retry attempt — между retry attempts wait
2^attempt * base_delay. - Separate alerting на repeated failures — если 3 retry прошли и batch всё ещё падает, это not transient, нужен page on-call.
- DLQ для permanently failed batches — после max retries записать в dead-letter table для manual investigation.
Dagster pattern: asset materialization per batch
В Dagster microbatch fits naturally в asset model — каждый batch это independent partition asset:
from dagster import asset, DailyPartitionsDefinition
@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def fct_events(context):
partition_date = context.partition_key
subprocess.run([
"dbt", "run", "--select", "fct_events",
"--event-time-start", partition_date,
"--event-time-end", f"{partition_date}T23:59:59"
], check=True)
Dagster берёт на себя partition tracking, retry semantics, observability. dbt становится execution layer per partition. Trade-off: теряешь dbt-native concurrent_batches — каждый Dagster run делает одну партицию.
Idempotency invariants
Re-run одного и того же batch’а должен давать бит-в-бит одинаковый результат. Иначе retry создаёт drift. Что ломает:
Non-deterministic SQL constructs
-
current_timestamp(),now()в model SQL. Каждый retry получает новое время — другие значения в derived columns. Решение:dbt_invocation_start_time()(фиксируется на запуск dbt) или явный pass через var. -
RAND(),RANDOM(),UUID(). Очевидно non-deterministic. Если нужны — pre-compute в source и читай как column. -
ROW_NUMBER() OVER (ORDER BY non_unique_col). Если order by даёт ties, физический порядок rows не deterministic. Tie-break добавлением unique column:ORDER BY non_unique_col, id. -
Самореференция
{{ this }}в derived columns. Если model читает себя для расчёта (например, max(id) + 1), retry batch’а после partial commit видит другой{{ this }}state. Используйevent_time-based filtering, не self-reference.
Source data mutability
Если raw данные мутируют между batch execution и retry — batch retry получит другой input. Что делать:
- CDC streams append-only — не mutate, OK.
- OLTP snapshots — если source это nightly snapshot OLTP, retry через день может попасть на другой snapshot. Mitigation: фиксируй snapshot_id в source query:
WHERE source_snapshot_id = '{{ var("snapshot_id") }}'. - Soft-delete late-arriving updates — если source может update past records (например, order.status меняется через дни), microbatch не справится точно. Используй SCD-2 или explicit version columns.
Schema evolution
Если source schema меняется между batches, retry старого batch’а может упасть на mismatched columns. Защита: on_schema_change='fail' (не auto-evolve), и pin source schema через explicit select col1, col2, ... (не select *).
Real numbers: 5B-table проект, 200 batches параллельно
Реальный case с Snowflake X-Large (8 nodes, ~64GB compute per node):
- Таблица:
fct_events, 5.2B rows, partitioned byevent_hour. - Strategy: microbatch,
batch_size='hour', lookback=72 hours. - Run scope: lookback re-process = 72 batches.
- Backfill scope: 90 days = 90 × 24 = 2160 batches.
Wall-clock измерения
| Config | Wall-clock | Notes |
|---|---|---|
| concurrent_batches=1 | 5.5 hours | Sequential, single-cluster X-Large |
| concurrent_batches=8 | 52 minutes | Default DEFAULT_CONCURRENCY |
| concurrent_batches=16 | 38 minutes | Diminishing returns from queueing |
| concurrent_batches=32 | 41 minutes | Worse — query queue overflow |
| concurrent_batches=16 + multi-cluster (max=4) | 14 minutes | Real parallelism |
| concurrent_batches=200 (peak setup) | 9 minutes | Snowflake MAX_CONCURRENCY_LEVEL=200, MAX_CLUSTER_COUNT=8 |
Cost analysis
Wall-clock сокращение не = cost saving. Total compute = warehouse_size × wall_clock:
- Single-cluster sequential 5.5h × X-Large (88**
- Single-cluster concurrent_batches=16, 38min × X-Large = $10.13
- Multi-cluster max=4, concurrent_batches=16, 14min × (X-Large × 2.5 avg cluster fill) = $9.33
- Peak setup 200 batches × 9min × multi-cluster max=8 × X-Large = $17.07
Sweet spot не “максимальный concurrent_batches”, а balance с warehouse sizing и cluster count. 200 параллельных batches с multi-cluster max=8 быстрее, но не дешевле — потому что multi-cluster billing считает каждый cluster отдельно.
Failure rate at scale
200 параллельных batches на 5B-table:
- ~2-3 batches/run упали с transient warehouse errors (out-of-memory, statement timeout).
- ~0.1% — это норма на scale, не bug. Orchestrator должен retry автоматически.
- Если failure rate растёт до 5%+ — warehouse undersized, нужен upscale.
Lessons learned production case
- Не максимизируй concurrent_batches blindly. После определённой точки — diminishing returns от warehouse queueing.
- Multi-cluster даёт реальный speedup, но dvigaet cost equation.
- Always orchestrate retry — на 200+ batches failure inevitable, dbt-native retry insufficient.
- Monitoring per-batch latency p99 — если p99 растёт, скорее всего partition skew (одна дата = 100x больше rows).
- Backfill отдельно от incremental runs. Backfill jobs с aggressive concurrent_batches, incremental с conservative — потому что incremental runs in production hours conflicts с downstream reads.
Snowflake/BigQuery: microbatch и partitioning
Microbatch не делает physical partitioning сам — это logical batching. Чтобы DELETE per batch был быстрым, физически таблица должна быть partition by event_date/event_hour.
Snowflake: cluster keys.
ALTER TABLE fct_events CLUSTER BY (event_hour);
BigQuery: partition tables.
CREATE TABLE fct_events (...)
PARTITION BY DATE(event_timestamp)
В dbt-конфиге:
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_timestamp',
batch_size='day',
partition_by={'field': 'event_timestamp', 'data_type': 'timestamp', 'granularity': 'day'}
) }}
С этим DELETE WHERE event_date = 'X' использует partition pruning — Snowflake/BQ читают только конкретный partition, не всю таблицу.
DuckDB: native partitioning slabnoe, microbatch на DuckDB OK для обучения, но в production не подходит для 100B-таблиц.
Microbatch и unique_key — несовместимы
Важный момент: microbatch НЕ поддерживает unique_key. Это by design.
Логика: microbatch удаляет всю партицию и вставляет заново. Не происходит “merge” внутри партиции. Если в batch появилось два события с одним и тем же event_id, оба попадут в target.
-- ОШИБКА: dbt падает на parse
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_timestamp',
batch_size='day',
unique_key='event_id' -- config error!
) }}
Решение для дедупликации:
- Дедуп в staging через
qualify row_number() over (partition by event_id order by loaded_at desc) = 1. - Дедуп в самой microbatch модели через
qualifyв SELECT.
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_timestamp',
batch_size='day'
) }}
select
*
from {{ source('app', 'events') }}
qualify row_number() over (partition by event_id order by loaded_at desc) = 1
Failure modes для microbatch
Production gotchas:
-
event_time column must be NOT NULL — если в source приходит NULL event_timestamp, dbt не знает, в какой batch положить. Падение.
-
Timezone mismatch — source timestamps в UTC, batch_size=‘day’ в local timezone. Batch boundaries не совпадают, dublirovanie данных в boundary partitions.
-
lookback × concurrent_batches × table size — на больших lookback можно случайно сделать 100x rebuild. lookback=30, concurrent=10, batch_size=‘hour’ -> 720 hour-batches, parallel 10 -> warehouse on fire.
-
DDL during run — если кто-то alter таблицу в середине microbatch run (добавил column), batches с разными schemas падают.
-
Cost spike на backfill — забыли поставить
--concurrent-batches, backfill 365 дней последовательно занимает 24 часа, $$$ continuously running warehouse.
Самая распространённая ошибка — забыть про timezone. Source в UTC, dbt batches в local. Утренние events попадают в “вчерашнюю” партицию. Через neделю обнаруживаете 5% drift в metrics. Решение — DATE_TRUNC(‘day’, event_timestamp AT TIME ZONE ‘UTC’) явно везде.
Observability microbatch
В run_results.json есть batch_results per model:
{
"unique_id": "model.my_project.fct_events",
"batch_results": {
"successful": [
["2026-05-12T00:00:00", "2026-05-13T00:00:00"],
["2026-05-13T00:00:00", "2026-05-14T00:00:00"]
],
"failed": [
["2026-05-14T00:00:00", "2026-05-15T00:00:00"]
],
"skipped": []
}
}
Полезные паттерны:
- Alert на failed batches — после run проверять
batch_results.failed, если > 0 — alert. - Latency tracking — для каждого batch есть start/end. Можно построить distribution latency per partition.
- Throughput chart — rows_affected per batch, понимаете data velocity.
Сравнение с другими стратегиями
| Аспект | append | delete+insert | merge | microbatch |
|---|---|---|---|---|
| Idempotent retry | Нет | Per row | Per row | Per batch |
| Backfill | full-refresh | full-refresh | full-refresh | —event-time-start/end |
| Parallel batches | Нет | Нет | Нет | —concurrent-batches |
| Late arriving | Нет лookback | Через lookback config | Через lookback config | lookback param |
| Big table support | Хорошо | OK | OK | Отлично (partitioned) |
| Complexity | Низкая | Средняя | Средняя | Высокая |
Microbatch — overkill для маленьких таблиц. Для 100M+ строк partition-by event_time — это production standard.
Резюме
- Microbatch — стратегия для большых event-time partitioned таблиц.
--concurrent-batches N— параллельный run батчей. 3-5x speedup на нужных setup’ах.- Idempotent retry через
result:error+1— пере-запуск только failed batches. - Backfill через
--event-time-start/--event-time-end— natural mechanism. - lookback — окно для late-arriving events, ставьте по P99 lag.
- Не поддерживает unique_key — дедуп через
qualifyв самой модели. - Gotchas: timezone, lookback × concurrent multiplication, partitioning misconfig.
- Production: 100B-таблицы -> batch_size=‘hour’, concurrent=8, lookback=3-7.