Apache Spark是一个能够快速处理大规模数据集的数据处理框架,它还可以在多台计算机上分布数据处理任务,无论是独立运行还是与其他分布式计算工具协同工作。它是一个为大数据和机器学习设计的极速统一分析引擎。为了支持Python与Spark的结合,Apache Spark社区发布了一个工具PySpark。通过PySpark,用户可以在Python编程语言中操作RDD(弹性分布式数据集)。
Spark Core:Apache Spark提供的所有功能都建立在Spark Core之上。它管理所有基本的I/O功能,用于任务调度和故障恢复。Spark Core内嵌了一个特殊的集合叫做RDD。RDD是Spark的抽象之一,它负责在集群的所有节点之间划分数据,并将其作为单一单元保存在集群的内存池中。RDD上有两种操作:
Spark SQL:Spark SQL组件是一个分布式的结构化数据处理框架。Spark SQL能够访问结构化和半结构化信息,它还支持在流数据和历史数据上的强大、交互式的分析应用。DataFrame和SQL提供了一种访问各种数据源的通用方式。其主要特点是成本基础优化器和查询中间故障容忍。
Spark Streaming:它是核心Spark API的附加组件,允许对实时数据流进行可扩展、高吞吐量、容错的流处理。Spark Streaming将实时数据分组为小批次,然后将其传递给批处理系统进行处理。它还提供了容错特性。
Spark GraphX:Spark中的GraphX是一个用于图和图并行执行的API。它是一个网络图分析引擎和数据存储。在图中,聚类、分类、遍历、搜索和路径查找也是可能的。
SparkR:SparkR提供了一个分布式数据帧实现。它支持在大型数据集上进行选择、过滤、聚合等操作。
Spark MLlib:Spark MLlib用于在Apache Spark中执行机器学习。MLlib由流行的算法和工具组成。Spark中的MLlib是一个可扩展的机器学习库,它讨论了高质量算法和高速。机器学习算法如回归、分类、聚类、模式挖掘和协同过滤。MLlib中也存在低级别的机器学习原语,如通用梯度下降优化算法。
Spark.ml是Spark的主要机器学习API。该库提供了一个基于DataFrame构建ML管道的高级API。SparkMLlib工具如下:
ML算法构成了MLlib的核心。这些包括常见的学习算法,如分类、回归、聚类和协同过滤。MLlib标准化了API,使得将多个算法组合成一个单一的管道或工作流程变得更加容易。关键概念是管道API,其中管道概念受到scikit-learn项目的启发。
转换器(Transformer):转换器是一个可以将一个DataFrame转换为另一个DataFrame的算法。技术上,转换器实现了一个transform()方法,它将一个DataFrame转换为另一个,通常通过添加一个或多个列。例如:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan']
stages = []
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + 'Indexer')
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "Vec"])
stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol='deposit', outputCol='label')
stages += [label_stringIdx]
numericColumns = ['age', 'balance', 'duration']
assemblerInputs = [c + "Vec" for c in categoricalColumns] + numericColumns
Vassembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [Vassembler]
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
特征化包括特征提取、转换、降维和选择。特征提取是从原始数据中提取特征。特征转换包括缩放、更新或修改特征。特征选择涉及从大量特征中选择必要的子集。
管道:管道将多个转换器和估计器链接在一起以指定ML工作流程。它还提供了构建、评估和调整ML管道的工具。在机器学习中,通常需要运行一系列算法来处理和学习数据。MLlib将这样的工作流程表示为管道,它由一系列管道阶段(转换器和估计器)组成,按特定顺序运行。
数据帧:与RDD相比,数据帧提供了更用户友好的API。基于数据帧的MLlib API为ML算法和多种语言提供了统一的API。数据帧促进了实际的ML管道,特别是特征转换。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mlearnsample').getOrCreate()
df = spark.read.csv('loan_bank.csv', header=True, inferSchema=True)
df.printSchema()
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features', labelCol='label')
lrModel = lr.fit(train)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))
predictions = lrModel.transform(test)
predictions.select('age', 'label', 'rawPrediction', 'prediction').show()
mllib.linalg
是MLlib工具,用于线性代数。