Apache Spark是一个广泛用于大规模数据处理的快速且通用的引擎。相较于传统的数据处理软件,它拥有多方面的优势。以下是Apache Spark的主要优势:
Apache Spark的运行速度大约是传统MapReduce作业的100倍。
它支持多种编程语言,如Java、Scala、Python、R等,并提供了用于SQL查询、机器学习和图处理应用的库。其并行分布式处理、容错、可扩展性和内存计算功能使其更加强大。
Apache Spark几乎可以在任何环境中运行。
接下来,将探讨如何使用PySpark来操作数据框。PySpark是Apache Spark的Python接口。将通过小示例/问题陈述来学习如何使用PySpark操作数据框。首先,将编写代码并查看输出;然后在输出下方,将解释代码。
将使用Google Colaboratory,这是Google提供的丰富的编码环境。也可以在本地系统上安装Apache Spark。(安装指南:如何在本地系统上安装Apache Spark)
!pip install pyspark
上述Python代码在Google Colaboratory中安装并导入了pyspark。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
需要创建一个Spark会话才能使用数据框。上述代码正是这样做的。
PS 1. 将csv文件加载到数据框中
df = spark.read.csv("StudentsPerformance.csv", header=True, inferSchema=True)
df.show(5)
header参数表示否将第一行视为列的标题。True意味着希望第一行作为标题。inferSchema参数告诉Spark函数推断数据中列的数据类型(浮点数、整数等)。.show(5)以整洁且易于阅读的格式打印/输出数据框。括号内的5表示打算看到的行数,本例中为5。
.columns帮助查看数据框的列名。
df.columns
printSchema帮助查看DataFrame列的数据类型。
df.printSchema()
nullable=true表示该字段不能包含null值。
PS 2. 选择输出math score列的一些值/行
df.select('math score').show(5)
PS 3: 选择并输出math score、reading score和writing score列的一些值/行
df.select('math score', 'reading score', 'writing score').show(5)
PS 4: 通过将math scores乘以2(当前给出的是100分制)来创建一个新列
df.withColumn("Math_Score_200", 2 * df["math score"]).show(5)
Math_Score_200是创建的新列的名称,其值是math score列的值的两倍,即2 * df["math score"]。现在分数是200分制而不是100分制。
PS 5: 重命名parental level of education列
df.withColumnRenamed("parental level of education", "Parental_Education_Status").show(5)
parental level of education是旧列名,Parental_Education_Status是新列名。
PS 6: 按reading score值升序排序数据框
df.orderBy('reading score').show(5)
注意:默认是升序。所以ascending=True是可选的。要将数据框按降序排列,需要输入df.orderBy(‘reading score’, ascending=False)。
PS 7: 删除race/ethnicity列
df.drop('race/ethnicity').show(5)
PS 8: 显示所有不同的父母教育水平
df.select('parental level of education').distinct().collect()
distinct()输出列中所有不同的值。collect()输出生成的所有数据/输出。
PS 9: 计算每个性别的阅读分数总和
df.select('gender', 'reading score').groupBy('gender').sum('reading score').show()
使用.select选择所需的列,然后使用.groupBy按性别分组,并使用.sum对每个组/类别的阅读分数求和。注意聚合列的名称是sum(reading score)。
PS 10: 过滤数据框,其中阅读分数大于90
df.count()
首先,检查原始数据框的总行数。如上输出所见,为1000。
df.filter(df['reading score'] > 90).count()
使用filter函数选择阅读分数大于90的值,然后计算行数。可以看到行数少了很多,因此它过滤并仅获取了阅读分数> 90的行。
PS 11: 1. 将gender列转换为大写2. 获取阅读分数列中的最低分数
from pyspark.sql import functions
pyspark.sql模块的functions对于对数据框的不同列执行各种操作非常有用。
print(dir(functions))
可以使用dir检查functions模块中有哪些功能/函数。
help(functions.upper)
可以使用help检查特定函数的作用。
from pyspark.sql.functions import upper, col, min
导入了任务所需的函数。
df.select(min(col('reading score'))).show()
计算了reading score列的最小值。
df.select(col('gender'), upper(col('gender'))).show(5)
将gender转换为大写。
PS 12: 重命名列名并永久保存
df = df.withColumnRenamed("parental level of education", "Parental_Education_Status") \
.withColumnRenamed("test preparation course", "Test_Preparation_Course") \
.withColumnRenamed("math score", "Math_Score") \
.withColumnRenamed("reading score", "Reading_Score") \
.withColumnRenamed("writing score", "Writing_Score")
df.show()
到目前为止,所做的更改都是临时的。要永久保留更改,需要将更改分配给同一个数据框,即df=df.withColumnsRenamed(..),或者如果想将更改存储在不同的数据框中,需要将它们分配给不同的数据框,即df_new=df.withColumnsRenamed(..)。
PS 13: 将数据框保存为.csv文件
df.write.csv("table_formed_2.csv", header=True)
将数据框保存为名为table_formed_2.csv的文件。header=True表示数据框有标题。
PS 14: 在单行代码/查询中执行多个转换
df.select(df['parental level of education'], df['lunch'], df['math score']) \
.filter(df['lunch'] == 'standard') \
.groupBy('parental level of education') \
.sum('math score') \
.withColumnRenamed("sum(math score)", "math score") \
.orderBy('math score', ascending=False) \
.show()
注意如何在数据框上执行不同的操作/转换,一个接一个。首先,使用.select选择所需的列。其次,使用.filter选择lunch类型为standard。第三和第四,对每个父母教育水平的math score求和。第五,将聚合列sum(math score)重命名为math score。第六,按math score对结果进行排序。第七,使用.show()输出结果。
PS 15: 创建一个名为DATES的单列数据框,其中包含一个随机日期和时间信息。同时创建另一个列,与最初选择的日期相隔5天。
from pyspark.sql.functions import to_date, to_timestamp, date_add
导入所需的函数。
df2 = spark.createDataFrame([('2012-11-12 11:00:03',)], ['DATES'])
df2.show()
创建包含日期和时间(格式:YYYY-dd-MM HH:mm:ss)的单行数据框DATES。
df3 = df2.select(to_date(col('DATES'), 'yyyy-dd-MM'), to_timestamp(col('DATES'), 'yyyy-dd-MM HH:mm:ss'))
renamed_cols = ['DATE', 'TIMESTAMP']
df4 = df3.toDF(*renamed_cols)
df4.show()
首先创建df3数据框,其中包含两个列,一个仅包含日期信息,另一个包含日期和时间信息。在后一种情况下,使用了to_timestamp函数。然后,从df3创建了df4数据框,这次添加了列名DATE和TIMESTAMP。
df4.select(col('TIMESTAMP'), date_add(col('TIMESTAMP'), 5)).show(1, truncate=False)