在本教程系列中,将深入探讨如何使用PySpark进行数据预处理。本文作为系列的第一篇,将详细介绍PySpark的DataFrame结构,并展示如何执行从启动PySpark会话到使用PySpark进行数据预处理的一般操作。
首先,需要从pyspark.sql
包中导入SparkSession
,并设置一个带有名称的Spark会话。以下是启动PySpark会话的代码示例:
from pyspark.sql import SparkSession
data_spark = SparkSession.builder.appName('DataFrame_article').getOrCreate()
在上述代码中,首先导入了SparkSession
,然后使用getOrCreate()
和builder
函数创建了一个Spark会话,并将其存储在一个变量中。
接下来,将读取数据集,并查看PySpark如何以不同的方式和函数读取它。以下是读取CSV文件的代码示例:
data_spark.read.option('header', 'true').csv('/content/sample_data/california_housing_train.csv')
在输出中,可以看到返回了一个DataFrame对象,显示了列名和相应的列类型。为了查看整个数据集,包括列和记录,可以使用show()
方法。
现在,将检查数据集中的列包含的数据类型,并检查这些列是否包含空值。可以使用printSchema()
函数来实现这一点。
df_pyspark = data_spark.read.option('header', 'true').csv('/content/sample_data/california_housing_train.csv')
df_pyspark.printSchema()
通过printSchema()
函数,可以注意到它返回了与列和它们的数据类型相关的大量信息。但是,可以看到每个列都显示为字符串类型,这并不正确。为了解决这个问题,可以添加一个参数inferSchema=True
来改变默认设置。
首先,让看看如何获取每个列的名称,以便可以执行列索引和其他操作。以下是获取列名的代码示例:
df_pyspark.columns
通过columns
对象,可以看到数据集中所有列的名称。现在,让了解如何选择列。例如,如果只想从数据集中提取total_rooms
列,可以使用select
函数。
现在,让看看PySpark中的describe()
函数如何工作。知道pandas的describe
函数,PySpark的describe()
函数的作用也是一样的。
df_pyspark.describe().show()
从describe
函数的结果中,可以找到数据集的以下详细信息:记录总数、列值的平均值、标准差、最小值和最大值。
现在,让学习如何在PySpark的DataFrame中创建一个新列。可以使用withColumn()
函数来实现这一点。
df_pyspark = df_pyspark.withColumn('Updated longitude', df_pyspark['longitude']+1.2)
在上述输出中,可以看到新列已更新到DataFrame中,名为“Updated longitude”。
已经学会了如何添加列,现在将学习如何删除特定的列,因为在数据预处理流程中,需要从数据集中删除不相关的列。
df_pyspark.drop('Updated longitude').show()
在输出中,可以看到“Updated longitude”列已从数据集中删除。只需在参数中提供列名即可从数据集中删除该列。
在学会了添加新列和删除不相关列之后,将看看如何使用withColumnRenamed()
函数重命名列。
df_pyspark.withColumnRenamed('population', 'population per capita').show()
从上述输出中,可以看到“population”列已重命名为“population per capita”。