PySpark 实用函数解析

PySpark,作为Apache Spark的Python接口,提供了一个强大的框架用于大规模数据处理。本文将通过实例探讨PySpark中的一些实用函数,以帮助读者更好地理解和应用这些功能。

PySpark 简介

PySpark是Apache Spark的PythonAPI,它允许开发者使用Python语言来开发Spark应用程序。PySpark不仅支持Spark Core的核心功能,还支持SparkSQL、DataFrame、Streaming、MLlib(机器学习库)等多种功能。

expr() 函数

expr()函数是PySpark中的一个SQL函数,它允许执行类似SQL的表达式。这个函数接受一个字符串参数,执行其中的SQL命令。它使得能够使用那些在PySpark Column类型和pyspark.sql.functions API中不存在的SQL函数。例如,CASE WHEN语句。

from pyspark.sql import SparkSession from pyspark.sql.functions import expr # 创建Spark会话 spark = SparkSession.builder.appName("practice").getOrCreate() # 创建数据 data = [("张三","北京",25, 58, "2022-08-01", 1), ("李四","上海",26,54,"2021-05-02", 2), ("王五","广州",24, 60, "2022-06-02", 3), ("赵六","深圳", 26,75,"2022-07-04", 4)] columns= ["姓名","城市", "年龄", "体重", "会面日期", "偏移量"] df_friends = spark.createDataFrame(data = data, schema = columns) df_friends.show()

以下是expr()函数的一些实际应用示例:

可以使用expr()函数将姓名、年龄和城市列合并到一个新列中。

# 合并姓名、年龄和城市列 df_concat = df_friends.withColumn("姓名-年龄-城市", expr("姓名|| '-'|| 年龄 || '-' || 城市")) df_concat.show()

可以使用CASE WHEN语句在expr()函数中添加一个新列,该列的值基于体重列的值。

# 根据体重判断是否需要锻炼 df_condition = df_friends.withColumn("锻炼需求", expr("CASE WHEN 体重 >= 60 THEN '是' WHEN 体重 < 55 THEN '否' ELSE '享受' END")) df_condition.show()

可以使用expr()函数根据当前列的值来创建新列。例如,可以将下一次会面日期增加偏移量。

# 根据偏移量增加会面日期的月份 df_meetup = df_friends.withColumn("新的会面日期", expr("add_months(会面日期,偏移量)")) df_meetup.show()

填充函数

PySpark提供了lpad()和rpad()两个函数,用于在列值的左侧和右侧添加填充。

from pyspark.sql.functions import col, lpad, rpad # 创建数据 data = [("河北",30000),("山东",50000),("河南",80000)] columns= ["省份名称","省份人口"] df_states = spark.createDataFrame(data = data, schema = columns) df_states.show()

以下是lpad()和rpad()函数的一些实际应用示例:

可以使用lpad()函数在省份名称列的左侧添加填充。

# 左填充省份名称 df_states = df_states.withColumn('省份名称左填充', lpad(col("省份名称"), 10, '#')) df_states.show(truncate=False)

可以使用rpad()函数在省份名称列的右侧添加填充。

# 右填充省份名称 df_states = df_states.withColumn('省份名称右填充', rpad(col("省份名称"), 10, '#')) df_states.show(truncate=False)

如果列字符串长度超过填充字符串长度,返回的列值将被缩短。

# 缩短省份名称 df_states = df_states.withColumn('省份名称条件', lpad(col("省份名称"), 3, '#')) df_states.show(truncate=False)

repeat() 函数

PySpark中,repeat()函数用于重复列值。repeat(str,n)函数返回一个字符串,其中指定的字符串值被重复n次。

from pyspark.sql.functions import expr, repeat # 创建数据 data = [("学生A",25, 80), ("学生B",26, 90),("学生C", 24, 85)] columns= ["学生姓名", "学生年龄", "学生分数"] df_students = spark.createDataFrame(data = data, schema = columns) df_students.show()

以下是repeat()函数的一个实际应用示例:

可以使用repeat()函数将学生姓名列的值重复两次。

# 重复学生姓名 df_repeated = df_students.withColumn("学生姓名重复",(expr("repeat(学生姓名, 2)"))) df_repeated.show()

startswith()函数和endswith()函数用于检查DataFrame列值是否以指定的字符串开始或结束。这两个函数返回布尔值。

from pyspark.sql.functions import col # 创建数据 data = [("学生A",25, 80), ("学生B",26, 90),("学生C", 24, 85), (None, 23, 87)] columns= ["学生姓名", "学生年龄", "学生分数"] df_students = spark.createDataFrame(data = data, schema = columns) df_students.show()

以下是startswith()和endswith()函数的一些实际应用示例:

# 检查学生姓名是否以'学生A'开始 df_check_start = df_students.filter(col("学生姓名").startswith("学生A")) df_check_start.show() # 检查学生姓名是否以'学生C'结束 df_check_end = df_students.filter(col("学生姓名").endswith("学生C")) df_check_end.show()
  • 使用expr()函数在PySpark中使用SQL类表达式合并列。
  • 在expr()函数中将列名作为字符串传递。
  • 使用列值在表达式中创建新列。
  • 使用repeat()函数多次重复列值。
  • 检查列值是否以特定单词开始或结束。
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485