Интеграционное тестирование Spark-приложений
Unit vs Integration: где граница?
В предыдущем уроке мы тестировали чистые функции — трансформации DataFrame без внешних зависимостей. Но production pipeline взаимодействует с реальными системами:
| Аспект | Unit-тест | Integration-тест |
|---|---|---|
| Внешние системы | Нет (mock/fixture) | Да (реальные или контейнеры) |
| Скорость | Секунды | Минуты |
| Что проверяет | Логика трансформации | Связка Spark + система |
| SparkSession | Минимальная конфигурация | Реальная конфигурация |
| Запуск в CI | Каждый commit | При merge/nightly |
Интеграционные тесты проверяют:
- Чтение из реальных источников — Kafka, PostgreSQL, S3-compatible storage
- Запись в реальные sinks — Delta Lake, Iceberg, JDBC
- Catalog операции — создание таблиц, temporary views, schema evolution
- Window functions и агрегации — проверка корректности на данных с edge cases
Тестирование с реальным SparkSession
Catalog операции
def test_create_and_query_temp_view(spark):
data = [("alice", "engineering", 95000),
("bob", "engineering", 87000),
("carol", "marketing", 72000)]
df = spark.createDataFrame(data, ["name", "dept", "salary"])
df.createOrReplaceTempView("employees")
result = spark.sql("""
SELECT dept, AVG(salary) as avg_salary
FROM employees
GROUP BY dept
ORDER BY avg_salary DESC
""")
rows = result.collect()
assert rows[0].dept == "engineering"
assert rows[0].avg_salary == 91000.0
Window functions
Window functions — частый источник багов. Тестируйте edge cases:
from pyspark.sql.functions import row_number, dense_rank
from pyspark.sql.window import Window
def test_window_ranking_with_ties(spark):
"""Проверяем поведение dense_rank при одинаковых значениях."""
data = [("alice", 100), ("bob", 100), ("carol", 90)]
df = spark.createDataFrame(data, ["name", "score"])
window = Window.orderBy(df.score.desc())
result = df.withColumn("rank", dense_rank().over(window))
ranks = {row.name: row.rank for row in result.collect()}
# alice и bob -- rank 1 (одинаковый score)
assert ranks["alice"] == 1
assert ranks["bob"] == 1
# carol -- rank 2 (не 3! dense_rank не пропускает)
assert ranks["carol"] == 2
Testcontainers: реальные сервисы в контейнерах
Testcontainers — библиотека, которая запускает Docker-контейнеры для тестов. Вместо mock — реальный PostgreSQL, Kafka, MinIO:
pip install testcontainers[postgres,kafka]
PostgreSQL + JDBC
import pytest
from testcontainers.postgres import PostgresContainer
@pytest.fixture(scope="module")
def postgres():
"""Запускает PostgreSQL в Docker для тестов."""
with PostgresContainer("postgres:16") as pg:
yield pg
def test_spark_reads_from_postgres(spark, postgres):
# Подготовка: создаём таблицу и данные
import psycopg2
conn = psycopg2.connect(postgres.get_connection_url())
cur = conn.cursor()
cur.execute("""
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer VARCHAR(100),
amount DECIMAL(10,2)
)
""")
cur.execute("INSERT INTO orders (customer, amount) VALUES ('alice', 150.00)")
cur.execute("INSERT INTO orders (customer, amount) VALUES ('bob', 230.50)")
conn.commit()
conn.close()
# Тест: Spark читает из PostgreSQL
jdbc_url = postgres.get_connection_url().replace("postgresql", "jdbc:postgresql")
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "orders") \
.option("user", postgres.username) \
.option("password", postgres.password) \
.option("driver", "org.postgresql.Driver") \
.load()
assert df.count() == 2
total = df.agg({"amount": "sum"}).collect()[0][0]
assert float(total) == 380.50
Kafka
from testcontainers.kafka import KafkaContainer
@pytest.fixture(scope="module")
def kafka():
with KafkaContainer("confluentinc/cp-kafka:7.6.0") as k:
yield k
def test_spark_reads_from_kafka(spark, kafka):
from kafka import KafkaProducer
import json
# Пишем сообщения в Kafka
producer = KafkaProducer(
bootstrap_servers=kafka.get_bootstrap_server(),
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
for i in range(10):
producer.send("test-topic", {"id": i, "value": f"msg-{i}"})
producer.flush()
# Spark читает из Kafka (batch mode для тестов)
df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka.get_bootstrap_server()) \
.option("subscribe", "test-topic") \
.option("startingOffsets", "earliest") \
.load()
assert df.count() == 10
Testcontainers требует Docker. Убедитесь, что Docker daemon запущен на CI-машине. В GitHub Actions используйте runner с Docker support (ubuntu-latest). В корпоративных CI проверьте политику Docker-in-Docker.
pytest markers: разделение тестов
Интеграционные тесты медленнее unit-тестов. Разделяйте их с помощью pytest markers:
# conftest.py или pyproject.toml
# [tool.pytest.ini_options]
# markers = [
# "integration: marks tests as integration (deselect with '-m \"not integration\"')",
# "slow: marks tests as slow (deselect with '-m \"not slow\"')",
# ]
import pytest
@pytest.mark.integration
def test_postgres_read(spark, postgres):
"""Интеграционный тест -- требует Docker."""
...
@pytest.mark.slow
def test_large_dataset_aggregation(spark):
"""Медленный тест -- запускать только в nightly CI."""
...
Запуск:
# Только быстрые unit-тесты
pytest tests/ -m "not integration and not slow"
# Только интеграционные
pytest tests/ -m "integration"
# Всё (полный прогон)
pytest tests/
CI/CD тестовая пирамида
Тестовая пирамида для Spark pipeline:
GitHub Actions пример
# .github/workflows/spark-tests.yml
name: Spark Tests
on: [push, pull_request]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- uses: actions/setup-java@v4
with:
java-version: "17"
distribution: "temurin"
- run: pip install pyspark==4.0.0 pytest pytest-cov
- run: pytest tests/ -m "not integration" --cov=src
integration-tests:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- uses: actions/setup-java@v4
with:
java-version: "17"
distribution: "temurin"
- run: pip install pyspark==4.0.0 pytest testcontainers[postgres,kafka]
- run: pytest tests/ -m "integration"
Что дальше?
В следующем уроке рассмотрим библиотеку spark-testing-base — готовые utilities для тестирования Spark, их сравнение с нативным pytest-подходом.