Learning Platform
Глоссарий Troubleshooting
Урок 12.03 · 16 мин
Продвинутый
MLlibFeature EngineeringVectorAssemblerCrossValidatorHyperparameter TuningModel Evaluation

MLlib: feature engineering и обучение моделей

Feature Engineering в MLlib

В предыдущем уроке мы построили базовый Pipeline с StringIndexer и VectorAssembler. Теперь разберём полный арсенал feature transformers в MLlib.

VectorAssembler: сборка feature vector

VectorAssembler — самый используемый transformer. Он собирает несколько столбцов (numeric, boolean, vector) в один вектор features, который принимают все ML-алгоритмы:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["age", "fare", "pclass", "family_size"],
    outputCol="features",
    handleInvalid="skip"  # пропустить строки с null
)

# До: | age | fare  | pclass | family_size |
# После: | features              |
#        | [22.0, 7.25, 3, 1]   |

StandardScaler и MinMaxScaler: нормализация

Многие алгоритмы (LogisticRegression, SVM) чувствительны к масштабу features:

from pyspark.ml.feature import StandardScaler, MinMaxScaler

# StandardScaler: mean=0, std=1 (z-score normalization)
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withStd=True,
    withMean=True  # осторожно: densifies sparse vectors
)

# MinMaxScaler: [0, 1] range
min_max = MinMaxScaler(
    inputCol="features",
    outputCol="scaled_features"
)
TIP

Когда нужна нормализация? StandardScaler обязателен для алгоритмов, использующих расстояния или градиентный спуск: LogisticRegression, LinearSVM, KMeans. Для tree-based моделей (RandomForest, GBT) нормализация не нужна — деревья работают с порогами, а не расстояниями.

StringIndexer и OneHotEncoder: категориальные features

from pyspark.ml.feature import StringIndexer, OneHotEncoder

# StringIndexer: "economy" -> 0.0, "business" -> 1.0, "first" -> 2.0
# Порядок определяется частотой (самый частый = 0)
indexer = StringIndexer(
    inputCol="cabin_class",
    outputCol="class_index",
    handleInvalid="keep"  # новые категории -> отдельный индекс
)

# OneHotEncoder: 0.0 -> [1,0,0], 1.0 -> [0,1,0], 2.0 -> [0,0,1]
encoder = OneHotEncoder(
    inputCols=["class_index"],
    outputCols=["class_onehot"],
    dropLast=True  # k-1 столбец (избегаем multicollinearity)
)

Bucketizer: бинаризация непрерывных features

from pyspark.ml.feature import Bucketizer

# Возраст -> категории
age_bucketizer = Bucketizer(
    splits=[0, 12, 18, 35, 55, 100],  # child, teen, young, middle, senior
    inputCol="age",
    outputCol="age_bucket"
)
# 22 -> bucket 2 (young), 54 -> bucket 3 (middle), 8 -> bucket 0 (child)

Текстовые features: Tokenizer, HashingTF, IDF

from pyspark.ml.feature import Tokenizer, HashingTF, IDF

# Tokenizer: строку -> массив слов
tokenizer = Tokenizer(inputCol="description", outputCol="words")

# HashingTF: слова -> sparse feature vector (bag of words)
hashing_tf = HashingTF(
    inputCol="words",
    outputCol="raw_features",
    numFeatures=1000  # размер хеш-таблицы
)

# IDF: TF-IDF weighting (уменьшает вес частых слов)
idf = IDF(inputCol="raw_features", outputCol="text_features")

End-to-End Training с CrossValidator

Продолжим пример из Урока 03 — добавим оценку модели и подбор гиперпараметров.

Оценка модели

from pyspark.ml.evaluation import (
    MulticlassClassificationEvaluator,
    BinaryClassificationEvaluator
)

# Binary classification metrics
binary_eval = BinaryClassificationEvaluator(
    labelCol="survived",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"  # AUC-ROC
)

auc = binary_eval.evaluate(predictions)
print(f"AUC-ROC: {auc:.4f}")

# Multiclass metrics
multi_eval = MulticlassClassificationEvaluator(
    labelCol="survived",
    predictionCol="prediction"
)

accuracy = multi_eval.evaluate(predictions, {multi_eval.metricName: "accuracy"})
f1 = multi_eval.evaluate(predictions, {multi_eval.metricName: "f1"})
precision = multi_eval.evaluate(predictions, {multi_eval.metricName: "weightedPrecision"})
recall = multi_eval.evaluate(predictions, {multi_eval.metricName: "weightedRecall"})

print(f"Accuracy:  {accuracy:.4f}")
print(f"F1-Score:  {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")

CrossValidator: подбор гиперпараметров

CrossValidator комбинирует Pipeline с grid search и k-fold cross-validation:

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

# Pipeline с feature engineering + модель
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="survived"
)

pipeline = Pipeline(stages=[sex_indexer, assembler, rf])

# Grid search: перебор комбинаций параметров
param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 200]) \
    .addGrid(rf.maxDepth, [3, 5, 10]) \
    .addGrid(rf.minInstancesPerNode, [1, 5]) \
    .build()
# 3 x 3 x 2 = 18 комбинаций

# CrossValidator: 3-fold CV x 18 комбинаций = 54 обучения
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=BinaryClassificationEvaluator(labelCol="survived"),
    numFolds=3,
    parallelism=4  # параллельно обучать 4 модели
)

# fit() запускает полный grid search
cv_model = cv.fit(train_df)

# Лучшая модель
best_model = cv_model.bestModel
print(f"Best numTrees: {best_model.stages[-1].getNumTrees}")
print(f"Best AUC: {max(cv_model.avgMetrics):.4f}")
CrossValidator Flow
ParamGrid: 18 комбинаций
K-Fold Cross-Validation× 18 params
Fold 1: train(66%) / validate(33%)
Fold 2: train(66%) / validate(33%)
Fold 3: train(66%) / validate(33%)
Average metrics across 3 folds
Select params with best average
Retrain on full training data
WARNING

Вычислительная стоимость CrossValidator. В нашем примере 18 комбинаций x 3 фолда = 54 полных обучения Pipeline. На больших данных это может занять часы. Используйте parallelism для параллельного обучения и начинайте с грубой сетки параметров, затем уточняйте диапазон вокруг лучших значений.

Сравнение RandomForest и LogisticRegression

from pyspark.ml.classification import (
    LogisticRegression,
    RandomForestClassifier,
    GBTClassifier
)

# LogisticRegression -- быстрый, интерпретируемый
lr = LogisticRegression(maxIter=100, regParam=0.01)

# RandomForest -- устойчив к переобучению, нет нормализации
rf = RandomForestClassifier(numTrees=100, maxDepth=5)

# GBT (Gradient-Boosted Trees) -- часто лучшее качество
gbt = GBTClassifier(maxIter=50, maxDepth=5, stepSize=0.1)
АлгоритмПлюсыМинусыКогда использовать
LogisticRegressionБыстрый, интерпретируемый, вероятностиЛинейная границаBaseline, интерпретируемость
RandomForestУстойчив, не нужна нормализацияМедленнее на inferenceСредние данные, feature importance
GBTClassifierЧасто лучшее качествоДолго обучается, overfittingМаксимальное качество

Decision Matrix: MLlib vs Альтернативы

Когда использовать MLlib, а когда — другие инструменты:

КритерийMLlib (Spark)scikit-learnPyTorch/TensorFlow
Размер данных>10GB, distributed<10GB, single machineЛюбой (GPU)
Тип задачиКлассика MLКлассика MLDeep Learning
Скорость прототипаМедленныйБыстрыйСредний
Production pipelineSpark PipelineОтдельный stepServing framework
Алгоритмы~30 базовых~200+Любые (custom)
ИнтеграцияSpark ETL + MLStandaloneStandalone
TIP

Оптимальная стратегия: прототипируйте на scikit-learn (pandas sample), затем переводите на MLlib, когда данные перестают помещаться в память одной машины. Для deep learning (NLP, CV) используйте PyTorch/TensorFlow + Spark только для data preprocessing.

WARNING

MLlib в maintenance mode. С 2023 года новые алгоритмы в MLlib не добавляются. Существующие API стабильны и поддерживаются, но для cutting-edge ML (gradient boosting — XGBoost/LightGBM, transformers) используйте внешние библиотеки с Spark интеграцией.

Проверка знанийKnowledge check
Зачем нужен CrossValidator и какова его вычислительная стоимость? Как уменьшить время подбора?
ОтветAnswer
CrossValidator комбинирует grid search гиперпараметров с k-fold cross-validation для надёжной оценки модели. Стоимость = (количество комбинаций в ParamGrid) x (numFolds) полных обучений Pipeline. В примере: 18 комбинаций x 3 фолда = 54 обучения. Уменьшить время можно: (1) увеличить parallelism для параллельного обучения; (2) начать с грубой сетки и уточнять; (3) использовать TrainValidationSplit вместо CrossValidator (один split вместо k фолдов); (4) уменьшить numFolds (2 вместо 3).
Проверка знанийKnowledge check
Когда следует использовать StandardScaler, а когда нормализация не нужна?
ОтветAnswer
StandardScaler обязателен для алгоритмов, чувствительных к масштабу features: LogisticRegression, LinearSVM, KMeans (используют расстояния или градиентный спуск). Без нормализации feature с большим диапазоном (например, fare: 0-500) доминирует над feature с малым диапазоном (pclass: 1-3). Для tree-based моделей (RandomForest, GBTClassifier) нормализация НЕ нужна -- деревья принимают решения по порогам отдельных features, а не по расстояниям.
Проверка знанийKnowledge check
В каких случаях MLlib предпочтительнее scikit-learn? Назовите ключевой критерий выбора.
ОтветAnswer
Ключевой критерий -- размер данных. MLlib предпочтительнее когда данные не помещаются в память одной машины (>10GB): MLlib распределяет обучение по executor кластера. Второй критерий -- интеграция с Spark pipeline: если данные уже в Spark (ETL), то MLlib позволяет объединить предобработку и обучение в один Pipeline без перемещения данных. scikit-learn быстрее для прототипирования на малых данных. Для deep learning (NLP, CV) ни один из них -- используйте PyTorch/TensorFlow.

Что дальше?

В следующем уроке мы разберём NVIDIA RAPIDS и GPU-ускорение — как прозрачно ускорить Spark-обработку на GPU без изменения кода.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 6. Зачем нужен StandardScaler для LogisticRegression, но НЕ нужен для RandomForest?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3