在数据分析和探索的过程中,处理缺失值是一个至关重要的环节。尤其是在处理大规模数据集时,数据工程师必须掌握处理数据集中NA/缺失值的技能。本文是关于PySpark系列文章的第二篇,如果对PySpark的DataFrame基础不了解,建议先阅读之前的文章《使用Python入门PySpark》和《使用PySpark的DataFrame进行数据预处理》。
将使用pyspark.sql包来启动SparkSession,以便可以访问来自pyspark.sql的Spark对象。
from pyspark.sql import SparkSession
null_spark = SparkSession.builder.appName('使用PySpark处理缺失值').getOrCreate()
请注意,已经在PySpark系列博客的第一篇文章《使用PySpark入门》中详细覆盖了上述代码段,因此在继续本文之前,请访问该文章。
为了本文,使用了一个虚拟数据集,以便从头开始更容易理解操作。将读取相同的数据集。
df_null_pyspark = null_spark.read.csv('/content/part2.csv', header=True, inferSchema=True)
read.csv()函数专门负责在PySpark中读取CSV格式的数据。第一个参数是数据集的完整路径。第二个参数Header,当标志为True时,将列名作为列标题。第三个参数infer schema,当标志为True时,将显示每列的原始数据类型。
使用show()函数查看数据集的前20行(如果存在)。这个函数与pandas中的head()函数非常相似。
df_null_pyspark.show()
将使用虚拟数据集来处理本文中的缺失值。
在开始删除包含NULL值的列之前,让向介绍一个函数,它可以让知道哪个列有NULL值,哪个列没有。这个函数是printSchema(),它的工作方式与pandas中的“describe”函数相同。
df_null_pyspark.printSchema()
从输出中可以看到,在数据集列名后面,可以看到nullable = True,这意味着该列中有一些NULL值。
要删除数据集中的NULL(NA)值,只需使用NA.drop()函数,它将删除所有包含至少一个NULL值的行。
df_null_pyspark.na.drop().show()
在上面的输出中,可以看到包含NULL值的行已经被删除。
之前看到了如何从行中移除NULL值,也看到它删除了包含至少一个NULL值的完整行。那么,能否在一定程度上控制它,以便仅在满足某些条件时才删除NULL值呢?答案是肯定的!可以这样做,让讨论如何实现这一点。
这个参数是决定在什么条件下跳过NULL值或删除它们的一种方式。在使用这个参数时,有两个选项,让记下它们:
df_null_pyspark.na.drop(how="any").show()
df_null_pyspark.na.drop(how="all").show()
如“any”选项中讨论的,如果一行中有多个NULL值,则会删除该行,否则行将保持不变。
如“all”选项中讨论的,它只会在一行中的所有值都是NULL时才删除NULL值,否则不会有任何变化,即没有行会被删除。
在这个参数中,设置了特定行中最小非NULL值的阈值。例如,如果将阈值设置为2,那么只有当总NULL值超过2时,该行才会被删除,否则该行不会被删除。
df_null_pyspark.na.drop(thresh=2).show()
在这个输出中,可以看到最后一行因为总共有3个NULL值而超过了阈值,所以被删除了,而其他行的NULL值等于或少于2,所以它们不会被删除。
这个参数让想起了pandas,因为这个参数的功能与从数据集中提取特定列的功能相同。在这里,也将了解如何从完整数据集中提取特定列的子集。
df_null_pyspark.na.drop(how='any', subset=['Experience (in years)']).show()
在上面的输出中,可以看到“Experience (in years)”列中的NULL值被成功删除,而其他列中的NULL值没有被删除,因为使用了subset参数。
这个参数将负责填充数据集中存在的缺失(NULL)值,这些值出现在NA.fill()函数中。该函数的第一个参数是需要填充缺失/NULL值的值。第二个参数是想要执行此插补的列/列的名称,这是完全可选的,如果不考虑它,那么插补将在整个数据集上执行。
df_null_pyspark.na.fill('缺失值', 'Employee Name').show()
在上面的输出中,可以看到使用了两个选项,即填充值以及对特定列进行操作,并得到了预期的结果。
注意:如果想要对多个列执行上述操作,只需要以列表数据类型传递这些列的名称。
这是一种更专业的方式来处理缺失值,即使用均值/中位数/众数来填充NULL值,这取决于数据集的领域。在这里,将使用PySpark库中的Imputer函数来使用均值/中位数/众数功能。
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols=['Age of Employee', 'Experience (in years)', 'Salary (per month - $)'],
outputCols=["{}_imputed".format(a) for a in ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)']]
).setStrategy("mean")
代码分解:这里有很多内容,让分解一下。首先,从PySpark的ml.feature库中调用了Imputer函数。然后使用Imputer对象定义了输入列和输出列。在输入列中,给出了需要插补的列的名称,输出列是插补后的列。最后,设置了插补值的策略(这里是均值),但也可以根据数据集使用中位数或众数。
现在已经使用Imputer对象在NULL值的位置插补了均值,但要看到变化,需要同时使用fit-transform方法。
imputer.fit(df_null_pyspark).transform(df_null_pyspark).show()