OpenLineage-aware patterns — standard operators, custom extractors, clean lineage
OpenLineage (модуль 14) — индустриальный стандарт для data lineage. С 2024 года это де-факто решение для tracking «откуда → куда» данные в pipelines. Apache Airflow provider apache-airflow-providers-openlineage (2.6+) автоматически emits OL events для standard operators — но только если ваши DAGs OpenLineage-aware.
Этот урок — конкретные patterns для maximizing automatic lineage emission: использовать standard operators везде где возможно, писать custom extractors для PythonOperator, design tasks так чтобы lineage был accurate и actionable.
Recap: что генерирует OL events
apache-airflow-providers-openlineage (часть default install с 2.7+) автоматически emits события для:
| Operator type | Lineage emission |
|---|---|
| SQL operators (PostgresOperator, SQLOperator, BigQueryOperator) | Auto — parses SQL для source/target tables |
| S3 operators (S3CopyObjectOperator, S3FilesToS3Operator) | Auto — bucket/key as datasets |
| Spark operators (SparkSubmitOperator) | Auto — через Spark OL agent в Spark job |
| dbt operators (DbtRunOperator) | Auto — dbt has native OL support |
| PythonOperator / @task | Manual — нужен extractor или explicit emission |
| KubernetesPodOperator | Partial — basic info, нужны hints для actual datasets |
| BashOperator | None — bash command opaque для Airflow |
Strategy: use standard operators where possible, add extractors where not.
dbt Descriptions: документирование моделей и колонокPattern 1: Prefer standard operators over @task
# ❌ Anti-pattern — PythonOperator для SQL work, lineage не captured
@task
def transform_orders():
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="warehouse")
hook.run("INSERT INTO dim.orders SELECT * FROM staging.orders;")
OL provider видит transform_orders как opaque Python task — нет lineage между staging.orders и dim.orders.
# ✅ Better — SQLExecuteQueryOperator emits lineage автоматически
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
transform_orders = SQLExecuteQueryOperator(
task_id="transform_orders"
conn_id="warehouse"
sql="""
INSERT INTO dim.orders
SELECT
id, customer_id, amount, created_at
FROM staging.orders
WHERE created_at >= '{{ data_interval_start }}'
AND created_at < '{{ data_interval_end }}';
""",
)
OL provider parses SQL через sqlparse, identifies:
- Inputs:
staging.orders(source table) - Outputs:
dim.orders(target table) - Generated Marquez graph:
staging.orders → transform_orders → dim.orders
When PythonOperator is unavoidable
Real-world: business logic в Python (data transformations с pandas, ML predictions, custom validation). For these — нужны custom extractors.
Pattern 2: Custom extractors
Extractor — это класс, который knows как extract lineage info из specific operator’s params. OL provider scans for extractors при event emission.
# plugins/lineage/extractors.py
from openlineage.airflow.extractors.base import BaseExtractor, TaskMetadata
from openlineage.client.facet import (
SchemaDatasetFacet, SchemaField,
SqlJobFacet,
)
from openlineage.client.run import Dataset
class CustomEtlOperatorExtractor(BaseExtractor):
"""Extractor для нашего custom CustomEtlOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["CustomEtlOperator"]
def extract(self) -> TaskMetadata:
# self.operator — instance нашего operator
op = self.operator
inputs = [
Dataset(
namespace=f"postgres://{op.source_conn_host}:{op.source_conn_port}"
name=f"{op.source_database}.{op.source_table}"
facets={
"schema": SchemaDatasetFacet(fields=[
SchemaField(name="id", type="bigint"),
SchemaField(name="amount", type="numeric"),
])
}
)
]
outputs = [
Dataset(
namespace=f"snowflake://{op.target_account}.snowflakecomputing.com"
name=f"{op.target_database}.{op.target_schema}.{op.target_table}",
)
]
return TaskMetadata(
name=f"{op.task_id}"
inputs=inputs,
outputs=outputs,
run_facets={},
job_facets={
"sql": SqlJobFacet(query=op.transform_sql) if op.transform_sql else None,
},
)
Register extractor через entry point в setup.py или env var:
# setup.py
setup(
...
entry_points={
"openlineage.airflow.extractors": [
"custom_etl = plugins.lineage.extractors:CustomEtlOperatorExtractor",
]
}
)
Или env var:
OPENLINEAGE_EXTRACTORS=plugins.lineage.extractors.CustomEtlOperatorExtractor
Pattern 3: Manual lineage emission в @task
Для одноразовой custom logic можно emit lineage manually через OpenLineageAdapter:
from airflow.decorators import task
from openlineage.airflow.adapter import OpenLineageAdapter
from openlineage.client.run import Dataset
@task
def custom_transform(execution_date: str):
# ... your business logic ...
rows_processed = some_pandas_work()
# Manually emit lineage
from airflow.operators.python import get_current_context
ctx = get_current_context()
adapter = OpenLineageAdapter()
adapter.emit_dataset_event(
run_id=ctx["run_id"],
job_name=f"{ctx['dag'].dag_id}.{ctx['task'].task_id}"
inputs=[
Dataset(namespace="s3://lake", name="raw/orders/2026-05-12.parquet"),
],
outputs=[
Dataset(namespace="s3://lake", name="processed/orders/2026-05-12.parquet"),
],
)
return rows_processed
Это менее clean чем extractor, но workable для one-off cases.
Pattern 4: outlets and inlets для explicit lineage
Airflow 2.4+ позволяет declarative lineage через inlets и outlets:
from airflow.lineage.entities import Table
@task(
inlets=[Table(database="warehouse", schema="staging", name="orders")],
outlets=[Table(database="warehouse", schema="prod", name="orders")],
)
def transform_orders():
# ... custom logic ...
pass
OL provider читает inlets/outlets и emits lineage events. Не требует extractor — declarative.
Production tip: inlets/outlets — best для cases где extractor too complex. Один-два line code instead 100-line extractor class.
Pattern 5: Datasets как explicit lineage
С Airflow 2.4+ Datasets (модуль 08) сами по себе создают lineage:
from airflow import Dataset
orders_dataset = Dataset("s3://lake/orders/")
processed_dataset = Dataset("s3://lake/processed/orders/")
@dag(...)
def producer():
@task(outlets=[orders_dataset])
def fetch_orders():
...
fetch_orders()
@dag(schedule=[orders_dataset], ...)
def consumer():
@task(outlets=[processed_dataset])
def process():
...
process()
OL provider видит Dataset references в outlets — emits dataset events. Plus dataset-driven scheduling — Marquez может visualize DAG triggers как lineage links.
В 3.x Datasets → Assets, но concept identical.
Pattern 6: Clean SQL для accurate parsing
OL provider парсит SQL через sqlparse. Чистый SQL → accurate lineage. Messy SQL → missed tables.
-- ❌ Hard для parser — dynamic SQL
EXECUTE (
SELECT CASE WHEN month = 'jan' THEN 'INSERT INTO jan_data ...'
ELSE 'INSERT INTO feb_data ...'
END
);
-- ✅ Easy для parser — explicit
INSERT INTO jan_data
SELECT * FROM source.events WHERE month = 'jan';
Avoid:
- Dynamic SQL via
EXECUTE - Stored procedures (parser doesn’t see internal queries)
- CTE chains 5+ levels deep (parser may struggle)
Prefer:
- Plain INSERT/SELECT/MERGE
- CTEs <= 3 levels
- Explicit table names (avoid
*joins без aliases)
Pattern 7: Design DAGs для clean lineage
DAG structure affects lineage clarity. Compare:
# ❌ Tangled — hard в Marquez visualize
@task
def do_everything():
"""Reads from 5 sources, writes to 3 targets — opaque."""
df1 = read_orders()
df2 = read_customers()
df3 = read_products()
df4 = read_returns()
df5 = read_promotions()
result = complex_transform(df1, df2, df3, df4, df5)
write_dim_orders(result)
write_fact_revenue(result)
write_mart_analytics(result)
Single task → single OL run event → 5 inputs + 3 outputs в одном blob. Marquez shows один node со множеством arrows.
# ✅ Clean — каждый transformation explicit
@task(inlets=[Table("orders")], outlets=[Table("staging_orders")])
def stage_orders(): ...
@task(inlets=[Table("customers")], outlets=[Table("staging_customers")])
def stage_customers(): ...
@task(inlets=[Table("staging_orders"), Table("staging_customers")],
outlets=[Table("dim_orders")])
def build_dim_orders(): ...
# 3 separate tasks → 3 OL events → clean graph в Marquez
В Marquez это shows как:
orders → stage_orders → staging_orders
↓
build_dim_orders → dim_orders
customers → stage_customers → staging_customers ↑
Easier для:
- Impact analysis (“what depends on dim_orders?”)
- Root cause analysis (“orders broken — where did it go wrong?”)
- Compliance reporting (“show me lineage of PII data”)
Production gotchas
Disable OpenLineage для sensitive DAGs. PII data lineage может leak metadata. Use [openlineage] disabled_for_operators = my.sensitive.Operator или DAG-level:
@dag(..., params={"openlineage_disabled": True})
Marquez storage growth. OL events накапливаются — Marquez DB растёт. Set retention via MARQUEZ_RETENTION_DAYS=180.
Extractor errors silently swallowed. Если extractor throws — OL provider logs warning, continues without event. Always log в extractor для debug:
def extract(self):
try:
return self._do_extract()
except Exception as e:
self.log.error(f"Extraction failed: {e}", exc_info=True)
raise
Custom XCom backend и lineage. Если используете custom XCom backend (модуль 06, S3 storage), pre-S3 OL events не show XCom values. Может add facet с XCom storage location.
@task(outlets=...) overrides extractor. Если у operator есть extractor AND outlets manually set — outlets выигрывают. Choose one approach.
Operator inheriting from SQL base — auto lineage works. Если custom operator subclasses BaseSQLOperator — OL provider auto-handles. Не нужен extractor.
Spark OL agent отдельный. Для SparkSubmitOperator — install OL listener в Spark itself:
SparkSubmitOperator(
...
packages="io.openlineage:openlineage-spark:1.20.0"
conf={
"spark.extraListeners": "io.openlineage.spark.agent.OpenLineageSparkListener",
"spark.openlineage.transport.type": "http",
"spark.openlineage.transport.url": "http://marquez:5000",
},
)
Spark job emits OL events directly, не через Airflow provider.
Real-world lineage example
Marquez визуализация end-to-end orders pipeline:
postgres:orders (source)
↓
extract_orders
↓
s3:datalake/raw/orders/dt=2026-05-12/
↓
transform_orders (Spark)
↓
s3:datalake/processed/orders/dt=2026-05-12/
↓
load_to_snowflake
↓
snowflake:warehouse.dim.orders
↓
build_revenue_mart
↓
snowflake:warehouse.mart.daily_revenue
↓
[Looker dashboard "Sales Overview"]
Каждое node — task с inputs/outputs. Click на dim.orders → видишь все downstream consumers, upstream sources. Click на task → видишь SQL/code что executed.
Это powers:
- Impact analysis: “deprecate column X в orders → affected dashboards”
- Compliance: “GDPR-delete user → trace where data flows”
- Cost optimization: “find tables читаемые только одним DAG — может delete”
- Onboarding: новый engineer открывает Marquez и видит data flow visual