Learning Platform
Глоссарий Troubleshooting
Урок 15.04 · 28 мин
Продвинутый
Custom ExtractorOpenLineageLineage CodePluginentry_points

Custom OpenLineage extractors — lineage для своих operators

В предыдущем уроке мы видели, что standard operators (PostgresOperator, BigQueryOperator, и т.п.) поддерживаются автоматически — extractors уже включены в provider. Но реальный production Airflow содержит десятки custom operators: внутренние ETL operators, integration с проприетарными системами, обёртки над third-party APIs.

Без custom extractor lineage для них пустой — только run start/complete без inputs/outputs. Это превращает Marquez граф в дырки.

Этот урок — как написать custom extractor правильно: интерфейс, регистрация, real-world примеры, и pitfalls.


Data lineage в dbt — та же концепция, другой инструмент

Когда нужен custom extractor

СценарийРешение
PythonOperator с known inputs/outputsinlets/outlets в task definition (легко)
PythonOperator с динамическими inputs/outputsCustom extractor (extractor читает kwargs)
Custom operator (внутренний MyDataLakeOperator)Custom extractor
Third-party operator без extractorCustom extractor (через monkey-patch или PR в provider)
Wrapper over CLI tool (BashOperator(bash_command="my-tool ..."))Custom extractor с CLI args parsing

Custom extractor — это plugin для OpenLineage, который Airflow подгружает автоматически.


Базовый интерфейс: BaseExtractor

Provider предоставляет abstract class:

# airflow.providers.openlineage.extractors.base
from abc import ABC, abstractmethod
from airflow.providers.openlineage.extractors.base import (
    BaseExtractor,
    OperatorLineage,
)
from openlineage.client.facet import (
    SchemaDatasetFacet,
    SchemaField,
    SqlJobFacet,
)
from openlineage.client.run import Dataset


class MyExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Список full class paths operators, которые этот extractor обрабатывает."""
        return ["my_company.operators.MyDataLakeOperator"]

    def extract(self) -> OperatorLineage | None:
        """
        Вызывается ДО task execute() (на on_task_instance_running).
        Возвращает lineage metadata из доступных полей operator.

        self.operator — instance operator
        """
        op = self.operator
        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace=f"s3://{op.source_bucket}"
                    name=op.source_path,
                    facets={}
                )
            ],
            outputs=[
                Dataset(
                    namespace=f"snowflake://{op.snowflake_account}"
                    name=f"{op.target_database}.{op.target_schema}.{op.target_table}"
                    facets={}
                )
            ],
            job_facets={
                "sql": SqlJobFacet(query=op.sql)
            } if op.sql else {},
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """
        Вызывается ПОСЛЕ execute() (на on_task_instance_success).
        Имеет доступ к results через task_instance.xcom_pull или operator state.

        Используется для: фактические row counts, partition info, error details.
        Не обязателен — если возвращает None, используется результат extract().
        """
        return None

Lifecycle

on_task_instance_running:

  extract()  →  OperatorLineage  →  START event с inputs/outputs

task.execute()
  ↓ (success)
on_task_instance_success:

  extract_on_complete()  →  OperatorLineage с runtime data  →  COMPLETE event

extract() обязателен, extract_on_complete() опционален — используется когда нужны actual results (row count, output schema после INSERT, error message).


Real example 1: Custom S3-to-Snowflake operator

Допустим, есть внутренний operator:

# my_company/operators/s3_to_snowflake.py
class S3ToSnowflakeOperator(BaseOperator):
    template_fields = ("source_bucket", "source_prefix", "target_table", "stage")

    def __init__(
        self,
        source_bucket: str,
        source_prefix: str,
        target_table: str,      # формат: db.schema.table
        stage: str,
        file_format: str = "PARQUET",
        snowflake_conn_id: str = "snowflake_default",
        **kwargs
    ):
        super().__init__(**kwargs)
        self.source_bucket = source_bucket
        self.source_prefix = source_prefix
        self.target_table = target_table
        self.stage = stage
        self.file_format = file_format
        self.snowflake_conn_id = snowflake_conn_id

    def execute(self, context):
        hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
        sql = f"""
            COPY INTO {self.target_table}
            FROM @{self.stage}/{self.source_prefix}
            FILE_FORMAT = (TYPE = '{self.file_format}')
        """
        result = hook.run(sql)
        # result содержит rows_loaded
        return {"rows_loaded": result.get("rows_loaded", 0)}

Extractor для него:

# my_company/openlineage_extractors/s3_to_snowflake.py
from airflow.providers.openlineage.extractors.base import (
    BaseExtractor, OperatorLineage
)
from openlineage.client.facet import (
    SqlJobFacet, OutputStatisticsOutputDatasetFacet
)
from openlineage.client.run import Dataset


class S3ToSnowflakeExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["my_company.operators.s3_to_snowflake.S3ToSnowflakeOperator"]

    def extract(self) -> OperatorLineage:
        op = self.operator

        # Получим Snowflake account из connection
        from airflow.hooks.base import BaseHook
        conn = BaseHook.get_connection(op.snowflake_conn_id)
        snowflake_namespace = f"snowflake://{conn.host}"

        # Parse target_table "db.schema.table"
        parts = op.target_table.split(".")
        if len(parts) != 3:
            raise ValueError(f"Invalid target_table: {op.target_table}")

        target_dataset = Dataset(
            namespace=snowflake_namespace,
            name=op.target_table,  # "ANALYTICS.PUBLIC.ORDERS"
        )

        source_dataset = Dataset(
            namespace=f"s3://{op.source_bucket}"
            name=op.source_prefix,
        )

        return OperatorLineage(
            inputs=[source_dataset],
            outputs=[target_dataset],
            job_facets={
                "sql": SqlJobFacet(
                    query=(
                        f"COPY INTO {op.target_table} "
                        f"FROM @{op.stage}/{op.source_prefix} "
                        f"FILE_FORMAT = (TYPE = '{op.file_format}')"
                    )
                )
            },
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """
        После execute() извлекаем rows_loaded из XCom return value.
        """
        result = task_instance.xcom_pull(task_ids=task_instance.task_id)
        if not result:
            return None

        # Заполним extract() результат + добавим OutputStatistics
        base = self.extract()
        base.outputs[0].facets["outputStatistics"] = (
            OutputStatisticsOutputDatasetFacet(
                rowCount=result.get("rows_loaded", 0),
                size=None
            )
        )
        return base

Регистрация extractor

Чтобы Airflow подгрузил extractor, нужен один из трёх способов.

Способ 1: Config [openlineage] extractors

[openlineage]
extractors = my_company.openlineage_extractors.s3_to_snowflake.S3ToSnowflakeExtractor

Можно перечислять comma-separated:

extractors = my_company.extractors.S3ToSnowflakeExtractor,my_company.extractors.DataLakeOpExtractor

Требование: модуль должен быть в PYTHONPATH — установлен как pip package или скопирован в worker/scheduler containers.

Способ 2: Entry points (preferred для distributed)

В pyproject.toml или setup.py вашего internal package:

# pyproject.toml
[project]
name = "my-company-airflow"
version = "0.1.0"

[project.entry-points."airflow.providers.openlineage.extractors"]
s3_to_snowflake = "my_company.openlineage_extractors.s3_to_snowflake:S3ToSnowflakeExtractor"
data_lake_op = "my_company.openlineage_extractors.data_lake:DataLakeExtractor"

После pip install my-company-airflow extractors автоматически discovered. Не требуется config.

Способ 3: ExtractorManager.add_extractor (programmatic)

Для testing/dev:

from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
OpenLineageAdapter.add_extractor(S3ToSnowflakeExtractor)

Не используется в production.


Real example 2: PythonOperator с dynamic datasets

PythonOperator выполняет произвольный Python код. Extractor не может «прочитать» бизнес-логику. Но если функция следует convention — можно extract.

# Convention: функция возвращает {"inputs": [...], "outputs": [...]}
@task
def process_data(year: int, month: int):
    sources = [f"s3://raw/year={year}/month={month}/*.parquet"]
    targets = [f"s3://processed/year={year}/month={month}/"]

    # Реальная работа
    df = pd.read_parquet(sources[0])
    df.to_parquet(targets[0])

    # Return value — для lineage extractor + downstream
    return {
        "inputs": sources,
        "outputs": targets,
        "rows_processed": len(df)
    }

Extractor:

class ConventionPythonExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return [
            "airflow.operators.python.PythonOperator",
            "airflow.decorators.python._PythonDecoratedOperator",
        ]

    def extract(self) -> OperatorLineage | None:
        """
        В extract() мы не знаем results — execute() ещё не было.
        Можем только проанализировать operator metadata (callable, op_kwargs).
        """
        op = self.operator

        # Проверим, есть ли annotation @lineage_aware
        callable_func = getattr(op, "python_callable", None)
        if callable_func is None:
            return None
        if not getattr(callable_func, "__lineage_aware__", False):
            return None  # Только функции, явно помеченные

        # Pre-extract from op_kwargs (если возможно)
        op_kwargs = op.op_kwargs or {}
        inputs = op_kwargs.get("_lineage_inputs", [])
        outputs = op_kwargs.get("_lineage_outputs", [])

        return OperatorLineage(
            inputs=[self._build_dataset(uri) for uri in inputs],
            outputs=[self._build_dataset(uri) for uri in outputs],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """
        После execute() читаем return value через XCom.
        """
        result = task_instance.xcom_pull(task_ids=task_instance.task_id)
        if not isinstance(result, dict):
            return None
        if "inputs" not in result or "outputs" not in result:
            return None

        return OperatorLineage(
            inputs=[self._build_dataset(uri) for uri in result["inputs"]],
            outputs=[self._build_dataset(uri) for uri in result["outputs"]],
        )

    @staticmethod
    def _build_dataset(uri: str) -> Dataset:
        """
        s3://bucket/path → namespace=s3://bucket, name=/path
        postgres://host/db.schema.table → namespace=postgres://host, name=db.schema.table
        """
        if uri.startswith("s3://"):
            parts = uri[5:].split("/", 1)
            return Dataset(namespace=f"s3://{parts[0]}", name=parts[1] if len(parts) > 1 else "")
        elif uri.startswith("postgres://"):
            # Similar parsing
            ...
        else:
            return Dataset(namespace="unknown", name=uri)

Convention для users:

# В DAG
@task
def process_data():
    # ... работа ...
    return {
        "inputs": ["s3://raw/orders/2026-05-12.parquet"],
        "outputs": ["s3://processed/orders/2026-05-12.parquet"],
    }

Это convention-based lineage — баланс между «zero changes for users» и «полная automation».


Real example 3: Facets для data quality

OL events могут содержать dataQuality facets — результаты validation, ассertions:

from openlineage.client.facet import (
    DataQualityAssertionsDatasetFacet,
    Assertion,
)

class GreatExpectationsExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["great_expectations_provider.operators.great_expectations.GreatExpectationsOperator"]

    def extract_on_complete(self, task_instance) -> OperatorLineage:
        op = self.operator
        # Pull GE результат
        ge_result = task_instance.xcom_pull(task_ids=task_instance.task_id)

        assertions = []
        for r in ge_result.get("results", []):
            assertions.append(Assertion(
                assertion=r["expectation_config"]["expectation_type"],
                success=r["success"],
                column=r["expectation_config"]["kwargs"].get("column"),
            ))

        dataset = Dataset(
            namespace=...,
            name=...,
            facets={
                "dataQualityAssertions": DataQualityAssertionsDatasetFacet(
                    assertions=assertions
                )
            }
        )
        return OperatorLineage(outputs=[dataset])

Marquez/DataHub визуализируют dataQualityAssertions — на dataset показывается история DQ checks. Это связывает orchestration с data quality, что необходимо для data trust.


Schema facet — column-level info

Самый полезный facet — schema. Без него Marquez знает только название таблицы. С ним — структура:

from openlineage.client.facet import SchemaDatasetFacet, SchemaField

def _build_target_dataset_with_schema(target_table_name, snowflake_hook):
    # Pull schema от Snowflake INFORMATION_SCHEMA
    cols = snowflake_hook.get_records(f"""
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_schema = '{schema}' AND table_name = '{table}'
        ORDER BY ordinal_position
    """)

    return Dataset(
        namespace=...,
        name=target_table_name,
        facets={
            "schema": SchemaDatasetFacet(
                fields=[
                    SchemaField(name=name, type=dtype)
                    for name, dtype in cols
                ]
            )
        }
    )

Schema fetch добавляет latency (DB query на каждый extract). Для high-throughput pipelines — рассмотрите кеширование (Redis TTL 1 hour).


Column-level lineage manual

Если operator не SQL-based, но вы знаете column mapping:

from openlineage.client.facet import (
    ColumnLineageDatasetFacet,
    ColumnLineageDatasetFacetFieldsAdditional,
    ColumnLineageDatasetFacetFieldsAdditionalInputFields,
)

column_lineage = ColumnLineageDatasetFacet(
    fields={
        "total_revenue": ColumnLineageDatasetFacetFieldsAdditional(
            inputFields=[
                ColumnLineageDatasetFacetFieldsAdditionalInputFields(
                    namespace="postgres://warehouse"
                    name="raw.orders"
                    field="amount",
                )
            ],
            transformationType="AGGREGATION"
            transformationDescription="SUM(amount)",
        ),
        "order_date": ColumnLineageDatasetFacetFieldsAdditional(
            inputFields=[
                ColumnLineageDatasetFacetFieldsAdditionalInputFields(
                    namespace="postgres://warehouse"
                    name="raw.orders"
                    field="created_at",
                )
            ],
            transformationType="DIRECT"
            transformationDescription="DATE(created_at)",
        ),
    }
)

target_dataset = Dataset(
    namespace=...,
    name=...,
    facets={
        "schema": schema_facet,
        "columnLineage": column_lineage,
    }
)

Это даёт Marquez UI column-level zoom — пользователь может видеть, что total_revenue derived from amount через SUM.


Testing extractor

Extractor — обычный Python класс. Test через unittest:

# tests/test_s3_to_snowflake_extractor.py
from unittest.mock import MagicMock
import pytest

from my_company.openlineage_extractors.s3_to_snowflake import S3ToSnowflakeExtractor
from my_company.operators.s3_to_snowflake import S3ToSnowflakeOperator


def test_extract_basic():
    op = S3ToSnowflakeOperator(
        task_id="test"
        source_bucket="raw-data"
        source_prefix="orders/2026-05-12/"
        target_table="ANALYTICS.PUBLIC.ORDERS"
        stage="MY_STAGE"
        file_format="PARQUET",
    )

    # Mock connection lookup
    with patch("airflow.hooks.base.BaseHook.get_connection") as mock_conn:
        mock_conn.return_value = MagicMock(host="xy12345.us-east-1")

        extractor = S3ToSnowflakeExtractor(op)
        result = extractor.extract()

    assert len(result.inputs) == 1
    assert result.inputs[0].namespace == "s3://raw-data"
    assert result.inputs[0].name == "orders/2026-05-12/"

    assert len(result.outputs) == 1
    assert result.outputs[0].namespace == "snowflake://xy12345.us-east-1"
    assert result.outputs[0].name == "ANALYTICS.PUBLIC.ORDERS"

    assert "sql" in result.job_facets


def test_extract_on_complete():
    """Test rowCount facet после execute."""
    op = S3ToSnowflakeOperator(...)
    extractor = S3ToSnowflakeExtractor(op)

    ti_mock = MagicMock()
    ti_mock.xcom_pull.return_value = {"rows_loaded": 12345}

    result = extractor.extract_on_complete(ti_mock)

    assert result.outputs[0].facets["outputStatistics"].rowCount == 12345

Production gotchas

1. Errors в extract() ломают task

Если extract() бросает exception — task instance тоже падает (since OL — listener в task lifecycle). Логи:

ERROR - Failed to emit OpenLineage event: ValueError: ...

Fix: wrap всё в try/except, log warning, возвращать None:

def extract(self) -> OperatorLineage | None:
    try:
        return self._extract_impl()
    except Exception as e:
        self.log.warning(f"OL extract failed: {e}", exc_info=True)
        return None

Лучше потерять lineage event, чем production task.

2. Connection lookup в extract()

BaseHook.get_connection() делает DB query. Для каждой task это +50-100ms. Cache connection info:

class MyExtractor(BaseExtractor):
    _conn_cache = {}

    @classmethod
    def _get_conn_host(cls, conn_id):
        if conn_id not in cls._conn_cache:
            cls._conn_cache[conn_id] = BaseHook.get_connection(conn_id).host
        return cls._conn_cache[conn_id]

Cache per-worker (worker процессы не делят память — каждый worker имеет свой cache).

3. Schema fetch — heavy

DB query для schema каждый task → DB overhead. Альтернативы:

  • Cache schema в Redis (TTL 1 hour)
  • Schema fetch только в extract_on_complete() (один раз per task instance)
  • Schema fetch только для критичных datasets

4. Extractor НЕ для side effects

# BAD
def extract(self):
    requests.post("https://my-metadata.com/log", json=...)  # NO!
    return OperatorLineage(...)

Extractor должен быть pure — никаких side effects кроме чтения metadata. Side effects = unpredictable failures, retries, perf issues.

5. Несовместимость версий

Versioning provider’ов и openlineage-python lib критичен. apache-airflow-providers-openlineage==1.7.0 имеет фиксированные deps на openlineage-python>=1.13.0. Если вы pin старую version openlineage-python в constraints — extractor API меняется → break.

Best practice: не pin openlineage-python отдельно. Allow provider управлять deps.

6. Slow extract() блокирует task start

START event emit-ится до task.execute(). Если extract() занимает 1-2s — это добавляется к каждой task latency.

Fix: profile extractor, async transport, или extract-on-complete-only (если нет critical info для START).


Проверка знанийKnowledge check
Команда добавила custom extractor для DataLakeOperator. Через час получают alerts: 50% TI этого operator failed с 'TypeError: argument of type NoneType is not iterable' в extractor logs. Tasks которые раньше работали — теперь крашатся. Что произошло, как safe fix?
ОтветAnswer
**Root cause:** extractor raise exception в OL Listener hook (`on_task_instance_running`). Когда listener фейлится, task lifecycle ломается — TI помечается failed. Это происходит **до** task.execute(), поэтому реальной работы не выполняется. Скорее всего в extractor.extract() есть `op.some_field.split(...)` где `some_field` иногда None (например, optional argument operator). **Safe fix immediate:** обернуть весь body extractor в try/except, возвращать None при exception, log warning. OL потеряет события для проблемных tasks, но production не сломается. ```python def extract(self): try: return self._extract_impl() except Exception as e: self.log.warning(f'OL extract failed: {e}', exc_info=True) return None ``` **Proper fix:** обработать edge case в extractor — проверить `if op.some_field is None: return OperatorLineage(inputs=[], outputs=[])`. Добавить unit test с None argument для regression. **Долгосрочно:** monitoring `openlineage.events.failed` через OTel, alerting при rate > 1%. Также — code review всех custom extractors с requirement 'never raise'. Принцип: **observability never breaks production**.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Custom extractor наследуется от BaseExtractor. Какие методы критичны?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7