PySpark,作为Apache Spark的Python接口,提供了一个强大的框架用于大规模数据处理。本文将通过实例探讨PySpark中的一些实用函数,以帮助读者更好地理解和应用这些功能。
PySpark是Apache Spark的PythonAPI,它允许开发者使用Python语言来开发Spark应用程序。PySpark不仅支持Spark Core的核心功能,还支持SparkSQL、DataFrame、Streaming、MLlib(机器学习库)等多种功能。
expr()函数是PySpark中的一个SQL函数,它允许执行类似SQL的表达式。这个函数接受一个字符串参数,执行其中的SQL命令。它使得能够使用那些在PySpark Column类型和pyspark.sql.functions API中不存在的SQL函数。例如,CASE WHEN语句。
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
# 创建Spark会话
spark = SparkSession.builder.appName("practice").getOrCreate()
# 创建数据
data = [("张三","北京",25, 58, "2022-08-01", 1),
("李四","上海",26,54,"2021-05-02", 2),
("王五","广州",24, 60, "2022-06-02", 3),
("赵六","深圳", 26,75,"2022-07-04", 4)]
columns= ["姓名","城市", "年龄", "体重", "会面日期", "偏移量"]
df_friends = spark.createDataFrame(data = data, schema = columns)
df_friends.show()
以下是expr()函数的一些实际应用示例:
可以使用expr()函数将姓名、年龄和城市列合并到一个新列中。
# 合并姓名、年龄和城市列
df_concat = df_friends.withColumn("姓名-年龄-城市", expr("姓名|| '-'|| 年龄 || '-' || 城市"))
df_concat.show()
可以使用CASE WHEN语句在expr()函数中添加一个新列,该列的值基于体重列的值。
# 根据体重判断是否需要锻炼
df_condition = df_friends.withColumn("锻炼需求", expr("CASE WHEN 体重 >= 60 THEN '是' WHEN 体重 < 55 THEN '否' ELSE '享受' END"))
df_condition.show()
可以使用expr()函数根据当前列的值来创建新列。例如,可以将下一次会面日期增加偏移量。
# 根据偏移量增加会面日期的月份
df_meetup = df_friends.withColumn("新的会面日期", expr("add_months(会面日期,偏移量)"))
df_meetup.show()
PySpark提供了lpad()和rpad()两个函数,用于在列值的左侧和右侧添加填充。
from pyspark.sql.functions import col, lpad, rpad
# 创建数据
data = [("河北",30000),("山东",50000),("河南",80000)]
columns= ["省份名称","省份人口"]
df_states = spark.createDataFrame(data = data, schema = columns)
df_states.show()
以下是lpad()和rpad()函数的一些实际应用示例:
可以使用lpad()函数在省份名称列的左侧添加填充。
# 左填充省份名称
df_states = df_states.withColumn('省份名称左填充', lpad(col("省份名称"), 10, '#'))
df_states.show(truncate=False)
可以使用rpad()函数在省份名称列的右侧添加填充。
# 右填充省份名称
df_states = df_states.withColumn('省份名称右填充', rpad(col("省份名称"), 10, '#'))
df_states.show(truncate=False)
如果列字符串长度超过填充字符串长度,返回的列值将被缩短。
# 缩短省份名称
df_states = df_states.withColumn('省份名称条件', lpad(col("省份名称"), 3, '#'))
df_states.show(truncate=False)
在PySpark中,repeat()函数用于重复列值。repeat(str,n)函数返回一个字符串,其中指定的字符串值被重复n次。
from pyspark.sql.functions import expr, repeat
# 创建数据
data = [("学生A",25, 80), ("学生B",26, 90),("学生C", 24, 85)]
columns= ["学生姓名", "学生年龄", "学生分数"]
df_students = spark.createDataFrame(data = data, schema = columns)
df_students.show()
以下是repeat()函数的一个实际应用示例:
可以使用repeat()函数将学生姓名列的值重复两次。
# 重复学生姓名
df_repeated = df_students.withColumn("学生姓名重复",(expr("repeat(学生姓名, 2)")))
df_repeated.show()
startswith()函数和endswith()函数用于检查DataFrame列值是否以指定的字符串开始或结束。这两个函数返回布尔值。
from pyspark.sql.functions import col
# 创建数据
data = [("学生A",25, 80), ("学生B",26, 90),("学生C", 24, 85), (None, 23, 87)]
columns= ["学生姓名", "学生年龄", "学生分数"]
df_students = spark.createDataFrame(data = data, schema = columns)
df_students.show()
以下是startswith()和endswith()函数的一些实际应用示例:
# 检查学生姓名是否以'学生A'开始
df_check_start = df_students.filter(col("学生姓名").startswith("学生A"))
df_check_start.show()
# 检查学生姓名是否以'学生C'结束
df_check_end = df_students.filter(col("学生姓名").endswith("学生C"))
df_check_end.show()