在处理大数据时,经常需要对数据框架中的特定行或列进行编辑。在Python中,这通常很容易实现,因为数据框架是可变的,提供了极大的灵活性。然而,在PySpark中,情况有所不同。PySpark的数据结构主要是RDD、DataFrame和GraphFrame,这些数据框架是不可变的,因此在行/列级别的处理上灵活性较低。
在编写PySpark代码时,必须坚持使用Spark的特性,而不是转向Python。这是因为,如果代码完全用Python编写,就会失去Spark特有的并行计算效率。当处理大量数据时,如果处理器遇到仅用Python编写的代码行,那么并行计算的优势就会丧失,所有数据都会被压缩到一个节点上,导致处理工作变得困难,甚至失败。
本文将展示一些在保持Spark并行处理能力的同时,灵活处理不可变数据框架中的行/列的方法。
对于列级别的操作,首先想到的解决方案是使用用户定义函数(UDF)。将列级别的处理代码写成Python函数,并将其作为Spark UDF调用。
例如,假设有一个结构化数据列,需要将其转换为字典格式。
from ast import literal_eval
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StringType, StructField
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
def structure_change_fn(struct_col):
dict_val = literal_eval(struct_col)
result = ['{"date":"' + str(val1) + '", "count":' + str(val2) + '}' for val1, val2 in dict_val.items()]
return result
udf_restructure = F.udf(structure_change_fn, StringType())
input_df = [("1","{'2021-06-01':300, '2021-06-02':400, '2021-06-03':300,'2021-06-04':500}")]
schema = StructType([
StructField("id", StringType(), True),
StructField("struct", StringType(), True)])
input_df = spark.createDataFrame(data=input_df, schema=schema)
result_df = input_df.withColumn('dict_val', udf_restructure('struct')).select('id', 'dict_val')
在这个例子中,定义了一个函数`structure_change_fn`,它将结构化数据列转换为字典格式。然后,使用Spark的UDF功能将这个Python函数应用到DataFrame的列上。
现在,考虑一个情况,即基于多个列的值派生出一列的值。
例如,每个班级的顶尖学生名字按科目给出,同时给出总体顶尖班级的名字。两者结合,动态地为每个记录生成所有3个班级的顶尖学生的名字。预期的输出是一个描述列,通过替换顶尖班级和顶尖学生的名字在描述文本中生成。因此,每个记录的描述都是定制的文本。
def desc_fn(desc_col, format_col0, format_col1, format_col2, format_class_col3):
get_class_value = '{0}' if format_class_col3=='A' else '{1}' if format_class_col3=='B' else '{2}'
desc_col_edit = get_class_value + desc_col
return desc_col_edit.format(format_col0, format_col1, format_col2, format_class_col3)
udf_desc_fn = F.udf(desc_fn, StringType())
inter_df = input_df.withColumn('description_temp', ' of Class {3} got the first mark')
final_df = inter_df.withColumn('Description', udf_desc_fn('description_temp', 'class_A_topper', 'class_B_topper', 'class_C_topper', 'overall_topper_class'))
在这个场景中,再次使用UDF。但是,这个UDF不仅仅使用一个列的值,而是使用多个列的值,并为每个列生成动态的描述值。这是通过在UDF内部编写Python字符串格式化函数来实现的。
现在,考虑一个情况,即有多个条件需要验证,并且结果描述列有很多可能性。
例如,对于每个用户,他们相对于三个条件(A、B和C)的状态都给出了。'1'表示条件匹配,而'-1'表示条件不匹配。需要生成一个合适的描述。描述有一个大列表的可能值,基于条件。
# 读取输入和配置文件作为数据框架
input_df.createOrReplaceTempView('input_tbl')
config_df.createOrReplaceTempView('config_tbl')
join_qry = """
SELECT input_tbl.user_id,
input_tbl.condition_A,
input_tbl.condition_B,
input_tbl.condition_C,
config_tbl.description
FROM input_tbl, config_tbl
WHERE input_tbl.condition_A*config_tbl.condition_A>0
AND input_tbl.condition_B*config_tbl.condition_B>0
AND input_tbl.condition_C*config_tbl.condition_C>0
"""
join_df = spark.sql(join_qry)