DataSource V2 API: Архитектура коннекторов
Apache Spark умеет читать Parquet, JSON, JDBC, Kafka, Iceberg, Delta Lake, Hudi. За каждым из них стоит коннектор. До Spark 2.4 все коннекторы писались через DataSource V1 — API, который прожил непростую жизнь и оброс таким количеством hacky workarounds, что в какой-то момент его стало невозможно расширять без взлома internals.
DataSource V2 (DSv2) — это архитектурная перестройка слоя коннекторов, представленная в Spark 2.3 в экспериментальном виде и стабилизированная начиная со Spark 3.0. В Spark 4.0 DSv2 — единственный правильный способ писать production-коннекторы. Iceberg, Delta Lake, Hudi, Kafka, Cassandra — все они используют именно этот API.
Этот урок — глубокое погружение в архитектуру DSv2: что означает каждый интерфейс, как они связаны, и почему каждое дизайн-решение принято именно так.
Почему V1 провалился
Чтобы понять V2, нужно понять, что сломалось в V1.
DataSource V1 строился вокруг двух трейтов: BaseRelation (описывает таблицу) и TableScan (читает данные). Коннектор возвращал RDD[Row], и Spark не имел возможности “заглянуть внутрь” — он не мог оптимизировать чтение, не мог сделать pushdown предикатов в нетривиальных случаях, не мог управлять партиционированием.
Конкретные проблемы V1:
Нет единого интерфейса для записи и чтения. Коннектор для чтения и коннектор для записи регистрировались по-разному. Добавить атомарный write + read для одного формата (как это нужно Iceberg) было невозможно без грязных хаков.
Нет поддержки streaming. Structured Streaming пришлось добавлять отдельными интерфейсами (Source, Sink), никак не связанными с batch-коннекторами. Один и тот же формат (скажем, CSV) требовал двух совершенно разных реализаций.
Catalog-слепота. V1-коннектор не знал ничего о каталоге. Это значит, partition discovery, schema evolution, таблицы с метаданными — всё делалось в обход Spark, на уровне filesystem или сторонних метастор.
Неуправляемый pushdown. PrunedFilteredScan позволял делать pushdown фильтров, но Spark не мог верифицировать, что коннектор реально применил фильтр — он был вынужден перепрогонять все предикаты поверх результата. Это приводило к двойному чтению и double-evaluation.
DSv2 решает все эти проблемы через четкую иерархию интерфейсов.
Архитектура DSv2: иерархия абстракций
DSv2 состоит из трех слоев: планирования, сканирования и чтения.
TableProvider: точка входа
TableProvider — это интерфейс, который Spark находит по имени формата через ServiceLoader. Когда вы пишете spark.read.format("myformat"), Spark ищет класс, зарегистрированный под этим именем в META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (или через полное имя класса).
// org.apache.spark.sql.connector.catalog.TableProvider
trait TableProvider {
// Вывод схемы из options (без partition info)
def inferSchema(options: CaseInsensitiveStringMap): StructType
// Вывод разделения (для partition-aware форматов)
def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] =
Array.empty
// Главный метод: создаёт объект Table
// schema + partitioning могут быть заданы явно (CREATE TABLE) или выведены
def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table
// Может ли коннектор работать без явной схемы?
def supportsExternalMetadata(): Boolean = false
}
CaseInsensitiveStringMap передаёт все опции, указанные в .option("key", "value") или в TBLPROPERTIES. Если supportsExternalMetadata() возвращает false, Spark вызовет inferSchema перед getTable. Если true — коннектор сам знает схему из своих метаданных (как Iceberg, у которого схема в manifest-файлах).
Table и Capabilities
Table — это центральный объект, описывающий источник данных:
// org.apache.spark.sql.connector.catalog.Table
trait Table {
def name(): String
def schema(): StructType
def partitioning(): Array[Transform]
def properties(): util.Map[String, String]
// Что умеет этот коннектор?
def capabilities(): util.Set[TableCapability]
}
TableCapability — это enum-подобный набор флагов, объявляющий возможности коннектора. Ключевые значения:
| Capability | Значение |
|---|---|
BATCH_READ | Поддерживает batch-чтение |
BATCH_WRITE | Поддерживает batch-запись |
STREAMING_READ | Поддерживает micro-batch streaming |
STREAMING_WRITE | Поддерживает streaming-запись |
TRUNCATE | Поддерживает TRUNCATE TABLE |
OVERWRITE_BY_FILTER | Поддерживает INSERT OVERWRITE с предикатом |
OVERWRITE_DYNAMIC | Поддерживает динамический overwrite партиций |
ACCEPT_ANY_SCHEMA | Принимает любую схему (для sink-only коннекторов) |
Объявляя capabilities, коннектор говорит Spark, какие операции он поддерживает. Если коннектор не объявит BATCH_WRITE, попытка записи через df.write.format("myformat").save(...) выбросит AnalysisException на этапе анализа плана — до того, как будет создана хоть одна задача.
Для коннекторов, поддерживающих чтение и запись, Table должен реализовывать SupportsRead и SupportsWrite:
trait SupportsRead extends Table {
def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
}
trait SupportsWrite extends Table {
def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder
}
ScanBuilder и фаза планирования
ScanBuilder — это объект на driver-стороне, живущий во время планирования запроса. Catalyst вызывает методы pushdown на ScanBuilder и только потом вызывает build(). Именно здесь происходит ключевая оптимизация DSv2.
// Минимальная реализация
trait ScanBuilder {
def build(): Scan
}
// Pushdown колонок (column pruning)
trait SupportsPushDownRequiredColumns extends ScanBuilder {
def pruneColumns(requiredSchema: StructType): Unit
}
// Pushdown фильтров
trait SupportsPushDownFilters extends ScanBuilder {
def pushFilters(filters: Array[Filter]): Array[Filter]
def pushedFilters(): Array[Filter]
}
// Pushdown агрегатов (Spark 3.2+)
trait SupportsPushDownAggregates extends ScanBuilder {
def pushAggregation(aggregation: Aggregation): Boolean
}
// Pushdown LIMIT
trait SupportsPushDownLimit extends ScanBuilder {
def pushLimit(limit: Int): Boolean
}
// Pushdown LIMIT + ORDER BY (топ-N)
trait SupportsPushDownTopN extends ScanBuilder {
def pushTopN(orders: Array[SortOrder], limit: Int): Boolean
}
Catalyst-правило V2ScanRelationPushDown в spark-sql модуле обходит логический план и, находя узел DataSourceV2Relation, пробует применить pushdown в следующем порядке:
- Aggregate pushdown (
SupportsPushDownAggregates.pushAggregation) - Filter pushdown (
SupportsPushDownFilters.pushFilters) - Column pruning (
SupportsPushDownRequiredColumns.pruneColumns) - TopN pushdown (
SupportsPushDownTopN.pushTopN) - Limit pushdown (
SupportsPushDownLimit.pushLimit)
Каждый pushdown модифицирует внутреннее состояние ScanBuilder. После всех pushdown вызывается build(), который возвращает Scan с уже зафиксированными параметрами.
SupportsPushDownFilters.pushFilters() принимает массив фильтров и должен вернуть массив фильтров, которые не были применены коннектором (post-scan filters). Spark применит их поверх результата. Если вы вернули пустой массив, Spark доверяет коннектору и не перепрогоняет фильтры. Если вернули все — коннектор их проигнорировал, и Spark перепрогонит. Распространённая ошибка: возвращать пустой массив, но фактически не применять фильтры — получаете неправильные результаты.
Scan, Batch, InputPartition: от планирования к выполнению
После ScanBuilder.build() Catalyst получает объект Scan. Это сериализуемый снэпшот всего, что нужно знать для чтения: схема после pruning, список файлов (или иное описание данных), параметры соединения.
trait Scan {
// Схема, которую реально будет читать ридер (после column pruning)
def readSchema(): StructType
// Для batch-чтения
def toBatch(): Batch
// Для micro-batch streaming
def toMicroBatchStream(checkpointLocation: String): MicroBatchStream
// Для continuous streaming
def toContinuousStream(checkpointLocation: String): ContinuousStream
}
Scan создаётся на driver и сериализуется в задачи. Поэтому он должен быть сериализуемым через Java Serialization (или Kryo, если настроено). Держите в Scan только примитивы и сериализуемые коллекции — никаких connection pools, никаких thread-local ресурсов.
Batch — интерфейс разбиения на физические части:
trait Batch {
// Вызывается на driver: планирует партиции
def planInputPartitions(): Array[InputPartition]
// Вызывается на driver, но экземпляр фабрики отправляется на executor
def createReaderFactory(): PartitionReaderFactory
}
planInputPartitions() вызывается один раз на driver. Возвращённый массив InputPartition сериализуется и отправляется executors. Количество партиций определяет degree of parallelism — сколько tasks будет запущено для этого скана.
InputPartition — просто marker-интерфейс:
trait InputPartition extends Serializable
Каждая конкретная реализация несёт данные, специфичные для коннектора: путь к файлу, byte range, Kafka topic + partition + offset range, SQL-запрос с ROWNUM, итд.
PartitionReaderFactory и PartitionReader: чтение на executor
PartitionReaderFactory создаётся на driver (в Batch.createReaderFactory()), сериализуется и отправляется на каждый executor. На executor он создаёт PartitionReader для конкретной партиции:
trait PartitionReaderFactory extends Serializable {
// Создаёт row-based ридер
def createReader(partition: InputPartition): PartitionReader[InternalRow]
// Создаёт columnar ридер (батчи Arrow/Parquet векторов)
// Вызывается только если supportColumnarReads() вернул true
def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] =
throw new UnsupportedOperationException()
def supportColumnarReads(partition: InputPartition): Boolean = false
}
PartitionReader — это простой итератор с resource-management:
trait PartitionReader[T] extends Closeable {
def next(): Boolean // есть ли следующий элемент?
def get(): T // возвращает текущий элемент (InternalRow или ColumnarBatch)
def close(): Unit // освобождает ресурсы
}
Обратите внимание: PartitionReader[InternalRow], а не PartitionReader[Row]. InternalRow — это внутреннее представление Spark, более эффективное, чем публичный Row. При реализации ридера вы либо строите InternalRow вручную через GenericInternalRow, либо используете UnsafeRowWriter из org.apache.spark.sql.catalyst.expressions.codegen.
Как Catalyst видит DSv2-коннектор
С точки зрения Catalyst, подключённый DSv2-коннектор виден как узел DataSourceV2Relation в логическом плане. Этот узел несёт ссылку на Table и опции. Когда Catalyst переходит к физическому планированию, DataSourceV2Strategy преобразует DataSourceV2Relation в BatchScanExec (для batch) или MicroBatchScanExec (для streaming).
BatchScanExec — это физический оператор (SparkPlan), который держит финальный Scan и умеет создавать RDD из него через Batch.planInputPartitions() + Batch.createReaderFactory(). Именно этот оператор передаётся в WholeStageCodegenExec или выполняется как обычный RDD.compute().
== Physical Plan ==
*(1) BatchScan[id#0, name#1, amount#2]
class: com.mycompany.MyConnectorTable
filters: [isnotnull(amount#2), (amount#2 > 100)]
pushed filters: [IsNotNull(amount), GreaterThan(amount,100)]
ReadSchema: struct<id:int,name:string,amount:double>
В этом выводе explain() строки filters и pushed filters показывают: первая — какие фильтры Spark применяет поверх результата (post-scan), вторая — какие были переданы коннектору. Если вы правильно реализовали SupportsPushDownFilters, обе строки должны быть одинаковы.
V1 vs V2: принципиальные различия
| Аспект | DataSource V1 | DataSource V2 |
|---|---|---|
| Интерфейс | BaseRelation + TableScan | TableProvider + Table + ScanBuilder |
| Streaming | Отдельный API (Source/Sink) | Единый (MicroBatchStream, ContinuousStream) |
| Catalog | Нет поддержки | Интегрирован через CatalogTable |
| Pushdown | Ограниченный, без верификации | Декларативный, Spark верифицирует |
| Columnar | Нет | PartitionReaderFactory.createColumnarReader() |
| Транзакции | Нет | SupportsAtomicReplacement, SupportsOverwrite |
| Планирование партиций | RDD (неуправляемый) | planInputPartitions() на driver |
| Сериализация | RDD-based | Явная: InputPartition + Factory |
Как это использует Iceberg
Apache Iceberg — один из самых сложных DSv2-коннекторов. Вот как он использует иерархию интерфейсов:
IcebergSource реализует TableProvider и StagingTableCatalog. При вызове getTable() он обращается к Iceberg catalog (REST, Glue, Hive Metastore) и возвращает SparkTable, который несёт Table — нативный Iceberg-объект с manifest-файлами.
SparkScanBuilder реализует ScanBuilder + SupportsPushDownFilters + SupportsPushDownRequiredColumns + SupportsPushDownAggregates. При pushdown фильтров Iceberg преобразует Spark-фильтры в Iceberg-выражения и применяет их к manifest-файлам ещё на этапе планирования — до того, как любой executor прочитает хоть один байт данных.
SparkBatch.planInputPartitions() возвращает FileScanTask[] — по одной задаче на каждый data file (с учётом delete files и partition pruning). На большой Iceberg-таблице с тысячами файлов это может занять несколько секунд — всё на driver-стороне.
IcebergArrowReader (реализация PartitionReader[ColumnarBatch]) читает Parquet векторизованно через Arrow, возвращая ColumnarBatch вместо строк. Это возможно только через createColumnarReader() — в V1 такой возможности не было.
Попробуй сам
Откройте Spark shell и исследуйте, как Spark видит Iceberg-коннектор (если у вас есть Iceberg-таблица), или более простой DSv2-коннектор — Parquet-сканер:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("dsv2-exploration") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse") \
.getOrCreate()
# Создаём Iceberg-таблицу через DSv2 Catalog
spark.sql("""
CREATE TABLE IF NOT EXISTS local.db.events (
id BIGINT,
event_type STRING,
amount DOUBLE,
ts TIMESTAMP
) USING iceberg
PARTITIONED BY (days(ts))
""")
# Вставляем данные
spark.sql("""
INSERT INTO local.db.events VALUES
(1, 'purchase', 99.9, TIMESTAMP '2024-01-15 10:00:00'),
(2, 'refund', 25.0, TIMESTAMP '2024-01-15 11:00:00'),
(3, 'purchase', 149.9, TIMESTAMP '2024-01-16 09:00:00'),
(4, 'purchase', 299.9, TIMESTAMP '2024-01-16 14:00:00')
""")
# Запрос с предикатом -- смотрим на pushed filters
df = spark.table("local.db.events").filter("amount > 100 AND event_type = 'purchase'")
# Iceberg pushdown в manifest-файлы виден в explain()
df.explain("formatted")
# Вывод покажет BatchScan с полем pushed filters
# pushed filters: [IsNotNull(amount), GreaterThan(amount,100), IsNotNull(event_type), EqualTo(event_type,purchase)]
# Все фильтры переданы Iceberg -- он применяет partition pruning + data file skipping
print("Количество прочитанных файлов (partition stats):")
spark.sql("SELECT * FROM local.db.events.files").show(truncate=False)
Для изучения стека вызовов DSv2 без Iceberg можно включить трассировку планирования:
# Посмотрим на план детально через explain("extended")
df2 = spark.read.parquet("/path/to/data").filter("amount > 100").select("id", "amount")
df2.explain("extended")
# В выводе найдите секцию Physical Plan:
# BatchScan[id#0, amount#2] parquet
# PushedFilters: [IsNotNull(amount), GreaterThan(amount,100.0)]
# PushedFilters применены в Parquet RowGroup-фильтрации через BloomFilter/statistics
Чтобы понять, сколько партиций создаёт коннектор, используйте df.rdd.getNumPartitions() после применения всех pushdown. Для Iceberg эта цифра = количество data files после partition pruning. Если файлов тысячи, а задач нужно меньше — используйте spark.sql.files.maxPartitionBytes (default 128MB) или Iceberg-специфичный read.split.target-size.
Производительность и failure modes
Планирование партиций как bottleneck. planInputPartitions() выполняется на driver однопоточно. Для таблиц с миллионом файлов (большие Iceberg/Delta-таблицы без компакции) это может занять десятки секунд. В production следите за метрикой “Planning time” в Spark UI — если она превышает время выполнения самых первых stage, это сигнал для компакции.
Сериализация InputPartition. Spark сериализует весь массив InputPartition и отправляет его в TaskScheduler. На 100,000 партиций это может быть 10-50MB сериализованных данных. Держите InputPartition компактными — только необходимые поля.
Нереализованный close(). Если PartitionReader.close() выбросит исключение или не закроет ресурс, executor получает resource leak. Задачи при ретрае создадут новый ридер, не закрыв старый. Это классический путь к Too many open files в production. Всегда оборачивайте close() в try-catch.
Pushdown без верификации данных. Если SupportsPushDownFilters.pushFilters() объявил, что применил фильтр (вернул пустой массив), но реально не применил — Spark не перепрогонит фильтр поверх результата. Вы получите неправильные ответы без ошибок. Это тихая корруция данных.
Лабораторная работа
Лабораторная даёт расширить Spark своими руками: вы пишете коннектор на Python Data Source API, регистрируете собственное правило Catalyst и подключаете кастомный каталог. Это превращает интерфейсы DSv2 из урока в работающий код.
cd labs/extending-spark
docker compose up -d
Полное описание и шаги проверки — в labs/extending-spark/README.md.