在大数据领域,聚合操作是一项核心任务,它涉及到将大量数据集中并简化为更易于管理和分析的形式。聚合操作通常包括定义一个或多个键值或分组,并指定一个聚合函数来确定如何对列进行转换。如果输入多个值,聚合函数将为每个组生成一个结果。Spark提供了成熟且复杂的聚合功能,支持多种不同的用例和可能性。聚合通常用于获取数据的摘要,可以进行计数、求和以及计算数据的乘积。使用Spark,可以将任何类型的值聚合成集合、列表等。在“聚合到复杂类型”部分将看到这一点。
聚合操作可以分为以下几类:
简单聚合:最简单的分组是通过在select语句中使用聚合函数来获取给定数据帧的摘要。
分组聚合:“分组”允许指定多个键或聚合函数来转换列。
窗口函数:“窗口”提供了指定一个或多个键以及一个或多个聚合函数来转换值列的功能。然而,输入到聚合函数的行与当前行有一定的关联性。
所有这些Spark中的聚合都是通过内置函数实现的。
本文使用的是ApacheSpark3.0.3版本和Hadoop 2.7版本。可以在下载。同时,使用的是Eclipse Scala IDE,可以在下载。本文使用的是一个CSV数据文件,可以在找到。数据集包含以下列:station_id, name, lat, long, dockcount, landmark, 和 installation。这是自行车站的数据。
因为聚合操作涉及到使用聚合函数和窗口函数,所以这里导入所有函数。可以通过以下方式实现:
import org.apache.spark.sql.functions._
现在,将数据文件读入数据帧:
var df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\Users\Dhanya\Downloads\201508_station_data.csv")
现在,准备进行一些聚合操作。从最简单的开始。最简单的聚合形式是汇总整个数据帧,结果将给出一个单行结果。例如,可以计算这个数据帧中的记录数,它将返回一个包含记录数的单行结果。
从数据帧开始,使用select()方法并应用count函数。还可以为摘要列添加一个hive别名。还可以为dockcount列的总和添加一个摘要列。还可以计算平均值。还有countDistinct()函数。在这里,计算landmark列的唯一值。countDistinct()将给出这个数据帧中唯一的landmark的数量。还有另一件事叫做approx_count_distinct()。当给出countDistinct()时,它会对不同的值进行分组并计数。当有一个包含数百万行的巨大数据集时会发生什么。countDistinct()函数将花费时间。在这种情况下,可以使用approx_count_distinct(),它将返回一个近似计数。它不是100%准确。当速度比准确性更重要时,可以使用它。当想要得到一个不同值集合的总和时,可以使用sumDistinct()函数。
这些函数可以这样实现:
df.select(
count("*").as("Count *"),
sum("dockcount").alias("Total Dock"),
avg("dockcount").alias("avg dock"),
countDistinct("landmark").alias("landmark count"),
approx_count_distinct("station_id").alias("app station"),
sumDistinct("station_id").alias("station_id")
).show()
select方法将返回一个新的数据帧,可以显示它。
还有很多其他的聚合函数,比如first()和last(),可以得到数据帧中的第一个和最后一个值。可以使用min()和max()函数分别得到最小值和最大值。
df.select(
first("station_id").alias("first"),
last("station_id").alias("last"),
min("dockcount").alias("min"),
max("dockcount").alias("max")
).show()
接下来,将使用selectExpr(),可以传递类似SQL的表达式。
df.selectExpr(
"mean(dockcount) as mean_count"
).show()
在这里,计算了dockcount列的平均值。
让看看其他聚合函数,如方差和标准差。众所周知,方差是平均值的平方差的平均值,标准差是方差的平方根。
df.select(
var_pop("dockcount"),
var_samp("dockcount"),
stddev_pop("dockcount"),
stddev_samp("dockcount")
).show()
偏度是数据分布偏离正态分布的程度,可能是正的也可能是负的。峰度是关于数据分布尾部的,用于在数据中找到异常值。
df.select(
skewness("dockcount"),
kurtosis("dockcount")
).show()
接下来,将看到协方差和相关性。协方差是衡量两个列或特征或变量彼此变化的程度。相关性是衡量它们彼此之间的关系程度。
df.select(
corr("station_id", "dockcount"),
covar_samp("station_id", "dockcount"),
covar_pop("station_id", "dockcount")
).show()
df.agg(collect_set("landmark"), collect_list("landmark")).show(false)
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object demo extends App{
val conf = new SparkConf().setAppName("Demo").setMaster("local[1]")
val sc = new SparkContext(conf)
val spark = org.apache.spark.sql.SparkSession.builder.master("local[1]").appName("Demo").getOrCreate;
var df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("C:\Users\Dhanya\Downloads\201508_station_data.csv")
df.select(
count("*").as("Count *"),
sum("dockcount").alias("Total Dock"),
avg("dockcount").alias("avg dock"),
countDistinct("landmark").alias("landmark count"),
approx_count_distinct("station_id").alias("app station"),
sumDistinct("station_id").alias("station_id")
).show()
df.select(
first("station_id").alias("first"),
last("station_id").alias("last"),
min("dockcount").alias("min"),
max("dockcount").alias("max")
).show()
df.selectExpr(
"mean(dockcount) as mean_count"
).show()
df.select(
var_pop("dockcount"),
var_samp("dockcount"),
stddev_pop("dockcount"),
stddev_samp("dockcount")
).show()
df.select(
skewness("dockcount"),
kurtosis("dockcount")
).show()
df.select(
corr("station_id", "dockcount"),
covar_samp("station_id", "dockcount"),
covar_pop("station_id", "dockcount")
).show()
df.agg(collect_set("landmark"), collect_list("landmark")).show(false)
}