在本文中,将深入了解Apache Spark及其Python接口PySpark。将探讨其核心特性、差异以及在处理大数据时所带来的优势。文章后半部分,将通过PySpark进行一些基础的数据剖析,以便更好地理解其语法和语义。
分布式计算是处理大数据工作负载的关键,它允许处理传统工具和方法难以处理的大量数据。这种数据的“大”通常体现在三个维度:数据量(Volume)、数据流速(Velocity)和数据种类(Variety)。
正如下面的图表所示,Pandas(一个广泛用于数据科学社区的Python库,用于操作表格数据)在数据量较小时,其操作速度比PySpark快得多。然而,随着数据量的增加,PySpark开始表现得更好,存在一个临界点,超过这个点后,Pandas将无法处理大数据。将探讨Pandas为何会“内存不足”,以及Spark如何通过这个问题。
Pandas通过将数据整体加载到机器的内存中来操作数据。因此,如果使用的设备内存为32GB,当数据大小接近32GB时,Pandas的性能会下降,并且计算机上将没有足够的内存用于其他事项。一旦超过32GB,它将耗尽内存,因为它无法在可用内存中加载整个数据块。
Spark通过使用称为集群的机器/节点网络来克服这个问题。因此,大型任务被分成几个子任务,并将这些子任务分配给各个节点。节点执行子任务,每个节点的单独输出被聚合以获得最终输出。
让通过一个简单的类比来理解这一点——假设和朋友被数学老师挑战,在一分钟内解决10个加法运算。在第一次尝试中,和朋友开始加法,都只能加前6个数字,然后时间就到了。(为了简单起见,6个数字在60秒内加完,即每10秒加1个数字)。所以,在第二次尝试中,制定了一个新的策略。当朋友从开始加数字时,从末尾开始加数字。在50秒结束时,朋友找到了前5个数字的和,找到了最后5个数字的和。在最后10秒内,将两个和相加,找到了所有十个数字的和,从而给数学老师留下了深刻的印象。
Spark因其在大规模数据处理中的快速性能而闻名。这是因为它在机器集群的分布式内存(RAM)中保存和加载数据。RAM的处理速度远高于硬盘。当数据不适合放入RAM时,它要么被重新计算,要么被溢出到硬盘。当数据完全存储在内存中时,Spark的处理速度比Hadoop快100倍,当部分数据溢出到硬盘时,速度比Hadoop快10倍。
在PySpark中,不可变性意味着当尝试对对象进行更改时,不会修改对象,而是创建一个新的对象引用。不可变性与DAG(有向无环图,稍后解释)一起帮助Spark实现容错能力,使其能够在发生故障时恢复。(与不可变对象交互的语法略有不同,稍后将通过代码和示例进行解释)
Spark支持两种类型的操作——转换(Transformations)和动作(Actions)。转换是接受对象作为输入并产生一个或多个对象作为输出的函数。它们不会改变输入对象。动作是对对象执行的操作,执行计算并将结果返回给驱动程序。
延迟评估意味着在触发动作之前,应用于对象的转换不会执行。当触发动作时,Spark会创建一个DAG(有向无环图),包含所有需要执行的操作,并找到执行所需操作的最佳路径,从而节省资源并提供最佳结果。
这可以通过一个类比来理解。假设数学老师,也就是班主任,想知道隔壁班的Jim同学是否在场。去了隔壁班询问Jim是否在场。老师还想知道同一个班的Pam同学是否在场。再次去了隔壁班,询问Pam是否在场。假设这个过程发生了几次,因为他想知道Michael、Dwight和Jan的出勤情况。与其立即去隔壁班执行给定的任务,本可以等待一个包含所有需要知道出勤情况的学生名单,从而节省大量时间和资源。
iris_df = spark.read.csv('dbfs:/FileStore/shared_uploads/irisdataset.csv', header="true", inferSchema="true")
iris_df.show()
iris_df.show(5)
iris_df.columns
iris_df.dtypes
iris_df.describe().show()
iris_df[iris_df.sepal_length > 3].show()
iris_df.groupby('species').count().show()
iris_df.groupby('species').mean('sepal_length').show()
iris_df.groupby('species').mean('sepal_length','petal_length').show()
windowSpecAgg = Window.partitionBy("species")
from pyspark.sql.functions import col,avg,sum,min,max,row_number
iris_df.withColumn("row",row_number().over(windowSpec))
.withColumn("avg", avg(col("sepal_length")).over(windowSpecAgg))
.withColumn("sum", sum(col("sepal_length")).over(windowSpecAgg))
.withColumn("min", min(col("sepal_length")).over(windowSpecAgg))
.withColumn("max", max(col("sepal_length")).over(windowSpecAgg))
.where(col("row")==1).select("species","avg","sum","min","max")
.show()