随着数据量的不断增长,企业能够从数据中获得洞察,进而推动业务的发展。像Alexa、Siri和Google Home这样的个人助理,就是利用大数据和物联网技术来收集数据并提供答案。本文将探讨PySpark在大数据中的角色,以及它如何影响大数据生态系统,并将提供一些使用PySpark的实践经验。
由于对数据驱动技术的需求不断增加,数据增长迅速,需要使用快速的计算机来处理这些庞大的数据量。大数据的处理可以采用两种方式:使用一台功能强大的计算机,或者使用分布式计算机系统并行处理。由于单一计算机无法处理大量数据,因此后者成为主流选择。
并行计算意味着将处理任务的工作量分割,并独立运行。如果任何一个计算机节点(节点)无法处理数据,其他节点会接手任务。分布式系统集群是一组协同工作的节点(单台计算机),它们同步运行。开源项目如Hadoop项目和大数据工具如Apache Hive和Apache Spark,因为它们免费且完全透明,所以在大数据世界中占据主导地位。
大数据生态系统需要根据不同的需求使用各种工具。它需要云管理、数据处理工具、数据库、商业智能工具和编程语言。这些是所需的分类工具。
编程工具、数据库(NoSql和SQL)、商业智能工具、数据技术、分析和可视化、云技术,这些都是大数据生态系统中不可或缺的部分。
Hadoop生态系统包含相互支持的组件:数据摄入(Flume、Sqoop)、数据存储(HDFS、HBase)、数据处理与分析(Pig & Hive)、数据访问(Impala、Hue)。
Apache Spark为何受到青睐?Uber、Netflix、腾讯和阿里巴巴等公司都在运行Apache Spark操作。Spark是Apache Hadoop的强大补充。Spark更强大、更易于访问,并且能够在分布式系统中处理大量数据。
Spark是一个基于内存框架的开源应用程序,用于大规模分布式数据处理。以下是Apache Spark的一些顶级特性,使其广受欢迎。
1) 快速处理:Spark在磁盘数据排序方面创下了世界纪录。它在内存中处理数据的速度比Hadoop集群快100倍,快10倍。
2) 支持多种API:Spark应用程序可以用多种编程语言编写,如SCALA、JAVA、PYTHON、R、CLOJURE。Spark支持多种高级语言API,使其更易于使用。Spark Core是Spark的基础引擎;它执行大规模分布式和并行数据处理,其容错性意味着如果一个节点宕机,处理不会停止。它包含能够在集群中并行工作的各种元素。
3) 强大的库:它支持Map-Reduce函数;它促进了SQL和数据框架。它为机器学习任务提供MLlib,并为实时数据分析提供Spark Streaming。
4) 实时处理:Spark支持MapReduce,能够处理存储在Hadoop集群和HDFS文件中的数据。Spark Streaming能够处理实时数据。
5) 兼容性和部署:Spark能够轻松运行在Hadoop、Kubernetes、Mesos或任何云服务上。
Apache Spark运行时架构包含三个高级组件:Spark Driver、Cluster Manager和Executors。
使用Python与Apache Spark开始:Spark最初是用Scala编写的,可以编译成java字节码,但可以用Python代码通过py4j与JVM(java虚拟机)通信。因此,可以用Python编写Spark应用程序。
目标:启动Pyspark、初始化上下文和会话、创建RDD、应用转换和动作、缓存。
对于运行Spark,可以选择任何云笔记本,但强烈建议创建一个Databricks Spark集群;这是一个关于如何在Databricks上免费启动Spark集群的逐步指南。
PySpark是为Python制作的Spark API。在本文中,将使用PySpark初始化和操作Spark。
安装所需的包:
!pip install pyspark
!pip install findspark
Findspark添加了一个启动文件到当前IPython配置文件,并使环境准备好运行Spark。
创建Spark上下文和Spark会话。Spark会话需要SparkSQL和数据框架。SparkContext是Spark应用程序的入口点,它包含创建RDD(弹性分布式数据库)的函数,如parallelize()。
import findspark
findspark.init()
sc = SparkContext()
spark = SparkSession
.builder
.appName("Python Spark DataFrames basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
初始化Spark会话后,需要验证刚刚创建的Spark会话实例。
Spark RDDs是Spark中的基本数据抽象,使用函数式编程概念来处理RDD。RDD支持各种类型的文件,包括文本、序列文件、Avro、Parquet、Hadoop输入等。Cassandra、HBase、HDFS、Amazon S3等。RDD是不可变的,以维护数据完整性。RDD以分布式方式工作;RDD跨集群节点分区。RDD支持延迟评估,意味着在调用动作之前不会应用任何转换。
例如,通过调用函数parallelize在Spark中创建一个RDD。sc.parallelize接受数据列表并将其转换为RDD。
data = range(1,30)
print(data[0])
len(data)
xrangeRDD = sc.parallelize(data, 4)
xrangeRDD
RDD转换:正如讨论的,RDD是不可变的;当应用转换时,它会返回一个新的RDD。RDD是延迟评估的,意味着在生成新的RDD时不会进行任何计算。新的RDD将包含一系列转换规则。一旦调用动作,所有转换将被执行。
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)
print(filteredRDD.collect())
filteredRDD.count()
import time
test = sc.parallelize(range(1,50000),4)
test.cache()
t1 = time.time()
count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)
t2 = time.time()
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)