OpenLineage deep dive — automatic data lineage из коробки
OpenLineage — это главная фишка Airflow observability в 2024-2026 годах. Это open standard для emission data lineage events, который Airflow поддерживает через provider package (apache-airflow-providers-openlineage, стабилизирован с 2.6+).
Что делает OpenLineage уникальным: он эмитит lineage автоматически для standard operators без единой строки lineage code. PostgresOperator парсит ваш SQL, извлекает source tables, target tables, column-level dependencies — и шлёт это в backend (Marquez, DataHub, Atlan). Compliance/governance команды получают полный data flow граф вашей платформы бесплатно.
Этот урок — детальный разбор: как это работает изнутри, как ставить, как читать events, и какие production gotchas.
Что такое OpenLineage и зачем он нужен
Data lineage = граф зависимостей данных. Какая таблица откуда appearance, какой ETL её produces, какие dashboards её consume.
Старая boль (без lineage):
- «Этот KPI на dashboard неправильный — почему?»
- «Какая колонка в Snowflake читает из orders.csv?»
- «GDPR — где personal data во всех системах?»
- «Я хочу remove deprecated source, кто его всё ещё читает?»
Без lineage ответы — недели manual investigation. С lineage — query в граф.
OpenLineage — это стандарт для emit lineage events. Любая система, поддерживающая OL (Airflow, dbt, Spark, Flink, Trino) шлёт events в общий format → backend строит граф.
Отслеживание lineage данных в Spark Exposures в dbt: декларация downstream-консьюмеровУстановка provider
pip install 'apache-airflow-providers-openlineage>=1.7'
Конфигурация в airflow.cfg:
[openlineage]
# Куда отправлять events
transport = {"type": "http", "url": "http://marquez:5000", "endpoint": "/api/v1/lineage"}
# Namespace — group identifier для всех jobs из этого Airflow
namespace = airflow.prod-eu
# Disabled providers (если нужно)
disabled_for_operators = airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator
# Custom extractors (точка-разделённый Python path)
# extractors = my_company.airflow.MyCustomExtractor
Альтернатива — env vars (preferred для secrets):
OPENLINEAGE_URL=http://marquez:5000
OPENLINEAGE_ENDPOINT=/api/v1/lineage
OPENLINEAGE_API_KEY=...
AIRFLOW__OPENLINEAGE__NAMESPACE=airflow.prod-eu
После рестарта Airflow начинает emit events. Никаких изменений в DAG коде.
OpenLineage event format
OL event — это JSON по спецификации (openlineage.io/spec/). Полный пример для PostgresOperator:
{
"eventType": "COMPLETE",
"eventTime": "2026-05-12T10:23:45.123Z",
"producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.0/integration/airflow",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
"run": {
"runId": "a3f1b2c4-...",
"facets": {
"parent": {
"run": { "runId": "dagrun-uuid" },
"job": { "namespace": "airflow.prod-eu", "name": "etl_orders" }
},
"nominalTime": {
"nominalStartTime": "2026-05-12T00:00:00Z",
"nominalEndTime": "2026-05-13T00:00:00Z"
}
}
},
"job": {
"namespace": "airflow.prod-eu",
"name": "etl_orders.transform_orders",
"facets": {
"sql": {
"query": "INSERT INTO analytics.orders_daily SELECT ... FROM raw.orders WHERE date = '2026-05-12'"
},
"ownership": {
"owners": [{"name": "data-platform-team", "type": "TEAM"}]
}
}
},
"inputs": [
{
"namespace": "postgres://warehouse.example.com:5432",
"name": "raw.orders",
"facets": {
"schema": {
"fields": [
{"name": "order_id", "type": "BIGINT"},
{"name": "user_id", "type": "BIGINT"},
{"name": "amount", "type": "NUMERIC"},
{"name": "created_at", "type": "TIMESTAMP"}
]
}
}
}
],
"outputs": [
{
"namespace": "postgres://warehouse.example.com:5432",
"name": "analytics.orders_daily",
"facets": {
"schema": {
"fields": [
{"name": "date", "type": "DATE"},
{"name": "total_revenue", "type": "NUMERIC"},
{"name": "order_count", "type": "BIGINT"}
]
},
"columnLineage": {
"fields": {
"total_revenue": {
"inputFields": [
{"namespace": "postgres://...", "name": "raw.orders", "field": "amount"}
],
"transformationType": "AGGREGATION",
"transformationDescription": "SUM(amount)"
}
}
}
}
}
]
}
Ключевые поля
eventType—START(когда task начался) илиCOMPLETE/FAIL(когда закончился). На один task instance — два events.run.runId— UUID этого выполнения.run.facets.parent— связь с parent DagRun (нужно для grouping в backend).job.namespace+job.name— уникальный identifier task. Namespace из config, name из dag_id.task_id.inputs[]/outputs[]— datasets. Каждый имеет namespace (typically connection URL) + name (typically schema.table).facets— расширения. Schema, columnLineage, sql, ownership, dataQuality — стандартные facets.
Какие operators поддерживаются automatic
Provider 2.6+ имеет built-in extractors для:
| Operator | Что извлекается |
|---|---|
| PostgresOperator / MySQLOperator / SQLExecuteQueryOperator | SQL парсинг → source/target tables + column lineage |
| SnowflakeOperator | Аналогично, плюс Snowflake-specific facets (warehouse, role) |
| BigQueryOperator | BigQuery jobs API → tables/queries/columns |
| PostgresToGCS / S3ToPostgres / прочие Transfer | Source/target datasets |
| S3CopyObjectOperator | Bucket/key как datasets |
| GCSToBigQueryOperator | GCS file → BigQuery table |
| SparkSubmitOperator | Spark application metadata (через runtime hooks) |
| DbtCloudRunJobOperator | dbt-job → models/sources |
| PythonOperator | Только run start/complete без datasets (нужен custom extractor) |
| BashOperator | Только run start/complete (нужен custom extractor) |
Для PythonOperator / BashOperator и custom operators — нужен custom extractor (это урок 04).
SQL parsing — внутренности
Самая впечатляющая возможность OL Airflow — column-level lineage из SQL. Как это работает:
- SQL extracted из operator’s
sqlargument после Jinja templating. openlineage-sqlcrate (Rust под капотом) парсит SQL.- AST analyzed: identifies SELECT clauses, JOIN, GROUP BY, INSERT/UPDATE targets.
- Column dependencies computed:
total_revenuederived fromamountviaSUM. - Schema fetched from operator’s hook (
PostgresHook.get_records).
Supported SQL dialects: PostgreSQL, MySQL, Snowflake, BigQuery, Redshift, Spark SQL, generic ANSI.
Ограничения SQL parser
- Complex CTEs иногда теряются (deep nesting).
- Dynamic SQL (построенный из Python strings) не парсится — только literal SQL.
- Stored procedures не tracked.
- Triggers на DB не известны OL.
Для сложных кейсов — augment автоматический extract через custom extractor (урок 04).
Marquez backend setup
Marquez — open-source reference backend для OpenLineage. Простой setup для proof-of-concept:
# docker-compose.lineage.yaml
version: '3'
services:
marquez-db:
image: postgres:15
environment:
POSTGRES_DB: marquez
POSTGRES_USER: marquez
POSTGRES_PASSWORD: marquez
volumes:
- marquez-db-data:/var/lib/postgresql/data
marquez:
image: marquezproject/marquez:0.47.0
ports:
- 5000:5000 # API
- 5001:5001 # Admin
environment:
MARQUEZ_DB_HOST: marquez-db
MARQUEZ_DB_USER: marquez
MARQUEZ_DB_PASSWORD: marquez
depends_on:
- marquez-db
marquez-web:
image: marquezproject/marquez-web:0.47.0
ports:
- 3000:3000
environment:
MARQUEZ_HOST: marquez
MARQUEZ_PORT: 5000
volumes:
marquez-db-data:
После docker compose up -d Marquez UI на http://localhost:3000.
Endpoint structure
# Receive OL event
POST http://marquez:5000/api/v1/lineage
# Query lineage граф (drill-down от dataset)
GET http://marquez:5000/api/v1/lineage?nodeId=dataset:postgres:warehouse:raw.orders&depth=3
# List jobs
GET http://marquez:5000/api/v1/namespaces/{ns}/jobs
Production scale-up
В production Marquez нужен:
- Postgres backend с regular VACUUM (lineage events накапливаются)
- Несколько replicas с LB (Marquez stateless)
- Retention policy — archive events старше N месяцев
Альтернативы Marquez для production:
| Backend | Цена | Когда выбрать |
|---|---|---|
| Marquez | Free, OSS | Self-hosted, нужен полный контроль |
| DataHub | Free, OSS | + metadata management, ER diagrams, ML |
| Atlan | Commercial SaaS | Enterprise, no-ops setup |
| OpenMetadata | Free + Cloud | Modern UI, data discovery |
| AWS SageMaker Lineage | AWS-native | Если уже на AWS |
| Microsoft Purview | Azure-native | Azure-heavy stacks |
Namespaces — как организовать
Namespace = logical grouping. Best practices:
# Один Airflow cluster
namespace = airflow.prod-eu
# Несколько Airflow clusters в одной организации
namespace = airflow.prod-eu
namespace = airflow.prod-us
namespace = airflow.staging
Datasets также имеют namespace — обычно URL connection:
postgres://warehouse.example.com:5432 → raw.orders
s3://data-lake-prod → /orders/2026-05-12/*.parquet
bigquery://my-project → analytics.orders_daily
snowflake://xy12345.us-east-1 → analytics.orders_daily
Это критично для cross-platform lineage. Если dbt также emit OL events с тем же namespace для Snowflake — backend объединит graphs. Это даёт unified lineage: Airflow → dbt → Snowflake → BI.
Listener API integration
OpenLineage provider использует Airflow Listener API (hookimpl, см. модуль 12):
# В провайдере (упрощённо):
class OpenLineageListener:
@hookimpl
def on_task_instance_running(self, previous_state, task_instance, session):
extractor = self.extractor_manager.get_extractor(task_instance.task)
metadata = extractor.extract()
event = self.build_run_event("START", task_instance, metadata)
self.transport.emit(event)
@hookimpl
def on_task_instance_success(self, previous_state, task_instance, session):
extractor = self.extractor_manager.get_extractor(task_instance.task)
metadata = extractor.extract_on_complete() # после exec, имеет result info
event = self.build_run_event("COMPLETE", task_instance, metadata)
self.transport.emit(event)
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, session):
...
Это значит:
- Один Listener на весь Airflow — события из всех DAGs идут через него.
- Plugin registration — провайдер автоматически регистрирует Listener при installation.
- Не нужно изменять DAGs — listeners работают глобально.
Также OL может работать через on_dag_run_running/success/failed hooks для DAG-level lineage (не только task-level).
Performance impact
Каждый task emit-ит ~2 events (START + COMPLETE). Размер event ~5-50 KB (с schemas). Transport — HTTP POST.
На production масштабе:
- 100k TI/day = 200k events/day = ~2.3 events/sec average
- Peak load (синхронный fan-out) = ~50-100 events/sec
- Bandwidth = ~5-50 MB/min
Это lightweight, но не free. Особенности:
- Если Marquez down — provider накапливает events в memory (с buffer) + retry. После N retries — drop events. Возможна потеря lineage.
- Sync send блокирует task на ~100-500ms — добавляется к task duration.
- Async transport (через
[openlineage] transport = {"type": "http", "async": true}) не блокирует, но требует careful error handling.
Для production — обязательно мониторить:
openlineage.events.emitted— отправленоopenlineage.events.failed— failed (alert если spike)- Latency endpoint Marquez через OTel metrics
Production gotchas
1. Sensitive data в SQL
OL emit-ит SQL запросы как facets. Если ваш SQL содержит hardcoded secrets — SELECT password FROM users WHERE token = 'sk-...' — это утечёт в Marquez logs.
Fix: sanitize SQL extractor:
[openlineage]
disabled_for_operators = airflow.providers.snowflake.operators.SnowflakeOperator
# Или через extractor override с sanitization
2. PII в schemas
Schema facet содержит column names. Если column называется ssn, credit_card_number — это в lineage events. Под GDPR это может быть unwanted.
Fix: column-level filtering в OL Collector (если такой есть) или disable schema facet в провайдере.
3. Marquez DB размер
Events накапливаются. На 100k events/day Marquez DB растёт ~1-10 GB/month. Без cleanup — disk full.
Fix: retention policy:
-- Daily cleanup
DELETE FROM lineage_events WHERE created_at < now() - interval '90 days';
DELETE FROM job_versions WHERE created_at < now() - interval '90 days';
4. Naming conflicts
Если два Airflow clusters используют один Marquez с одним namespace — job names будут конфликтовать. Использовать unique namespace per cluster.
5. PythonOperator emit-ит lineage только если есть custom extractor
Common misconception: «OL автоматически работает для всех operators». Нет. PythonOperator emit только run START/COMPLETE без datasets. Для extracted lineage из PythonOperator — custom extractor (урок 04) или используйте inlets/outlets в task definition (manual lineage):
@task(
inlets=[Dataset("s3://raw/orders/")],
outlets=[Dataset("s3://processed/orders/")]
)
def transform_orders():
...
inlets/outlets emit-ятся как OL inputs/outputs autoматически.
6. dbt + Airflow
dbt также emit OL events (через dbt-ol wrapper). Если ваш DAG запускает dbt через BashOperator(bash_command="dbt-ol run") — dbt emit events напрямую, не через Airflow extractor. Это создаёт полный pipeline lineage: Airflow trigger → dbt models → Snowflake tables, всё связано через namespace.