Learning Platform
Глоссарий Troubleshooting
Урок 14.02 · 28 мин
Продвинутый
DataflowPlanSourceScanOptimizerMetricFlowquery optimization

DataflowPlan deep: nodes, optimizers, performance

В прошлом уроке мы прошли весь pipeline MetricFlow от YAML до SQL. Сейчас фокусируемся на самой важной части — DataflowPlan и его оптимизаторах. Это сердце MetricFlow и где лежит большая часть его «магии».

Понимание DataflowPlan важно для:

  1. Debugging unexpected SQL в compiled queries.
  2. Performance optimization при scale (100+ метрик, 50+ semantic_models).
  3. Поверить когда оптимизатор не работает (нашёл bug, плохо написанная metric).
  4. Contributions в MetricFlow (Apache 2.0).

Dict: hash function, open addressing, perturbation probe

DataflowPlan: что это

DataflowPlan — это DAG операций, которые должны быть выполнены для resolving metric query. Каждый узел — DataflowPlanNode — представляет одну операцию.

@dataclass
class DataflowPlan:
    sink_nodes: List[BaseDataflowPlanNode]  # финальный output
    nodes: List[BaseDataflowPlanNode]       # все узлы
    edges: List[Tuple[NodeId, NodeId]]      # parent -> child

Это directed acyclic graph (DAG). Источники (semantic_models) — root nodes. Output (computed metric) — sink node.

DataflowPlan: типы узлов

Полный list типов в metricflow/dataflow/nodes.py репозитория. Эти 8 — основные.


Пример: разбор реального plan

Query: revenue by country month.

# Pseudo-code MetricFlow trace
plan = DataflowPlan(
    sink_nodes=[write_node],
    nodes=[
        read_orders,
        read_customers,
        join_orders_customers,
        filter_recent,
        aggregate_revenue,
        group_by_country_month,
        compute_metric,
        write_node
    ],
    edges=[
        (read_orders, join_orders_customers),
        (read_customers, join_orders_customers),
        (join_orders_customers, filter_recent),
        (filter_recent, aggregate_revenue),
        (aggregate_revenue, group_by_country_month),
        (group_by_country_month, compute_metric),
        (compute_metric, write_node),
    ]
)

Visualization:

ReadFromSemanticModel(orders)       ReadFromSemanticModel(customers)
                  ↓                              ↓
                  └─────────┬───────────────────┘

                  JoinToBaseSemanticModel(LEFT, on customer_id)

                  WhereFilterNode(order_date >= '2025-01-01')

                  AggregateMeasuresNode(SUM(amount) AS revenue)

                  GroupByNode([country, metric_time__month])

                  ComputeMetricsNode(revenue)

                  WriteToResultDataFrame

Это unoptimized plan. Optimizer применяет changes.


Optimizer pipeline

MetricFlow has multi-pass optimizer. Каждый pass — отдельный оптимизатор:

optimizers = [
    SourceScanOptimizer,           # column pruning, source pruning
    PredicatePushdownOptimizer,    # filters down к source
    JoinReorderOptimizer,          # порядок joins для perf
    AggregateFusionOptimizer,      # merge multiple aggs
    SubqueryEliminationOptimizer,  # flatten CTEs где возможно
    ConstantFoldingOptimizer,      # eval constant expressions
]

for optimizer in optimizers:
    plan = optimizer.optimize(plan)

Каждый optimizer берёт plan, возвращает new plan (immutable approach). Это functional pipeline.


Optimizer 1: SourceScanOptimizer

Уже coverали в прошлом уроке. Recap:

Before:
  ReadFromSemanticModel(orders, columns=[all 50])
  ReadFromSemanticModel(customers, columns=[all 20])

After:
  ReadFromSemanticModel(orders, columns=[customer_id, amount, order_date])
  ReadFromSemanticModel(customers, columns=[customer_id, country])

SQL impact:

-- Before (without optimizer):
SELECT * FROM fct_orders   -- 50 columns

-- After:
SELECT customer_id, amount, order_date FROM fct_orders   -- 3 columns

Снижение I/O и cost (BigQuery scanned bytes, Snowflake compute time).

Когда не работает:

  • Сложные measure expressions, которые reference непредсказуемые columns (через variable substitution в Jinja).
  • Adapter dialects, где column pruning fragmented.

Optimizer 2: PredicatePushdownOptimizer

Идея: push WHERE filters как можно ближе к source.

Before:
  ReadFromSemanticModel(orders) -> reads ALL orders
  ReadFromSemanticModel(customers) -> reads ALL customers
  Join
  WhereFilterNode(order_date >= '2025-01-01')   ← filter после join
  Aggregate

After:
  ReadFromSemanticModel(orders, where=[order_date >= '2025-01-01'])   ← filter в source
  ReadFromSemanticModel(customers)
  Join
  Aggregate

SQL impact:

-- Before:
SELECT ...
FROM (
  SELECT * FROM fct_orders
  LEFT JOIN dim_customers ON ...
)
WHERE order_date >= '2025-01-01'

-- After:
SELECT ...
FROM (
  SELECT * FROM fct_orders WHERE order_date >= '2025-01-01'
)
LEFT JOIN dim_customers ON ...

Снижение rows на join. Warehouse может partition prune (BigQuery, Snowflake) при date filters.

Когда не работает:

  • Filter references multiple semantic_models (cross-join filter).
  • Filter expression non-deterministic.
  • Filter применяется к computed dimension (не source column).

Optimizer 3: JoinReorderOptimizer

Идея: переставить joins для better performance.

A LEFT JOIN B LEFT JOIN C

Если |A| = 10M rows, |B| = 100K rows, |C| = 1K rows:

Bad order: A -> B -> C
  A LEFT JOIN B    # 10M × 100K logical (но JOIN reduces)
  result LEFT JOIN C   # final result × 1K

Better order: C -> B -> A
  C LEFT JOIN B    # 1K × 100K
  result LEFT JOIN A   # smaller intermediate joined с 10M

Это classic database optimizer problem. MetricFlow uses heuristics:

1. Smallest table first.
2. Joins through indexes/clustered columns first.
3. Filtered tables первое.

В современных warehouses (Snowflake, BigQuery) cost-based optimizer внутри warehouse делает много этой работы. MetricFlow помогает в edge cases.

Когда не работает:

  • Когда warehouse statistics не actual (out of date).
  • При complex multi-stage queries.

Optimizer 4: AggregateFusionOptimizer

Merge multiple aggregate операции:

Before:
  Read -> AggregateMeasures(SUM(amount))
  Read -> AggregateMeasures(COUNT(*))
  Read -> AggregateMeasures(AVG(amount))
  Join all three

After:
  Read -> AggregateMeasures(SUM, COUNT, AVG в одном проходе)

SQL impact:

-- Before:
WITH revenue AS (SELECT customer_id, SUM(amount) AS r FROM orders GROUP BY 1),
     order_count AS (SELECT customer_id, COUNT(*) AS c FROM orders GROUP BY 1),
     avg_order AS (SELECT customer_id, AVG(amount) AS a FROM orders GROUP BY 1)
SELECT ... JOIN ... JOIN ...

-- After:
SELECT customer_id, 
       SUM(amount) AS r,
       COUNT(*) AS c,
       AVG(amount) AS a
FROM orders GROUP BY 1

Один pass над данными вместо трёх. Huge speedup на big data.


Optimizer 5: SubqueryEliminationOptimizer

Flatten unnecessary CTEs:

Before:
  WITH cte1 AS (SELECT * FROM tbl WHERE x > 0),
       cte2 AS (SELECT * FROM cte1 WHERE y < 10)
  SELECT * FROM cte2

After:
  SELECT * FROM tbl WHERE x > 0 AND y < 10

Snowflake / BigQuery handle CTEs хорошо, но not always optimally. Eliminate где possible.


Optimizer 6: ConstantFoldingOptimizer

Evaluate constant expressions:

Before:
  WHERE order_date >= '2025-01-01' AND TRUE

After:
  WHERE order_date >= '2025-01-01'

Trivial, но важно — некоторые Jinja expressions resolve до constants только во время compilation, не во время user write. Optimizer cleans up.


Где optimizer не работает: known bottlenecks

Optimizer limitations

Как читать DataflowPlan для debugging

Если ваш SL query slow или generates unexpected SQL, debug pipeline:

# 1. Get DataflowPlan visualization
mf query --metrics revenue --group_by customers__country --explain --output-format=text

# Это выведет plan:
# Node id=1: ReadFromSemanticModel(orders)
# Node id=2: ReadFromSemanticModel(customers)
# Node id=3: JoinToBaseSemanticModel(1, 2, on customer_id)
# Node id=4: ...

# 2. Compare optimized vs unoptimized
mf query --metrics revenue --explain --no-optimize   # без optimizer
mf query --metrics revenue --explain                  # с optimizer

# Compare differences. Если no diff — optimizer не работает на ваш case.

# 3. Compiled SQL
mf query --metrics revenue --compile
# Видеть final SQL, который пойдёт в warehouse.

Это standard debugging workflow для SL performance issues.


Performance tuning: actionable steps

Если ваш SL slow:

Step 1: Profile.

# Identify где время идёт
mf query --metrics revenue --time-it

# Output:
# Parse: 0.1s
# Build manifest: 0.5s
# Build DataflowPlan: 8s    ← bottleneck
# Optimize: 2s
# Generate SQL: 0.3s
# Execute warehouse: 4s

8 секунд DataflowPlan building -> architectural problem (semantic graph too big).

Step 2: Audit semantic_models.

# Сколько у вас semantic_models?
ls semantic_models/**/*.yml | wc -l
# Если > 30, начинайте domain partitioning.

# Сколько entities в самой большой?
yq '.semantic_models[].entities | length' semantic_models/*.yml | sort -nr | head -5
# Если > 15 в одной semantic_model — split.

Step 3: Saved queries.

Часто-запрашиваемые combinations -> materialize:

saved_queries:
  - name: top_metrics_dashboard
    query_params:
      metrics: [revenue, customer_count, order_count]
      group_by: [customers__country, metric_time__month]
    exports:
      - name: dashboard_metrics
        config:
          export_as: table

Dbt build материализует. Dashboards query mart, не SL.

Step 4: Denormalize.

Если multi-hop joins проблема:

-- staging/int_orders_enriched.sql
SELECT 
    o.*,
    c.country,
    c.region,
    c.tier
FROM {{ ref('fct_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id

Semantic_model points to enriched model. Joins happen once при build, не при каждом query.

Step 5: Upgrade MetricFlow.

pip install --upgrade dbt-metricflow

Optimizer improvements каждый release.


Проверка знанийKnowledge check
Команда работает с SL и видит, что compiled SQL для `revenue by month` содержит CROSS JOIN с UNNEST для time spine, что значительно замедляет query. Что это и как fix?
ОтветAnswer
Это **time spine handling** в MetricFlow — мощный, но потенциально expensive механизм. Понимание — критично для SL performance. **Что такое time spine:** MetricFlow генерирует metric series через **time spine** — таблицу всех временных значений в требуемой granularity (например, все дни в году, все месяцы и т.д.). Это нужно для: 1. **Cumulative metrics** — running totals требуют все периоды, чтобы добавлять. 2. **Fill gaps** — если данные отсутствуют за период, спайн вставляет NULL/0. 3. **Comparison metrics** — растущая base. **Generated SQL (типичный):** ```sql WITH time_spine AS ( SELECT date_day FROM ( SELECT MIN(metric_time) AS min_date, MAX(metric_time) AS max_date FROM ... ) CROSS JOIN UNNEST(GENERATE_DATE_ARRAY(min_date, max_date, INTERVAL '1 day')) ), metric_values AS ( SELECT metric_time__month, SUM(amount) AS revenue FROM ... GROUP BY 1 ) SELECT ts.date_day AS metric_time, COALESCE(mv.revenue, 0) AS revenue FROM time_spine ts LEFT JOIN metric_values mv ON DATE_TRUNC('month', ts.date_day) = mv.metric_time__month ``` Cross join с UNNEST для generation date series. На больших time ranges (несколько лет, daily granularity) это значительный overhead. **Почему может быть slow:** 1. **GENERATE_DATE_ARRAY** на large ranges (5 лет daily = 1825 rows) sometimes plan poorly в warehouse. 2. **CROSS JOIN UNNEST** — некоторые adapters delegate к expensive path. 3. **JOIN на time computed value** (DATE_TRUNC) — не использует partition pruning. 4. **Если query не нужен time spine** (всегда есть data) — pure overhead. **Solution 1: Pre-built time spine table.** MetricFlow позволяет explicit time spine таблицу: ```yaml # semantic_models/_models.yml models: - name: time_spine_daily config: materialized: table semantic_model: defaults: agg_time_dimension: date_day time_spine: standard_granularity_column: date_day dimensions: - name: date_day type: time type_params: time_granularity: day ``` И SQL для time_spine_daily: ```sql -- models/time_spine_daily.sql SELECT date_day FROM ( SELECT DATE '2020-01-01' + INTERVAL n DAY AS date_day FROM UNNEST(GENERATE_ARRAY(0, 365*10)) AS n ) ``` Материализованная таблица. MetricFlow видит её и использует JOIN вместо генерации в каждом query. Дополнительная оптимизация: cluster таблицу by `date_day` (Snowflake) или partition by `DATE_TRUNC(date_day, MONTH)` (BigQuery). JOIN использует partition pruning. **Solution 2: Granularity matching.** Если query is monthly aggregation, MetricFlow не должен генерировать daily time spine с JOIN to daily metric. Setup: ```yaml semantic_models: - name: orders defaults: agg_time_dimension: order_date dimensions: - name: order_date type: time type_params: time_granularity: day time_granularity_levels: [day, week, month, quarter, year] ``` И соответствующая time_spine с multiple levels: ```yaml time_spine: standard_granularity_column: date_day custom_granularities: - name: month column: month_start ``` Данные в time spine уже aggregated по month. JOIN — простой по pre-computed month column. **Solution 3: Disable time spine когда не нужен.** Для простых queries без cumulative metrics и без gap-filling — time spine overhead. Можно явно отключить: Пока (на 2026) это не fully supported, но planned для future MetricFlow releases. На данный момент workaround — материализовать metric через saved_query без time dimension в group_by: ```yaml saved_queries: - name: revenue_by_month query_params: metrics: [revenue] group_by: [metric_time__month] exports: - name: monthly_revenue_export config: export_as: table # материализуется в таблицу ``` Query эту таблицу, не SL. Никакого time spine overhead. **Solution 4: Optimize generation.** Для одиночных queries без changing time spine — MetricFlow позволяет caching time spine results. Каждый mf query reuse cached spine. ```bash # Configuration export METRICFLOW_TIME_SPINE_CACHE_TTL=3600 # 1 час mf query --metrics revenue ``` Not all setups support, но если работает — sniff. **Debugging:** ```bash mf query --metrics revenue --explain --time-it ``` Искать в plan: ``` Node id=N: GenerateTimeSpineNode Node id=N+1: JoinTimeSpineToMetricNode ``` Если видите эти узлы — time spine в плане. Compare runtime с и без custom time_spine. Если drama difference — solve через pre-built table. **Главный урок:** MetricFlow optimizer хорош, но не покрывает все scenarios. **time spine** — мощный feature, но генерирует expensive SQL. Senior должен знать когда it's overhead, и как fix через explicit time_spine table, materialized saved_queries, или granularity matching. Performance SL требует understanding **что MetricFlow генерирует** и **почему**.

Проверка знанийKnowledge check
Senior изучает source code metricflow/plan_conversion/optimize/. Видит файл predicate_pushdown_optimizer.py. Какой алгоритм оптимизации этот файл имплементирует и какие edge cases он не покрывает?
ОтветAnswer
PredicatePushdownOptimizer — это **classic database optimization**, адаптированный под MetricFlow. Реализация в metricflow/plan_conversion/optimize/predicate_pushdown_optimizer.py. **Базовый алгоритм:** 1. **Walk DataflowPlan от sink назад к sources.** 2. **Для каждого WhereFilterNode определить:** - Какие columns используются в фильтре. - К какому source(ам) принадлежат эти columns. - Можно ли push filter к ReadFromSemanticModel напрямую. 3. **Если все columns в одном source** -> push filter к ReadNode. 4. **Если columns в нескольких sources** -> keep filter после Join, не push. 5. **Если есть aggregations между filter и source** -> cannot push (filter referencing aggregate values). **Pseudo-code:** ```python class PredicatePushdownOptimizer: def optimize(self, plan: DataflowPlan) -> DataflowPlan: for filter_node in plan.find_nodes(WhereFilterNode): columns_used = analyze_columns(filter_node.predicate) sources = trace_sources(columns_used, plan) if len(sources) == 1 and not has_aggregation_between(filter_node, sources[0]): # Push filter к ReadFromSemanticModel read_node = sources[0] new_read_node = read_node.with_filter(filter_node.predicate) plan = plan.replace_node(read_node, new_read_node) plan = plan.remove_node(filter_node) return plan ``` **Что pushdown даёт:** Before: ```sql SELECT ... FROM ( SELECT * FROM fct_orders LEFT JOIN dim_customers ON ... ) WHERE order_date не меньше '2025-01-01' -- filter после join ``` After pushdown: ```sql SELECT ... FROM ( SELECT * FROM fct_orders WHERE order_date не меньше '2025-01-01' -- filter в source ) LEFT JOIN dim_customers ON ... ``` Warehouse impact: - **Partition pruning**: на BigQuery/Snowflake с partitioned tables, filter на partition column в source -> warehouse скипает partitions. Огромный speedup. - **Smaller join input**: less rows для join. Less compute. - **Smaller intermediate result**: less memory. **Edge cases которые НЕ обрабатываются (или плохо):** **1. Filter referencing computed column.** ```yaml dimensions: - name: order_year expr: EXTRACT(YEAR FROM order_date) ``` Query: `WHERE order_year = 2025`. Problem: `order_year` is computed expression, не raw column в fct_orders. Optimizer cannot simply rewrite `WHERE order_year = 2025` в source как `WHERE EXTRACT(YEAR FROM order_date) = 2025` — потому что: - Warehouse не может partition prune на EXTRACT result. - Computed expression не indexed. Result: optimizer keeps filter после source read. Limit performance gain. **Решение для user:** добавить raw filter: ```yaml # В Query where: ["order_date не меньше '2025-01-01' AND order_date < '2026-01-01'"] # Вместо: where: ["order_year = 2025"] ``` Optimizer теперь может push raw date filter. **2. Multi-source filter.** ```yaml where: ["o.amount > 100 AND c.country = 'USA'"] ``` Filter touches both `orders.amount` и `customers.country`. Optimizer не может push к одному source (нужны оба). Filter keeps после join. Workaround: разделить filter на per-source parts: ```yaml where: - "orders.amount > 100" # pushable to orders - "customers.country = 'USA'" # pushable to customers ``` Optimizer обрабатывает каждый отдельно, оба push успешно. **3. Filter с OR conditions.** ```sql WHERE order_date не меньше '2025-01-01' OR customer_segment = 'VIP' ``` OR conditions нельзя cleanly push, потому что каждая половина OR может относиться к разному source. Standard optimizers не пробуют partial pushdown с OR. Workaround: переписать в UNION ALL: ```sql SELECT ... FROM ... WHERE order_date не меньше '2025-01-01' UNION ALL SELECT ... FROM ... WHERE order_date < '2025-01-01' AND customer_segment = 'VIP' ``` Manual rewrite, не Optimizer automatic. **4. Filter on aggregate result.** Classic: ```sql SELECT customer_id, SUM(amount) AS total FROM orders GROUP BY customer_id HAVING SUM(amount) > 1000 -- filter on aggregate ``` HAVING нельзя push к source — фильтр на post-aggregation. Optimizer correctly keeps это после aggregation. NOT a bug. **5. Subquery в WHERE.** ```sql WHERE customer_id IN (SELECT id FROM dim_customers WHERE country = 'USA') ``` Subquery в WHERE — optimizer iterates через subquery, но не unification с outer plan. Может или может не optimize correctly. Depends на implementation. **6. Adapter-specific predicate support.** Not all warehouses support все predicate types. Postgres has different syntax for date filtering than Snowflake. Optimizer needs adapter-specific code paths. **Что senior должен делать:** 1. **Знать ограничения optimizer.** Не writing queries которые нельзя optimize (multi-source filters, OR conditions, computed column filters). 2. **Write filters explicitly для pushdown.** Use raw columns (`order_date не меньше ...`) вместо computed (`order_year = 2025`). 3. **Debug через --explain.** Видеть optimized plan, проверять что filter действительно pushed к source. 4. **Profile compiled SQL.** Final SQL show если push happened. `WHERE` в outer query — push failed. `WHERE` в source CTE — push succeeded. 5. **Contributing fixes.** Если найдёте edge case — open issue или PR. MetricFlow Apache 2.0. **Главный урок:** **classic database optimizations** работают в MetricFlow, но **edge cases важны**. Senior должен понимать какие patterns optimizer-friendly, какие нет. Это знание daily improves SL queries performance.

Итого

  1. DataflowPlan — DAG операций, ~8 типов узлов. Source, Join, Filter, Aggregate, GroupBy, Compute, Cumulative, Write.
  2. Optimizer pipeline — multi-pass. SourceScan, PredicatePushdown, JoinReorder, AggregateFusion, SubqueryElimination, ConstantFolding.
  3. SourceScanOptimizer — column pruning, source pruning. Огромный impact на BigQuery/Snowflake (less scanned bytes).
  4. PredicatePushdown — push filters к source. Partition pruning unlocked.
  5. Limitations на 100+ метрик (quadratic complexity), multi-hop joins, cumulative on large data, custom Jinja expressions.
  6. Debugging tools: mf query --explain, --no-optimize, --compile, --time-it.
  7. Performance tuning steps: profile, audit semantic_models, saved_queries, denormalize, upgrade MetricFlow.
  8. Source code в github.com/dbt-labs/metricflow/metricflow/plan_conversion/optimize/. Apache 2.0, можно contribute.

Следующий урок — v2 YAML spec (1.12) и миграция с v1.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие типы узлов составляют DataflowPlan?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4