Apache Spark与PySpark数据处理

Apache Spark是一个广泛用于大规模数据处理的快速且通用的引擎。相较于传统的数据处理软件,它拥有多方面的优势。以下是Apache Spark的主要优势:

速度

Apache Spark的运行速度大约是传统MapReduce作业的100倍。

易用性

它支持多种编程语言,如Java、Scala、Python、R等,并提供了用于SQL查询、机器学习和图处理应用的库。其并行分布式处理、容错、可扩展性和内存计算功能使其更加强大。

平台无关性

Apache Spark几乎可以在任何环境中运行。

Apache Spark的组件

接下来,将探讨如何使用PySpark来操作数据框。PySpark是Apache Spark的Python接口。将通过小示例/问题陈述来学习如何使用PySpark操作数据框。首先,将编写代码并查看输出;然后在输出下方,将解释代码。

将使用Google Colaboratory,这是Google提供的丰富的编码环境。也可以在本地系统上安装Apache Spark。(安装指南:如何在本地系统上安装Apache Spark)

!pip install pyspark

上述Python代码在Google Colaboratory中安装并导入了pyspark。

from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()

需要创建一个Spark会话才能使用数据框。上述代码正是这样做的。

PS 1. 将csv文件加载到数据框中

df = spark.read.csv("StudentsPerformance.csv", header=True, inferSchema=True) df.show(5)

header参数表示否将第一行视为列的标题。True意味着希望第一行作为标题。inferSchema参数告诉Spark函数推断数据中列的数据类型(浮点数、整数等)。.show(5)以整洁且易于阅读的格式打印/输出数据框。括号内的5表示打算看到的行数,本例中为5。

.columns帮助查看数据框的列名。

df.columns

printSchema帮助查看DataFrame列的数据类型。

df.printSchema()

nullable=true表示该字段不能包含null值。

PS 2. 选择输出math score列的一些值/行

df.select('math score').show(5)

PS 3: 选择并输出math score、reading score和writing score列的一些值/行

df.select('math score', 'reading score', 'writing score').show(5)

PS 4: 通过将math scores乘以2(当前给出的是100分制)来创建一个新列

df.withColumn("Math_Score_200", 2 * df["math score"]).show(5)

Math_Score_200是创建的新列的名称,其值是math score列的值的两倍,即2 * df["math score"]。现在分数是200分制而不是100分制。

PS 5: 重命名parental level of education列

df.withColumnRenamed("parental level of education", "Parental_Education_Status").show(5)

parental level of education是旧列名,Parental_Education_Status是新列名。

PS 6: 按reading score值升序排序数据框

df.orderBy('reading score').show(5)

注意:默认是升序。所以ascending=True是可选的。要将数据框按降序排列,需要输入df.orderBy(‘reading score’, ascending=False)。

PS 7: 删除race/ethnicity列

df.drop('race/ethnicity').show(5)

PS 8: 显示所有不同的父母教育水平

df.select('parental level of education').distinct().collect()

distinct()输出列中所有不同的值。collect()输出生成的所有数据/输出。

PS 9: 计算每个性别的阅读分数总和

df.select('gender', 'reading score').groupBy('gender').sum('reading score').show()

使用.select选择所需的列,然后使用.groupBy按性别分组,并使用.sum对每个组/类别的阅读分数求和。注意聚合列的名称是sum(reading score)。

PS 10: 过滤数据框,其中阅读分数大于90

df.count()

首先,检查原始数据框的总行数。如上输出所见,为1000。

df.filter(df['reading score'] > 90).count()

使用filter函数选择阅读分数大于90的值,然后计算行数。可以看到行数少了很多,因此它过滤并仅获取了阅读分数> 90的行。

PS 11: 1. 将gender列转换为大写2. 获取阅读分数列中的最低分数

from pyspark.sql import functions

pyspark.sql模块的functions对于对数据框的不同列执行各种操作非常有用。

print(dir(functions))

可以使用dir检查functions模块中有哪些功能/函数。

help(functions.upper)

可以使用help检查特定函数的作用。

from pyspark.sql.functions import upper, col, min

导入了任务所需的函数。

df.select(min(col('reading score'))).show()

计算了reading score列的最小值。

df.select(col('gender'), upper(col('gender'))).show(5)

将gender转换为大写。

PS 12: 重命名列名并永久保存

df = df.withColumnRenamed("parental level of education", "Parental_Education_Status") \ .withColumnRenamed("test preparation course", "Test_Preparation_Course") \ .withColumnRenamed("math score", "Math_Score") \ .withColumnRenamed("reading score", "Reading_Score") \ .withColumnRenamed("writing score", "Writing_Score") df.show()

到目前为止,所做的更改都是临时的。要永久保留更改,需要将更改分配给同一个数据框,即df=df.withColumnsRenamed(..),或者如果想将更改存储在不同的数据框中,需要将它们分配给不同的数据框,即df_new=df.withColumnsRenamed(..)。

PS 13: 将数据框保存为.csv文件

df.write.csv("table_formed_2.csv", header=True)

将数据框保存为名为table_formed_2.csv的文件。header=True表示数据框有标题。

PS 14: 在单行代码/查询中执行多个转换

df.select(df['parental level of education'], df['lunch'], df['math score']) \ .filter(df['lunch'] == 'standard') \ .groupBy('parental level of education') \ .sum('math score') \ .withColumnRenamed("sum(math score)", "math score") \ .orderBy('math score', ascending=False) \ .show()

注意如何在数据框上执行不同的操作/转换,一个接一个。首先,使用.select选择所需的列。其次,使用.filter选择lunch类型为standard。第三和第四,对每个父母教育水平的math score求和。第五,将聚合列sum(math score)重命名为math score。第六,按math score对结果进行排序。第七,使用.show()输出结果。

PS 15: 创建一个名为DATES的单列数据框,其中包含一个随机日期和时间信息。同时创建另一个列,与最初选择的日期相隔5天。

from pyspark.sql.functions import to_date, to_timestamp, date_add

导入所需的函数。

df2 = spark.createDataFrame([('2012-11-12 11:00:03',)], ['DATES']) df2.show()

创建包含日期和时间(格式:YYYY-dd-MM HH:mm:ss)的单行数据框DATES。

df3 = df2.select(to_date(col('DATES'), 'yyyy-dd-MM'), to_timestamp(col('DATES'), 'yyyy-dd-MM HH:mm:ss')) renamed_cols = ['DATE', 'TIMESTAMP'] df4 = df3.toDF(*renamed_cols) df4.show()

首先创建df3数据框,其中包含两个列,一个仅包含日期信息,另一个包含日期和时间信息。在后一种情况下,使用了to_timestamp函数。然后,从df3创建了df4数据框,这次添加了列名DATE和TIMESTAMP。

df4.select(col('TIMESTAMP'), date_add(col('TIMESTAMP'), 5)).show(1, truncate=False)
  • Apache Spark的基本介绍及其优势。
  • 使用PySpark对数据框执行不同的转换,并提供适当的解释。
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485