Learning Platform
Глоссарий Troubleshooting
Урок 11.02 · 12 мин
Средний
Integration TestingTestcontainerspytestCI/CDTest Pyramid

Интеграционное тестирование Spark-приложений

Unit vs Integration: где граница?

В предыдущем уроке мы тестировали чистые функции — трансформации DataFrame без внешних зависимостей. Но production pipeline взаимодействует с реальными системами:

АспектUnit-тестIntegration-тест
Внешние системыНет (mock/fixture)Да (реальные или контейнеры)
СкоростьСекундыМинуты
Что проверяетЛогика трансформацииСвязка Spark + система
SparkSessionМинимальная конфигурацияРеальная конфигурация
Запуск в CIКаждый commitПри merge/nightly

Интеграционные тесты проверяют:

  • Чтение из реальных источников — Kafka, PostgreSQL, S3-compatible storage
  • Запись в реальные sinks — Delta Lake, Iceberg, JDBC
  • Catalog операции — создание таблиц, temporary views, schema evolution
  • Window functions и агрегации — проверка корректности на данных с edge cases

Тестирование с реальным SparkSession

Catalog операции

def test_create_and_query_temp_view(spark):
    data = [("alice", "engineering", 95000),
            ("bob", "engineering", 87000),
            ("carol", "marketing", 72000)]
    df = spark.createDataFrame(data, ["name", "dept", "salary"])
    df.createOrReplaceTempView("employees")

    result = spark.sql("""
        SELECT dept, AVG(salary) as avg_salary
        FROM employees
        GROUP BY dept
        ORDER BY avg_salary DESC
    """)

    rows = result.collect()
    assert rows[0].dept == "engineering"
    assert rows[0].avg_salary == 91000.0

Window functions

Window functions — частый источник багов. Тестируйте edge cases:

from pyspark.sql.functions import row_number, dense_rank
from pyspark.sql.window import Window


def test_window_ranking_with_ties(spark):
    """Проверяем поведение dense_rank при одинаковых значениях."""
    data = [("alice", 100), ("bob", 100), ("carol", 90)]
    df = spark.createDataFrame(data, ["name", "score"])

    window = Window.orderBy(df.score.desc())
    result = df.withColumn("rank", dense_rank().over(window))

    ranks = {row.name: row.rank for row in result.collect()}

    # alice и bob -- rank 1 (одинаковый score)
    assert ranks["alice"] == 1
    assert ranks["bob"] == 1
    # carol -- rank 2 (не 3! dense_rank не пропускает)
    assert ranks["carol"] == 2

Testcontainers: реальные сервисы в контейнерах

Testcontainers — библиотека, которая запускает Docker-контейнеры для тестов. Вместо mock — реальный PostgreSQL, Kafka, MinIO:

pip install testcontainers[postgres,kafka]

PostgreSQL + JDBC

import pytest

from testcontainers.postgres import PostgresContainer


@pytest.fixture(scope="module")
def postgres():
    """Запускает PostgreSQL в Docker для тестов."""
    with PostgresContainer("postgres:16") as pg:
        yield pg


def test_spark_reads_from_postgres(spark, postgres):
    # Подготовка: создаём таблицу и данные
    import psycopg2
    conn = psycopg2.connect(postgres.get_connection_url())
    cur = conn.cursor()
    cur.execute("""
        CREATE TABLE orders (
            id SERIAL PRIMARY KEY,
            customer VARCHAR(100),
            amount DECIMAL(10,2)
        )
    """)
    cur.execute("INSERT INTO orders (customer, amount) VALUES ('alice', 150.00)")
    cur.execute("INSERT INTO orders (customer, amount) VALUES ('bob', 230.50)")
    conn.commit()
    conn.close()

    # Тест: Spark читает из PostgreSQL
    jdbc_url = postgres.get_connection_url().replace("postgresql", "jdbc:postgresql")
    df = spark.read \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "orders") \
        .option("user", postgres.username) \
        .option("password", postgres.password) \
        .option("driver", "org.postgresql.Driver") \
        .load()

    assert df.count() == 2
    total = df.agg({"amount": "sum"}).collect()[0][0]
    assert float(total) == 380.50

Kafka

from testcontainers.kafka import KafkaContainer


@pytest.fixture(scope="module")
def kafka():
    with KafkaContainer("confluentinc/cp-kafka:7.6.0") as k:
        yield k


def test_spark_reads_from_kafka(spark, kafka):
    from kafka import KafkaProducer
    import json

    # Пишем сообщения в Kafka
    producer = KafkaProducer(
        bootstrap_servers=kafka.get_bootstrap_server(),
        value_serializer=lambda v: json.dumps(v).encode("utf-8")
    )
    for i in range(10):
        producer.send("test-topic", {"id": i, "value": f"msg-{i}"})
    producer.flush()

    # Spark читает из Kafka (batch mode для тестов)
    df = spark.read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka.get_bootstrap_server()) \
        .option("subscribe", "test-topic") \
        .option("startingOffsets", "earliest") \
        .load()

    assert df.count() == 10
WARNING

Testcontainers требует Docker. Убедитесь, что Docker daemon запущен на CI-машине. В GitHub Actions используйте runner с Docker support (ubuntu-latest). В корпоративных CI проверьте политику Docker-in-Docker.

pytest markers: разделение тестов

Интеграционные тесты медленнее unit-тестов. Разделяйте их с помощью pytest markers:

# conftest.py или pyproject.toml
# [tool.pytest.ini_options]
# markers = [
#     "integration: marks tests as integration (deselect with '-m \"not integration\"')",
#     "slow: marks tests as slow (deselect with '-m \"not slow\"')",
# ]
import pytest


@pytest.mark.integration
def test_postgres_read(spark, postgres):
    """Интеграционный тест -- требует Docker."""
    ...


@pytest.mark.slow
def test_large_dataset_aggregation(spark):
    """Медленный тест -- запускать только в nightly CI."""
    ...

Запуск:

# Только быстрые unit-тесты
pytest tests/ -m "not integration and not slow"

# Только интеграционные
pytest tests/ -m "integration"

# Всё (полный прогон)
pytest tests/

CI/CD тестовая пирамида

Тестовая пирамида для Spark pipeline:

Пирамида тестирования
E2E (rare)
Полный pipeline на реальном кластере
Запуск: вручную / release
Integration (module)
Spark + DB/Kafka/S3
Запуск: merge to main / nightly
Unit (fast)
Чистые функции, local SparkSession
Запуск: каждый commit / pre-commit

GitHub Actions пример

# .github/workflows/spark-tests.yml
name: Spark Tests

on: [push, pull_request]

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - uses: actions/setup-java@v4
        with:
          java-version: "17"
          distribution: "temurin"
      - run: pip install pyspark==4.0.0 pytest pytest-cov
      - run: pytest tests/ -m "not integration" --cov=src

  integration-tests:
    runs-on: ubuntu-latest
    if: github.event_name == 'pull_request'
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - uses: actions/setup-java@v4
        with:
          java-version: "17"
          distribution: "temurin"
      - run: pip install pyspark==4.0.0 pytest testcontainers[postgres,kafka]
      - run: pytest tests/ -m "integration"
Проверка знанийKnowledge check
Когда использовать Testcontainers вместо mock для внешних источников? Приведите 2 сценария.
ОтветAnswer
Testcontainers нужны, когда mock не покрывает реальное поведение: (1) Тестирование JDBC чтения с реальным PostgreSQL -- проверяет SQL pushdown, типы данных, connection pooling, что невозможно с mock. (2) Тестирование Kafka consumer с реальным брокером -- проверяет сериализацию, offset management, consumer group поведение. Mock подходит для unit-тестов трансформаций, но integration-тесты с Testcontainers ловят баги на границе Spark и внешних систем (неправильные типы, encoding, network timeouts).
Проверка знанийKnowledge check
Как организовать тестовую пирамиду для Spark pipeline? Какие тесты запускать на каждом этапе?
ОтветAnswer
Тестовая пирамида: (1) Unit-тесты (основание, 70-80% тестов) -- чистые функции-трансформации с local SparkSession, запуск на каждый commit, секунды. (2) Integration-тесты (середина, 15-25%) -- Spark + реальные системы через Testcontainers, запуск при merge/PR или nightly, минуты. (3) E2E-тесты (вершина, 5-10%) -- полный pipeline на реальном кластере, запуск при release или вручную. Разделение через pytest markers: -m 'not integration' для быстрого CI, полный прогон на merge.

Что дальше?

В следующем уроке рассмотрим библиотеку spark-testing-base — готовые utilities для тестирования Spark, их сравнение с нативным pytest-подходом.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём ключевое отличие интеграционного теста от unit-теста для Spark?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8