Custom OpenLineage extractors — lineage для своих operators
В предыдущем уроке мы видели, что standard operators (PostgresOperator, BigQueryOperator, и т.п.) поддерживаются автоматически — extractors уже включены в provider. Но реальный production Airflow содержит десятки custom operators: внутренние ETL operators, integration с проприетарными системами, обёртки над third-party APIs.
Без custom extractor lineage для них пустой — только run start/complete без inputs/outputs. Это превращает Marquez граф в дырки.
Этот урок — как написать custom extractor правильно: интерфейс, регистрация, real-world примеры, и pitfalls.
Data lineage в dbt — та же концепция, другой инструмент
Когда нужен custom extractor
| Сценарий | Решение |
|---|---|
| PythonOperator с known inputs/outputs | inlets/outlets в task definition (легко) |
| PythonOperator с динамическими inputs/outputs | Custom extractor (extractor читает kwargs) |
Custom operator (внутренний MyDataLakeOperator) | Custom extractor |
| Third-party operator без extractor | Custom extractor (через monkey-patch или PR в provider) |
Wrapper over CLI tool (BashOperator(bash_command="my-tool ...")) | Custom extractor с CLI args parsing |
Custom extractor — это plugin для OpenLineage, который Airflow подгружает автоматически.
Базовый интерфейс: BaseExtractor
Provider предоставляет abstract class:
# airflow.providers.openlineage.extractors.base
from abc import ABC, abstractmethod
from airflow.providers.openlineage.extractors.base import (
BaseExtractor,
OperatorLineage,
)
from openlineage.client.facet import (
SchemaDatasetFacet,
SchemaField,
SqlJobFacet,
)
from openlineage.client.run import Dataset
class MyExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Список full class paths operators, которые этот extractor обрабатывает."""
return ["my_company.operators.MyDataLakeOperator"]
def extract(self) -> OperatorLineage | None:
"""
Вызывается ДО task execute() (на on_task_instance_running).
Возвращает lineage metadata из доступных полей operator.
self.operator — instance operator
"""
op = self.operator
return OperatorLineage(
inputs=[
Dataset(
namespace=f"s3://{op.source_bucket}"
name=op.source_path,
facets={}
)
],
outputs=[
Dataset(
namespace=f"snowflake://{op.snowflake_account}"
name=f"{op.target_database}.{op.target_schema}.{op.target_table}"
facets={}
)
],
job_facets={
"sql": SqlJobFacet(query=op.sql)
} if op.sql else {},
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""
Вызывается ПОСЛЕ execute() (на on_task_instance_success).
Имеет доступ к results через task_instance.xcom_pull или operator state.
Используется для: фактические row counts, partition info, error details.
Не обязателен — если возвращает None, используется результат extract().
"""
return None
Lifecycle
on_task_instance_running:
↓
extract() → OperatorLineage → START event с inputs/outputs
task.execute()
↓ (success)
on_task_instance_success:
↓
extract_on_complete() → OperatorLineage с runtime data → COMPLETE event
extract() обязателен, extract_on_complete() опционален — используется когда нужны actual results (row count, output schema после INSERT, error message).
Real example 1: Custom S3-to-Snowflake operator
Допустим, есть внутренний operator:
# my_company/operators/s3_to_snowflake.py
class S3ToSnowflakeOperator(BaseOperator):
template_fields = ("source_bucket", "source_prefix", "target_table", "stage")
def __init__(
self,
source_bucket: str,
source_prefix: str,
target_table: str, # формат: db.schema.table
stage: str,
file_format: str = "PARQUET",
snowflake_conn_id: str = "snowflake_default",
**kwargs
):
super().__init__(**kwargs)
self.source_bucket = source_bucket
self.source_prefix = source_prefix
self.target_table = target_table
self.stage = stage
self.file_format = file_format
self.snowflake_conn_id = snowflake_conn_id
def execute(self, context):
hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
sql = f"""
COPY INTO {self.target_table}
FROM @{self.stage}/{self.source_prefix}
FILE_FORMAT = (TYPE = '{self.file_format}')
"""
result = hook.run(sql)
# result содержит rows_loaded
return {"rows_loaded": result.get("rows_loaded", 0)}
Extractor для него:
# my_company/openlineage_extractors/s3_to_snowflake.py
from airflow.providers.openlineage.extractors.base import (
BaseExtractor, OperatorLineage
)
from openlineage.client.facet import (
SqlJobFacet, OutputStatisticsOutputDatasetFacet
)
from openlineage.client.run import Dataset
class S3ToSnowflakeExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["my_company.operators.s3_to_snowflake.S3ToSnowflakeOperator"]
def extract(self) -> OperatorLineage:
op = self.operator
# Получим Snowflake account из connection
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection(op.snowflake_conn_id)
snowflake_namespace = f"snowflake://{conn.host}"
# Parse target_table "db.schema.table"
parts = op.target_table.split(".")
if len(parts) != 3:
raise ValueError(f"Invalid target_table: {op.target_table}")
target_dataset = Dataset(
namespace=snowflake_namespace,
name=op.target_table, # "ANALYTICS.PUBLIC.ORDERS"
)
source_dataset = Dataset(
namespace=f"s3://{op.source_bucket}"
name=op.source_prefix,
)
return OperatorLineage(
inputs=[source_dataset],
outputs=[target_dataset],
job_facets={
"sql": SqlJobFacet(
query=(
f"COPY INTO {op.target_table} "
f"FROM @{op.stage}/{op.source_prefix} "
f"FILE_FORMAT = (TYPE = '{op.file_format}')"
)
)
},
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""
После execute() извлекаем rows_loaded из XCom return value.
"""
result = task_instance.xcom_pull(task_ids=task_instance.task_id)
if not result:
return None
# Заполним extract() результат + добавим OutputStatistics
base = self.extract()
base.outputs[0].facets["outputStatistics"] = (
OutputStatisticsOutputDatasetFacet(
rowCount=result.get("rows_loaded", 0),
size=None
)
)
return base
Регистрация extractor
Чтобы Airflow подгрузил extractor, нужен один из трёх способов.
Способ 1: Config [openlineage] extractors
[openlineage]
extractors = my_company.openlineage_extractors.s3_to_snowflake.S3ToSnowflakeExtractor
Можно перечислять comma-separated:
extractors = my_company.extractors.S3ToSnowflakeExtractor,my_company.extractors.DataLakeOpExtractor
Требование: модуль должен быть в PYTHONPATH — установлен как pip package или скопирован в worker/scheduler containers.
Способ 2: Entry points (preferred для distributed)
В pyproject.toml или setup.py вашего internal package:
# pyproject.toml
[project]
name = "my-company-airflow"
version = "0.1.0"
[project.entry-points."airflow.providers.openlineage.extractors"]
s3_to_snowflake = "my_company.openlineage_extractors.s3_to_snowflake:S3ToSnowflakeExtractor"
data_lake_op = "my_company.openlineage_extractors.data_lake:DataLakeExtractor"
После pip install my-company-airflow extractors автоматически discovered. Не требуется config.
Способ 3: ExtractorManager.add_extractor (programmatic)
Для testing/dev:
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter
OpenLineageAdapter.add_extractor(S3ToSnowflakeExtractor)
Не используется в production.
Real example 2: PythonOperator с dynamic datasets
PythonOperator выполняет произвольный Python код. Extractor не может «прочитать» бизнес-логику. Но если функция следует convention — можно extract.
# Convention: функция возвращает {"inputs": [...], "outputs": [...]}
@task
def process_data(year: int, month: int):
sources = [f"s3://raw/year={year}/month={month}/*.parquet"]
targets = [f"s3://processed/year={year}/month={month}/"]
# Реальная работа
df = pd.read_parquet(sources[0])
df.to_parquet(targets[0])
# Return value — для lineage extractor + downstream
return {
"inputs": sources,
"outputs": targets,
"rows_processed": len(df)
}
Extractor:
class ConventionPythonExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return [
"airflow.operators.python.PythonOperator",
"airflow.decorators.python._PythonDecoratedOperator",
]
def extract(self) -> OperatorLineage | None:
"""
В extract() мы не знаем results — execute() ещё не было.
Можем только проанализировать operator metadata (callable, op_kwargs).
"""
op = self.operator
# Проверим, есть ли annotation @lineage_aware
callable_func = getattr(op, "python_callable", None)
if callable_func is None:
return None
if not getattr(callable_func, "__lineage_aware__", False):
return None # Только функции, явно помеченные
# Pre-extract from op_kwargs (если возможно)
op_kwargs = op.op_kwargs or {}
inputs = op_kwargs.get("_lineage_inputs", [])
outputs = op_kwargs.get("_lineage_outputs", [])
return OperatorLineage(
inputs=[self._build_dataset(uri) for uri in inputs],
outputs=[self._build_dataset(uri) for uri in outputs],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""
После execute() читаем return value через XCom.
"""
result = task_instance.xcom_pull(task_ids=task_instance.task_id)
if not isinstance(result, dict):
return None
if "inputs" not in result or "outputs" not in result:
return None
return OperatorLineage(
inputs=[self._build_dataset(uri) for uri in result["inputs"]],
outputs=[self._build_dataset(uri) for uri in result["outputs"]],
)
@staticmethod
def _build_dataset(uri: str) -> Dataset:
"""
s3://bucket/path → namespace=s3://bucket, name=/path
postgres://host/db.schema.table → namespace=postgres://host, name=db.schema.table
"""
if uri.startswith("s3://"):
parts = uri[5:].split("/", 1)
return Dataset(namespace=f"s3://{parts[0]}", name=parts[1] if len(parts) > 1 else "")
elif uri.startswith("postgres://"):
# Similar parsing
...
else:
return Dataset(namespace="unknown", name=uri)
Convention для users:
# В DAG
@task
def process_data():
# ... работа ...
return {
"inputs": ["s3://raw/orders/2026-05-12.parquet"],
"outputs": ["s3://processed/orders/2026-05-12.parquet"],
}
Это convention-based lineage — баланс между «zero changes for users» и «полная automation».
Real example 3: Facets для data quality
OL events могут содержать dataQuality facets — результаты validation, ассertions:
from openlineage.client.facet import (
DataQualityAssertionsDatasetFacet,
Assertion,
)
class GreatExpectationsExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["great_expectations_provider.operators.great_expectations.GreatExpectationsOperator"]
def extract_on_complete(self, task_instance) -> OperatorLineage:
op = self.operator
# Pull GE результат
ge_result = task_instance.xcom_pull(task_ids=task_instance.task_id)
assertions = []
for r in ge_result.get("results", []):
assertions.append(Assertion(
assertion=r["expectation_config"]["expectation_type"],
success=r["success"],
column=r["expectation_config"]["kwargs"].get("column"),
))
dataset = Dataset(
namespace=...,
name=...,
facets={
"dataQualityAssertions": DataQualityAssertionsDatasetFacet(
assertions=assertions
)
}
)
return OperatorLineage(outputs=[dataset])
Marquez/DataHub визуализируют dataQualityAssertions — на dataset показывается история DQ checks. Это связывает orchestration с data quality, что необходимо для data trust.
Schema facet — column-level info
Самый полезный facet — schema. Без него Marquez знает только название таблицы. С ним — структура:
from openlineage.client.facet import SchemaDatasetFacet, SchemaField
def _build_target_dataset_with_schema(target_table_name, snowflake_hook):
# Pull schema от Snowflake INFORMATION_SCHEMA
cols = snowflake_hook.get_records(f"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = '{schema}' AND table_name = '{table}'
ORDER BY ordinal_position
""")
return Dataset(
namespace=...,
name=target_table_name,
facets={
"schema": SchemaDatasetFacet(
fields=[
SchemaField(name=name, type=dtype)
for name, dtype in cols
]
)
}
)
Schema fetch добавляет latency (DB query на каждый extract). Для high-throughput pipelines — рассмотрите кеширование (Redis TTL 1 hour).
Column-level lineage manual
Если operator не SQL-based, но вы знаете column mapping:
from openlineage.client.facet import (
ColumnLineageDatasetFacet,
ColumnLineageDatasetFacetFieldsAdditional,
ColumnLineageDatasetFacetFieldsAdditionalInputFields,
)
column_lineage = ColumnLineageDatasetFacet(
fields={
"total_revenue": ColumnLineageDatasetFacetFieldsAdditional(
inputFields=[
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
namespace="postgres://warehouse"
name="raw.orders"
field="amount",
)
],
transformationType="AGGREGATION"
transformationDescription="SUM(amount)",
),
"order_date": ColumnLineageDatasetFacetFieldsAdditional(
inputFields=[
ColumnLineageDatasetFacetFieldsAdditionalInputFields(
namespace="postgres://warehouse"
name="raw.orders"
field="created_at",
)
],
transformationType="DIRECT"
transformationDescription="DATE(created_at)",
),
}
)
target_dataset = Dataset(
namespace=...,
name=...,
facets={
"schema": schema_facet,
"columnLineage": column_lineage,
}
)
Это даёт Marquez UI column-level zoom — пользователь может видеть, что total_revenue derived from amount через SUM.
Testing extractor
Extractor — обычный Python класс. Test через unittest:
# tests/test_s3_to_snowflake_extractor.py
from unittest.mock import MagicMock
import pytest
from my_company.openlineage_extractors.s3_to_snowflake import S3ToSnowflakeExtractor
from my_company.operators.s3_to_snowflake import S3ToSnowflakeOperator
def test_extract_basic():
op = S3ToSnowflakeOperator(
task_id="test"
source_bucket="raw-data"
source_prefix="orders/2026-05-12/"
target_table="ANALYTICS.PUBLIC.ORDERS"
stage="MY_STAGE"
file_format="PARQUET",
)
# Mock connection lookup
with patch("airflow.hooks.base.BaseHook.get_connection") as mock_conn:
mock_conn.return_value = MagicMock(host="xy12345.us-east-1")
extractor = S3ToSnowflakeExtractor(op)
result = extractor.extract()
assert len(result.inputs) == 1
assert result.inputs[0].namespace == "s3://raw-data"
assert result.inputs[0].name == "orders/2026-05-12/"
assert len(result.outputs) == 1
assert result.outputs[0].namespace == "snowflake://xy12345.us-east-1"
assert result.outputs[0].name == "ANALYTICS.PUBLIC.ORDERS"
assert "sql" in result.job_facets
def test_extract_on_complete():
"""Test rowCount facet после execute."""
op = S3ToSnowflakeOperator(...)
extractor = S3ToSnowflakeExtractor(op)
ti_mock = MagicMock()
ti_mock.xcom_pull.return_value = {"rows_loaded": 12345}
result = extractor.extract_on_complete(ti_mock)
assert result.outputs[0].facets["outputStatistics"].rowCount == 12345
Production gotchas
1. Errors в extract() ломают task
Если extract() бросает exception — task instance тоже падает (since OL — listener в task lifecycle). Логи:
ERROR - Failed to emit OpenLineage event: ValueError: ...
Fix: wrap всё в try/except, log warning, возвращать None:
def extract(self) -> OperatorLineage | None:
try:
return self._extract_impl()
except Exception as e:
self.log.warning(f"OL extract failed: {e}", exc_info=True)
return None
Лучше потерять lineage event, чем production task.
2. Connection lookup в extract()
BaseHook.get_connection() делает DB query. Для каждой task это +50-100ms. Cache connection info:
class MyExtractor(BaseExtractor):
_conn_cache = {}
@classmethod
def _get_conn_host(cls, conn_id):
if conn_id not in cls._conn_cache:
cls._conn_cache[conn_id] = BaseHook.get_connection(conn_id).host
return cls._conn_cache[conn_id]
Cache per-worker (worker процессы не делят память — каждый worker имеет свой cache).
3. Schema fetch — heavy
DB query для schema каждый task → DB overhead. Альтернативы:
- Cache schema в Redis (TTL 1 hour)
- Schema fetch только в
extract_on_complete()(один раз per task instance) - Schema fetch только для критичных datasets
4. Extractor НЕ для side effects
# BAD
def extract(self):
requests.post("https://my-metadata.com/log", json=...) # NO!
return OperatorLineage(...)
Extractor должен быть pure — никаких side effects кроме чтения metadata. Side effects = unpredictable failures, retries, perf issues.
5. Несовместимость версий
Versioning provider’ов и openlineage-python lib критичен. apache-airflow-providers-openlineage==1.7.0 имеет фиксированные deps на openlineage-python>=1.13.0. Если вы pin старую version openlineage-python в constraints — extractor API меняется → break.
Best practice: не pin openlineage-python отдельно. Allow provider управлять deps.
6. Slow extract() блокирует task start
START event emit-ится до task.execute(). Если extract() занимает 1-2s — это добавляется к каждой task latency.
Fix: profile extractor, async transport, или extract-on-complete-only (если нет critical info для START).