Learning Platform
Глоссарий Troubleshooting
Урок 11.04 · 24 мин
Продвинутый
microbatchparallelincrementalbackfill

Microbatch parallel: idempotency, retry per batch, —concurrent-batches

NOTE

Microbatch концепт, event_time/batch_size/begin/lookback, базовый run/backfill — материал dbt-ii/03. Если ещё не знаешь — сюда. Здесь — --concurrent-batches deep, isolation на warehouses без snapshot isolation, retry orchestration.

Краткая сводка для контекста: microbatch разбивает один dbt run incremental-модели на множество DELETE+INSERT-операций per partition. event_time+batch_size определяют, как делить, lookback — окно для late-arriving events, begin — нижняя граница backfill. Каждый batch атомарен и идемпотентен on its own. Этого достаточно — здесь сразу к advanced.

Microbatch: концепт и зачем появился (dbt II) Idempotency — MERGE INTO, atomic writes, deterministic paths (airflow-course)

—concurrent-batches: scheduler внутри

По умолчанию dbt-core выполняет batches последовательно даже если они независимы. Lookback=7 на таблице где каждый batch — 5 минут -> 35 минут wall-clock. С --concurrent-batches N появляется внутренний scheduler, который кладёт batches в task pool и забирает по N одновременно.

dbt run --select fct_events --concurrent-batches 8

Внутренности scheduler

В core/dbt/task/run.py есть MicrobatchModelRunner. Когда concurrent_batches > 1, runner:

  1. Строит list batch-задач из event_time_start..event_time_end shifted by batch_size.
  2. Создаёт ThreadPoolExecutor(max_workers=concurrent_batches).
  3. Submit каждый batch как future — каждый future это отдельный execute_batch(batch_meta) call.
  4. Собирает results через as_completed().
  5. После всех batches — мерж в batch_results для run_results.json.

Это отдельный thread pool в рамках одной model task, в добавок к глобальному threads pool в profile.

Взаимодействие с threads в profile

Это два разных уровня параллельности — junior часто путают:

  • threads: N в profiles.yml — сколько моделей dbt запускает одновременно. Глобальный pool task runner’а.
  • --concurrent-batches M — сколько batches одной microbatch model идут одновременно. Локальный pool внутри model task.

Эффективный max concurrency = threads × concurrent_batches (если все модели microbatch и одного размера). 10 microbatch-моделей, threads=4, concurrent_batches=4 -> до 16 одновременных queries на warehouse.

Deadlock scenarios

Параллельные DELETE+INSERT на одну таблицу могут deadlock даже без cross-batch dependencies:

  1. Lock escalation на не-partitioned таблицах. PostgreSQL/Redshift: DELETE с большой выборкой эскалирует row locks до table lock. Два batch-thread’а одновременно делают DELETE на разных partitions, обе пытаются получить table lock — deadlock detection kills один.
  2. Snowflake table locks при DDL-like operations. Если microbatch триггерит ALTER (например, on_schema_change='append_new_columns'), DDL берёт exclusive lock на таблицу. Параллельный batch с DML пытается прочитать metadata — wait или fail.
  3. Cross-resource contention. Два batch’а одновременно делают MERGE на двух разных моделях, но обе зависят от одной staging-таблицы. Staging захвачена locks от обоих writes — deadlock через shared resource.

Симптом в логах: 40P01 deadlock detected (Postgres) или ER_LOCK_DEADLOCK (MySQL-семейство). На Snowflake — Statement reached its statement or warehouse timeout без явного deadlock message, потому что Snowflake резолвит через queueing, а не detection.

Isolation на warehouses без snapshot isolation

Главный gotcha parallel microbatch: между DELETE и INSERT одного batch’а reader видит partial state на warehouses без snapshot isolation.

Какие warehouses имеют какую isolation

WarehouseDefault isolationReader sees mid-batch?
SnowflakeSnapshot (read-committed snapshot)Нет — txn-level snapshot
BigQuerySnapshot per queryНет в одном query, да между queries
RedshiftRead-committedДа — после DELETE и до INSERT
PostgresRead-committed (default)Да
DuckDBSerializable single-writerНет

Что видит downstream consumer

Сценарий: BI-dashboard каждые 30 секунд читает SELECT COUNT(*) FROM fct_events. dbt запускает microbatch с concurrent_batches=8, каждый batch ~ 5 минут. Между DELETE и INSERT для каждой партиции есть окно где её данных нет в таблице.

На Redshift или Postgres: пользователь refresh’нул dashboard ровно в это окно -> видит занижённый count. 8 batches × 5 минут окна = ~40 minutes total exposure window per run. На активных warehouse часть reads гарантированно попадут в дыру.

Защита: commit-per-batch с проверкой и retry

Naive подход — DELETE; INSERT; в одной транзакции. Это решает proboiлему частично:

BEGIN;
DELETE FROM fct_events WHERE event_date = '2026-05-15';
INSERT INTO fct_events SELECT ... WHERE event_date = '2026-05-15';
COMMIT;

На Redshift это даёт atomic switch — reader видит либо old, либо new state. Microbatch macro для Redshift в dbt-redshift обёртывает batch в BEGIN/COMMIT именно поэтому.

Но не на всех warehouses dbt-core это делает. Проверь adapter:

  • dbt-snowflake — не нужен explicit txn (snapshot isolation).
  • dbt-redshift — wraps в txn в materialization.
  • dbt-bigquery — нет txn, использует MERGE для atomic swap.
  • dbt-postgres — wraps в txn.

Если используешь custom microbatch macro на community-adapter — проверь, есть ли BEGIN/COMMIT. Если нет — добавляй.

Pattern: shadow table swap для read-heavy таблиц

Если read-traffic критичен и DELETE+INSERT окно неприемлемо, паттерн shadow table swap:

-- Custom pre/post hooks вокруг batch
{{ config(
    materialized='incremental',
    incremental_strategy='microbatch',
    pre_hook="CREATE TABLE IF NOT EXISTS {{ this }}_staging LIKE {{ this }}",
    post_hook="ALTER TABLE {{ this }} SWAP WITH {{ this }}_staging"
) }}

Каждый batch пишет в staging, после успеха — atomic SWAP. Reader всё время видит consistent state. Стоимость — двойной storage временно.

Retry orchestration через run_results

run_results.json содержит per-batch status. Этим пользуются orchestrators (Airflow, Prefect, Dagster) для surgical retry — re-run только failed batches.

Структура batch_results

{
  "results": [
    {
      "unique_id": "model.my_project.fct_events",
      "status": "partial success",
      "batch_results": {
        "successful": [
          ["2026-05-12T00:00:00", "2026-05-13T00:00:00"],
          ["2026-05-13T00:00:00", "2026-05-14T00:00:00"]
        ],
        "failed": [
          ["2026-05-14T00:00:00", "2026-05-15T00:00:00"]
        ],
        "skipped": []
      }
    }
  ]
}

failed — list of [start, end] tuples, не date strings. Это важно для retry — orchestrator passes их в --event-time-start/--event-time-end.

dbt-native retry: result:error+1

Простейший случай — встроенная selector:

dbt run --select fct_events
# 3 batches failed

dbt retry  # читает run_results.json, re-runs ТОЛЬКО failed batches

dbt retry парсит последний run_results.json, находит failed nodes и для microbatch-моделей — failed batches внутри node. Re-run происходит точечно.

Limitation: dbt retry не работает между runs если запускается из другой rabочей директории / другого CI job. run_results.json живёт в target/ — должен persist между runs.

Airflow pattern: parse run_results и dynamic retry

В production Airflow обычно делает свой retry — потому что dbt retry не интегрируется с Airflow’s task state.

# Airflow DAG (упрощённо)
from airflow.decorators import task
import json

@task
def run_microbatch_model(model: str):
    result = subprocess.run(
        ["dbt", "run", "--select", model, "--concurrent-batches", "8"],
        capture_output=True
    )
    with open("target/run_results.json") as f:
        results = json.load(f)
    
    for node_result in results["results"]:
        if node_result["status"] == "partial success":
            failed_batches = node_result["batch_results"]["failed"]
            # Trigger retry tasks для каждого failed batch
            for [start, end] in failed_batches:
                retry_batch.override(retries=3).expand(
                    start=[start], end=[end], model=[model]
                )

@task
def retry_batch(model: str, start: str, end: str):
    subprocess.run([
        "dbt", "run", "--select", model,
        "--event-time-start", start,
        "--event-time-end", end,
        "--concurrent-batches", "4"  # меньше concurrency на retry
    ], check=True)

Key insights production-orchestrator implementation:

  1. Меньше concurrency на retry — если batches упали из-за warehouse pressure, retry с тем же concurrency повторит проблему.
  2. Exponential backoff per retry attempt — между retry attempts wait 2^attempt * base_delay.
  3. Separate alerting на repeated failures — если 3 retry прошли и batch всё ещё падает, это not transient, нужен page on-call.
  4. DLQ для permanently failed batches — после max retries записать в dead-letter table для manual investigation.

Dagster pattern: asset materialization per batch

В Dagster microbatch fits naturally в asset model — каждый batch это independent partition asset:

from dagster import asset, DailyPartitionsDefinition

@asset(partitions_def=DailyPartitionsDefinition(start_date="2024-01-01"))
def fct_events(context):
    partition_date = context.partition_key
    subprocess.run([
        "dbt", "run", "--select", "fct_events",
        "--event-time-start", partition_date,
        "--event-time-end", f"{partition_date}T23:59:59"
    ], check=True)

Dagster берёт на себя partition tracking, retry semantics, observability. dbt становится execution layer per partition. Trade-off: теряешь dbt-native concurrent_batches — каждый Dagster run делает одну партицию.

Idempotency invariants

Re-run одного и того же batch’а должен давать бит-в-бит одинаковый результат. Иначе retry создаёт drift. Что ломает:

Non-deterministic SQL constructs

  1. current_timestamp(), now() в model SQL. Каждый retry получает новое время — другие значения в derived columns. Решение: dbt_invocation_start_time() (фиксируется на запуск dbt) или явный pass через var.

  2. RAND(), RANDOM(), UUID(). Очевидно non-deterministic. Если нужны — pre-compute в source и читай как column.

  3. ROW_NUMBER() OVER (ORDER BY non_unique_col). Если order by даёт ties, физический порядок rows не deterministic. Tie-break добавлением unique column: ORDER BY non_unique_col, id.

  4. Самореференция {{ this }} в derived columns. Если model читает себя для расчёта (например, max(id) + 1), retry batch’а после partial commit видит другой {{ this }} state. Используй event_time-based filtering, не self-reference.

Source data mutability

Если raw данные мутируют между batch execution и retry — batch retry получит другой input. Что делать:

  1. CDC streams append-only — не mutate, OK.
  2. OLTP snapshots — если source это nightly snapshot OLTP, retry через день может попасть на другой snapshot. Mitigation: фиксируй snapshot_id в source query: WHERE source_snapshot_id = '{{ var("snapshot_id") }}'.
  3. Soft-delete late-arriving updates — если source может update past records (например, order.status меняется через дни), microbatch не справится точно. Используй SCD-2 или explicit version columns.

Schema evolution

Если source schema меняется между batches, retry старого batch’а может упасть на mismatched columns. Защита: on_schema_change='fail' (не auto-evolve), и pin source schema через explicit select col1, col2, ... (не select *).

Real numbers: 5B-table проект, 200 batches параллельно

Реальный case с Snowflake X-Large (8 nodes, ~64GB compute per node):

  • Таблица: fct_events, 5.2B rows, partitioned by event_hour.
  • Strategy: microbatch, batch_size='hour', lookback=72 hours.
  • Run scope: lookback re-process = 72 batches.
  • Backfill scope: 90 days = 90 × 24 = 2160 batches.

Wall-clock измерения

ConfigWall-clockNotes
concurrent_batches=15.5 hoursSequential, single-cluster X-Large
concurrent_batches=852 minutesDefault DEFAULT_CONCURRENCY
concurrent_batches=1638 minutesDiminishing returns from queueing
concurrent_batches=3241 minutesWorse — query queue overflow
concurrent_batches=16 + multi-cluster (max=4)14 minutesReal parallelism
concurrent_batches=200 (peak setup)9 minutesSnowflake MAX_CONCURRENCY_LEVEL=200, MAX_CLUSTER_COUNT=8

Cost analysis

Wall-clock сокращение не = cost saving. Total compute = warehouse_size × wall_clock:

  • Single-cluster sequential 5.5h × X-Large (16/h)=16/h) = **88**
  • Single-cluster concurrent_batches=16, 38min × X-Large = $10.13
  • Multi-cluster max=4, concurrent_batches=16, 14min × (X-Large × 2.5 avg cluster fill) = $9.33
  • Peak setup 200 batches × 9min × multi-cluster max=8 × X-Large = $17.07

Sweet spot не “максимальный concurrent_batches”, а balance с warehouse sizing и cluster count. 200 параллельных batches с multi-cluster max=8 быстрее, но не дешевле — потому что multi-cluster billing считает каждый cluster отдельно.

Failure rate at scale

200 параллельных batches на 5B-table:

  • ~2-3 batches/run упали с transient warehouse errors (out-of-memory, statement timeout).
  • ~0.1% — это норма на scale, не bug. Orchestrator должен retry автоматически.
  • Если failure rate растёт до 5%+ — warehouse undersized, нужен upscale.

Lessons learned production case

  1. Не максимизируй concurrent_batches blindly. После определённой точки — diminishing returns от warehouse queueing.
  2. Multi-cluster даёт реальный speedup, но dvigaet cost equation.
  3. Always orchestrate retry — на 200+ batches failure inevitable, dbt-native retry insufficient.
  4. Monitoring per-batch latency p99 — если p99 растёт, скорее всего partition skew (одна дата = 100x больше rows).
  5. Backfill отдельно от incremental runs. Backfill jobs с aggressive concurrent_batches, incremental с conservative — потому что incremental runs in production hours conflicts с downstream reads.

Snowflake/BigQuery: microbatch и partitioning

Microbatch не делает physical partitioning сам — это logical batching. Чтобы DELETE per batch был быстрым, физически таблица должна быть partition by event_date/event_hour.

Snowflake: cluster keys.

ALTER TABLE fct_events CLUSTER BY (event_hour);

BigQuery: partition tables.

CREATE TABLE fct_events (...)
PARTITION BY DATE(event_timestamp)

В dbt-конфиге:

{{ config(
    materialized='incremental',
    incremental_strategy='microbatch',
    event_time='event_timestamp',
    batch_size='day',
    partition_by={'field': 'event_timestamp', 'data_type': 'timestamp', 'granularity': 'day'}
) }}

С этим DELETE WHERE event_date = 'X' использует partition pruning — Snowflake/BQ читают только конкретный partition, не всю таблицу.

DuckDB: native partitioning slabnoe, microbatch на DuckDB OK для обучения, но в production не подходит для 100B-таблиц.

Microbatch и unique_key — несовместимы

Важный момент: microbatch НЕ поддерживает unique_key. Это by design.

Логика: microbatch удаляет всю партицию и вставляет заново. Не происходит “merge” внутри партиции. Если в batch появилось два события с одним и тем же event_id, оба попадут в target.

-- ОШИБКА: dbt падает на parse
{{ config(
    materialized='incremental',
    incremental_strategy='microbatch',
    event_time='event_timestamp',
    batch_size='day',
    unique_key='event_id'  -- config error!
) }}

Решение для дедупликации:

  1. Дедуп в staging через qualify row_number() over (partition by event_id order by loaded_at desc) = 1.
  2. Дедуп в самой microbatch модели через qualify в SELECT.
{{ config(
    materialized='incremental',
    incremental_strategy='microbatch',
    event_time='event_timestamp',
    batch_size='day'
) }}

select
    *
from {{ source('app', 'events') }}
qualify row_number() over (partition by event_id order by loaded_at desc) = 1

Failure modes для microbatch

Production gotchas:

  1. event_time column must be NOT NULL — если в source приходит NULL event_timestamp, dbt не знает, в какой batch положить. Падение.

  2. Timezone mismatch — source timestamps в UTC, batch_size=‘day’ в local timezone. Batch boundaries не совпадают, dublirovanie данных в boundary partitions.

  3. lookback × concurrent_batches × table size — на больших lookback можно случайно сделать 100x rebuild. lookback=30, concurrent=10, batch_size=‘hour’ -> 720 hour-batches, parallel 10 -> warehouse on fire.

  4. DDL during run — если кто-то alter таблицу в середине microbatch run (добавил column), batches с разными schemas падают.

  5. Cost spike на backfill — забыли поставить --concurrent-batches, backfill 365 дней последовательно занимает 24 часа, $$$ continuously running warehouse.

WARNING

Самая распространённая ошибка — забыть про timezone. Source в UTC, dbt batches в local. Утренние events попадают в “вчерашнюю” партицию. Через neделю обнаруживаете 5% drift в metrics. Решение — DATE_TRUNC(‘day’, event_timestamp AT TIME ZONE ‘UTC’) явно везде.

Observability microbatch

В run_results.json есть batch_results per model:

{
  "unique_id": "model.my_project.fct_events",
  "batch_results": {
    "successful": [
      ["2026-05-12T00:00:00", "2026-05-13T00:00:00"],
      ["2026-05-13T00:00:00", "2026-05-14T00:00:00"]
    ],
    "failed": [
      ["2026-05-14T00:00:00", "2026-05-15T00:00:00"]
    ],
    "skipped": []
  }
}

Полезные паттерны:

  • Alert на failed batches — после run проверять batch_results.failed, если > 0 — alert.
  • Latency tracking — для каждого batch есть start/end. Можно построить distribution latency per partition.
  • Throughput chart — rows_affected per batch, понимаете data velocity.

Сравнение с другими стратегиями

Аспектappenddelete+insertmergemicrobatch
Idempotent retryНетPer rowPer rowPer batch
Backfillfull-refreshfull-refreshfull-refresh—event-time-start/end
Parallel batchesНетНетНет—concurrent-batches
Late arrivingНет лookbackЧерез lookback configЧерез lookback configlookback param
Big table supportХорошоOKOKОтлично (partitioned)
ComplexityНизкаяСредняяСредняяВысокая

Microbatch — overkill для маленьких таблиц. Для 100M+ строк partition-by event_time — это production standard.

Проверка знанийKnowledge check
У вас 50B-events таблица на Snowflake. Microbatch с batch_size='day', lookback=14, concurrent_batches=8 на X-Large warehouse. dbt run занимает 90 минут. Junior говорит: "увеличить concurrent_batches до 32 — будет в 4 раза быстрее". Что не так с этой логикой?
ОтветAnswer
Junior смотрит линейно, не учитывает constraints. Несколько проблем с "concurrent_batches=32": (1) Snowflake X-Large concurrent query limit (default 8). С concurrent_batches=32 -> 32 параллельных DELETE+INSERT operations. Из них 8 бегут, 24 ждут в очереди. Никакого выигрыша от concurrent_batches=32 vs 8 не будет — все остаются queued. (2) DELETE на одну таблицу с разных threads. Даже если warehouse мог бы 32 одновременных, DELETE creates table-level locks. Несколько DELETE statements на разные partitions параллельно на одной таблице — внутренне Snowflake может сериализовать через locks, особенно если cluster keys не идеальны. (3) Memory pressure. X-Large даёт фиксированный объём memory. 32 параллельных queries делят его. Каждая получает меньше — query performance деградирует. На больших aggregations это spilling to disk -> 5x медленнее. (4) Lookback=14 + concurrent_batches=32: всего 14 batches per run. Concurrent>14 ничего не даёт, потому что нет столько batches. Junior не учёл, что concurrent_batches ограничен количеством available batches. (5) Cost не оптимизируется. Increase concurrent_batches не уменьшает total compute time — warehouse работает то же время на total batches. Только перераспределяет throughput. Правильный анализ для ускорения 90-minute run: (a) Уменьшить batch_size — с day на hour. Это даст 14 days × 24 = 336 batches. С concurrent_batches=8 -> 42 параллельных waves × 5-7 min each = 30-40 min. Speedup 2-3x. (b) Upscale warehouse до 2X-Large. Каждая query получает 2x compute -> query time halved -> run в 2x быстрее (45 min). Cost grows but throughput too. (c) Multi-cluster warehouse MAX_CLUSTER_COUNT=4. Concurrent queries × 4. Concurrent_batches=8 теперь реально использует, не queued. (d) Уменьшить lookback с 14 до P99 lag (например, 3 days). 14->3 = 4.6x меньше batches -> 90 min -> 20 min. (e) Check partitioning — cluster keys by event_date присутствуют? Без них DELETE сканит всю 50B таблицу для каждого batch. Add proper clustering. Реальный grand plan: lookback=3, batch_size='hour', concurrent_batches=8, multi-cluster MAX=2, cluster keys на event_hour. Ожидаемый run time: 12-18 минут. Cost similar но throughput 5-8x. Это и есть senior-уровень оптимизации.

Резюме

  • Microbatch — стратегия для большых event-time partitioned таблиц.
  • --concurrent-batches N — параллельный run батчей. 3-5x speedup на нужных setup’ах.
  • Idempotent retry через result:error+1 — пере-запуск только failed batches.
  • Backfill через --event-time-start/--event-time-end — natural mechanism.
  • lookback — окно для late-arriving events, ставьте по P99 lag.
  • Не поддерживает unique_key — дедуп через qualify в самой модели.
  • Gotchas: timezone, lookback × concurrent multiplication, partitioning misconfig.
  • Production: 100B-таблицы -> batch_size=‘hour’, concurrent=8, lookback=3-7.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. Что даёт --concurrent-batches в microbatch?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5