使用Spark进行机器学习

在本文中,将探讨如何利用Spark进行机器学习。之前的文章讨论了Spark数据库、安装以及在Python中使用Spark的基础知识。如果还未阅读,可以通过以下链接访问。

本文将主要讨论如何使用PySpark实现机器学习模型,并构建一个回归模型,结合交叉验证和参数调优。众所周知,Spark是一个内存中数据处理工具,能够以分布式方式处理PB级别的数据。使用SparkMLlib包,可以在如此庞大的数据量上实现机器学习模型的分布式处理。

传统的机器学习模型实现方式是借助Apache Mahout,但这种方式最终被证明是缓慢且不够灵活的。Spark机器学习管道可以与实时数据以及流数据结合使用,并且它使用内存中计算来加速处理过程。

Spark最棒的一点是它提供了多种内置的机器学习包,使其更加多功能。这些内置的机器学习包被称为Apache Spark中的MLlib。

MLlib的特点

Spark为处理所有与机器学习相关的任务提供了一个完全不同的包。也可以将第三方库与Spark结合使用。以下是Spark的一些重要特点:

MLlib的API与scikit-learn API非常相似,不需要单独学习它。MLlib提供了各种类型的机器学习重建模型。Spark的MLlib支持计算机视觉和自然语言处理。它可以在实时数据以及分布式数据系统上实现。

机器学习的实现步骤

在Spark中实现机器学习管道需要多个阶段。Spark仅支持以特征向量形式的数据。为了说明,将使用Python和Spark构建一个回归模型。

使用pip包管理Python可以轻松安装Spark。建议在基于云的笔记本上设置Spark,因为安装Spark在本地计算机上可能需要一些时间。

!pip install pyspark !pip install findspark

安装后,需要导入一些基本库以启动Spark集群。

import pandas as pd import matplotlib.pyplot as plt from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession

成功安装后,需要创建一个Spark上下文和会话。创建Spark上下文可以被理解为为特定项目创建一个集群。它是Spark集群的入口点,并保存Spark集群的配置,比如需要使用的核心数和数据流的位置等。

from pyspark import SparkConf, SparkContext scou = SparkSession.builder.master("local[*]").getOrCreate()

"local[*]"定义了要使用的可用CPU核心数。"getOrCreate"创建一个新的会话,如果已定义的会话尚未创建。

在本文中,将使用Kaggle上公开可用的美国汽车价格数据集。如果使用云笔记本,则不需要下载数据集。

data = sc.read.csv('../input/cars/data_car.csv', inferSchema=True, header=True)

"inferSchema=True"保留了数据集的模式。"read.csv"将CSV文件读入Spark。

Spark提供了一些基本方法,以便查看加载数据的基本统计信息。

data.describe().toPandas().transpose()

数据清洗是机器生命周期中最重要的步骤。删除不需要的行和列。在这一步中,从数据集中删除所有无用、重复的信息。删除冗余数据可以提高整体模型性能和准确性。

Spark仅支持特征向量数据格式来处理机器学习任务。特征向量有助于Spark更快地进行推断。

from pyspark.ml.feature import VectorAssembler

将使用VectorAssembler类将DataFrame转换为特征向量系列。

Spark提供了不同的内置机器学习模型。需要导入并简单地在数据上训练它们。

from pyspark.ml.regression import RandomForestRegressor

将使用随机森林回归器作为回归模型。

Spark中的pyspark.ml.tuning类提供了所有可用于模型调优的函数。在执行超参数调优之前,需要定义一个参数网格,用于超参数调优和交叉验证。

from pyspark.ml.tuning import ParamGridBuilder paramGrid = ParamGridBuilder().addGrid(regressor.numTrees,[100,1500]).build() cvModel = crossval.fit(train_data) prediction = cvModel.transform(test_data) from pyspark.ml.evaluation import RegressionEvaluator
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485