DataFrames: Создание и схема
Что такое DataFrame?
DataFrame — это распределённая коллекция данных, организованная в именованные столбцы. Если вы работали с pandas, концепция знакома: таблица с типизированными колонками. Но в отличие от pandas DataFrame, Spark DataFrame распределён по кластеру и может обрабатывать терабайты данных.
DataFrame построен поверх RDD (Resilient Distributed Dataset), но предоставляет высокоуровневый API, который Catalyst optimizer может оптимизировать. Именно поэтому DataFrame API — основной способ работы с данными в Spark начиная с версии 2.0.
Создание DataFrame
Из списка кортежей
Самый простой способ — передать список кортежей и список имён колонок:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
data = [
("Alice", "Engineering", 75000),
("Bob", "Marketing", 55000),
("Carol", "Engineering", 80000),
]
df = spark.createDataFrame(data, ["name", "department", "salary"])
df.show()
+-----+-----------+------+
| name| department|salary|
+-----+-----------+------+
|Alice|Engineering| 75000|
| Bob| Marketing| 55000|
|Carol|Engineering| 80000|
+-----+-----------+------+
Из списка словарей
Словари автоматически определяют имена колонок:
data = [
{"name": "Alice", "age": 30, "city": "Moscow"},
{"name": "Bob", "age": 25, "city": "SPb"},
]
df = spark.createDataFrame(data)
df.printSchema()
root
|-- age: long (nullable = true)
|-- city: string (nullable = true)
|-- name: string (nullable = true)
Из объектов Row
Row позволяет создавать типизированные строки:
from pyspark.sql import Row
Employee = Row("name", "department", "salary")
data = [
Employee("Alice", "Engineering", 75000),
Employee("Bob", "Marketing", 55000),
]
df = spark.createDataFrame(data)
KnowledgeCheck: Какой из трёх способов создания DataFrame (кортежи, словари, Row) лучше подходит для production ETL и почему?
Ответ: В production ETL данные обычно загружаются из внешних источников (Parquet, CSV, Kafka) через spark.read, а не создаются вручную. createDataFrame используется для тестов и прототипирования. Из трёх способов кортежи с явной схемой предпочтительнее, так как вы контролируете типы и имена колонок.
Схема: Schema Inference vs StructType
Schema Inference (автоматическое определение)
Spark может сам определить типы данных, но для этого ему нужно прочитать данные дважды: один раз для определения типов, второй — для загрузки:
# Schema inference на CSV-файле
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/employees.csv")
# Spark прочитает файл 2 раза: infer + load
Явная схема через StructType
Для production-кода всегда задавайте схему явно:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("department", StringType(), nullable=True),
StructField("salary", IntegerType(), nullable=False),
StructField("bonus", DoubleType(), nullable=True),
])
df = spark.createDataFrame(data, schema)
df.printSchema()
root
|-- name: string (nullable = false)
|-- department: string (nullable = true)
|-- salary: integer (nullable = false)
|-- bonus: double (nullable = true)
Эквивалент в Spark SQL:
CREATE TABLE employees (
name STRING NOT NULL,
department STRING,
salary INT NOT NULL,
bonus DOUBLE
) USING parquet;
StructField принимает три аргумента:
- name — имя колонки
- type — тип данных (StringType, IntegerType, LongType, DoubleType, BooleanType, TimestampType, и др.)
- nullable — допускает ли колонка NULL значения
Анти-паттерн: inferSchema=True в production
Никогда не используйте inferSchema=True на больших CSV-файлах в production. Это вызывает:
- Двойное чтение файла (extra I/O)
- Schema drift — если структура данных изменилась, Spark молча выведет неправильные типы
- Нестабильность — тип колонки может меняться от запуска к запуску (например,
"123"-> IntegerType в одном файле,"123.45"-> DoubleType в другом)
Используйте StructType с явными типами для всех production pipelines.
Lazy Evaluation: трансформации vs действия
Фундаментальное свойство Spark — lazy evaluation. Когда вы пишете трансформацию (select, filter, join), Spark не выполняет её сразу. Вместо этого он строит логический план — описание того, что нужно сделать.
Выполнение происходит только при вызове действия (action):
| Трансформации (lazy) | Действия (execute) |
|---|---|
select() | show() |
filter() / where() | collect() |
join() | count() |
groupBy() | write.parquet() |
withColumn() | first() / head() |
# Ни одна из этих строк не выполняет вычисления:
df2 = df.filter(df.salary > 60000) # lazy
df3 = df2.select("name", "salary") # lazy
df4 = df3.withColumn("tax", df3.salary * 0.13) # lazy
# Вычисление запускается ТОЛЬКО здесь:
df4.show() # action -- Spark строит план и выполняет всё разом
Зачем это нужно? Оптимизация. Catalyst optimizer видит весь план целиком и может:
- Объединить фильтры
- Перенести фильтрацию до join (predicate pushdown)
- Выбрать оптимальную стратегию join
- Убрать ненужные колонки (column pruning)
Вот как Catalyst видит простой запрос (explain() вывод):
== Physical Plan ==
*(1) Project [name#0, salary#2, (salary#2 * 0.13) AS tax#5]
+- *(1) Filter (salary#2 > 60000)
+- *(1) Scan ExistingRDD[name#0, department#1, salary#2]
Обратите внимание: фильтрация происходит до проекции — Catalyst автоматически оптимизировал порядок операций.
Spark4.0Spark 4.0: ANSI mode по умолчанию. Начиная с Spark 4.0, ANSI mode включён по умолчанию (spark.sql.ansi.enabled=true). Это означает, что неявное приведение типов (например, StringType к IntegerType) теперь вызывает ошибку вместо тихого преобразования или NULL. Всегда указывайте типы явно в StructType, чтобы избежать runtime exceptions.
Spark 4.0: VARIANT как первоклассный тип данных. Spark 4.0 ввёл VariantType как primitive Spark SQL type (не только Delta-фичу) для semi-structured данных JSON-like формы. В отличие от хранения JSON в StringType, VARIANT использует компактный binary layout с lazy parsing, columnar pruning и predicate pushdown — это даёт до 8x ускорение на nested и flat-схемах по сравнению со строковыми колонками. Поддерживается arbitrary nesting (массивы, maps), сохранение типов в глубоко вложенных полях, совместимость с Delta Lake, Structured Streaming и Spark Connect. Используйте функции parse_json, variant_get, try_variant_get, schema_of_variant для работы с VARIANT-колонками. Это устраняет необходимость выбирать между schema-on-write (StructType) и schema-on-read (StringType + JSON-парсинг) для динамических данных.
Schema Enforcement и nullable
Spark проверяет соответствие данных схеме при чтении. Если колонка помечена как nullable=False, а данные содержат NULL — Spark выбросит ошибку (в ANSI mode) или подставит значение по умолчанию.
schema = StructType([
StructField("id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=True),
])
# Это вызовет ошибку, если в данных есть NULL для id
df = spark.createDataFrame([(1, "Alice"), (None, "Bob")], schema)
KnowledgeCheck: Что произойдёт, если вы создадите DataFrame с 3 трансформациями (filter, select, withColumn) и не вызовете ни одного action?
Ответ: Ничего. Spark не выполнит ни одного вычисления. Он лишь построит логический план (DAG) из трёх трансформаций. Вычисление произойдёт только при вызове action (show, collect, count, write). Это и есть lazy evaluation — ключевое свойство, позволяющее Catalyst optimizer оптимизировать весь план целиком.
Что дальше?
В следующем уроке мы разберём трансформации — select, filter, withColumn, withColumnRenamed — операции, которые вы будете использовать в каждом Spark-приложении. Вы увидите, как одну и ту же логику можно выразить через PySpark API и Spark SQL.
Pandas vs Spark DataFrame: концептуальная разница