MLlib: основы Pipeline API
MLlib: встроенная ML-библиотека Spark
MLlib (Machine Learning Library) — встроенная библиотека машинного обучения Spark. С версии 2.0 MLlib работает на уровне DataFrame API (пакет pyspark.ml), заменив устаревший RDD-based API (pyspark.mllib).
Ключевое преимущество MLlib: распределённое обучение на кластере. Когда данных больше, чем помещается в память одной машины, scikit-learn не справится — MLlib распределяет вычисления по executor.
# Современный API (DataFrame-based) -- используйте этот
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
# Legacy API (RDD-based) -- НЕ используйте
# from pyspark.mllib.classification import LogisticRegressionWithSGD
Core Abstractions
MLlib Pipeline API построен на трёх абстракциях:
Transformer
Transformer принимает DataFrame и возвращает новый DataFrame с добавленным столбцом. Метод transform():
from pyspark.ml.feature import StringIndexer, VectorAssembler
# StringIndexer: "male"/"female" -> 0.0/1.0
indexer = StringIndexer(inputCol="sex", outputCol="sex_index")
# VectorAssembler: собирает несколько столбцов в один вектор
assembler = VectorAssembler(
inputCols=["age", "fare", "sex_index", "pclass"],
outputCol="features"
)
# Trained model -- тоже Transformer
# model.transform(df) добавляет столбец "prediction"
Estimator
Estimator обучается на данных и возвращает Transformer (модель). Метод fit():
from pyspark.ml.classification import LogisticRegression
# LogisticRegression -- Estimator
lr = LogisticRegression(
featuresCol="features",
labelCol="survived",
maxIter=100
)
# fit() обучает модель на данных -> возвращает Model (Transformer)
model = lr.fit(train_df)
# model.transform() делает предсказания
predictions = model.transform(test_df)
Pipeline
Pipeline — цепочка stages (Transformer и Estimator), выполняемых последовательно:
Pipeline.fit(train_df) → PipelineModel.transform(test_df) → predictions
End-to-End пример: классификация пассажиров
Построим pipeline классификации: предскажем выживание пассажиров на основе возраста, класса и стоимости билета.
1. Загрузка данных
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MLlibPipeline").getOrCreate()
# Данные о пассажирах (Titanic-style)
data = spark.createDataFrame([
(1, "male", 22, 1, 7.25, 0), # 3 класс, молодой, дешёвый -- не выжил
(2, "female", 38, 1, 71.28, 1), # 1 класс, женщина -- выжила
(3, "female", 26, 3, 7.92, 1), # 3 класс, женщина -- выжила
(4, "male", 35, 1, 53.10, 0), # 1 класс, мужчина -- не выжил
(5, "male", 28, 3, 8.05, 0), # 3 класс, мужчина -- не выжил
(6, "female", 27, 2, 11.13, 1), # 2 класс, женщина -- выжила
(7, "male", 54, 1, 51.86, 0), # 1 класс, старший -- не выжил
(8, "female", 2, 3, 21.07, 1), # 3 класс, ребёнок -- выжила
(9, "male", 14, 3, 11.24, 0), # 3 класс, подросток -- не выжил
(10, "female", 58, 1, 26.55, 1), # 1 класс, женщина -- выжила
], ["id", "sex", "age", "pclass", "fare", "survived"])
# Train/test split (80/20)
train_df, test_df = data.randomSplit([0.8, 0.2], seed=42)
2. Построение Pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
# Stage 1: Кодирование пола (string -> numeric)
sex_indexer = StringIndexer(
inputCol="sex",
outputCol="sex_index",
handleInvalid="keep"
)
# Stage 2: Сборка features в вектор
assembler = VectorAssembler(
inputCols=["age", "fare", "sex_index", "pclass"],
outputCol="features"
)
# Stage 3: Модель классификации
lr = LogisticRegression(
featuresCol="features",
labelCol="survived",
maxIter=100,
regParam=0.01
)
# Pipeline: последовательность stages
pipeline = Pipeline(stages=[sex_indexer, assembler, lr])
3. Обучение и предсказание
# fit() обучает ВСЮ цепочку на train данных
# StringIndexer.fit() -> StringIndexerModel
# VectorAssembler -- просто transform (не требует fit)
# LogisticRegression.fit() -> LogisticRegressionModel
model = pipeline.fit(train_df)
# transform() применяет ВСЮ цепочку к test данным
predictions = model.transform(test_df)
predictions.select("id", "sex", "age", "survived", "prediction", "probability").show()
# +---+------+---+--------+----------+--------------------+
# | id| sex|age|survived|prediction| probability|
# +---+------+---+--------+----------+--------------------+
# | 2|female| 38| 1| 1.0|[0.12, 0.88] |
# | 7| male| 54| 0| 0.0|[0.91, 0.09] |
# +---+------+---+--------+----------+--------------------+
Pipeline = reproducibility. Вся предобработка (encoding, scaling, assembly) запечатана в Pipeline. При сохранении модели сохраняется и вся цепочка — на новых данных не нужно вручную повторять feature engineering. Это критически важно для production ML.
Model Persistence
MLlib поддерживает сохранение и загрузку моделей и pipeline:
from pyspark.ml import PipelineModel
# Сохранение обученного pipeline (включая все transformers)
model.save("/models/titanic-pipeline-v1")
# Загрузка (на другом кластере, в другом приложении)
loaded_model = PipelineModel.load("/models/titanic-pipeline-v1")
# Предсказание с загруженной моделью
new_predictions = loaded_model.transform(new_data)
Spark сохраняет:
- Metadata каждого stage (параметры, тип)
- Обученные параметры (weights, vocabulary, mappings)
- Порядок stages в pipeline
Формат совместим между Spark-кластерами одной major version.
Anti-pattern: ручная предобработка вне Pipeline. Если вы делаете feature engineering отдельно от Pipeline (например, pandas preprocessing перед MLlib), то при inference на новых данных нужно повторить те же шаги. Это частый источник training-serving skew. Всегда включайте предобработку в Pipeline stages.
Что дальше?
В следующем уроке мы углубимся в feature engineering и обучение моделей — VectorAssembler, StandardScaler, OneHotEncoder, CrossValidator для подбора гиперпараметров, и когда MLlib предпочтительнее scikit-learn.