Learning Platform
Глоссарий Troubleshooting
Урок 12.02 · 14 мин
Средний
MLlibPipelineTransformerEstimatorClassificationMachine Learning

MLlib: основы Pipeline API

MLlib: встроенная ML-библиотека Spark

MLlib (Machine Learning Library) — встроенная библиотека машинного обучения Spark. С версии 2.0 MLlib работает на уровне DataFrame API (пакет pyspark.ml), заменив устаревший RDD-based API (pyspark.mllib).

Ключевое преимущество MLlib: распределённое обучение на кластере. Когда данных больше, чем помещается в память одной машины, scikit-learn не справится — MLlib распределяет вычисления по executor.

# Современный API (DataFrame-based) -- используйте этот
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Legacy API (RDD-based) -- НЕ используйте
# from pyspark.mllib.classification import LogisticRegressionWithSGD

Core Abstractions

MLlib Pipeline API построен на трёх абстракциях:

Transformer

Transformer принимает DataFrame и возвращает новый DataFrame с добавленным столбцом. Метод transform():

from pyspark.ml.feature import StringIndexer, VectorAssembler

# StringIndexer: "male"/"female" -> 0.0/1.0
indexer = StringIndexer(inputCol="sex", outputCol="sex_index")
# VectorAssembler: собирает несколько столбцов в один вектор
assembler = VectorAssembler(
    inputCols=["age", "fare", "sex_index", "pclass"],
    outputCol="features"
)

# Trained model -- тоже Transformer
# model.transform(df) добавляет столбец "prediction"

Estimator

Estimator обучается на данных и возвращает Transformer (модель). Метод fit():

from pyspark.ml.classification import LogisticRegression

# LogisticRegression -- Estimator
lr = LogisticRegression(
    featuresCol="features",
    labelCol="survived",
    maxIter=100
)

# fit() обучает модель на данных -> возвращает Model (Transformer)
model = lr.fit(train_df)

# model.transform() делает предсказания
predictions = model.transform(test_df)

Pipeline

Pipeline — цепочка stages (Transformer и Estimator), выполняемых последовательно:

Pipeline Stages

Pipeline.fit(train_df) → PipelineModel.transform(test_df) → predictions

Stages (fit)
StringIndex(Estimator)
VectorAssembler(Transformer)
StandardScaler(Estimator)
LogisticRegress.(Estimator)
fit() → Model / pass-through
PipelineModel (transform)
IndexModel(Transform)
VectorAssembler(Transformer)
ScalerModel(Transformer)
LR Model(Transformer)

End-to-End пример: классификация пассажиров

Построим pipeline классификации: предскажем выживание пассажиров на основе возраста, класса и стоимости билета.

1. Загрузка данных

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MLlibPipeline").getOrCreate()

# Данные о пассажирах (Titanic-style)
data = spark.createDataFrame([
    (1, "male",   22, 1, 7.25,   0),   # 3 класс, молодой, дешёвый -- не выжил
    (2, "female", 38, 1, 71.28,  1),   # 1 класс, женщина -- выжила
    (3, "female", 26, 3, 7.92,   1),   # 3 класс, женщина -- выжила
    (4, "male",   35, 1, 53.10,  0),   # 1 класс, мужчина -- не выжил
    (5, "male",   28, 3, 8.05,   0),   # 3 класс, мужчина -- не выжил
    (6, "female", 27, 2, 11.13,  1),   # 2 класс, женщина -- выжила
    (7, "male",   54, 1, 51.86,  0),   # 1 класс, старший -- не выжил
    (8, "female", 2,  3, 21.07,  1),   # 3 класс, ребёнок -- выжила
    (9, "male",   14, 3, 11.24,  0),   # 3 класс, подросток -- не выжил
    (10, "female", 58, 1, 26.55, 1),   # 1 класс, женщина -- выжила
], ["id", "sex", "age", "pclass", "fare", "survived"])

# Train/test split (80/20)
train_df, test_df = data.randomSplit([0.8, 0.2], seed=42)

2. Построение Pipeline

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Stage 1: Кодирование пола (string -> numeric)
sex_indexer = StringIndexer(
    inputCol="sex",
    outputCol="sex_index",
    handleInvalid="keep"
)

# Stage 2: Сборка features в вектор
assembler = VectorAssembler(
    inputCols=["age", "fare", "sex_index", "pclass"],
    outputCol="features"
)

# Stage 3: Модель классификации
lr = LogisticRegression(
    featuresCol="features",
    labelCol="survived",
    maxIter=100,
    regParam=0.01
)

# Pipeline: последовательность stages
pipeline = Pipeline(stages=[sex_indexer, assembler, lr])

3. Обучение и предсказание

# fit() обучает ВСЮ цепочку на train данных
# StringIndexer.fit() -> StringIndexerModel
# VectorAssembler -- просто transform (не требует fit)
# LogisticRegression.fit() -> LogisticRegressionModel
model = pipeline.fit(train_df)

# transform() применяет ВСЮ цепочку к test данным
predictions = model.transform(test_df)
predictions.select("id", "sex", "age", "survived", "prediction", "probability").show()
# +---+------+---+--------+----------+--------------------+
# | id|   sex|age|survived|prediction|         probability|
# +---+------+---+--------+----------+--------------------+
# |  2|female| 38|       1|       1.0|[0.12, 0.88]       |
# |  7|  male| 54|       0|       0.0|[0.91, 0.09]       |
# +---+------+---+--------+----------+--------------------+
TIP

Pipeline = reproducibility. Вся предобработка (encoding, scaling, assembly) запечатана в Pipeline. При сохранении модели сохраняется и вся цепочка — на новых данных не нужно вручную повторять feature engineering. Это критически важно для production ML.

Model Persistence

MLlib поддерживает сохранение и загрузку моделей и pipeline:

from pyspark.ml import PipelineModel

# Сохранение обученного pipeline (включая все transformers)
model.save("/models/titanic-pipeline-v1")

# Загрузка (на другом кластере, в другом приложении)
loaded_model = PipelineModel.load("/models/titanic-pipeline-v1")

# Предсказание с загруженной моделью
new_predictions = loaded_model.transform(new_data)

Spark сохраняет:

  • Metadata каждого stage (параметры, тип)
  • Обученные параметры (weights, vocabulary, mappings)
  • Порядок stages в pipeline

Формат совместим между Spark-кластерами одной major version.

WARNING

Anti-pattern: ручная предобработка вне Pipeline. Если вы делаете feature engineering отдельно от Pipeline (например, pandas preprocessing перед MLlib), то при inference на новых данных нужно повторить те же шаги. Это частый источник training-serving skew. Всегда включайте предобработку в Pipeline stages.

Проверка знанийKnowledge check
В чём разница между Transformer и Estimator в MLlib? Приведите по одному примеру каждого.
ОтветAnswer
Transformer принимает DataFrame и возвращает новый DataFrame с добавленным столбцом (метод transform()). Пример: VectorAssembler -- собирает несколько числовых столбцов в один вектор features. Estimator обучается на данных и возвращает Transformer/Model (метод fit()). Пример: LogisticRegression -- fit() обучает модель на train данных, возвращает LogisticRegressionModel, который уже является Transformer (его transform() добавляет столбец prediction). Обученная модель -- это всегда Transformer.
Проверка знанийKnowledge check
Почему feature engineering нужно включать в Pipeline, а не делать отдельно?
ОтветAnswer
Если feature engineering (StringIndexer, VectorAssembler, Scaler) выполняется отдельно от Pipeline, при inference на новых данных нужно повторить те же шаги с теми же параметрами. Это создаёт risk training-serving skew -- несоответствие предобработки между training и production. Pipeline инкапсулирует все stages: при model.save() сохраняются и transformers, и их обученные параметры. При PipelineModel.load().transform(new_data) вся цепочка воспроизводится автоматически.

Что дальше?

В следующем уроке мы углубимся в feature engineering и обучение моделей — VectorAssembler, StandardScaler, OneHotEncoder, CrossValidator для подбора гиперпараметров, и когда MLlib предпочтительнее scikit-learn.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Какова разница между Transformer и Estimator в MLlib Pipeline API?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3