PySpark DataFrame 列操作详解

在大数据处理中,PySparkDataFrame扮演着至关重要的角色,它允许以类似于SQL表的方式操作数据。本文将介绍如何在PySpark DataFrame中执行列操作,包括创建新列、重命名列、选择列和创建列别名等。这些操作是进行PySpark练习时经常需要用到的。将从基本的库安装开始,创建SparkSession,并探索PySpark DataFrame列操作的各种方法。通过本文的学习,将能够轻松地进行PySpark DataFrame的操纵。建议按照顺序逐步阅读本文,因为每个部分都会参考前一部分的内容。

创建PySpark DataFrame中的新列

在本节中,将首先创建一个SparkSession,然后通过导入CSV文件创建一个新的DataFrame。之后,将使用.withColumn()方法创建一个新的列。

首先,需要安装必要的库:

!pip install pyspark

然后,导入库:

from pyspark.sql import SparkSession

创建Spark Session:

spark = SparkSession.builder.appName('PySparkColumn Ops').getOrCreate()

在这里,通过传递一个字符串给.appName()方法来给应用程序命名。接下来,使用.getOrCreate()方法,该方法会创建并实例化SparkSession到对象spark中。如果已经存在SparkSession,.getOrCreate()方法将使用现有的SparkSession,否则会创建一个新的。

读取数据集:

df = spark.read.csv('Fish.csv', sep = ',', inferSchema = True, header = True)

这里,导入了从Kaggle下载的Fish数据集。

检查导入的数据集:

df.show()

创建新列:

df = df.withColumn('Weight in Kg', df.Weight/1000)

假设想要一个新列,显示鱼的重量(以千克为单位)。在这里,使用了.withColumn()方法。在.withColumn()方法中,第一个参数是想要的新列名,第二个参数是想要的列值。这里,给出了新列名“Weight in Kg”,其值为列Weight除以1000,这将把重量值从克转换为千克。

检查更新后的DataFrame:

df.show()

重命名PySparkDataFrame中的现有列

在本节中,将使用.withColumnRenamed()方法重命名PySpark DataFrame中的现有列。

继续使用上一步更新后的DataFrame,并添加了一列鱼的重量(以千克为单位)。

检查当前的PySpark DataFrame:

df.show()

重命名列:

df = df.withColumnRenamed("Weight in Kg", "Weight in Kilograms")

更新了现有列名“Weight in Kg”,在前一节中创建的,更名为新的列名“Weight in Kilograms”。在这里,使用了.withColumnRenamed()方法来满足需求。.withColumnRenamed()方法接受两个参数,第一个是想要更新的现有列名,第二个是想要更改的新列名。

检查更新后的PySpark DataFrame:

df.show()

选择PySpark DataFrame中的一个或多个列

在本节中,将了解如何在PySpark DataFrame中选择列。要在选择PySpark DataFrame中的一个或多个列,将使用.select()方法。这个方法等同于SQL中的SELECT子句,可以一次性选择一个或多个列。

继续使用上一步更新后的DataFrame,并重命名了鱼的重量列(以千克为单位)。

检查当前的PySpark DataFrame:

df.show()

选择列:

df.select(df.Weight, df['Weight in Kilograms']).show()

在这里,使用了.select()方法从之前的PySparkDataFrame中选择“Weight”和“Weight in Kilogram”列。.select()方法接受任意数量的参数,每个参数都是以逗号分隔的字符串形式的列名。即使传递相同的列两次,.show()方法也会显示两次该列。

select方法返回一个PySpark DataFrame,因此在最后使用了.show()方法来显示PySpark DataFrame。

要创建列的别名,将使用.alias()方法。这个方法是SQL中AS关键字的等价物,用于创建别名。它为输出PySpark DataFrame的列提供了一个临时名称。

继续使用上一步更新后的DataFrame,并重命名了鱼的重量列(以千克为单位)。

df.show() df.select(df['Weight in Kilograms'].alias("Kilograms")).show()
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485