使用Apache Spark构建机器学习管道

在这篇文章中,将探讨如何利用Apache SparkMLlib包来构建一个机器学习管道,用于预测汽车的价格。Apache Spark是一个强大的分布式数据处理框架,它提供了一个名为MLlib的机器学习库,支持多种机器学习任务,包括计算机视觉和自然语言处理。MLlib的API与其他机器学习包相似,易于上手,并且支持近实时操作。

Spark的安装与环境配置

首先,需要安装Spark。在Python环境中,可以使用pip包管理器快速安装Spark。此外,还可以在云笔记本如Google Colab、Kaggle笔记本和Databricks上设置Spark。以下是安装Spark的命令:

!pip install pyspark !pip install findspark import findspark findspark.init()

接下来,需要导入一些必要的库,包括pandas、matplotlib以及Spark的库:

import pandas as pd import matplotlib.pyplot as plt from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession

创建Spark上下文和会话

在进一步操作之前,需要创建一个Spark上下文,这是Spark应用程序的入口点,在这里定义配置和核心。

from pyspark import SparkContext, SparkConf sc = SparkSession.builder.master("local[*]").getOrCreate()

这里,"local[*]"表示创建一个本地会话,使用所有可用的核心。"getOrCreate"表示如果会话尚未创建,则创建一个新的会话。

数据加载与清洗

将加载美国汽车价格数据集,该数据集可以在Kaggle上免费获取。可以创建一个与数据集关联的笔记本或下载数据集。以下是加载数据的代码:

data = sc.read.csv('../input/cars-data/data.csv', inferSchema=True, header=True) data.show(5) data.printSchema()

"printSchema()"函数打印所有列及其数据类型,合理地描述了数据集的模式。接下来,进行数据的统计分析:

data.describe().toPandas().transpose()

在数据清洗步骤中,从数据集中移除所有冗余信息。例如,数据集中包含一些NA值,目标是删除这些NaN值。

def replace(column, value): return when(column!=value,column).otherwise(lit(None))

这个函数替换列和值,并返回与传递的值匹配的值。

data = data.withColumn("Market Category", replace(col("Market Category"),"N/A")) data.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in data.columns]).show()

"Market Category"列最多有3742个空值,这意味着可以删除此列。

data = data.drop("Market Category") data = data.na.drop()

已经清洗了数据,现在可以开始构建机器学习模型了。

特征向量与模型训练

SparkMLlib支持将数据集以特征向量的形式处理。需要将常规的列式数据集转换为特征向量,以实现快速且更好的收敛。使用类向量装配器将数据框的列反转为特征向量系列。

from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=["Engine HP","Engine Cylinders","Year","highway MPG","Number of Doors","city mpg","Popularity"], outputCol="Input Attributes")

不在数据上调用向量装配器,因为正在创建一个管道。只需要传递函数;管道将按需转换数据。

构建模型和管道

Spark提供了许多内置的机器学习模型,只需要导入并根据数据进行训练。将使用RandomForestRegressor作为模型,并稍后使用交叉验证进行训练。

from pyspark.ml.regression import RandomForestRegressor regressor_model = RandomForestRegressor(featuresCol='Input_Attributes', labelCol="MSRP")

"RandomForestRegressor"输入所有特征的组合和"labelCol"作为训练的输出特征。

from pyspark.ml import Pipeline pipeline = Pipeline(stages=[assembler, regressor])

管道将多个步骤和转换组合成单个步骤,按顺序调用。在通道中,需要列出序列,数据从左端进入,通过每个处理后从另一端出来。

交叉验证与模型调优

pyspark.ml.tuning类提供了所有用于模型调优的函数。在此之前,需要定义一个用于交叉验证期间超参数调优的超参数网格。

from pyspark.ml.tuning import ParamGridBuilder paramGrid = ParamGridBuilder().addGrid(regressor.numTrees,[100,500]).build()

在这里,参数k被numFolder替换。使用numFolder=3,意味着66%的数据将用于训练,其余将用于测试。

from pyspark.ml.tuning import CrossValidator crossval = CrossValidator(estimator=pipelineModel, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol="MSRP"), numFolds=3)

到目前为止,已经创建了一个带有训练管道的交叉验证器。现在是训练管道的时候了。

数据分割与模型训练

将数据分割为训练和测试部分,80%用于模型训练,其余20%用于验证。

train_data, test_data = data.randomSplit([0.8,0.2], seed=133)

训练过程可能需要一些时间,这取决于机器使用的Spark核心。

cvModel = crossval.fit(train_data)

交叉验证得分和最佳拟合模型将给出参数调整后的最佳模型。

bestModel = cvModel.bestModel print(bestModel.stages)

最佳拟合模型具有numFeatures=7和numTree=500。

transform()方法用于预测。

prediction = cvModel.transform(test_data)

transform方法自动创建一个包含所有预测的预测列。

from pyspark.ml.evaluation import RegressionEvaluator rmse = eval.evaluate(pred, {eval.metricName:'rmse'}) eval = RegressionEvaluator(labelCol='MSRP') mae = eval.evaluate(pred, {eval.metricName:"mae"}) r2 = eval.evaluate(pred,{eval.metricName:'r2'}) print("RMSE: %.3f" % rmse) print("R2: %.3f" % r2) print("MAE: %.3f" % mae)
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485