PySpark数据框架处理

在处理大数据时,经常需要对数据框架中的特定行或列进行编辑。在Python中,这通常很容易实现,因为数据框架是可变的,提供了极大的灵活性。然而,在PySpark中,情况有所不同。PySpark的数据结构主要是RDD、DataFrame和GraphFrame,这些数据框架是不可变的,因此在行/列级别的处理上灵活性较低。

在编写PySpark代码时,必须坚持使用Spark的特性,而不是转向Python。这是因为,如果代码完全用Python编写,就会失去Spark特有的并行计算效率。当处理大量数据时,如果处理器遇到仅用Python编写的代码行,那么并行计算的优势就会丧失,所有数据都会被压缩到一个节点上,导致处理工作变得困难,甚至失败。

本文将展示一些在保持Spark并行处理能力的同时,灵活处理不可变数据框架中的行/列的方法。

1. 单列复杂条件/表达式

对于列级别的操作,首先想到的解决方案是使用用户定义函数(UDF)。将列级别的处理代码写成Python函数,并将其作为Spark UDF调用。

例如,假设有一个结构化数据列,需要将其转换为字典格式。

from ast import literal_eval from pyspark.sql import SparkSession, DataFrame from pyspark.sql.types import StructType, StringType, StructField import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() def structure_change_fn(struct_col): dict_val = literal_eval(struct_col) result = ['{"date":"' + str(val1) + '", "count":' + str(val2) + '}' for val1, val2 in dict_val.items()] return result udf_restructure = F.udf(structure_change_fn, StringType()) input_df = [("1","{'2021-06-01':300, '2021-06-02':400, '2021-06-03':300,'2021-06-04':500}")] schema = StructType([ StructField("id", StringType(), True), StructField("struct", StringType(), True)]) input_df = spark.createDataFrame(data=input_df, schema=schema) result_df = input_df.withColumn('dict_val', udf_restructure('struct')).select('id', 'dict_val')

在这个例子中,定义了一个函数`structure_change_fn`,它将结构化数据列转换为字典格式。然后,使用Spark的UDF功能将这个Python函数应用到DataFrame的列上。

2. 多列值 - 简单条件

现在,考虑一个情况,即基于多个列的值派生出一列的值。

例如,每个班级的顶尖学生名字按科目给出,同时给出总体顶尖班级的名字。两者结合,动态地为每个记录生成所有3个班级的顶尖学生的名字。预期的输出是一个描述列,通过替换顶尖班级和顶尖学生的名字在描述文本中生成。因此,每个记录的描述都是定制的文本。

def desc_fn(desc_col, format_col0, format_col1, format_col2, format_class_col3): get_class_value = '{0}' if format_class_col3=='A' else '{1}' if format_class_col3=='B' else '{2}' desc_col_edit = get_class_value + desc_col return desc_col_edit.format(format_col0, format_col1, format_col2, format_class_col3) udf_desc_fn = F.udf(desc_fn, StringType()) inter_df = input_df.withColumn('description_temp', ' of Class {3} got the first mark') final_df = inter_df.withColumn('Description', udf_desc_fn('description_temp', 'class_A_topper', 'class_B_topper', 'class_C_topper', 'overall_topper_class'))

在这个场景中,再次使用UDF。但是,这个UDF不仅仅使用一个列的值,而是使用多个列的值,并为每个列生成动态的描述值。这是通过在UDF内部编写Python字符串格式化函数来实现的。

3. 多列值 - 复杂条件

现在,考虑一个情况,即有多个条件需要验证,并且结果描述列有很多可能性。

例如,对于每个用户,他们相对于三个条件(A、B和C)的状态都给出了。'1'表示条件匹配,而'-1'表示条件不匹配。需要生成一个合适的描述。描述有一个大列表的可能值,基于条件。

# 读取输入和配置文件作为数据框架 input_df.createOrReplaceTempView('input_tbl') config_df.createOrReplaceTempView('config_tbl') join_qry = """ SELECT input_tbl.user_id, input_tbl.condition_A, input_tbl.condition_B, input_tbl.condition_C, config_tbl.description FROM input_tbl, config_tbl WHERE input_tbl.condition_A*config_tbl.condition_A>0 AND input_tbl.condition_B*config_tbl.condition_B>0 AND input_tbl.condition_C*config_tbl.condition_C>0 """ join_df = spark.sql(join_qry)
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485