Библиотека spark-testing-base
Обзор spark-testing-base
spark-testing-base — open-source библиотека от Holden Karau (автора «High Performance Spark»). Она предоставляет базовые классы и utilities для тестирования Spark-приложений:
| Возможность | Описание |
|---|---|
SharedSparkContext | Базовый класс с pre-configured SparkContext |
DataFrameSuiteBase | DataFrame assertion utilities |
StreamingSuiteBase | Utilities для тестирования DStream (legacy) |
RDD comparisons | Сравнение RDD с tolerance для float |
Репозиторий: github.com/holdenk/spark-testing-base
Python API: PySpark тесты
Установка
pip install spark-testing-base
DataFrameSuiteBase
from sparktesting.utils import assertDataFrameEqual
from pyspark.sql import SparkSession
class TestTransformations:
"""Тесты с spark-testing-base utilities."""
@classmethod
def setup_class(cls):
cls.spark = SparkSession.builder \
.master("local[2]") \
.appName("spark-testing-base-demo") \
.getOrCreate()
@classmethod
def teardown_class(cls):
cls.spark.stop()
def test_column_rename(self):
input_df = self.spark.createDataFrame(
[("alice", 25), ("bob", 30)],
["name", "age"]
)
result = input_df.withColumnRenamed("name", "full_name")
expected = self.spark.createDataFrame(
[("alice", 25), ("bob", 30)],
["full_name", "age"]
)
# Сравнение по содержимому (порядок строк не важен)
assertDataFrameEqual(result, expected)
Scala API: ScalaTest интеграция
Spark-testing-base изначально создавалась для Scala + ScalaTest. Если ваша команда пишет Spark на Scala, библиотека предоставляет удобные trait-ы:
// build.sbt
libraryDependencies += "com.holdenkarau" %% "spark-testing-base" % "4.0.0_2.0.0" % Test
// src/test/scala/TransformationSpec.scala
import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.scalatest.funsuite.AnyFunSuite
class TransformationSpec extends AnyFunSuite with DataFrameSuiteBase {
test("filter active users") {
val input = spark.createDataFrame(Seq(
("alice", true),
("bob", false),
("carol", true)
)).toDF("name", "is_active")
val result = input.filter("is_active = true")
val expected = spark.createDataFrame(Seq(
("alice", true),
("carol", true)
)).toDF("name", "is_active")
assertDataFrameEquals(expected, result)
}
}
// Тестирование RDD с SharedSparkContext
import com.holdenkarau.spark.testing.SharedSparkContext
import org.scalatest.funsuite.AnyFunSuite
class RddSpec extends AnyFunSuite with SharedSparkContext {
test("word count") {
val input = sc.parallelize(Seq("hello world", "hello spark"))
val result = input
.flatMap(_.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.collectAsMap()
assert(result("hello") == 2)
assert(result("spark") == 1)
}
}
Scala-примеры в этом уроке — read-only. Они показывают паттерны ScalaTest + spark-testing-base для команд, работающих с Scala. В нашем курсе мы фокусируемся на PySpark, но знание Scala-паттернов полезно при работе в JVM-heavy командах.
Сравнение: spark-testing-base vs нативный pytest
| Аспект | spark-testing-base | Нативный pytest + PySpark |
|---|---|---|
| SparkSession management | Встроенные базовые классы | conftest.py fixture (scope=“session”) |
| DataFrame assertions | assertDataFrameEqual (свой) | assertDataFrameEqual (PySpark 3.5+ встроенный) |
| API стиль | Class-based (наследование) | Function-based (fixtures, markers) |
| Scala support | Отличный (ScalaTest traits) | Не применимо |
| Python support | Базовый | Полный (monkeypatch, parametrize, plugins) |
| Поддержка | Менее активная | Активная (часть PySpark) |
| Зависимости | Дополнительная библиотека | Только pyspark + pytest |
| Spark version support | Может отставать от новых версий | Всегда совместимо |
Когда использовать каждый подход
spark-testing-base подходит, если:
- Проект на Scala с ScalaTest
- Нужны RDD assertions (редко в современных проектах)
- Уже используется в legacy codebase
Нативный pytest рекомендуется для новых проектов, если:
- Проект на PySpark (большинство современных проектов)
- Используете PySpark 3.5+ (встроенный
assertDataFrameEqual) - Хотите использовать всю экосистему pytest (markers, parametrize, fixtures, plugins)
- Не хотите зависеть от сторонней библиотеки
Миграция со spark-testing-base на нативный pytest
Если у вас legacy тесты на spark-testing-base, миграция на нативный pytest проста:
# ДО: spark-testing-base стиль
class TestPipeline:
@classmethod
def setup_class(cls):
cls.spark = SparkSession.builder.master("local[2]").getOrCreate()
def test_transform(self):
df = self.spark.createDataFrame(...)
...
# ПОСЛЕ: нативный pytest стиль
# conftest.py -- один файл на весь проект
# fixture spark доступна автоматически
def test_transform(spark): # spark -- fixture из conftest.py
df = spark.createDataFrame(...)
...
Преимущества миграции:
- Убирается boilerplate (setup_class / teardown_class)
- fixture
sparkдоступна во всех тестовых файлах автоматически - Можно добавить
parametrize,monkeypatch,tmp_pathи другие pytest-фичи
Что дальше?
В следующем уроке изучим Great Expectations — мощный фреймворк для валидации качества данных, который глубоко интегрируется с PySpark.