Learning Platform
Глоссарий Troubleshooting
Урок 12.01 · 30 мин
Продвинутый
DataSource V2TableProviderScanBuilderBatchInputPartitionPartitionReader

DataSource V2 API: Архитектура коннекторов

Apache Spark умеет читать Parquet, JSON, JDBC, Kafka, Iceberg, Delta Lake, Hudi. За каждым из них стоит коннектор. До Spark 2.4 все коннекторы писались через DataSource V1 — API, который прожил непростую жизнь и оброс таким количеством hacky workarounds, что в какой-то момент его стало невозможно расширять без взлома internals.

DataSource V2 (DSv2) — это архитектурная перестройка слоя коннекторов, представленная в Spark 2.3 в экспериментальном виде и стабилизированная начиная со Spark 3.0. В Spark 4.0 DSv2 — единственный правильный способ писать production-коннекторы. Iceberg, Delta Lake, Hudi, Kafka, Cassandra — все они используют именно этот API.

Этот урок — глубокое погружение в архитектуру DSv2: что означает каждый интерфейс, как они связаны, и почему каждое дизайн-решение принято именно так.

Почему V1 провалился

Чтобы понять V2, нужно понять, что сломалось в V1.

DataSource V1 строился вокруг двух трейтов: BaseRelation (описывает таблицу) и TableScan (читает данные). Коннектор возвращал RDD[Row], и Spark не имел возможности “заглянуть внутрь” — он не мог оптимизировать чтение, не мог сделать pushdown предикатов в нетривиальных случаях, не мог управлять партиционированием.

Конкретные проблемы V1:

Нет единого интерфейса для записи и чтения. Коннектор для чтения и коннектор для записи регистрировались по-разному. Добавить атомарный write + read для одного формата (как это нужно Iceberg) было невозможно без грязных хаков.

Нет поддержки streaming. Structured Streaming пришлось добавлять отдельными интерфейсами (Source, Sink), никак не связанными с batch-коннекторами. Один и тот же формат (скажем, CSV) требовал двух совершенно разных реализаций.

Catalog-слепота. V1-коннектор не знал ничего о каталоге. Это значит, partition discovery, schema evolution, таблицы с метаданными — всё делалось в обход Spark, на уровне filesystem или сторонних метастор.

Неуправляемый pushdown. PrunedFilteredScan позволял делать pushdown фильтров, но Spark не мог верифицировать, что коннектор реально применил фильтр — он был вынужден перепрогонять все предикаты поверх результата. Это приводило к двойному чтению и double-evaluation.

DSv2 решает все эти проблемы через четкую иерархию интерфейсов.

Архитектура DSv2: иерархия абстракций

DSv2 состоит из трех слоев: планирования, сканирования и чтения.

Иерархия интерфейсов DataSource V2
TableProviderTableProvider -- точка входа. Spark вызывает inferSchema() и getTable() для получения метаданных. Обычно это класс с именем, совпадающим с 'format' в spark.read.format(...)
getTable(schema, partitioning, options)
TableTable -- описание таблицы: имя, схема, разделение, capabilities. Реализует SupportsRead и/или SupportsWrite в зависимости от того, что поддерживает коннектор
newScanBuilder()
newWriteBuilder()
ScanBuilderScanBuilder -- фаза планирования чтения. Здесь происходит pushdown: фильтры, колонки, агрегаты, лимит. build() возвращает финальный Scan
WriteBuilderWriteBuilder -- фаза планирования записи. buildForBatch() или buildForStreaming() возвращают BatchWrite/StreamingWrite
build()
ScanScan -- сериализуемое описание скана. Возвращает readSchema() (после pushdown), toBatch() или toMicroBatchStream(). Сериализуется и передается на executors
toBatch()
BatchBatch -- на driver: planInputPartitions() делит данные на части. На executor: createReaderFactory() создаёт фабрику ридеров
planInputPartitions()
createReaderFactory()
InputPartition[]InputPartition -- сериализуемое описание одной партиции: путь к файлу, offset range, часть HTTP-запроса. Сериализуется и отправляется на конкретный executor
PartitionReaderFactoryPartitionReaderFactory -- фабрика ридеров на стороне executor. createReader(partition) или createColumnarReader(partition). Один экземпляр фабрики на Task
createReader(partition)
PartitionReader[T]PartitionReader -- читает данные одной партиции. next()/get() итерирует по InternalRow или ColumnarBatch. Живёт в одном Task на одном executor

TableProvider: точка входа

TableProvider — это интерфейс, который Spark находит по имени формата через ServiceLoader. Когда вы пишете spark.read.format("myformat"), Spark ищет класс, зарегистрированный под этим именем в META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (или через полное имя класса).

// org.apache.spark.sql.connector.catalog.TableProvider
trait TableProvider {
  // Вывод схемы из options (без partition info)
  def inferSchema(options: CaseInsensitiveStringMap): StructType

  // Вывод разделения (для partition-aware форматов)
  def inferPartitioning(options: CaseInsensitiveStringMap): Array[Transform] =
    Array.empty

  // Главный метод: создаёт объект Table
  // schema + partitioning могут быть заданы явно (CREATE TABLE) или выведены
  def getTable(
    schema: StructType,
    partitioning: Array[Transform],
    properties: util.Map[String, String]): Table

  // Может ли коннектор работать без явной схемы?
  def supportsExternalMetadata(): Boolean = false
}

CaseInsensitiveStringMap передаёт все опции, указанные в .option("key", "value") или в TBLPROPERTIES. Если supportsExternalMetadata() возвращает false, Spark вызовет inferSchema перед getTable. Если true — коннектор сам знает схему из своих метаданных (как Iceberg, у которого схема в manifest-файлах).

Table и Capabilities

Table — это центральный объект, описывающий источник данных:

// org.apache.spark.sql.connector.catalog.Table
trait Table {
  def name(): String
  def schema(): StructType
  def partitioning(): Array[Transform]
  def properties(): util.Map[String, String]

  // Что умеет этот коннектор?
  def capabilities(): util.Set[TableCapability]
}

TableCapability — это enum-подобный набор флагов, объявляющий возможности коннектора. Ключевые значения:

CapabilityЗначение
BATCH_READПоддерживает batch-чтение
BATCH_WRITEПоддерживает batch-запись
STREAMING_READПоддерживает micro-batch streaming
STREAMING_WRITEПоддерживает streaming-запись
TRUNCATEПоддерживает TRUNCATE TABLE
OVERWRITE_BY_FILTERПоддерживает INSERT OVERWRITE с предикатом
OVERWRITE_DYNAMICПоддерживает динамический overwrite партиций
ACCEPT_ANY_SCHEMAПринимает любую схему (для sink-only коннекторов)

Объявляя capabilities, коннектор говорит Spark, какие операции он поддерживает. Если коннектор не объявит BATCH_WRITE, попытка записи через df.write.format("myformat").save(...) выбросит AnalysisException на этапе анализа плана — до того, как будет создана хоть одна задача.

Для коннекторов, поддерживающих чтение и запись, Table должен реализовывать SupportsRead и SupportsWrite:

trait SupportsRead extends Table {
  def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
}

trait SupportsWrite extends Table {
  def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder
}

ScanBuilder и фаза планирования

ScanBuilder — это объект на driver-стороне, живущий во время планирования запроса. Catalyst вызывает методы pushdown на ScanBuilder и только потом вызывает build(). Именно здесь происходит ключевая оптимизация DSv2.

// Минимальная реализация
trait ScanBuilder {
  def build(): Scan
}

// Pushdown колонок (column pruning)
trait SupportsPushDownRequiredColumns extends ScanBuilder {
  def pruneColumns(requiredSchema: StructType): Unit
}

// Pushdown фильтров
trait SupportsPushDownFilters extends ScanBuilder {
  def pushFilters(filters: Array[Filter]): Array[Filter]
  def pushedFilters(): Array[Filter]
}

// Pushdown агрегатов (Spark 3.2+)
trait SupportsPushDownAggregates extends ScanBuilder {
  def pushAggregation(aggregation: Aggregation): Boolean
}

// Pushdown LIMIT
trait SupportsPushDownLimit extends ScanBuilder {
  def pushLimit(limit: Int): Boolean
}

// Pushdown LIMIT + ORDER BY (топ-N)
trait SupportsPushDownTopN extends ScanBuilder {
  def pushTopN(orders: Array[SortOrder], limit: Int): Boolean
}

Catalyst-правило V2ScanRelationPushDown в spark-sql модуле обходит логический план и, находя узел DataSourceV2Relation, пробует применить pushdown в следующем порядке:

  1. Aggregate pushdown (SupportsPushDownAggregates.pushAggregation)
  2. Filter pushdown (SupportsPushDownFilters.pushFilters)
  3. Column pruning (SupportsPushDownRequiredColumns.pruneColumns)
  4. TopN pushdown (SupportsPushDownTopN.pushTopN)
  5. Limit pushdown (SupportsPushDownLimit.pushLimit)

Каждый pushdown модифицирует внутреннее состояние ScanBuilder. После всех pushdown вызывается build(), который возвращает Scan с уже зафиксированными параметрами.

WARNING

SupportsPushDownFilters.pushFilters() принимает массив фильтров и должен вернуть массив фильтров, которые не были применены коннектором (post-scan filters). Spark применит их поверх результата. Если вы вернули пустой массив, Spark доверяет коннектору и не перепрогоняет фильтры. Если вернули все — коннектор их проигнорировал, и Spark перепрогонит. Распространённая ошибка: возвращать пустой массив, но фактически не применять фильтры — получаете неправильные результаты.

Scan, Batch, InputPartition: от планирования к выполнению

После ScanBuilder.build() Catalyst получает объект Scan. Это сериализуемый снэпшот всего, что нужно знать для чтения: схема после pruning, список файлов (или иное описание данных), параметры соединения.

trait Scan {
  // Схема, которую реально будет читать ридер (после column pruning)
  def readSchema(): StructType

  // Для batch-чтения
  def toBatch(): Batch

  // Для micro-batch streaming
  def toMicroBatchStream(checkpointLocation: String): MicroBatchStream

  // Для continuous streaming
  def toContinuousStream(checkpointLocation: String): ContinuousStream
}

Scan создаётся на driver и сериализуется в задачи. Поэтому он должен быть сериализуемым через Java Serialization (или Kryo, если настроено). Держите в Scan только примитивы и сериализуемые коллекции — никаких connection pools, никаких thread-local ресурсов.

Batch — интерфейс разбиения на физические части:

trait Batch {
  // Вызывается на driver: планирует партиции
  def planInputPartitions(): Array[InputPartition]

  // Вызывается на driver, но экземпляр фабрики отправляется на executor
  def createReaderFactory(): PartitionReaderFactory
}

planInputPartitions() вызывается один раз на driver. Возвращённый массив InputPartition сериализуется и отправляется executors. Количество партиций определяет degree of parallelism — сколько tasks будет запущено для этого скана.

InputPartition — просто marker-интерфейс:

trait InputPartition extends Serializable

Каждая конкретная реализация несёт данные, специфичные для коннектора: путь к файлу, byte range, Kafka topic + partition + offset range, SQL-запрос с ROWNUM, итд.

PartitionReaderFactory и PartitionReader: чтение на executor

PartitionReaderFactory создаётся на driver (в Batch.createReaderFactory()), сериализуется и отправляется на каждый executor. На executor он создаёт PartitionReader для конкретной партиции:

trait PartitionReaderFactory extends Serializable {
  // Создаёт row-based ридер
  def createReader(partition: InputPartition): PartitionReader[InternalRow]

  // Создаёт columnar ридер (батчи Arrow/Parquet векторов)
  // Вызывается только если supportColumnarReads() вернул true
  def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] =
    throw new UnsupportedOperationException()

  def supportColumnarReads(partition: InputPartition): Boolean = false
}

PartitionReader — это простой итератор с resource-management:

trait PartitionReader[T] extends Closeable {
  def next(): Boolean     // есть ли следующий элемент?
  def get(): T            // возвращает текущий элемент (InternalRow или ColumnarBatch)
  def close(): Unit       // освобождает ресурсы
}

Обратите внимание: PartitionReader[InternalRow], а не PartitionReader[Row]. InternalRow — это внутреннее представление Spark, более эффективное, чем публичный Row. При реализации ридера вы либо строите InternalRow вручную через GenericInternalRow, либо используете UnsafeRowWriter из org.apache.spark.sql.catalyst.expressions.codegen.

Жизненный цикл объектов DSv2: driver vs executor
DRIVERПланирование и разбиениеВсё на этом уровне выполняется в SparkContext/SparkSession процессе. Никаких сетевых вызовов к данным -- только метаданные
getTable()TableProvider.getTable(): Spark вызывает это один раз при анализе плана. Здесь происходит подключение к метастору, чтение схемы, проверка прав доступа
ScanBuilderScanBuilder + pushdown: Catalyst применяет все pushdown оптимизации. После этого -- build(), получаем финальный Scan с зафиксированными параметрами
planInputPartitions()Batch.planInputPartitions(): Коннектор решает на сколько частей разбить данные. Результат -- массив InputPartition, который будет сериализован и отправлен executors
сериализация InputPartition[] + PartitionReaderFactory
EXECUTOR (N параллельных tasks)Чтение данныхКаждый executor получает свою InputPartition и экземпляр PartitionReaderFactory. Чтение происходит параллельно
createReader()PartitionReaderFactory.createReader(partition): создаёт ридер для конкретной партиции. Здесь открываются соединения, файловые хэндлы, HTTP-сессии
next() / get()PartitionReader.next()/get(): итерирует строки или батчи. Каждый get() возвращает InternalRow -- не делайте new InternalRow на каждый вызов, переиспользуйте объект (mutable row)
close()PartitionReader.close(): вызывается Spark в finally-блоке после исчерпания данных или при ошибке. Здесь закрываем соединения, освобождаем ресурсы

Как Catalyst видит DSv2-коннектор

С точки зрения Catalyst, подключённый DSv2-коннектор виден как узел DataSourceV2Relation в логическом плане. Этот узел несёт ссылку на Table и опции. Когда Catalyst переходит к физическому планированию, DataSourceV2Strategy преобразует DataSourceV2Relation в BatchScanExec (для batch) или MicroBatchScanExec (для streaming).

BatchScanExec — это физический оператор (SparkPlan), который держит финальный Scan и умеет создавать RDD из него через Batch.planInputPartitions() + Batch.createReaderFactory(). Именно этот оператор передаётся в WholeStageCodegenExec или выполняется как обычный RDD.compute().

== Physical Plan ==
*(1) BatchScan[id#0, name#1, amount#2]
     class: com.mycompany.MyConnectorTable
     filters: [isnotnull(amount#2), (amount#2 > 100)]
     pushed filters: [IsNotNull(amount), GreaterThan(amount,100)]
     ReadSchema: struct<id:int,name:string,amount:double>

В этом выводе explain() строки filters и pushed filters показывают: первая — какие фильтры Spark применяет поверх результата (post-scan), вторая — какие были переданы коннектору. Если вы правильно реализовали SupportsPushDownFilters, обе строки должны быть одинаковы.

V1 vs V2: принципиальные различия

АспектDataSource V1DataSource V2
ИнтерфейсBaseRelation + TableScanTableProvider + Table + ScanBuilder
StreamingОтдельный API (Source/Sink)Единый (MicroBatchStream, ContinuousStream)
CatalogНет поддержкиИнтегрирован через CatalogTable
PushdownОграниченный, без верификацииДекларативный, Spark верифицирует
ColumnarНетPartitionReaderFactory.createColumnarReader()
ТранзакцииНетSupportsAtomicReplacement, SupportsOverwrite
Планирование партицийRDD (неуправляемый)planInputPartitions() на driver
СериализацияRDD-basedЯвная: InputPartition + Factory

Как это использует Iceberg

Apache Iceberg — один из самых сложных DSv2-коннекторов. Вот как он использует иерархию интерфейсов:

IcebergSource реализует TableProvider и StagingTableCatalog. При вызове getTable() он обращается к Iceberg catalog (REST, Glue, Hive Metastore) и возвращает SparkTable, который несёт Table — нативный Iceberg-объект с manifest-файлами.

SparkScanBuilder реализует ScanBuilder + SupportsPushDownFilters + SupportsPushDownRequiredColumns + SupportsPushDownAggregates. При pushdown фильтров Iceberg преобразует Spark-фильтры в Iceberg-выражения и применяет их к manifest-файлам ещё на этапе планирования — до того, как любой executor прочитает хоть один байт данных.

SparkBatch.planInputPartitions() возвращает FileScanTask[] — по одной задаче на каждый data file (с учётом delete files и partition pruning). На большой Iceberg-таблице с тысячами файлов это может занять несколько секунд — всё на driver-стороне.

IcebergArrowReader (реализация PartitionReader[ColumnarBatch]) читает Parquet векторизованно через Arrow, возвращая ColumnarBatch вместо строк. Это возможно только через createColumnarReader() — в V1 такой возможности не было.

Попробуй сам

Откройте Spark shell и исследуйте, как Spark видит Iceberg-коннектор (если у вас есть Iceberg-таблица), или более простой DSv2-коннектор — Parquet-сканер:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("dsv2-exploration") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-warehouse") \
    .getOrCreate()

# Создаём Iceberg-таблицу через DSv2 Catalog
spark.sql("""
    CREATE TABLE IF NOT EXISTS local.db.events (
        id BIGINT,
        event_type STRING,
        amount DOUBLE,
        ts TIMESTAMP
    ) USING iceberg
    PARTITIONED BY (days(ts))
""")

# Вставляем данные
spark.sql("""
    INSERT INTO local.db.events VALUES
    (1, 'purchase', 99.9, TIMESTAMP '2024-01-15 10:00:00'),
    (2, 'refund', 25.0, TIMESTAMP '2024-01-15 11:00:00'),
    (3, 'purchase', 149.9, TIMESTAMP '2024-01-16 09:00:00'),
    (4, 'purchase', 299.9, TIMESTAMP '2024-01-16 14:00:00')
""")

# Запрос с предикатом -- смотрим на pushed filters
df = spark.table("local.db.events").filter("amount > 100 AND event_type = 'purchase'")

# Iceberg pushdown в manifest-файлы виден в explain()
df.explain("formatted")

# Вывод покажет BatchScan с полем pushed filters
# pushed filters: [IsNotNull(amount), GreaterThan(amount,100), IsNotNull(event_type), EqualTo(event_type,purchase)]
# Все фильтры переданы Iceberg -- он применяет partition pruning + data file skipping
print("Количество прочитанных файлов (partition stats):")
spark.sql("SELECT * FROM local.db.events.files").show(truncate=False)

Для изучения стека вызовов DSv2 без Iceberg можно включить трассировку планирования:

# Посмотрим на план детально через explain("extended")
df2 = spark.read.parquet("/path/to/data").filter("amount > 100").select("id", "amount")
df2.explain("extended")

# В выводе найдите секцию Physical Plan:
# BatchScan[id#0, amount#2] parquet
#   PushedFilters: [IsNotNull(amount), GreaterThan(amount,100.0)]
#   PushedFilters применены в Parquet RowGroup-фильтрации через BloomFilter/statistics
TIP

Чтобы понять, сколько партиций создаёт коннектор, используйте df.rdd.getNumPartitions() после применения всех pushdown. Для Iceberg эта цифра = количество data files после partition pruning. Если файлов тысячи, а задач нужно меньше — используйте spark.sql.files.maxPartitionBytes (default 128MB) или Iceberg-специфичный read.split.target-size.

Производительность и failure modes

Планирование партиций как bottleneck. planInputPartitions() выполняется на driver однопоточно. Для таблиц с миллионом файлов (большие Iceberg/Delta-таблицы без компакции) это может занять десятки секунд. В production следите за метрикой “Planning time” в Spark UI — если она превышает время выполнения самых первых stage, это сигнал для компакции.

Сериализация InputPartition. Spark сериализует весь массив InputPartition и отправляет его в TaskScheduler. На 100,000 партиций это может быть 10-50MB сериализованных данных. Держите InputPartition компактными — только необходимые поля.

Нереализованный close(). Если PartitionReader.close() выбросит исключение или не закроет ресурс, executor получает resource leak. Задачи при ретрае создадут новый ридер, не закрыв старый. Это классический путь к Too many open files в production. Всегда оборачивайте close() в try-catch.

Pushdown без верификации данных. Если SupportsPushDownFilters.pushFilters() объявил, что применил фильтр (вернул пустой массив), но реально не применил — Spark не перепрогонит фильтр поверх результата. Вы получите неправильные ответы без ошибок. Это тихая корруция данных.

Apache Iceberg: архитектура таблиц
Проверка знанийKnowledge check
Коннектор реализует SupportsPushDownFilters и в pushFilters() применяет все переданные фильтры к SQL-запросу к внешней БД. Метод возвращает пустой массив. В ходе тестирования вы обнаруживаете, что запрос вида df.filter("amount > 100").count() иногда возвращает строки с amount = 100 (включительно). В чём проблема и как её диагностировать?
ОтветAnswer
Проблема: коннектор неправильно транслирует Spark-фильтр GreaterThan(amount, 100) в SQL. Скорее всего, используется >= вместо >. Поскольку pushFilters() вернул пустой массив, Spark не перепрогоняет фильтр и доверяет коннектору -- тихая ошибка, не исключение. Диагностика: 1) Включите логирование SQL-запросов на уровне коннектора -- посмотрите, что реально отправляется в БД. 2) Временно верните из pushFilters() все фильтры обратно (не применяйте pushdown) -- если результат изменился, проблема в трансляции фильтра. 3) Используйте df.explain("formatted") и найдите строку "Filters" vs "PushedFilters" -- если Filters пусто, Spark не перепрогоняет. 4) Добавьте явный post-scan фильтр: df.filter("amount > 100").filter("amount > 100").count() -- второй filter применит Spark, и цифра изменится. Исправление: найдите в коде маппинг Spark-фильтров в SQL и исправьте GreaterThan -> ">", убедитесь что есть unit-тест для каждого типа фильтра (EqualTo, GreaterThan, LessThan, In, IsNull, IsNotNull, Not, And, Or).

Лабораторная работа

Лабораторная даёт расширить Spark своими руками: вы пишете коннектор на Python Data Source API, регистрируете собственное правило Catalyst и подключаете кастомный каталог. Это превращает интерфейсы DSv2 из урока в работающий код.

cd labs/extending-spark
docker compose up -d

Полное описание и шаги проверки — в labs/extending-spark/README.md.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. ScanBuilder.pushFilters() принял массив [EqualTo(category, 'A'), Contains(name, 'Pro')] и вернул [Contains(name, 'Pro')]. Что произойдёт с фильтром EqualTo(category, 'A') при выполнении запроса?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5