在本文中,将探讨如何利用Spark和Python构建一个机器学习管道,用于预测汽车的价格。Spark是一个内存分布式数据处理工具,它能够处理大量实时数据,并且比传统的基于磁盘的Map-Reduce技术快100倍。在Hadoop生态系统中,Apache Mahout曾被用于机器学习任务,但现在Spark提供了一个更先进的ML-lib包,用于快速实现机器学习任务。
ML-lib是Spark内置的机器学习库,支持几乎所有的机器学习任务,包括监督学习和无监督学习。它还支持计算机视觉和自然语言处理。ML-lib的API与sklearn的API相似,因此更容易上手。此外,ML-lib还能够对实时数据进行预测。
本文的目标是使用ApacheSpark和Python实现美国汽车价格预测(一个回归任务)。将经历以下步骤:在云端笔记本中设置Spark、加载数据、数据清洗、特征向量构建、模型训练和测试以及性能评估。
使用pip包管理器在Python环境中安装Spark非常简单。推荐使用任何云端笔记本,例如Google Colab、Kaggle或Databricks笔记本。以下是安装Spark的命令:
!pip install pyspark
!pip install findspark
接下来,需要导入必要的库,并创建Spark上下文和会话:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkSession.builder.master("local[*]").getOrCreate()
在这里,创建了一个Spark上下文类,并验证了会话是否正在运行。
将使用Kaggle上的美国汽车价格预测数据集。可以下载数据集或使用Kaggle笔记本。以下是加载数据的代码:
data = sc.read.csv('../input/cars-data/data.csv', inferSchema=True, header=True)
data.show(5)
data.printSchema()
这里,使用read.csv方法加载数据,并显示前5行数据以及数据的模式。
数据中包含一些“N/A”值作为字符串,需要将它们替换为实际的NA值。以下是替换函数的代码:
from pyspark.sql.functions import when, lit, count, isnan, col
def replace(column, value):
return when(column!=value, column).otherwise(lit(None))
data = data.withColumn("Market Category", replace(col("Market Category"),"N/A"))
这个函数接受一个列和值,并将匹配的值替换为None。然后,统计每个列的空值数量,并删除包含空值的列。
为了训练模型,需要将常规的列数据转换为特征向量,以获得更好的收敛性。使用VectorAssembler类,可以将数据框的列转换为特征向量。以下是构建特征向量的代码:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["Year","Engine HP","Engine Cylinders",
"Number of Doors","highway MPG","city mpg","Popularity"],
outputCol="Input Attributes")
在这里,将“Year”、“Engine HP”、“Engine Cylinders”、“Number of Doors”、“highway MPG”、“city mpg”和“Popularity”这些列作为输入特征,并将其转换为特征向量。
Spark提供了内置的机器学习模型。将使用RandomForestRegressor作为回归任务,并使用交叉验证进行训练。以下是创建模型和管道的代码:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
regressor = RandomForestRegressor(featuresCol='Input Attributes', labelCol="MSRP")
pipeline = Pipeline(stages=[assembler, regressor])
在这里,定义了一个随机森林回归器,并将其与特征向量组装器一起放入管道中。
定义了一个超参数空间,用于在交叉验证期间进行超参数调优。以下是参数调优和交叉验证的代码:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees,[100,500]).build()
crossval = CrossValidator(estimator=pipelineModel,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(labelCol="MSRP"),
numFolds=3)
在这里,创建了一个交叉验证器,并配备了训练管道,但尚未用数据拟合它。
将数据分割为训练和测试部分,其中80%用于模型训练,其余20%用于预测。以下是数据分割的代码:
train_data, test_data = data.randomSplit([0.8,0.2], seed=123)
在这里,将数据随机分割为训练集和测试集。
这个过程可能需要一些时间,因为它是实际的训练过程。以下是模型拟合的代码:
cvModel = crossval.fit(train_data)
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol='MSRP')
rmse = eval.evaluate(pred, {eval.metricName:'rmse'})
mae = eval.evaluate(pred, {eval.metricName:"mae"})
r2 = eval.evaluate(pred,{eval.metricName:'r2'})