Learning Platform
Глоссарий Troubleshooting
Урок 18.07 · 22 мин
Продвинутый
OpenLineageLineageExtractorsStandard OperatorsData Governance

OpenLineage-aware patterns — standard operators, custom extractors, clean lineage

OpenLineage (модуль 14) — индустриальный стандарт для data lineage. С 2024 года это де-факто решение для tracking «откуда → куда» данные в pipelines. Apache Airflow provider apache-airflow-providers-openlineage (2.6+) автоматически emits OL events для standard operators — но только если ваши DAGs OpenLineage-aware.

Этот урок — конкретные patterns для maximizing automatic lineage emission: использовать standard operators везде где возможно, писать custom extractors для PythonOperator, design tasks так чтобы lineage был accurate и actionable.


Recap: что генерирует OL events

apache-airflow-providers-openlineage (часть default install с 2.7+) автоматически emits события для:

Operator typeLineage emission
SQL operators (PostgresOperator, SQLOperator, BigQueryOperator)Auto — parses SQL для source/target tables
S3 operators (S3CopyObjectOperator, S3FilesToS3Operator)Auto — bucket/key as datasets
Spark operators (SparkSubmitOperator)Auto — через Spark OL agent в Spark job
dbt operators (DbtRunOperator)Auto — dbt has native OL support
PythonOperator / @taskManual — нужен extractor или explicit emission
KubernetesPodOperatorPartial — basic info, нужны hints для actual datasets
BashOperatorNone — bash command opaque для Airflow

Strategy: use standard operators where possible, add extractors where not.

dbt Descriptions: документирование моделей и колонок
OpenLineage emission flow: operator → extractor → OL event → Marquez
Airflow task executesЛюбой task (standard SQL operator, custom operator, @task с inlets/outlets) запускается worker-ом. OL provider (apache-airflow-providers-openlineage) подписан на task lifecycle events через listener interface. На execute start/complete — trigger lineage extraction.
OL provider hooks on task lifecycle
Built-in extractors (SQL ops)OL provider includes extractors для SQLExecuteQueryOperator, PostgresOperator, BigQueryOperator. Парсит SQL через sqlparse — identifies source tables (FROM/JOIN), target tables (INSERT/UPDATE/MERGE), schema inference. Auto, без code changes — поэтому prefer standard operators.
Custom extractor (CustomOperator)BaseExtractor subclass с get_operator_classnames() returning class name. extract() возвращает TaskMetadata(inputs, outputs, run_facets, job_facets). Register через entry_points в setup.py или env var OPENLINEAGE_EXTRACTORS. Errors silently swallowed — always log в extractor.
inlets/outlets (declarative)@task(inlets=[Table('warehouse', 'staging', 'orders')], outlets=[Table(...)]) — declarative lineage без extractor class. OL provider reads inlets/outlets directly. Simplest path — best для PythonOperator/@task где SQL не parseable.
OpenLineageAdapter builds event
OL event (JSON)Standardized OpenLineage event schema: eventType (START/COMPLETE), eventTime, run (UUID), job (name, namespace), inputs (datasets с schema facets), outputs (datasets), facets (sql, ownership, schema). JSON serialized. Versioned schema (1.0+) — backward compatible.
HTTP POST к transport URL
Marquez backend (or DataHub/OL collector)Marquez — reference implementation OL backend. Postgres storage. REST API + UI. Получает events через HTTP, dedups runs (start/complete same run UUID), builds lineage graph. Retention: MARQUEZ_RETENTION_DAYS=180 — older events archived. DataHub — alternative backend с broader features.
UI visualization
Marquez graph: source → task → targetUI рендерит lineage graph. Click на dataset (snowflake:warehouse.dim.orders) — видишь все upstream sources и downstream consumers. Click на task — видишь SQL/code executed. Powers: impact analysis (deprecate column → affected dashboards), GDPR compliance (trace PII flow), root cause analysis.

Pattern 1: Prefer standard operators over @task

# ❌ Anti-pattern — PythonOperator для SQL work, lineage не captured
@task
def transform_orders():
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    hook = PostgresHook(postgres_conn_id="warehouse")
    hook.run("INSERT INTO dim.orders SELECT * FROM staging.orders;")

OL provider видит transform_orders как opaque Python task — нет lineage между staging.orders и dim.orders.

# ✅ Better — SQLExecuteQueryOperator emits lineage автоматически
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

transform_orders = SQLExecuteQueryOperator(
    task_id="transform_orders"
    conn_id="warehouse"
    sql="""
    INSERT INTO dim.orders
    SELECT
        id, customer_id, amount, created_at
    FROM staging.orders
    WHERE created_at >= '{{ data_interval_start }}'
      AND created_at < '{{ data_interval_end }}';
    """,
)

OL provider parses SQL через sqlparse, identifies:

  • Inputs: staging.orders (source table)
  • Outputs: dim.orders (target table)
  • Generated Marquez graph: staging.orders → transform_orders → dim.orders

When PythonOperator is unavoidable

Real-world: business logic в Python (data transformations с pandas, ML predictions, custom validation). For these — нужны custom extractors.


Pattern 2: Custom extractors

Extractor — это класс, который knows как extract lineage info из specific operator’s params. OL provider scans for extractors при event emission.

# plugins/lineage/extractors.py
from openlineage.airflow.extractors.base import BaseExtractor, TaskMetadata
from openlineage.client.facet import (
    SchemaDatasetFacet, SchemaField,
    SqlJobFacet,
)
from openlineage.client.run import Dataset

class CustomEtlOperatorExtractor(BaseExtractor):
    """Extractor для нашего custom CustomEtlOperator."""

    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["CustomEtlOperator"]

    def extract(self) -> TaskMetadata:
        # self.operator — instance нашего operator
        op = self.operator

        inputs = [
            Dataset(
                namespace=f"postgres://{op.source_conn_host}:{op.source_conn_port}"
                name=f"{op.source_database}.{op.source_table}"
                facets={
                    "schema": SchemaDatasetFacet(fields=[
                        SchemaField(name="id", type="bigint"),
                        SchemaField(name="amount", type="numeric"),
                    ])
                }
            )
        ]

        outputs = [
            Dataset(
                namespace=f"snowflake://{op.target_account}.snowflakecomputing.com"
                name=f"{op.target_database}.{op.target_schema}.{op.target_table}",
            )
        ]

        return TaskMetadata(
            name=f"{op.task_id}"
            inputs=inputs,
            outputs=outputs,
            run_facets={},
            job_facets={
                "sql": SqlJobFacet(query=op.transform_sql) if op.transform_sql else None,
            },
        )

Register extractor через entry point в setup.py или env var:

# setup.py
setup(
    ...
    entry_points={
        "openlineage.airflow.extractors": [
            "custom_etl = plugins.lineage.extractors:CustomEtlOperatorExtractor",
        ]
    }
)

Или env var:

OPENLINEAGE_EXTRACTORS=plugins.lineage.extractors.CustomEtlOperatorExtractor

Pattern 3: Manual lineage emission в @task

Для одноразовой custom logic можно emit lineage manually через OpenLineageAdapter:

from airflow.decorators import task
from openlineage.airflow.adapter import OpenLineageAdapter
from openlineage.client.run import Dataset

@task
def custom_transform(execution_date: str):
    # ... your business logic ...
    rows_processed = some_pandas_work()

    # Manually emit lineage
    from airflow.operators.python import get_current_context
    ctx = get_current_context()

    adapter = OpenLineageAdapter()
    adapter.emit_dataset_event(
        run_id=ctx["run_id"],
        job_name=f"{ctx['dag'].dag_id}.{ctx['task'].task_id}"
        inputs=[
            Dataset(namespace="s3://lake", name="raw/orders/2026-05-12.parquet"),
        ],
        outputs=[
            Dataset(namespace="s3://lake", name="processed/orders/2026-05-12.parquet"),
        ],
    )
    return rows_processed

Это менее clean чем extractor, но workable для one-off cases.


Pattern 4: outlets and inlets для explicit lineage

Airflow 2.4+ позволяет declarative lineage через inlets и outlets:

from airflow.lineage.entities import Table

@task(
    inlets=[Table(database="warehouse", schema="staging", name="orders")],
    outlets=[Table(database="warehouse", schema="prod", name="orders")],
)
def transform_orders():
    # ... custom logic ...
    pass

OL provider читает inlets/outlets и emits lineage events. Не требует extractor — declarative.

Production tip: inlets/outlets — best для cases где extractor too complex. Один-два line code instead 100-line extractor class.


Pattern 5: Datasets как explicit lineage

С Airflow 2.4+ Datasets (модуль 08) сами по себе создают lineage:

from airflow import Dataset

orders_dataset = Dataset("s3://lake/orders/")
processed_dataset = Dataset("s3://lake/processed/orders/")

@dag(...)
def producer():
    @task(outlets=[orders_dataset])
    def fetch_orders():
        ...
    fetch_orders()

@dag(schedule=[orders_dataset], ...)
def consumer():
    @task(outlets=[processed_dataset])
    def process():
        ...
    process()

OL provider видит Dataset references в outlets — emits dataset events. Plus dataset-driven scheduling — Marquez может visualize DAG triggers как lineage links.

В 3.x Datasets → Assets, но concept identical.


Pattern 6: Clean SQL для accurate parsing

OL provider парсит SQL через sqlparse. Чистый SQL → accurate lineage. Messy SQL → missed tables.

-- ❌ Hard для parser — dynamic SQL
EXECUTE (
    SELECT CASE WHEN month = 'jan' THEN 'INSERT INTO jan_data ...'
                ELSE 'INSERT INTO feb_data ...'
           END
);

-- ✅ Easy для parser — explicit
INSERT INTO jan_data
SELECT * FROM source.events WHERE month = 'jan';

Avoid:

  • Dynamic SQL via EXECUTE
  • Stored procedures (parser doesn’t see internal queries)
  • CTE chains 5+ levels deep (parser may struggle)

Prefer:

  • Plain INSERT/SELECT/MERGE
  • CTEs <= 3 levels
  • Explicit table names (avoid * joins без aliases)

Pattern 7: Design DAGs для clean lineage

DAG structure affects lineage clarity. Compare:

# ❌ Tangled — hard в Marquez visualize
@task
def do_everything():
    """Reads from 5 sources, writes to 3 targets — opaque."""
    df1 = read_orders()
    df2 = read_customers()
    df3 = read_products()
    df4 = read_returns()
    df5 = read_promotions()
    result = complex_transform(df1, df2, df3, df4, df5)
    write_dim_orders(result)
    write_fact_revenue(result)
    write_mart_analytics(result)

Single task → single OL run event → 5 inputs + 3 outputs в одном blob. Marquez shows один node со множеством arrows.

# ✅ Clean — каждый transformation explicit
@task(inlets=[Table("orders")], outlets=[Table("staging_orders")])
def stage_orders(): ...

@task(inlets=[Table("customers")], outlets=[Table("staging_customers")])
def stage_customers(): ...

@task(inlets=[Table("staging_orders"), Table("staging_customers")],
      outlets=[Table("dim_orders")])
def build_dim_orders(): ...

# 3 separate tasks → 3 OL events → clean graph в Marquez

В Marquez это shows как:

orders → stage_orders → staging_orders

                   build_dim_orders → dim_orders
customers → stage_customers → staging_customers ↑

Easier для:

  • Impact analysis (“what depends on dim_orders?”)
  • Root cause analysis (“orders broken — where did it go wrong?”)
  • Compliance reporting (“show me lineage of PII data”)

Production gotchas

Disable OpenLineage для sensitive DAGs. PII data lineage может leak metadata. Use [openlineage] disabled_for_operators = my.sensitive.Operator или DAG-level:

@dag(..., params={"openlineage_disabled": True})

Marquez storage growth. OL events накапливаются — Marquez DB растёт. Set retention via MARQUEZ_RETENTION_DAYS=180.

Extractor errors silently swallowed. Если extractor throws — OL provider logs warning, continues without event. Always log в extractor для debug:

def extract(self):
    try:
        return self._do_extract()
    except Exception as e:
        self.log.error(f"Extraction failed: {e}", exc_info=True)
        raise

Custom XCom backend и lineage. Если используете custom XCom backend (модуль 06, S3 storage), pre-S3 OL events не show XCom values. Может add facet с XCom storage location.

@task(outlets=...) overrides extractor. Если у operator есть extractor AND outlets manually set — outlets выигрывают. Choose one approach.

Operator inheriting from SQL base — auto lineage works. Если custom operator subclasses BaseSQLOperator — OL provider auto-handles. Не нужен extractor.

Spark OL agent отдельный. Для SparkSubmitOperator — install OL listener в Spark itself:

SparkSubmitOperator(
    ...
    packages="io.openlineage:openlineage-spark:1.20.0"
    conf={
        "spark.extraListeners": "io.openlineage.spark.agent.OpenLineageSparkListener",
        "spark.openlineage.transport.type": "http",
        "spark.openlineage.transport.url": "http://marquez:5000",
    },
)

Spark job emits OL events directly, не через Airflow provider.


Real-world lineage example

Marquez визуализация end-to-end orders pipeline:

postgres:orders (source)

extract_orders

s3:datalake/raw/orders/dt=2026-05-12/

transform_orders (Spark)

s3:datalake/processed/orders/dt=2026-05-12/

load_to_snowflake

snowflake:warehouse.dim.orders

build_revenue_mart

snowflake:warehouse.mart.daily_revenue

[Looker dashboard "Sales Overview"]

Каждое node — task с inputs/outputs. Click на dim.orders → видишь все downstream consumers, upstream sources. Click на task → видишь SQL/code что executed.

Это powers:

  • Impact analysis: “deprecate column X в orders → affected dashboards”
  • Compliance: “GDPR-delete user → trace where data flows”
  • Cost optimization: “find tables читаемые только одним DAG — может delete”
  • Onboarding: новый engineer открывает Marquez и видит data flow visual

Проверка знанийKnowledge check
Production team хочет maximize OpenLineage coverage для data governance compliance (GDPR — must trace where PII flows). У них есть 50 DAGs: 20 SQL pipelines (PostgresOperator), 10 Spark jobs (SparkSubmitOperator), 15 Python ETL (@task), 5 custom KubernetesPodOperator. Какие шаги дают best ROI для lineage coverage?
ОтветAnswer
Priority list для max OL coverage at minimum effort: (1) **Verify SQL pipelines (20 DAGs) auto-emit** — install apache-airflow-providers-openlineage если not already. Configure transport URL к Marquez. ZERO code changes нужны — SQL operators auto-emit. Cover 40% of work без effort. (2) **Spark jobs (10 DAGs)** — add `openlineage-spark` package в SparkSubmitOperator + spark.extraListeners config. OL agent в Spark emits события напрямую к Marquez (не через Airflow). 10 DAGs covered с 5-line config change каждый. (3) **Python ETL (15 DAGs) — самый высокий ROI** — для каждого add `inlets=[Table(...)]` и `outlets=[Table(...)]` в @task decorators. Declarative, decision per task: какие source tables читаются, какие destination tables writers. 5-10 min per task. Если task делает SQL queries — лучше переписать в SQLExecuteQueryOperator (still доступно в 2.x), auto-emission. (4) **Custom KubernetesPodOperator (5 DAGs)** — самый сложный. Two approaches: (a) inlets/outlets manual (if simple, fixed) — works для cases где pod always reads source-X writes target-Y; (b) Write custom extractor для CustomKubernetesPodOperator subclass — extract reads pod args/env для inferring inputs/outputs. ~1 day work per unique pod type. (5) **Enforce in code review** — checklist для new DAGs: 'inlets/outlets specified или standard operator used'. PR template question. (6) **Add governance dashboard в Marquez** — query 'tables containing PII columns' (via schema facets), trace downstream consumers. **Avoid**: trying full custom extractor для каждого PythonOperator — too much work, inlets/outlets sufficient для declarative cases. Migration в 3.x — OL provider works идентично, no changes нужны. Expected coverage: starting 40% (SQL auto), end at ~95% после 2-3 weeks of declarative inlets/outlets work. The 5% не covered — opaque PythonOperator с runtime-determined I/O — пометить как 'manual review required' для GDPR audit. Compliance team может live с этим — explicit acknowledgment > silent gaps.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какой operator auto-emits OpenLineage events?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8