DataflowPlan deep: nodes, optimizers, performance
В прошлом уроке мы прошли весь pipeline MetricFlow от YAML до SQL. Сейчас фокусируемся на самой важной части — DataflowPlan и его оптимизаторах. Это сердце MetricFlow и где лежит большая часть его «магии».
Понимание DataflowPlan важно для:
- Debugging unexpected SQL в compiled queries.
- Performance optimization при scale (100+ метрик, 50+ semantic_models).
- Поверить когда оптимизатор не работает (нашёл bug, плохо написанная metric).
- Contributions в MetricFlow (Apache 2.0).
Dict: hash function, open addressing, perturbation probe
DataflowPlan: что это
DataflowPlan — это DAG операций, которые должны быть выполнены для resolving metric query. Каждый узел — DataflowPlanNode — представляет одну операцию.
@dataclass
class DataflowPlan:
sink_nodes: List[BaseDataflowPlanNode] # финальный output
nodes: List[BaseDataflowPlanNode] # все узлы
edges: List[Tuple[NodeId, NodeId]] # parent -> child
Это directed acyclic graph (DAG). Источники (semantic_models) — root nodes. Output (computed metric) — sink node.
Полный list типов в metricflow/dataflow/nodes.py репозитория. Эти 8 — основные.
Пример: разбор реального plan
Query: revenue by country month.
# Pseudo-code MetricFlow trace
plan = DataflowPlan(
sink_nodes=[write_node],
nodes=[
read_orders,
read_customers,
join_orders_customers,
filter_recent,
aggregate_revenue,
group_by_country_month,
compute_metric,
write_node
],
edges=[
(read_orders, join_orders_customers),
(read_customers, join_orders_customers),
(join_orders_customers, filter_recent),
(filter_recent, aggregate_revenue),
(aggregate_revenue, group_by_country_month),
(group_by_country_month, compute_metric),
(compute_metric, write_node),
]
)
Visualization:
ReadFromSemanticModel(orders) ReadFromSemanticModel(customers)
↓ ↓
└─────────┬───────────────────┘
↓
JoinToBaseSemanticModel(LEFT, on customer_id)
↓
WhereFilterNode(order_date >= '2025-01-01')
↓
AggregateMeasuresNode(SUM(amount) AS revenue)
↓
GroupByNode([country, metric_time__month])
↓
ComputeMetricsNode(revenue)
↓
WriteToResultDataFrame
Это unoptimized plan. Optimizer применяет changes.
Optimizer pipeline
MetricFlow has multi-pass optimizer. Каждый pass — отдельный оптимизатор:
optimizers = [
SourceScanOptimizer, # column pruning, source pruning
PredicatePushdownOptimizer, # filters down к source
JoinReorderOptimizer, # порядок joins для perf
AggregateFusionOptimizer, # merge multiple aggs
SubqueryEliminationOptimizer, # flatten CTEs где возможно
ConstantFoldingOptimizer, # eval constant expressions
]
for optimizer in optimizers:
plan = optimizer.optimize(plan)
Каждый optimizer берёт plan, возвращает new plan (immutable approach). Это functional pipeline.
Optimizer 1: SourceScanOptimizer
Уже coverали в прошлом уроке. Recap:
Before:
ReadFromSemanticModel(orders, columns=[all 50])
ReadFromSemanticModel(customers, columns=[all 20])
After:
ReadFromSemanticModel(orders, columns=[customer_id, amount, order_date])
ReadFromSemanticModel(customers, columns=[customer_id, country])
SQL impact:
-- Before (without optimizer):
SELECT * FROM fct_orders -- 50 columns
-- After:
SELECT customer_id, amount, order_date FROM fct_orders -- 3 columns
Снижение I/O и cost (BigQuery scanned bytes, Snowflake compute time).
Когда не работает:
- Сложные measure expressions, которые reference непредсказуемые columns (через variable substitution в Jinja).
- Adapter dialects, где column pruning fragmented.
Optimizer 2: PredicatePushdownOptimizer
Идея: push WHERE filters как можно ближе к source.
Before:
ReadFromSemanticModel(orders) -> reads ALL orders
ReadFromSemanticModel(customers) -> reads ALL customers
Join
WhereFilterNode(order_date >= '2025-01-01') ← filter после join
Aggregate
After:
ReadFromSemanticModel(orders, where=[order_date >= '2025-01-01']) ← filter в source
ReadFromSemanticModel(customers)
Join
Aggregate
SQL impact:
-- Before:
SELECT ...
FROM (
SELECT * FROM fct_orders
LEFT JOIN dim_customers ON ...
)
WHERE order_date >= '2025-01-01'
-- After:
SELECT ...
FROM (
SELECT * FROM fct_orders WHERE order_date >= '2025-01-01'
)
LEFT JOIN dim_customers ON ...
Снижение rows на join. Warehouse может partition prune (BigQuery, Snowflake) при date filters.
Когда не работает:
- Filter references multiple semantic_models (cross-join filter).
- Filter expression non-deterministic.
- Filter применяется к computed dimension (не source column).
Optimizer 3: JoinReorderOptimizer
Идея: переставить joins для better performance.
A LEFT JOIN B LEFT JOIN C
Если |A| = 10M rows, |B| = 100K rows, |C| = 1K rows:
Bad order: A -> B -> C
A LEFT JOIN B # 10M × 100K logical (но JOIN reduces)
result LEFT JOIN C # final result × 1K
Better order: C -> B -> A
C LEFT JOIN B # 1K × 100K
result LEFT JOIN A # smaller intermediate joined с 10M
Это classic database optimizer problem. MetricFlow uses heuristics:
1. Smallest table first.
2. Joins through indexes/clustered columns first.
3. Filtered tables первое.
В современных warehouses (Snowflake, BigQuery) cost-based optimizer внутри warehouse делает много этой работы. MetricFlow помогает в edge cases.
Когда не работает:
- Когда warehouse statistics не actual (out of date).
- При complex multi-stage queries.
Optimizer 4: AggregateFusionOptimizer
Merge multiple aggregate операции:
Before:
Read -> AggregateMeasures(SUM(amount))
Read -> AggregateMeasures(COUNT(*))
Read -> AggregateMeasures(AVG(amount))
Join all three
After:
Read -> AggregateMeasures(SUM, COUNT, AVG в одном проходе)
SQL impact:
-- Before:
WITH revenue AS (SELECT customer_id, SUM(amount) AS r FROM orders GROUP BY 1),
order_count AS (SELECT customer_id, COUNT(*) AS c FROM orders GROUP BY 1),
avg_order AS (SELECT customer_id, AVG(amount) AS a FROM orders GROUP BY 1)
SELECT ... JOIN ... JOIN ...
-- After:
SELECT customer_id,
SUM(amount) AS r,
COUNT(*) AS c,
AVG(amount) AS a
FROM orders GROUP BY 1
Один pass над данными вместо трёх. Huge speedup на big data.
Optimizer 5: SubqueryEliminationOptimizer
Flatten unnecessary CTEs:
Before:
WITH cte1 AS (SELECT * FROM tbl WHERE x > 0),
cte2 AS (SELECT * FROM cte1 WHERE y < 10)
SELECT * FROM cte2
After:
SELECT * FROM tbl WHERE x > 0 AND y < 10
Snowflake / BigQuery handle CTEs хорошо, но not always optimally. Eliminate где possible.
Optimizer 6: ConstantFoldingOptimizer
Evaluate constant expressions:
Before:
WHERE order_date >= '2025-01-01' AND TRUE
After:
WHERE order_date >= '2025-01-01'
Trivial, но важно — некоторые Jinja expressions resolve до constants только во время compilation, не во время user write. Optimizer cleans up.
Где optimizer не работает: known bottlenecks
Как читать DataflowPlan для debugging
Если ваш SL query slow или generates unexpected SQL, debug pipeline:
# 1. Get DataflowPlan visualization
mf query --metrics revenue --group_by customers__country --explain --output-format=text
# Это выведет plan:
# Node id=1: ReadFromSemanticModel(orders)
# Node id=2: ReadFromSemanticModel(customers)
# Node id=3: JoinToBaseSemanticModel(1, 2, on customer_id)
# Node id=4: ...
# 2. Compare optimized vs unoptimized
mf query --metrics revenue --explain --no-optimize # без optimizer
mf query --metrics revenue --explain # с optimizer
# Compare differences. Если no diff — optimizer не работает на ваш case.
# 3. Compiled SQL
mf query --metrics revenue --compile
# Видеть final SQL, который пойдёт в warehouse.
Это standard debugging workflow для SL performance issues.
Performance tuning: actionable steps
Если ваш SL slow:
Step 1: Profile.
# Identify где время идёт
mf query --metrics revenue --time-it
# Output:
# Parse: 0.1s
# Build manifest: 0.5s
# Build DataflowPlan: 8s ← bottleneck
# Optimize: 2s
# Generate SQL: 0.3s
# Execute warehouse: 4s
8 секунд DataflowPlan building -> architectural problem (semantic graph too big).
Step 2: Audit semantic_models.
# Сколько у вас semantic_models?
ls semantic_models/**/*.yml | wc -l
# Если > 30, начинайте domain partitioning.
# Сколько entities в самой большой?
yq '.semantic_models[].entities | length' semantic_models/*.yml | sort -nr | head -5
# Если > 15 в одной semantic_model — split.
Step 3: Saved queries.
Часто-запрашиваемые combinations -> materialize:
saved_queries:
- name: top_metrics_dashboard
query_params:
metrics: [revenue, customer_count, order_count]
group_by: [customers__country, metric_time__month]
exports:
- name: dashboard_metrics
config:
export_as: table
Dbt build материализует. Dashboards query mart, не SL.
Step 4: Denormalize.
Если multi-hop joins проблема:
-- staging/int_orders_enriched.sql
SELECT
o.*,
c.country,
c.region,
c.tier
FROM {{ ref('fct_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
Semantic_model points to enriched model. Joins happen once при build, не при каждом query.
Step 5: Upgrade MetricFlow.
pip install --upgrade dbt-metricflow
Optimizer improvements каждый release.
Итого
- DataflowPlan — DAG операций, ~8 типов узлов. Source, Join, Filter, Aggregate, GroupBy, Compute, Cumulative, Write.
- Optimizer pipeline — multi-pass. SourceScan, PredicatePushdown, JoinReorder, AggregateFusion, SubqueryElimination, ConstantFolding.
- SourceScanOptimizer — column pruning, source pruning. Огромный impact на BigQuery/Snowflake (less scanned bytes).
- PredicatePushdown — push filters к source. Partition pruning unlocked.
- Limitations на 100+ метрик (quadratic complexity), multi-hop joins, cumulative on large data, custom Jinja expressions.
- Debugging tools:
mf query --explain,--no-optimize,--compile,--time-it. - Performance tuning steps: profile, audit semantic_models, saved_queries, denormalize, upgrade MetricFlow.
- Source code в
github.com/dbt-labs/metricflow/metricflow/plan_conversion/optimize/. Apache 2.0, можно contribute.
Следующий урок — v2 YAML spec (1.12) и миграция с v1.