Spark SQL是Spark内置的结构化数据处理模块,它使用SQL或类似SQL的数据框架API在Spark程序中查询结构化数据。它支持全局临时视图和临时视图,使用视图表和SQL查询来聚合和生成数据。它支持多种数据类型,如Parquet文件、JSON和HIVE表。Spark还使用Catalyst和Tungsten工具优化提供的SQL查询。使用它,可以运行SQL查询。在Spark SQL中,数据视图被视为表。
本文首先将理解为什么应该使用SparkSQL以及它如何在使用Spark时提供灵活性。本文是作为的一部分发布的。
本文的目标是介绍Spark SQL的重要性和特点,以及如何在Python中设置Spark,加载数据文件作为数据框架,运行SQL查询和创建视图,以及创建Pandas UDF进行列操作。
Spark SQL功能丰富,支持多种结构化数据,如Hive表、Pandas数据框架、Parquet文件等。可以使用Spark SQL以多种结构化数据格式写入数据,如Hive表、Parquet、JSON等。Spark SQL利用Spark RDD模型的可扩展性和其他优势。使用JDBC或ODBC等连接器,可以以标准方式连接。
Spark SQL优化的主要目标是通过减少查询的时间和内存消耗来提高SQL查询的运行时性能,从而节省组织的时间和金钱。它支持基于规则和基于成本的查询优化。Catalyst,也称为Catalyst优化器,是Spark内置的查询优化器。可以向其中添加自定义优化技术和功能。Tungsten是一个基于成本的查询优化器,它使用树数据结构计算最低成本路径。它提高了CPU性能而不是IO。
将使用Pandas将数据加载到数据框架中。可以将Pandas的数据框架加载到Spark中。安装所需的包需要互联网,因此建议使用任何云笔记本。
# 安装所需的包
!pip install pyspark
!pip install findspark
!pip install pyarrow==1.0.0
!pip install pandas
!pip install numpy==1.19.5
安装完所有所需的包后,需要为Spark设置环境变量。
import findspark
findspark.init()
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
# 创建Spark上下文类
sc = SparkContext()
# 创建Spark会话
spark = SparkSession \
.builder \
.appName("Python Spark DataFrames basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
运行命令spark
将返回运行中的Spark实例信息。
在本节中,首先将CSV文件读入Pandas数据框架,然后将其读入Spark数据框架。要创建Spark数据框架,只需将Pandas数据框架转换为Spark数据框架。
# 使用pandas中的read_csv函数读取文件
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')
mtcars.head()
# 重命名列“Unnamed”
mtcars.rename(columns={'Unnamed: 0':'name'}, inplace=True)
数据框架包含32个观测值(行)和11个变量。将数据框架加载到Spark中。使用createDataFrame
函数将数据加载到Spark数据框架中。
sdf = spark.createDataFrame(mtcars)
打印加载的Spark数据框架的模式。创建表视图。表视图是Spark SQL运行查询所必需的。表视图被视为SQL表。有两种方式可以创建表视图。临时视图和全局视图。使用函数createTempView()
,可以创建临时表视图。
sdf.createTempView("cars")
创建视图后,可以将cars
视为Spark中的SQL表。
创建表视图后,可以运行类似于查询SQL表的查询。显示整个表。
spark.sql("SELECT * FROM cars").show()
选择特定表。显示特定列。
spark.sql("SELECT mpg FROM cars").show(5)
基本SQL过滤。基本过滤查询,以确定具有高里程和低缸数的卡片。
spark.sql("SELECT * FROM cars WHERE mpg > 20 AND cyl < 6").show(5)
聚合。聚合数据并按缸数分组。
spark.sql("SELECT count(*), cyl FROM cars GROUP BY cyl").show()
查询所有以“Merc”开头的汽车。
spark.sql("SELECT * FROM cars WHERE name LIKE 'Merc%'").show()
用户定义函数(UDF)一次作用于一行,因此它遭受高序列化,因此许多数据管道使用UDF进行数据处理计算任务。通过使用@pandas_udf()
装饰器注册常规Python函数,使其成为UDF。它可以应用于Spark数据框架以及Spark SQL。
# 导入Pandas UDF函数
from pyspark.sql.functions import pandas_udf, PandasUDFType
创建UDF。
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
# 从英制转换为公制吨
return s * 0.45
spark.udf.register("convert_weight", convert_wt)
@pandas_udf("float")
def convert_mileage(s: pd.Series) -> pd.Series:
return s * 0.425
# 在Spark中注册UDF
spark.udf.register("convert_mileage", convert_mileage)
spark.sql("SELECT *, wt AS imperial_weight, convert_weight(wt) AS weight_metric FROM cars").show()