在当今数据爆炸的时代,构建能够处理海量数据的高效机器和平台变得尤为重要。几年前,当数据科学和机器学习还未成为热门话题时,人们习惯于在电子表格上进行简单的数据操作和分析。随着对数据的深入理解和计算机硬件效率的提升,新的平台逐渐取代了简单的平台,用于处理数据操作和模型构建任务。R/Python首先取代了Excel,成为数据处理任务的标准平台,因为它们能够处理更大规模的数据集。随后,像Spark这样的大数据处理平台应运而生,它是一个统一的并行数据处理引擎,能够在计算机集群上进行内存计算,更加高效地处理数十亿行和列的大数据。
本文旨在帮助快速解决在使用Spark处理大数据时可能遇到的瓶颈问题,并优化Spark作业。尽管Spark有自己的内部催化剂来优化作业和查询,但由于资源有限,可能会遇到与内存相关的问题,因此了解一些好的实践方法是非常有益的。本文假设已经具备使用Spark的先前经验。
本文不仅对数据科学家有益,对数据工程师也同样有帮助。将专注于如何通过以下方式优化Spark作业:
Spark应用程序由一个驱动进程和一组执行进程组成。驱动进程运行main()函数,是Spark应用程序的核心。它负责在执行器之间执行驱动程序的命令以完成任务。它持有SparkContext,这是Spark应用程序的入口点。工作节点包含执行器,负责实际执行驱动程序分配给它们的工作。集群管理器控制物理机器并为Spark应用程序分配资源。集群上可以同时运行多个Spark应用程序。
要配置集群上的Spark作业,主要有三个方面需要注意:执行器数量、执行器内存和核心数。执行器是为Spark应用程序在节点上启动的单个JVM进程,而核心是CPU的基本计算单元或执行器可以运行的并发任务。一个节点可以有多个执行器和核心。所有计算都需要一定量的内存来完成这些任务。可以通过传递所需值来控制这三个参数:
-executor-cores
, -num-executors
, -executor-memory
。
可能会认为核心数越多,可以同时执行的任务就越多。虽然这种理念是可行的,但它有一个限制。观察到,许多Spark应用程序在超过5个并发任务时表现不佳。这个数字来自于执行器的能力,而不是系统有多少核心。因此,即使机器有更多的核心,这个数字也保持不变。因此,将此设置为5以获得良好的HDFS吞吐量(通过设置-executor-cores
为5来提交Spark应用程序)是一个好主意。
当使用集群管理器运行Spark应用程序时,后台将运行多个Hadoop守护进程,如名称节点、数据节点、作业跟踪器和任务跟踪器(它们都有特定的工作要做,应该阅读)。因此,在指定-num-executors
时,需要确保为这些守护进程留出足够的核心(每个节点约1个核心),以便它们能够顺利运行。
还需要至少留出一个执行器给应用程序管理器,以便从资源管理器那里协商资源。还需要分配一些执行器内存来补偿其他一些杂项任务的开销内存。文献显示,将其分配为执行器内存的7-10%是一个不错的选择,但不应太低。
例如,假设正在一个有10个节点的集群上工作,每个节点有16个核心和64GB RAM。可以为每个执行器分配5个核心,并为Hadoop守护进程留出每个节点1个核心。因此,现在每个节点有15个可用核心。由于有10个节点,总共可用的核心数将是10×15=150。
现在可用的执行器数量=总核心数/每个执行器的核心数=150/5=30,但需要至少留出一个执行器给应用程序管理器,因此执行器的数量将是29。由于有10个节点,将每个节点有3(30/10)个执行器。每个执行器的内存将是每个节点的内存/每个节点的执行器数量=64/2=21GB。留出7%(约3GB)作为内存开销,将有18(21-3)GB的内存每个执行器。因此,最终参数将是:-executor-cores 5
, -num-executors 29
, -executor-memory 18 GB
。
像这样,可以计算分配这些参数的数学。尽管请注意,这只是分配这些参数的一种方式,作业可能在不同的值下得到调整,但重要的是要有结构化的方式来思考调整这些值,而不是盲目射击。
Spark提供两种类型的操作:动作和转换。转换(例如map、filter、groupBy等)从先前的一个构建一个新的RDD/DataFrame,而动作(例如head、show、write等)根据RDD/DataFrame计算结果,并将其返回给驱动程序或保存到外部存储系统。Spark以惰性方式执行所有这些操作。Spark中的惰性评估意味着实际执行不会发生,直到触发一个动作。在Spark DataFrame或RDD上运行的每个转换命令都存储在一个血统图中。
不建议在血统中链接大量的转换,特别是当希望用最少的资源处理大量数据时。相反,通过将中间结果写入HDFS(最好在HDFS中,而不是外部存储如S3,因为写入外部存储可能更慢)来打破血统。
当需要在Spark的执行器之间共享变量时,可以将其声明为广播变量。请注意,广播变量是只读的。广播变量特别适用于偏斜的连接。例如,如果尝试连接两个表,其中一个非常小,另一个非常大,那么将较小的表广播到工作节点的执行器上以避免网络开销是有意义的。
虽然Spark为数据选择了合理的默认值,但如果Spark作业耗尽内存或运行缓慢,可能是分区策略不佳。如果数据集很大,可以尝试重新分区(使用repartition方法)到更大的数量,以允许作业有更多的并行性。如果Spark UI中的任务不多,但每个任务完成得非常慢,这是一个好迹象。另一方面,如果没有那么多数据,但有很多分区,拥有太多分区的开销也可能导致作业变慢。可以使用coalesce方法而不是repartition方法重新分区到更小的数量,因为它更快,并且会尝试在同一台机器上合并分区,而不是再次重新洗牌数据。