在这篇文章中,将探讨如何利用Apache Spark的MLlib包来构建一个机器学习管道,用于预测汽车的价格。Apache Spark是一个强大的分布式数据处理框架,它提供了一个名为MLlib的机器学习库,支持多种机器学习任务,包括计算机视觉和自然语言处理。MLlib的API与其他机器学习包相似,易于上手,并且支持近实时操作。
首先,需要安装Spark。在Python环境中,可以使用pip包管理器快速安装Spark。此外,还可以在云笔记本如Google Colab、Kaggle笔记本和Databricks上设置Spark。以下是安装Spark的命令:
!pip install pyspark
!pip install findspark
import findspark
findspark.init()
接下来,需要导入一些必要的库,包括pandas、matplotlib以及Spark的库:
import pandas as pd
import matplotlib.pyplot as plt
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
在进一步操作之前,需要创建一个Spark上下文,这是Spark应用程序的入口点,在这里定义配置和核心。
from pyspark import SparkContext, SparkConf
sc = SparkSession.builder.master("local[*]").getOrCreate()
这里,"local[*]"表示创建一个本地会话,使用所有可用的核心。"getOrCreate"表示如果会话尚未创建,则创建一个新的会话。
将加载美国汽车价格数据集,该数据集可以在Kaggle上免费获取。可以创建一个与数据集关联的笔记本或下载数据集。以下是加载数据的代码:
data = sc.read.csv('../input/cars-data/data.csv', inferSchema=True, header=True)
data.show(5)
data.printSchema()
"printSchema()"函数打印所有列及其数据类型,合理地描述了数据集的模式。接下来,进行数据的统计分析:
data.describe().toPandas().transpose()
在数据清洗步骤中,从数据集中移除所有冗余信息。例如,数据集中包含一些NA值,目标是删除这些NaN值。
def replace(column, value):
return when(column!=value,column).otherwise(lit(None))
这个函数替换列和值,并返回与传递的值匹配的值。
data = data.withColumn("Market Category", replace(col("Market Category"),"N/A"))
data.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in data.columns]).show()
"Market Category"列最多有3742个空值,这意味着可以删除此列。
data = data.drop("Market Category")
data = data.na.drop()
已经清洗了数据,现在可以开始构建机器学习模型了。
SparkMLlib支持将数据集以特征向量的形式处理。需要将常规的列式数据集转换为特征向量,以实现快速且更好的收敛。使用类向量装配器将数据框的列反转为特征向量系列。
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["Engine HP","Engine Cylinders","Year","highway MPG","Number of Doors","city mpg","Popularity"], outputCol="Input Attributes")
不在数据上调用向量装配器,因为正在创建一个管道。只需要传递函数;管道将按需转换数据。
Spark提供了许多内置的机器学习模型,只需要导入并根据数据进行训练。将使用RandomForestRegressor作为模型,并稍后使用交叉验证进行训练。
from pyspark.ml.regression import RandomForestRegressor
regressor_model = RandomForestRegressor(featuresCol='Input_Attributes', labelCol="MSRP")
"RandomForestRegressor"输入所有特征的组合和"labelCol"作为训练的输出特征。
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, regressor])
管道将多个步骤和转换组合成单个步骤,按顺序调用。在通道中,需要列出序列,数据从左端进入,通过每个处理后从另一端出来。
pyspark.ml.tuning类提供了所有用于模型调优的函数。在此之前,需要定义一个用于交叉验证期间超参数调优的超参数网格。
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees,[100,500]).build()
在这里,参数k被numFolder替换。使用numFolder=3,意味着66%的数据将用于训练,其余将用于测试。
from pyspark.ml.tuning import CrossValidator
crossval = CrossValidator(estimator=pipelineModel, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol="MSRP"), numFolds=3)
到目前为止,已经创建了一个带有训练管道的交叉验证器。现在是训练管道的时候了。
将数据分割为训练和测试部分,80%用于模型训练,其余20%用于验证。
train_data, test_data = data.randomSplit([0.8,0.2], seed=133)
训练过程可能需要一些时间,这取决于机器使用的Spark核心。
cvModel = crossval.fit(train_data)
交叉验证得分和最佳拟合模型将给出参数调整后的最佳模型。
bestModel = cvModel.bestModel
print(bestModel.stages)
最佳拟合模型具有numFeatures=7和numTree=500。
transform()方法用于预测。
prediction = cvModel.transform(test_data)
transform方法自动创建一个包含所有预测的预测列。
from pyspark.ml.evaluation import RegressionEvaluator
rmse = eval.evaluate(pred, {eval.metricName:'rmse'})
eval = RegressionEvaluator(labelCol='MSRP')
mae = eval.evaluate(pred, {eval.metricName:"mae"})
r2 = eval.evaluate(pred,{eval.metricName:'r2'})
print("RMSE: %.3f" % rmse)
print("R2: %.3f" % r2)
print("MAE: %.3f" % mae)