Learning Platform
Глоссарий Troubleshooting
Урок 15.03 · 34 мин
Продвинутый
OpenLineageData LineageMarquezLineage EventsListener API

OpenLineage deep dive — automatic data lineage из коробки

OpenLineage — это главная фишка Airflow observability в 2024-2026 годах. Это open standard для emission data lineage events, который Airflow поддерживает через provider package (apache-airflow-providers-openlineage, стабилизирован с 2.6+).

Что делает OpenLineage уникальным: он эмитит lineage автоматически для standard operators без единой строки lineage code. PostgresOperator парсит ваш SQL, извлекает source tables, target tables, column-level dependencies — и шлёт это в backend (Marquez, DataHub, Atlan). Compliance/governance команды получают полный data flow граф вашей платформы бесплатно.

Этот урок — детальный разбор: как это работает изнутри, как ставить, как читать events, и какие production gotchas.


Что такое OpenLineage и зачем он нужен

Data lineage = граф зависимостей данных. Какая таблица откуда appearance, какой ETL её produces, какие dashboards её consume.

Старая boль (без lineage):

  • «Этот KPI на dashboard неправильный — почему?»
  • «Какая колонка в Snowflake читает из orders.csv?»
  • «GDPR — где personal data во всех системах?»
  • «Я хочу remove deprecated source, кто его всё ещё читает?»

Без lineage ответы — недели manual investigation. С lineage — query в граф.

OpenLineage — это стандарт для emit lineage events. Любая система, поддерживающая OL (Airflow, dbt, Spark, Flink, Trino) шлёт events в общий format → backend строит граф.

Отслеживание lineage данных в Spark Exposures в dbt: декларация downstream-консьюмеров
OpenLineage event flow в Airflow
PostgresOperator выполняетсяtask = PostgresOperator(task_id='transform', sql='INSERT INTO target SELECT FROM source'). Worker запускает execute().
Listener API hook
OpenLineage ListenerProvider 2.6+ регистрирует Listener (модуль 12). Listener получает on_task_instance_running / success / failed events для всех tasks.
extract metadata
Extractor для PostgresOperatorProvider содержит extractor классы для standard operators. PostgresExtractor использует SQL parser (sqlparse + openlineage-sql) — парсит SQL, извлекает source/target tables, columns, transformations.
OpenLineage Event (JSON)Сформирован OL event: {eventType, eventTime, run, job, inputs[], outputs[]}. Inputs: warehouse.public.source с schema facets. Outputs: warehouse.public.target. SQL transformation в job facet.
HTTP POST
Marquez backendПринимает OL events, строит граф lineage. UI показывает: datasets, jobs, runs, dependencies. PostgreSQL для storage. REST API для query (/api/v1/namespaces/.../lineage).

Установка provider

pip install 'apache-airflow-providers-openlineage>=1.7'

Конфигурация в airflow.cfg:

[openlineage]
# Куда отправлять events
transport = {"type": "http", "url": "http://marquez:5000", "endpoint": "/api/v1/lineage"}

# Namespace — group identifier для всех jobs из этого Airflow
namespace = airflow.prod-eu

# Disabled providers (если нужно)
disabled_for_operators = airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator

# Custom extractors (точка-разделённый Python path)
# extractors = my_company.airflow.MyCustomExtractor

Альтернатива — env vars (preferred для secrets):

OPENLINEAGE_URL=http://marquez:5000
OPENLINEAGE_ENDPOINT=/api/v1/lineage
OPENLINEAGE_API_KEY=...
AIRFLOW__OPENLINEAGE__NAMESPACE=airflow.prod-eu

После рестарта Airflow начинает emit events. Никаких изменений в DAG коде.


OpenLineage event format

OL event — это JSON по спецификации (openlineage.io/spec/). Полный пример для PostgresOperator:

{
  "eventType": "COMPLETE",
  "eventTime": "2026-05-12T10:23:45.123Z",
  "producer": "https://github.com/OpenLineage/OpenLineage/tree/1.13.0/integration/airflow",
  "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",

  "run": {
    "runId": "a3f1b2c4-...",
    "facets": {
      "parent": {
        "run": { "runId": "dagrun-uuid" },
        "job": { "namespace": "airflow.prod-eu", "name": "etl_orders" }
      },
      "nominalTime": {
        "nominalStartTime": "2026-05-12T00:00:00Z",
        "nominalEndTime": "2026-05-13T00:00:00Z"
      }
    }
  },

  "job": {
    "namespace": "airflow.prod-eu",
    "name": "etl_orders.transform_orders",
    "facets": {
      "sql": {
        "query": "INSERT INTO analytics.orders_daily SELECT ... FROM raw.orders WHERE date = '2026-05-12'"
      },
      "ownership": {
        "owners": [{"name": "data-platform-team", "type": "TEAM"}]
      }
    }
  },

  "inputs": [
    {
      "namespace": "postgres://warehouse.example.com:5432",
      "name": "raw.orders",
      "facets": {
        "schema": {
          "fields": [
            {"name": "order_id", "type": "BIGINT"},
            {"name": "user_id", "type": "BIGINT"},
            {"name": "amount", "type": "NUMERIC"},
            {"name": "created_at", "type": "TIMESTAMP"}
          ]
        }
      }
    }
  ],

  "outputs": [
    {
      "namespace": "postgres://warehouse.example.com:5432",
      "name": "analytics.orders_daily",
      "facets": {
        "schema": {
          "fields": [
            {"name": "date", "type": "DATE"},
            {"name": "total_revenue", "type": "NUMERIC"},
            {"name": "order_count", "type": "BIGINT"}
          ]
        },
        "columnLineage": {
          "fields": {
            "total_revenue": {
              "inputFields": [
                {"namespace": "postgres://...", "name": "raw.orders", "field": "amount"}
              ],
              "transformationType": "AGGREGATION",
              "transformationDescription": "SUM(amount)"
            }
          }
        }
      }
    }
  ]
}

Ключевые поля

  • eventTypeSTART (когда task начался) или COMPLETE / FAIL (когда закончился). На один task instance — два events.
  • run.runId — UUID этого выполнения.
  • run.facets.parent — связь с parent DagRun (нужно для grouping в backend).
  • job.namespace + job.name — уникальный identifier task. Namespace из config, name из dag_id.task_id.
  • inputs[] / outputs[] — datasets. Каждый имеет namespace (typically connection URL) + name (typically schema.table).
  • facets — расширения. Schema, columnLineage, sql, ownership, dataQuality — стандартные facets.

Какие operators поддерживаются automatic

Provider 2.6+ имеет built-in extractors для:

OperatorЧто извлекается
PostgresOperator / MySQLOperator / SQLExecuteQueryOperatorSQL парсинг → source/target tables + column lineage
SnowflakeOperatorАналогично, плюс Snowflake-specific facets (warehouse, role)
BigQueryOperatorBigQuery jobs API → tables/queries/columns
PostgresToGCS / S3ToPostgres / прочие TransferSource/target datasets
S3CopyObjectOperatorBucket/key как datasets
GCSToBigQueryOperatorGCS file → BigQuery table
SparkSubmitOperatorSpark application metadata (через runtime hooks)
DbtCloudRunJobOperatordbt-job → models/sources
PythonOperatorТолько run start/complete без datasets (нужен custom extractor)
BashOperatorТолько run start/complete (нужен custom extractor)

Для PythonOperator / BashOperator и custom operators — нужен custom extractor (это урок 04).


SQL parsing — внутренности

Самая впечатляющая возможность OL Airflow — column-level lineage из SQL. Как это работает:

  1. SQL extracted из operator’s sql argument после Jinja templating.
  2. openlineage-sql crate (Rust под капотом) парсит SQL.
  3. AST analyzed: identifies SELECT clauses, JOIN, GROUP BY, INSERT/UPDATE targets.
  4. Column dependencies computed: total_revenue derived from amount via SUM.
  5. Schema fetched from operator’s hook (PostgresHook.get_records).

Supported SQL dialects: PostgreSQL, MySQL, Snowflake, BigQuery, Redshift, Spark SQL, generic ANSI.

Ограничения SQL parser

  • Complex CTEs иногда теряются (deep nesting).
  • Dynamic SQL (построенный из Python strings) не парсится — только literal SQL.
  • Stored procedures не tracked.
  • Triggers на DB не известны OL.

Для сложных кейсов — augment автоматический extract через custom extractor (урок 04).


Marquez backend setup

Marquez — open-source reference backend для OpenLineage. Простой setup для proof-of-concept:

# docker-compose.lineage.yaml
version: '3'
services:
  marquez-db:
    image: postgres:15
    environment:
      POSTGRES_DB: marquez
      POSTGRES_USER: marquez
      POSTGRES_PASSWORD: marquez
    volumes:
      - marquez-db-data:/var/lib/postgresql/data

  marquez:
    image: marquezproject/marquez:0.47.0
    ports:
      - 5000:5000     # API
      - 5001:5001     # Admin
    environment:
      MARQUEZ_DB_HOST: marquez-db
      MARQUEZ_DB_USER: marquez
      MARQUEZ_DB_PASSWORD: marquez
    depends_on:
      - marquez-db

  marquez-web:
    image: marquezproject/marquez-web:0.47.0
    ports:
      - 3000:3000
    environment:
      MARQUEZ_HOST: marquez
      MARQUEZ_PORT: 5000

volumes:
  marquez-db-data:

После docker compose up -d Marquez UI на http://localhost:3000.

Endpoint structure

# Receive OL event
POST http://marquez:5000/api/v1/lineage

# Query lineage граф (drill-down от dataset)
GET http://marquez:5000/api/v1/lineage?nodeId=dataset:postgres:warehouse:raw.orders&depth=3

# List jobs
GET http://marquez:5000/api/v1/namespaces/{ns}/jobs

Production scale-up

В production Marquez нужен:

  • Postgres backend с regular VACUUM (lineage events накапливаются)
  • Несколько replicas с LB (Marquez stateless)
  • Retention policy — archive events старше N месяцев

Альтернативы Marquez для production:

BackendЦенаКогда выбрать
MarquezFree, OSSSelf-hosted, нужен полный контроль
DataHubFree, OSS+ metadata management, ER diagrams, ML
AtlanCommercial SaaSEnterprise, no-ops setup
OpenMetadataFree + CloudModern UI, data discovery
AWS SageMaker LineageAWS-nativeЕсли уже на AWS
Microsoft PurviewAzure-nativeAzure-heavy stacks

Namespaces — как организовать

Namespace = logical grouping. Best practices:

# Один Airflow cluster
namespace = airflow.prod-eu

# Несколько Airflow clusters в одной организации
namespace = airflow.prod-eu
namespace = airflow.prod-us
namespace = airflow.staging

Datasets также имеют namespace — обычно URL connection:

postgres://warehouse.example.com:5432  →  raw.orders
s3://data-lake-prod                    →  /orders/2026-05-12/*.parquet
bigquery://my-project                  →  analytics.orders_daily
snowflake://xy12345.us-east-1          →  analytics.orders_daily

Это критично для cross-platform lineage. Если dbt также emit OL events с тем же namespace для Snowflake — backend объединит graphs. Это даёт unified lineage: Airflow → dbt → Snowflake → BI.


Listener API integration

OpenLineage provider использует Airflow Listener API (hookimpl, см. модуль 12):

# В провайдере (упрощённо):
class OpenLineageListener:
    @hookimpl
    def on_task_instance_running(self, previous_state, task_instance, session):
        extractor = self.extractor_manager.get_extractor(task_instance.task)
        metadata = extractor.extract()
        event = self.build_run_event("START", task_instance, metadata)
        self.transport.emit(event)

    @hookimpl
    def on_task_instance_success(self, previous_state, task_instance, session):
        extractor = self.extractor_manager.get_extractor(task_instance.task)
        metadata = extractor.extract_on_complete()  # после exec, имеет result info
        event = self.build_run_event("COMPLETE", task_instance, metadata)
        self.transport.emit(event)

    @hookimpl
    def on_task_instance_failed(self, previous_state, task_instance, session):
        ...

Это значит:

  • Один Listener на весь Airflow — события из всех DAGs идут через него.
  • Plugin registration — провайдер автоматически регистрирует Listener при installation.
  • Не нужно изменять DAGs — listeners работают глобально.

Также OL может работать через on_dag_run_running/success/failed hooks для DAG-level lineage (не только task-level).


Performance impact

Каждый task emit-ит ~2 events (START + COMPLETE). Размер event ~5-50 KB (с schemas). Transport — HTTP POST.

На production масштабе:

  • 100k TI/day = 200k events/day = ~2.3 events/sec average
  • Peak load (синхронный fan-out) = ~50-100 events/sec
  • Bandwidth = ~5-50 MB/min

Это lightweight, но не free. Особенности:

  • Если Marquez down — provider накапливает events в memory (с buffer) + retry. После N retries — drop events. Возможна потеря lineage.
  • Sync send блокирует task на ~100-500ms — добавляется к task duration.
  • Async transport (через [openlineage] transport = {"type": "http", "async": true}) не блокирует, но требует careful error handling.

Для production — обязательно мониторить:

  • openlineage.events.emitted — отправлено
  • openlineage.events.failed — failed (alert если spike)
  • Latency endpoint Marquez через OTel metrics

Production gotchas

1. Sensitive data в SQL

OL emit-ит SQL запросы как facets. Если ваш SQL содержит hardcoded secrets — SELECT password FROM users WHERE token = 'sk-...' — это утечёт в Marquez logs.

Fix: sanitize SQL extractor:

[openlineage]
disabled_for_operators = airflow.providers.snowflake.operators.SnowflakeOperator
# Или через extractor override с sanitization

2. PII в schemas

Schema facet содержит column names. Если column называется ssn, credit_card_number — это в lineage events. Под GDPR это может быть unwanted.

Fix: column-level filtering в OL Collector (если такой есть) или disable schema facet в провайдере.

3. Marquez DB размер

Events накапливаются. На 100k events/day Marquez DB растёт ~1-10 GB/month. Без cleanup — disk full.

Fix: retention policy:

-- Daily cleanup
DELETE FROM lineage_events WHERE created_at < now() - interval '90 days';
DELETE FROM job_versions WHERE created_at < now() - interval '90 days';

4. Naming conflicts

Если два Airflow clusters используют один Marquez с одним namespace — job names будут конфликтовать. Использовать unique namespace per cluster.

5. PythonOperator emit-ит lineage только если есть custom extractor

Common misconception: «OL автоматически работает для всех operators». Нет. PythonOperator emit только run START/COMPLETE без datasets. Для extracted lineage из PythonOperator — custom extractor (урок 04) или используйте inlets/outlets в task definition (manual lineage):

@task(
    inlets=[Dataset("s3://raw/orders/")],
    outlets=[Dataset("s3://processed/orders/")]
)
def transform_orders():
    ...

inlets/outlets emit-ятся как OL inputs/outputs autoматически.

6. dbt + Airflow

dbt также emit OL events (через dbt-ol wrapper). Если ваш DAG запускает dbt через BashOperator(bash_command="dbt-ol run") — dbt emit events напрямую, не через Airflow extractor. Это создаёт полный pipeline lineage: Airflow trigger → dbt models → Snowflake tables, всё связано через namespace.


Проверка знанийKnowledge check
Команда жалуется: 'Включили OpenLineage provider, видим events для PostgresOperator в Marquez, но наш main DAG использует custom PythonOperator с pandas — для него events есть, но без inputs/outputs (только run START/COMPLETE). Как добавить lineage для custom operator?'
ОтветAnswer
Есть три варианта по сложности: **(1) `inlets`/`outlets` на task definition (простейший)** — при определении task передать `inlets=[Dataset('s3://raw/orders/')]`, `outlets=[Dataset('postgres://warehouse/analytics.orders')]`. OL provider их автоматически emit как inputs/outputs. Работает для известных-заранее статичных datasets. **(2) Custom Extractor (полноценный)** — subclass `BaseExtractor`, метод `extract()` возвращает `OperatorLineage(inputs=[Dataset(...)], outputs=[Dataset(...)], job_facets={'sql': ...})`. Регистрируется через entry_points в setup.py или через config `extractors = my_module.MyExtractor`. Extractor может анализировать operator's args/kwargs для динамического inference. Это разбирается в уроке 14/04 (custom extractors). **(3) Manual emission (для edge cases)** — внутри PythonOperator вручную вызывать `openlineage.client.Client().emit(...)` с custom event. Не recommended (теряется correlation с run). **Best practice:** для известных source/target — inlets/outlets. Для сложной логики (динамические targets, parametrized SQL) — custom extractor. Для unmaintained third-party operators — extractor через monkey-patch.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. OpenLineage provider стабилизирован в Airflow начиная с какой версии?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7