Apache Spark性能优化技巧

在处理大数据时,最大的挑战不是完成任务,而是以最少的资源和时间完成任务。Apache Spark以其出色的灵活性,可以帮助优化代码,实现资源的最大化利用。本文将讨论每个数据工程初学者都应该了解的8个Spark性能优化技巧。这些技巧大多是简单的,可以替换无意中使用的低效代码。还有一些是需要对现有代码进行的小调整,以成为Spark的超级用户。让立即开始吧!

如果是完全的初学者,对Spark及其基本组件一无所知,建议先阅读以下文章:

目录

  1. 不要收集数据
  2. 持久化是关键
  3. 避免使用Groupbykey
  4. 使用累加器进行聚合
  5. 广播大变量
  6. 明智地使用分区
  7. 重新分区数据
  8. 不要重新分区数据 - 合并它

1. 不要收集数据

作为数据工程师初学者,从处理小数据开始,习惯使用一些命令,并在处理大数据时继续使用它们。其中一个命令就是Spark中的collect()操作。当调用collect操作时,结果会返回到驱动节点。这可能看起来无害,但如果处理的是大量数据,那么驱动节点很容易耗尽内存。

df = spark.read.csv("/FileStore/tables/train.csv", header=True) df.collect()

应该使用什么替代方案呢?一个很好的方法是使用take()操作。它扫描找到的第一个分区并返回结果。例如,如果只是想感受一下数据,可以使用take(1)获取一行数据。

df.take(1)

这比使用collect()高效得多!

2. 持久化是关键

当开始使用Spark时,学到的第一件事就是Spark是一个惰性求值器,这是一件好事。但如果不谨慎操作,这也可能是灾难的开始。意思是,假设对RDD执行了一些转换。现在每次对RDD调用操作时,Spark都会重新计算RDD及其所有依赖项。这可能非常昂贵。

l1 = [1, 2, 3, 4] rdd1 = sc.parallelize(l1) rdd2 = rdd1.map(lambda x: x*x) rdd3 = rdd2.map(lambda x: x+2) print(rdd3.count()) print(rdd3.collect())

当调用count()时,所有转换都执行了,任务在0.1秒内完成。当调用collect()时,所有的转换再次被调用,任务仍然需要0.1秒来完成。那么如何摆脱这个恶性循环呢?持久化!

from pyspark import StorageLevel rdd3.persist(StorageLevel.MEMORY_AND_DISK)

在之前的代码中,只需要在最终的RDD上持久化。这样,当第一次对RDD调用操作时,生成的最终数据将存储在集群中。现在,任何后续对同一RDD的操作都会快得多,因为已经存储了先前的结果。

注意 - 这里,将数据持久化到内存和磁盘中。但是还有其他选项可以持久化数据。

3. 避免使用Groupbykey

当开始数据工程之旅时,一定会遇到词频统计的例子。

text_list = ["这是一个示例句子", "这是另一个示例句子", "示例为一个示例测试"] rdd = sc.parallelize(text_list) rdd_word = rdd.flatMap(lambda x: x.split(" ")) rdd_pair = rdd_word.map(lambda x: (x, 1)) rdd_group = rdd_pair.groupByKey() rdd_group_count = rdd_group.map(lambda x:(x[0], len(x[1]))) rdd_group_count.collect()

但为什么要在这里提到呢?这是突出groupbykey()转换在处理配对RDD时的低效性的最佳方式。Groupbykey在网络中混洗键值对,然后组合它们。对于更大的数据,混洗将更加夸张。那么如何应对呢?使用reducebykey()

rdd_reduce = rdd_pair.reduceByKey(lambda x,y: x+y) rdd_reduce.collect()

这导致在网络上混洗的数据量大大降低。

4. 使用累加器进行聚合

假设想聚合一些值。这可以通过使用变量作为计数器的简单编程来完成。

file = sc.textFile("/FileStore/tables/log.txt") warningCount = 0 def extractWarning(line): global warningCount if ("WARNING" in line): warningCount +=1 lines = file.flatMap(lambda x: x.split(",")) lines.foreach(extractWarning) warningCount

但这里有一个陷阱。当尝试在驱动节点上查看结果时,得到的是0值。这是因为当代码在工作节点上实现时,变量变为该节点的局部变量。这意味着更新的值没有发送回驱动节点。为了克服这个问题,使用累加器。

warningCount = sc.accumulator(0) def extractWarning(line): global warningCount if ("WARNING" in line): warningCount +=1 lines = file.flatMap(lambda x: x.split(",")) lines.foreach(extractWarning) warningCount.value # 输出 4

使用累加器时需要记住的一点是,工作节点只能写入累加器。但只有驱动节点可以读取值。

5. 广播大变量

就像累加器一样,Spark还有另一个共享变量叫做广播变量。它们只用于读取目的,并且在集群的所有工作节点中被缓存。这在不得不将一个大的查找表发送到所有节点时非常有用。

country = {"IND":"India","USA":"United States of America","SA":"South Africa"} broadcaster = sc.broadcast(country) userData = [("Johnny","USA"),("Faf","SA"),("Sachin","IND")] rdd_data = sc.parallelize(userData) def convert(code): return broadcaster.value[code] output = rdd_data.map(lambda x: (x[0], convert(x[1]))) output.collect()

6. 明智地使用分区

Spark的一个基石是其并行处理数据的能力。Spark将数据分成几个分区,每个分区包含完整数据的一个子集。例如,如果一个数据框架包含10,000行,并且有10个分区,那么每个分区将有1000行。

df.rdd.getNumPartitions()

但是这个数字是可调的,应该调整以获得更好的优化。选择太少的分区,有很多资源闲置。选择太多的分区,有很多小分区频繁地混洗数据,这可能变得非常低效。那么正确的数字是什么呢?

根据Spark,128 MB是应该打包到单个分区的最大字节数。所以,如果有128000 MB的数据,应该有1000个分区。但这个数字并不严格,正如在下一个提示中将看到的。

7. 重新分区数据

Spark应用程序中的分区数量需要调整。如果开始时有100个分区,可能需要将它们减少到50个。但为什么要这样做呢?

import numpy as np l1 = np.arange(13) rdd = sc.parallelize(l1) print(rdd.glom().collect()) print(rdd.repartition(10).glom().collect())

repartition()将数据调整到定义的分区数量时,它必须在网络中混洗完整的数据。尽管在增加分区时这种过度混洗是不可避免的,但当减少分区数量时,有一种更好的方法。

在最后一个提示中,讨论了减少分区数量时使用repartition不是最好的方法。为什么呢?

print(rdd.glom().collect()) print(rdd.repartition(4).glom().collect()) print(rdd.coalesce(4).glom().collect())
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485