PySpark 数据预处理教程

在本教程系列中,将深入探讨如何使用PySpark进行数据预处理。本文作为系列的第一篇,将详细介绍PySpark的DataFrame结构,并展示如何执行从启动PySpark会话到使用PySpark进行数据预处理的一般操作。

启动PySpark会话

首先,需要从pyspark.sql包中导入SparkSession,并设置一个带有名称的Spark会话。以下是启动PySpark会话的代码示例:

from pyspark.sql import SparkSession data_spark = SparkSession.builder.appName('DataFrame_article').getOrCreate()

在上述代码中,首先导入了SparkSession,然后使用getOrCreate()builder函数创建了一个Spark会话,并将其存储在一个变量中。

读取数据集

接下来,将读取数据集,并查看PySpark如何以不同的方式和函数读取它。以下是读取CSV文件的代码示例:

data_spark.read.option('header', 'true').csv('/content/sample_data/california_housing_train.csv')

在输出中,可以看到返回了一个DataFrame对象,显示了列名和相应的列类型。为了查看整个数据集,包括列和记录,可以使用show()方法。

检查列的数据类型

现在,将检查数据集中的列包含的数据类型,并检查这些列是否包含空值。可以使用printSchema()函数来实现这一点。

df_pyspark = data_spark.read.option('header', 'true').csv('/content/sample_data/california_housing_train.csv') df_pyspark.printSchema()

通过printSchema()函数,可以注意到它返回了与列和它们的数据类型相关的大量信息。但是,可以看到每个列都显示为字符串类型,这并不正确。为了解决这个问题,可以添加一个参数inferSchema=True来改变默认设置。

列索引和选择

首先,让看看如何获取每个列的名称,以便可以执行列索引和其他操作。以下是获取列名的代码示例:

df_pyspark.columns

通过columns对象,可以看到数据集中所有列的名称。现在,让了解如何选择列。例如,如果只想从数据集中提取total_rooms列,可以使用select函数。

描述函数

现在,让看看PySpark中的describe()函数如何工作。知道pandas的describe函数,PySpark的describe()函数的作用也是一样的。

df_pyspark.describe().show()

describe函数的结果中,可以找到数据集的以下详细信息:记录总数、列值的平均值、标准差、最小值和最大值。

在PySpark DataFrame中添加列

现在,让学习如何在PySpark的DataFrame中创建一个新列。可以使用withColumn()函数来实现这一点。

df_pyspark = df_pyspark.withColumn('Updated longitude', df_pyspark['longitude']+1.2)

在上述输出中,可以看到新列已更新到DataFrame中,名为“Updated longitude”。

在PySparkDataFrame中删除列

已经学会了如何添加列,现在将学习如何删除特定的列,因为在数据预处理流程中,需要从数据集中删除不相关的列。

df_pyspark.drop('Updated longitude').show()

在输出中,可以看到“Updated longitude”列已从数据集中删除。只需在参数中提供列名即可从数据集中删除该列。

在学会了添加新列和删除不相关列之后,将看看如何使用withColumnRenamed()函数重命名列。

df_pyspark.withColumnRenamed('population', 'population per capita').show()

从上述输出中,可以看到“population”列已重命名为“population per capita”。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485