Spark SQL在Python中的应用

Spark SQL是Spark内置的结构化数据处理模块,它使用SQL或类似SQL的数据框架API在Spark程序中查询结构化数据。它支持全局临时视图和临时视图,使用视图表和SQL查询来聚合和生成数据。它支持多种数据类型,如Parquet文件、JSON和HIVE表。Spark还使用Catalyst和Tungsten工具优化提供的SQL查询。使用它,可以运行SQL查询。在Spark SQL中,数据视图被视为表。

本文首先将理解为什么应该使用SparkSQL以及它如何在使用Spark时提供灵活性。本文是作为的一部分发布的。

目录

  • 目标
  • SparkSQL特点
  • Spark SQL优化
  • 设置
  • 初始化Spark会话
  • 加载数据和创建表视图
  • Spark SQL函数

目标

本文的目标是介绍Spark SQL的重要性和特点,以及如何在Python中设置Spark,加载数据文件作为数据框架,运行SQL查询和创建视图,以及创建Pandas UDF进行列操作。

SparkSQL特点

Spark SQL功能丰富,支持多种结构化数据,如Hive表、Pandas数据框架、Parquet文件等。可以使用Spark SQL以多种结构化数据格式写入数据,如Hive表、Parquet、JSON等。Spark SQL利用Spark RDD模型的可扩展性和其他优势。使用JDBC或ODBC等连接器,可以以标准方式连接。

Spark SQL优化

Spark SQL优化的主要目标是通过减少查询的时间和内存消耗来提高SQL查询的运行时性能,从而节省组织的时间和金钱。它支持基于规则和基于成本的查询优化。Catalyst,也称为Catalyst优化器,是Spark内置的查询优化器。可以向其中添加自定义优化技术和功能。Tungsten是一个基于成本的查询优化器,它使用树数据结构计算最低成本路径。它提高了CPU性能而不是IO。

设置

将使用Pandas将数据加载到数据框架中。可以将Pandas的数据框架加载到Spark中。安装所需的包需要互联网,因此建议使用任何云笔记本。

# 安装所需的包 !pip install pyspark !pip install findspark !pip install pyarrow==1.0.0 !pip install pandas !pip install numpy==1.19.5

安装完所有所需的包后,需要为Spark设置环境变量。

import findspark findspark.init()

初始化Spark会话

import pandas as pd from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession # 创建Spark上下文类 sc = SparkContext() # 创建Spark会话 spark = SparkSession \ .builder \ .appName("Python Spark DataFrames basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()

运行命令spark将返回运行中的Spark实例信息。

加载数据和创建表视图

在本节中,首先将CSV文件读入Pandas数据框架,然后将其读入Spark数据框架。要创建Spark数据框架,只需将Pandas数据框架转换为Spark数据框架。

# 使用pandas中的read_csv函数读取文件 mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv') mtcars.head() # 重命名列“Unnamed” mtcars.rename(columns={'Unnamed: 0':'name'}, inplace=True)

数据框架包含32个观测值(行)和11个变量。将数据框架加载到Spark中。使用createDataFrame函数将数据加载到Spark数据框架中。

sdf = spark.createDataFrame(mtcars)

打印加载的Spark数据框架的模式。创建表视图。表视图是Spark SQL运行查询所必需的。表视图被视为SQL表。有两种方式可以创建表视图。临时视图和全局视图。使用函数createTempView(),可以创建临时表视图。

sdf.createTempView("cars")

创建视图后,可以将cars视为Spark中的SQL表。

运行SQL查询和数据聚合

创建表视图后,可以运行类似于查询SQL表的查询。显示整个表。

spark.sql("SELECT * FROM cars").show()

选择特定表。显示特定列。

spark.sql("SELECT mpg FROM cars").show(5)

基本SQL过滤。基本过滤查询,以确定具有高里程和低缸数的卡片。

spark.sql("SELECT * FROM cars WHERE mpg > 20 AND cyl < 6").show(5)

聚合。聚合数据并按缸数分组。

spark.sql("SELECT count(*), cyl FROM cars GROUP BY cyl").show()

查询所有以“Merc”开头的汽车。

spark.sql("SELECT * FROM cars WHERE name LIKE 'Merc%'").show()

Pandas UDF在Spark中

用户定义函数(UDF)一次作用于一行,因此它遭受高序列化,因此许多数据管道使用UDF进行数据处理计算任务。通过使用@pandas_udf()装饰器注册常规Python函数,使其成为UDF。它可以应用于Spark数据框架以及Spark SQL。

# 导入Pandas UDF函数 from pyspark.sql.functions import pandas_udf, PandasUDFType

创建UDF。

@pandas_udf("float") def convert_wt(s: pd.Series) -> pd.Series: # 从英制转换为公制吨 return s * 0.45 spark.udf.register("convert_weight", convert_wt) @pandas_udf("float") def convert_mileage(s: pd.Series) -> pd.Series: return s * 0.425 # 在Spark中注册UDF spark.udf.register("convert_mileage", convert_mileage) spark.sql("SELECT *, wt AS imperial_weight, convert_weight(wt) AS weight_metric FROM cars").show()
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485