基于Spark的汽车价格预测模型

在本文中,将探讨如何利用Spark和Python构建一个机器学习管道,用于预测汽车的价格。Spark是一个内存分布式数据处理工具,它能够处理大量实时数据,并且比传统的基于磁盘的Map-Reduce技术快100倍。在Hadoop生态系统中,Apache Mahout曾被用于机器学习任务,但现在Spark提供了一个更先进的ML-lib包,用于快速实现机器学习任务。

ML-lib包的特点

ML-lib是Spark内置的机器学习库,支持几乎所有的机器学习任务,包括监督学习和无监督学习。它还支持计算机视觉和自然语言处理。ML-lib的API与sklearn的API相似,因此更容易上手。此外,ML-lib还能够对实时数据进行预测。

目标

本文的目标是使用ApacheSparkPython实现美国汽车价格预测(一个回归任务)。将经历以下步骤:在云端笔记本中设置Spark、加载数据、数据清洗、特征向量构建、模型训练和测试以及性能评估。

设置Spark

使用pip包管理器在Python环境中安装Spark非常简单。推荐使用任何云端笔记本,例如Google Colab、Kaggle或Databricks笔记本。以下是安装Spark的命令:

!pip install pyspark !pip install findspark

接下来,需要导入必要的库,并创建Spark上下文和会话:

import findspark findspark.init() from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession sc = SparkSession.builder.master("local[*]").getOrCreate()

在这里,创建了一个Spark上下文类,并验证了会话是否正在运行。

加载数据

将使用Kaggle上的美国汽车价格预测数据集。可以下载数据集或使用Kaggle笔记本。以下是加载数据的代码:

data = sc.read.csv('../input/cars-data/data.csv', inferSchema=True, header=True) data.show(5) data.printSchema()

这里,使用read.csv方法加载数据,并显示前5行数据以及数据的模式。

数据清洗

数据中包含一些“N/A”值作为字符串,需要将它们替换为实际的NA值。以下是替换函数的代码:

from pyspark.sql.functions import when, lit, count, isnan, col def replace(column, value): return when(column!=value, column).otherwise(lit(None)) data = data.withColumn("Market Category", replace(col("Market Category"),"N/A"))

这个函数接受一个列和值,并将匹配的值替换为None。然后,统计每个列的空值数量,并删除包含空值的列。

特征向量构建

为了训练模型,需要将常规的列数据转换为特征向量,以获得更好的收敛性。使用VectorAssembler类,可以将数据框的列转换为特征向量。以下是构建特征向量的代码:

from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=["Year","Engine HP","Engine Cylinders", "Number of Doors","highway MPG","city mpg","Popularity"], outputCol="Input Attributes")

在这里,将“Year”、“Engine HP”、“Engine Cylinders”、“Number of Doors”、“highway MPG”、“city mpg”和“Popularity”这些列作为输入特征,并将其转换为特征向量。

创建模型和管道

Spark提供了内置的机器学习模型。将使用RandomForestRegressor作为回归任务,并使用交叉验证进行训练。以下是创建模型和管道的代码:

from pyspark.ml.regression import RandomForestRegressor from pyspark.ml import Pipeline regressor = RandomForestRegressor(featuresCol='Input Attributes', labelCol="MSRP") pipeline = Pipeline(stages=[assembler, regressor])

在这里,定义了一个随机森林回归器,并将其与特征向量组装器一起放入管道中。

参数调优和交叉验证

定义了一个超参数空间,用于在交叉验证期间进行超参数调优。以下是参数调优和交叉验证的代码:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator paramGrid = ParamGridBuilder().addGrid(regressor.numTrees,[100,500]).build() crossval = CrossValidator(estimator=pipelineModel, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(labelCol="MSRP"), numFolds=3)

在这里,创建了一个交叉验证器,并配备了训练管道,但尚未用数据拟合它。

数据分割

将数据分割为训练和测试部分,其中80%用于模型训练,其余20%用于预测。以下是数据分割的代码:

train_data, test_data = data.randomSplit([0.8,0.2], seed=123)

在这里,将数据随机分割为训练集和测试集。

模型拟合

这个过程可能需要一些时间,因为它是实际的训练过程。以下是模型拟合的代码:

cvModel = crossval.fit(train_data) from pyspark.ml.evaluation import RegressionEvaluator eval = RegressionEvaluator(labelCol='MSRP') rmse = eval.evaluate(pred, {eval.metricName:'rmse'}) mae = eval.evaluate(pred, {eval.metricName:"mae"}) r2 = eval.evaluate(pred,{eval.metricName:'r2'})
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485