Learning Platform
Глоссарий Troubleshooting
Урок 03.01 · 12 мин
Начальный
DataFrameSchemaStructTypeLazy EvaluationcreateDataFrame

DataFrames: Создание и схема

Что такое DataFrame?

DataFrame — это распределённая коллекция данных, организованная в именованные столбцы. Если вы работали с pandas, концепция знакома: таблица с типизированными колонками. Но в отличие от pandas DataFrame, Spark DataFrame распределён по кластеру и может обрабатывать терабайты данных.

DataFrame построен поверх RDD (Resilient Distributed Dataset), но предоставляет высокоуровневый API, который Catalyst optimizer может оптимизировать. Именно поэтому DataFrame API — основной способ работы с данными в Spark начиная с версии 2.0.

Создание DataFrame

Из списка кортежей

Самый простой способ — передать список кортежей и список имён колонок:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demo").getOrCreate()

data = [
    ("Alice", "Engineering", 75000),
    ("Bob", "Marketing", 55000),
    ("Carol", "Engineering", 80000),
]

df = spark.createDataFrame(data, ["name", "department", "salary"])
df.show()
+-----+-----------+------+
| name| department|salary|
+-----+-----------+------+
|Alice|Engineering| 75000|
|  Bob|  Marketing| 55000|
|Carol|Engineering| 80000|
+-----+-----------+------+

Из списка словарей

Словари автоматически определяют имена колонок:

data = [
    {"name": "Alice", "age": 30, "city": "Moscow"},
    {"name": "Bob", "age": 25, "city": "SPb"},
]

df = spark.createDataFrame(data)
df.printSchema()
root
 |-- age: long (nullable = true)
 |-- city: string (nullable = true)
 |-- name: string (nullable = true)

Из объектов Row

Row позволяет создавать типизированные строки:

from pyspark.sql import Row

Employee = Row("name", "department", "salary")

data = [
    Employee("Alice", "Engineering", 75000),
    Employee("Bob", "Marketing", 55000),
]

df = spark.createDataFrame(data)
TIP

KnowledgeCheck: Какой из трёх способов создания DataFrame (кортежи, словари, Row) лучше подходит для production ETL и почему?

Ответ: В production ETL данные обычно загружаются из внешних источников (Parquet, CSV, Kafka) через spark.read, а не создаются вручную. createDataFrame используется для тестов и прототипирования. Из трёх способов кортежи с явной схемой предпочтительнее, так как вы контролируете типы и имена колонок.

Схема: Schema Inference vs StructType

Schema Inference (автоматическое определение)

Spark может сам определить типы данных, но для этого ему нужно прочитать данные дважды: один раз для определения типов, второй — для загрузки:

# Schema inference на CSV-файле
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/employees.csv")

# Spark прочитает файл 2 раза: infer + load

Явная схема через StructType

Для production-кода всегда задавайте схему явно:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("department", StringType(), nullable=True),
    StructField("salary", IntegerType(), nullable=False),
    StructField("bonus", DoubleType(), nullable=True),
])

df = spark.createDataFrame(data, schema)
df.printSchema()
root
 |-- name: string (nullable = false)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = false)
 |-- bonus: double (nullable = true)

Эквивалент в Spark SQL:

CREATE TABLE employees (
    name STRING NOT NULL,
    department STRING,
    salary INT NOT NULL,
    bonus DOUBLE
) USING parquet;

StructField принимает три аргумента:

  • name — имя колонки
  • type — тип данных (StringType, IntegerType, LongType, DoubleType, BooleanType, TimestampType, и др.)
  • nullable — допускает ли колонка NULL значения
WARNING

Анти-паттерн: inferSchema=True в production

Никогда не используйте inferSchema=True на больших CSV-файлах в production. Это вызывает:

  1. Двойное чтение файла (extra I/O)
  2. Schema drift — если структура данных изменилась, Spark молча выведет неправильные типы
  3. Нестабильность — тип колонки может меняться от запуска к запуску (например, "123" -> IntegerType в одном файле, "123.45" -> DoubleType в другом)

Используйте StructType с явными типами для всех production pipelines.

Lazy Evaluation: трансформации vs действия

Фундаментальное свойство Spark — lazy evaluation. Когда вы пишете трансформацию (select, filter, join), Spark не выполняет её сразу. Вместо этого он строит логический план — описание того, что нужно сделать.

Выполнение происходит только при вызове действия (action):

Трансформации (lazy)Действия (execute)
select()show()
filter() / where()collect()
join()count()
groupBy()write.parquet()
withColumn()first() / head()
# Ни одна из этих строк не выполняет вычисления:
df2 = df.filter(df.salary > 60000)          # lazy
df3 = df2.select("name", "salary")          # lazy
df4 = df3.withColumn("tax", df3.salary * 0.13)  # lazy

# Вычисление запускается ТОЛЬКО здесь:
df4.show()  # action -- Spark строит план и выполняет всё разом

Зачем это нужно? Оптимизация. Catalyst optimizer видит весь план целиком и может:

  • Объединить фильтры
  • Перенести фильтрацию до join (predicate pushdown)
  • Выбрать оптимальную стратегию join
  • Убрать ненужные колонки (column pruning)

Вот как Catalyst видит простой запрос (explain() вывод):

== Physical Plan ==
*(1) Project [name#0, salary#2, (salary#2 * 0.13) AS tax#5]
+- *(1) Filter (salary#2 > 60000)
   +- *(1) Scan ExistingRDD[name#0, department#1, salary#2]

Обратите внимание: фильтрация происходит до проекции — Catalyst автоматически оптимизировал порядок операций.

Spark4.0

Spark 4.0: ANSI mode по умолчанию. Начиная с Spark 4.0, ANSI mode включён по умолчанию (spark.sql.ansi.enabled=true). Это означает, что неявное приведение типов (например, StringType к IntegerType) теперь вызывает ошибку вместо тихого преобразования или NULL. Всегда указывайте типы явно в StructType, чтобы избежать runtime exceptions.

Spark4.0

Spark 4.0: VARIANT как первоклассный тип данных. Spark 4.0 ввёл VariantType как primitive Spark SQL type (не только Delta-фичу) для semi-structured данных JSON-like формы. В отличие от хранения JSON в StringType, VARIANT использует компактный binary layout с lazy parsing, columnar pruning и predicate pushdown — это даёт до 8x ускорение на nested и flat-схемах по сравнению со строковыми колонками. Поддерживается arbitrary nesting (массивы, maps), сохранение типов в глубоко вложенных полях, совместимость с Delta Lake, Structured Streaming и Spark Connect. Используйте функции parse_json, variant_get, try_variant_get, schema_of_variant для работы с VARIANT-колонками. Это устраняет необходимость выбирать между schema-on-write (StructType) и schema-on-read (StringType + JSON-парсинг) для динамических данных.

Schema Enforcement и nullable

Spark проверяет соответствие данных схеме при чтении. Если колонка помечена как nullable=False, а данные содержат NULL — Spark выбросит ошибку (в ANSI mode) или подставит значение по умолчанию.

schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=True),
])

# Это вызовет ошибку, если в данных есть NULL для id
df = spark.createDataFrame([(1, "Alice"), (None, "Bob")], schema)
TIP

KnowledgeCheck: Что произойдёт, если вы создадите DataFrame с 3 трансформациями (filter, select, withColumn) и не вызовете ни одного action?

Ответ: Ничего. Spark не выполнит ни одного вычисления. Он лишь построит логический план (DAG) из трёх трансформаций. Вычисление произойдёт только при вызове action (show, collect, count, write). Это и есть lazy evaluation — ключевое свойство, позволяющее Catalyst optimizer оптимизировать весь план целиком.

Что дальше?

В следующем уроке мы разберём трансформации — select, filter, withColumn, withColumnRenamed — операции, которые вы будете использовать в каждом Spark-приложении. Вы увидите, как одну и ту же логику можно выразить через PySpark API и Spark SQL.

Pandas vs Spark DataFrame: концептуальная разница

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Почему в production-пайплайнах рекомендуется использовать явную схему StructType вместо inferSchema=True?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8