在当今数据驱动的世界中,对于有志于成为数据科学家的人来说,掌握构建机器学习流水线的技能是必不可少的。本文将带了解如何使用PySpark来构建这些流水线,这是一个动手实践的文章,所以请准备好最喜欢的Python IDE,让开始吧!
思考一下,一个有抱负的数据科学家需要具备哪些技能才能在行业中获得一席之地?一个机器学习项目有很多动态组件需要在成功执行之前将它们联系起来。了解如何构建端到端的机器学习流水线是一个宝贵的资产。作为数据科学家(无论是有抱负的还是已经建立的),应该了解这些机器学习流水线是如何工作的。
简而言之,这是两个学科的融合——数据科学和软件工程。对于数据科学家来说,这两者是相辅相成的。这不仅仅是关于构建模型——还需要具备构建企业级系统的软件技能。
任何数据科学项目的首要(也是第一个)步骤是在构建任何机器学习模型之前理解数据。大多数数据科学初学者在这里绊倒——他们没有花足够的时间了解他们正在处理的数据。有一种倾向是急于构建模型——这是一个必须避免的谬误。
在本文中,将遵循这一原则。将在整个过程中遵循结构化的方法,以确保不会错过任何关键步骤。那么,首先,让花点时间了解将在这里使用的所有变量。将使用最近结束的印度对阵孟加拉国板球比赛的数据集。让看看数据集中的不同变量:
Batsman: 独特的击球手ID(整数)
Batsman_Name: 击球手的名称(字符串)
Bowler: 独特的投球手ID(整数)
Bowler_Name: 投球手的名称(字符串)
Commentary: 事件的广播描述(字符串)
Detail: 描述事件的另一个字符串,如击倒和额外的投球(字符串)
Dismissed: 被击倒的击球手的唯一ID(字符串)
Id: 唯一的行ID(字符串)
Isball: 投球是否合法(布尔值)
Isboundary: 击球手是否击中边界(二进制)
Iswicket: 击球手是否被击倒(二进制)
Over: 超数(双精度)
Runs: 那次投球的得分(整数)
Timestamp: 记录数据的时间(时间戳)
当启动Spark时,SparkSession变量在名为'spark'下适当可用。可以使用它来读取多种类型的文件,如CSV、JSON、TEXT等。这使能够将数据保存为Spark dataframe。
# 使用spark读取CSV文件
df = spark.read.csv('path_to_csv', header=True, inferSchema=True)
df.printSchema()
现在,不希望数据集中的所有列都被处理为字符串。那么,该怎么办呢?可以在Spark中为dataframe定义自定义模式。为此,需要创建一个StructType对象,它接受一个StructField列表。当然,应该用列名、列的数据类型以及是否允许特定列的空值来定义StructField。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# 定义自定义模式
schema = StructType([
StructField("Batsman", IntegerType(), True),
StructField("Batsman_Name", StringType(), True),
# 其他字段...
])
df = spark.read.csv('path_to_csv', header=True, schema=schema)
在任何机器学习项目中,总是有一些列不需要用来解决问题。相信也遇到过这种困境,无论是在行业中还是在在线黑客马拉松中。
# 使用drop函数从数据中删除列
df = df.drop(*['column_to_drop1', 'column_to_drop2'])
与Pandas不同,Spark dataframe没有shape函数来检查数据的维度。可以使用以下代码来检查数据集的维度:
# 检查数据集的维度
rows, cols = df.count(), df.columns.size
print(f"Rows: {rows}, Columns: {cols}")
Spark的describe函数为提供了大多数统计结果,如平均值、计数、最小值、最大值和标准差。可以使用summary函数来获取数值变量的四分位数:
# 描述数据
df.describe().show()
df.summary().show()
很少遇到没有缺失值的数据集。能记得上次发生这种情况是什么时候吗?在构建任何机器学习模型之前,检查所有列中存在的缺失值数量是很重要的。
# 计算数据集中的空值计数
from pyspark.sql.functions import count, when, col, isnull
null_counts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).toPandas()
与Pandas不同,没有value_counts()函数在Spark dataframe中。可以使用groupBy函数来计算分类变量的唯一值计数:
# 计算列中唯一值的计数
value_counts = df.groupBy('column_name').count().show()
大多数机器学习算法只接受数值形式的数据。因此,将数据集中的任何分类变量转换为数字至关重要。
字符串索引类似于标签编码。它为每个类别分配一个唯一的整数值。0被分配给最频繁的类别,1被分配给下一个最频繁的值,依此类推。必须定义想要索引的输入列名和想要结果的输出列名:
from pyspark.ml.feature import StringIndexer
# 字符串索引
indexer = StringIndexer(inputCol="category_column", outputCol="indexed_column")
df = indexer.fit(df).transform(df)
独热编码是每个数据科学家都应该了解的概念。已经多次依赖它来处理缺失值。这是一个救星!
from pyspark.ml.feature import OneHotEncoderEstimator
# 独热编码
encoder = OneHotEncoderEstimator(inputCols=["indexed_column"], outputCols=["encoded_column"])
df = encoder.fit(df).transform(df)
向量装配器将给定的列列表组合成单个向量列。这通常用于数据探索和预处理步骤的最后阶段。在这个阶段,通常使用一些原始或转换后的特征来训练模型。
from pyspark.ml.feature import VectorAssembler
# 向量装配器
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
df = assembler.transform(df)
机器学习项目通常涉及数据预处理、特征提取、模型拟合和评估结果等步骤。需要按顺序对数据执行大量转换。正如可以想象的,跟踪它们可能会变成一项繁琐的任务。
这就是机器学习流水线的用武之地。流水线允许维护所有相关转换的数据流,这些转换是达到最终结果所必需的。需要定义流水线的阶段,它们作为Spark运行的命令链。在这里,每个阶段要么是一个转换器,要么是一个估计器。
顾名思义,转换器将一个dataframe转换为另一个dataframe,要么通过更新特定列的当前值(如将分类列转换为数值),要么通过使用定义的逻辑将其映射到其他值。
估计器实现了fit()方法对dataframe,并产生一个模型。例如,LogisticRegression是一个估计器,当调用fit()方法时,它训练一个分类模型。
from pyspark.ml import Pipeline
# 定义流水线
pipeline = Pipeline(stages=[indexer, encoder, assembler, logisticRegression])
pipelineModel = pipeline.fit(training_df)