MLlib: feature engineering и обучение моделей
Feature Engineering в MLlib
В предыдущем уроке мы построили базовый Pipeline с StringIndexer и VectorAssembler. Теперь разберём полный арсенал feature transformers в MLlib.
VectorAssembler: сборка feature vector
VectorAssembler — самый используемый transformer. Он собирает несколько столбцов (numeric, boolean, vector) в один вектор features, который принимают все ML-алгоритмы:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["age", "fare", "pclass", "family_size"],
outputCol="features",
handleInvalid="skip" # пропустить строки с null
)
# До: | age | fare | pclass | family_size |
# После: | features |
# | [22.0, 7.25, 3, 1] |
StandardScaler и MinMaxScaler: нормализация
Многие алгоритмы (LogisticRegression, SVM) чувствительны к масштабу features:
from pyspark.ml.feature import StandardScaler, MinMaxScaler
# StandardScaler: mean=0, std=1 (z-score normalization)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True # осторожно: densifies sparse vectors
)
# MinMaxScaler: [0, 1] range
min_max = MinMaxScaler(
inputCol="features",
outputCol="scaled_features"
)
Когда нужна нормализация? StandardScaler обязателен для алгоритмов, использующих расстояния или градиентный спуск: LogisticRegression, LinearSVM, KMeans. Для tree-based моделей (RandomForest, GBT) нормализация не нужна — деревья работают с порогами, а не расстояниями.
StringIndexer и OneHotEncoder: категориальные features
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# StringIndexer: "economy" -> 0.0, "business" -> 1.0, "first" -> 2.0
# Порядок определяется частотой (самый частый = 0)
indexer = StringIndexer(
inputCol="cabin_class",
outputCol="class_index",
handleInvalid="keep" # новые категории -> отдельный индекс
)
# OneHotEncoder: 0.0 -> [1,0,0], 1.0 -> [0,1,0], 2.0 -> [0,0,1]
encoder = OneHotEncoder(
inputCols=["class_index"],
outputCols=["class_onehot"],
dropLast=True # k-1 столбец (избегаем multicollinearity)
)
Bucketizer: бинаризация непрерывных features
from pyspark.ml.feature import Bucketizer
# Возраст -> категории
age_bucketizer = Bucketizer(
splits=[0, 12, 18, 35, 55, 100], # child, teen, young, middle, senior
inputCol="age",
outputCol="age_bucket"
)
# 22 -> bucket 2 (young), 54 -> bucket 3 (middle), 8 -> bucket 0 (child)
Текстовые features: Tokenizer, HashingTF, IDF
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
# Tokenizer: строку -> массив слов
tokenizer = Tokenizer(inputCol="description", outputCol="words")
# HashingTF: слова -> sparse feature vector (bag of words)
hashing_tf = HashingTF(
inputCol="words",
outputCol="raw_features",
numFeatures=1000 # размер хеш-таблицы
)
# IDF: TF-IDF weighting (уменьшает вес частых слов)
idf = IDF(inputCol="raw_features", outputCol="text_features")
End-to-End Training с CrossValidator
Продолжим пример из Урока 03 — добавим оценку модели и подбор гиперпараметров.
Оценка модели
from pyspark.ml.evaluation import (
MulticlassClassificationEvaluator,
BinaryClassificationEvaluator
)
# Binary classification metrics
binary_eval = BinaryClassificationEvaluator(
labelCol="survived",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC" # AUC-ROC
)
auc = binary_eval.evaluate(predictions)
print(f"AUC-ROC: {auc:.4f}")
# Multiclass metrics
multi_eval = MulticlassClassificationEvaluator(
labelCol="survived",
predictionCol="prediction"
)
accuracy = multi_eval.evaluate(predictions, {multi_eval.metricName: "accuracy"})
f1 = multi_eval.evaluate(predictions, {multi_eval.metricName: "f1"})
precision = multi_eval.evaluate(predictions, {multi_eval.metricName: "weightedPrecision"})
recall = multi_eval.evaluate(predictions, {multi_eval.metricName: "weightedRecall"})
print(f"Accuracy: {accuracy:.4f}")
print(f"F1-Score: {f1:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
CrossValidator: подбор гиперпараметров
CrossValidator комбинирует Pipeline с grid search и k-fold cross-validation:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
# Pipeline с feature engineering + модель
rf = RandomForestClassifier(
featuresCol="features",
labelCol="survived"
)
pipeline = Pipeline(stages=[sex_indexer, assembler, rf])
# Grid search: перебор комбинаций параметров
param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [3, 5, 10]) \
.addGrid(rf.minInstancesPerNode, [1, 5]) \
.build()
# 3 x 3 x 2 = 18 комбинаций
# CrossValidator: 3-fold CV x 18 комбинаций = 54 обучения
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=BinaryClassificationEvaluator(labelCol="survived"),
numFolds=3,
parallelism=4 # параллельно обучать 4 модели
)
# fit() запускает полный grid search
cv_model = cv.fit(train_df)
# Лучшая модель
best_model = cv_model.bestModel
print(f"Best numTrees: {best_model.stages[-1].getNumTrees}")
print(f"Best AUC: {max(cv_model.avgMetrics):.4f}")
Вычислительная стоимость CrossValidator. В нашем примере 18 комбинаций x 3 фолда = 54 полных обучения Pipeline. На больших данных это может занять часы. Используйте parallelism для параллельного обучения и начинайте с грубой сетки параметров, затем уточняйте диапазон вокруг лучших значений.
Сравнение RandomForest и LogisticRegression
from pyspark.ml.classification import (
LogisticRegression,
RandomForestClassifier,
GBTClassifier
)
# LogisticRegression -- быстрый, интерпретируемый
lr = LogisticRegression(maxIter=100, regParam=0.01)
# RandomForest -- устойчив к переобучению, нет нормализации
rf = RandomForestClassifier(numTrees=100, maxDepth=5)
# GBT (Gradient-Boosted Trees) -- часто лучшее качество
gbt = GBTClassifier(maxIter=50, maxDepth=5, stepSize=0.1)
| Алгоритм | Плюсы | Минусы | Когда использовать |
|---|---|---|---|
| LogisticRegression | Быстрый, интерпретируемый, вероятности | Линейная граница | Baseline, интерпретируемость |
| RandomForest | Устойчив, не нужна нормализация | Медленнее на inference | Средние данные, feature importance |
| GBTClassifier | Часто лучшее качество | Долго обучается, overfitting | Максимальное качество |
Decision Matrix: MLlib vs Альтернативы
Когда использовать MLlib, а когда — другие инструменты:
| Критерий | MLlib (Spark) | scikit-learn | PyTorch/TensorFlow |
|---|---|---|---|
| Размер данных | >10GB, distributed | <10GB, single machine | Любой (GPU) |
| Тип задачи | Классика ML | Классика ML | Deep Learning |
| Скорость прототипа | Медленный | Быстрый | Средний |
| Production pipeline | Spark Pipeline | Отдельный step | Serving framework |
| Алгоритмы | ~30 базовых | ~200+ | Любые (custom) |
| Интеграция | Spark ETL + ML | Standalone | Standalone |
Оптимальная стратегия: прототипируйте на scikit-learn (pandas sample), затем переводите на MLlib, когда данные перестают помещаться в память одной машины. Для deep learning (NLP, CV) используйте PyTorch/TensorFlow + Spark только для data preprocessing.
MLlib в maintenance mode. С 2023 года новые алгоритмы в MLlib не добавляются. Существующие API стабильны и поддерживаются, но для cutting-edge ML (gradient boosting — XGBoost/LightGBM, transformers) используйте внешние библиотеки с Spark интеграцией.
Что дальше?
В следующем уроке мы разберём NVIDIA RAPIDS и GPU-ускорение — как прозрачно ускорить Spark-обработку на GPU без изменения кода.