使用Spark MLlib构建机器学习流水线

在本文中,将探讨如何利用Python中的Spark来实现机器学习。目标是构建一个在Spark中工作的回归模型流水线,并能够进行实时预测。将详细讨论Spark MLlib包,以便构建机器学习流水线。

最终目标是利用Spark机器学习API构建一个汽车价格预测器,并执行所有必要的步骤。将深入了解Spark的工作原理以及Spark的不同类和MLlib包。还将讨论流水线的工作方式以及与之相关的特征工程。

为什么选择Spark?

Spark是一个内存中分布式数据处理和计算工具,能够处理大量数据。与Hadoop Map-reduce相比,Spark的运行速度要快100倍,因为Map-reduce在磁盘上执行计算和数据处理,这使得它运行更慢。在Spark数据流流水线上构建的各种机器学习流水线可以实时运行。

SparkMLlib

Spark提供了一个单独的包来执行所有与机器学习相关的任务。还有各种第三方依赖项可以与Spark完美配合使用。Apache Mahout用于机器学习,在Hadoop生态系统中被使用。由于Spark是一个更新且定义良好的大数据处理工具,使用Spark依赖项来完成任务。

MLlib包的特点:

  • MLlib包含了所有中间处理和特征工程类所需的内容。
  • MLlib支持所有类型的监督学习和聚类算法。
  • MLlib可以应用于自然语言处理和计算机视觉。
  • MLlib的API与sklearn的API相似,因此对新学习者来说更容易适应。
  • 与实时流数据兼容。

目标

在本文中,将使用Apache Spark和Python构建一个汽车价格预测器流水线(一个回归任务)。

设置Spark环境

Python环境中安装Spark可以通过pip包管理器完成,尽管建议在任何云笔记本中设置Pyspark,例如Google Colab、Kaggle、Databricks等。

!pip install pyspark !pip install findspark import findspark findspark.init()

导入库:

import pandas as pd import matplotlib.pyplot as plt from pyspark.sql import SparkSession from pyspark import SparkContext, SparkConf

Spark上下文和会话:

Spark上下文是任何Spark应用程序的入口点。要在Spark上工作,加载数据,需要创建Spark上下文。这更像是创建一个类的实例。

from pyspark import SparkContext, SparkConf sc = SparkSession.builder.master("local[*]").getOrCreate()

local[*] → 使用所有可用的CPU核心创建会话。

getOrCreate → 如果尚未创建上下文,则创建一个新会话。

数据加载

正在导入公开可用的汽车价格预测数据集。可以下载数据集,或者创建一个云笔记本并导入数据集而无需下载。

data = sc.read.csv('cars-data/data.csv', inferSchema = True, header = True) data.show(5) data.printSchema()

printSchema() 打印数据集的结构。

inferSchema() 加载数据集的模式。

统计分析

Spark提供了内置的统计分析类。

data.describe().toPandas().transpose()

数据清洗

数据集中包含一些“N/A”字符串值,希望将它们替换为实际的NA值。

def replace(column, value): return when(column!=value,column).otherwise(lit(None))

replace函数接受一个列和值,并将匹配的值替换为None。

data = data.withColumn("Market Category", replace(col("Market Category"),"N/A"))

NULL值:

from pyspark.sql.functions import when,lit,count,isnan,col data.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in data.columns]).show()

Market Category列中NULL或NaN值最多,这意味着Market Category在数据集中并不重要,因此可以自由地删除此列。

删除NaN值:

data = data.drop("Market Category") data = data.na.drop()

在这个阶段,已经清理了数据,现在开始特征工程过程。

print((data.count(), len(data.columns)))

Spark MLlib中的特征向量

Spark MLlib接受数据形式为特征向量。通过使用Spark MLlib类中的VectorAssembler将常规列转换为Spark特征向量。

from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols = ["Year","Engine HP","Engine Cylinders","Number of Doors","Popularity", "highway MPG","city mpg"], outputCol = "Input Attributes")

outputCol: 这是通过训练列生成的特征的名称。

模型和流水线

SparkMLlib提供了许多内置的机器学习模型和预处理单元。只需要调用它们并在上面工作。

from pyspark.ml.regression import RandomForestRegressor regressor = RandomForestRegressor(labelCol = "MSRP",featuresCol = 'Input Attributes')

featuresCol: 这是输入特征列的名称。

labelCol: 这是训练的标签列。

流水线是一个由多个步骤顺序执行的组合。数据从一边进入,从另一边出来。还可以将流水线从磁盘加载并保存到磁盘上。

from pyspark.ml import Pipeline pipeline = Pipeline(stages = [assembler,regressor]) pipeline.write().overwrite().save("pipeline_saved_model") pipelineModel = Pipeline.load('./pipeline_saved_model')

分割数据

将数据集分割为训练和测试部分。将在80%的数据上训练流水线,剩余的将用于测试。

data_train , data_test = data.randomSplit([0.8,0.2], seed = 123)

训练流水线

这个过程取决于系统性能,可能需要一些时间。

Model = PipelineModel.fit(train_data) from pyspark.ml.evaluation import RegressionEvaluator eval = RegressionEvaluator(labelCol = 'MSRP') rmse = eval.evaluate(pred, {eval.metricName:'rmse'}) r2 =eval.evaluate(pred,{eval.metricName:'r2'}) print("RMSE: %.2f" % rmse) print("R2: %.2f" % r2)
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485