Apache Spark与RDD入门指南

随着数据量的不断增长,企业能够从数据中获得洞察,进而推动业务的发展。像Alexa、Siri和Google Home这样的个人助理,就是利用大数据和物联网技术来收集数据并提供答案。本文将探讨PySpark在大数据中的角色,以及它如何影响大数据生态系统,并将提供一些使用PySpark的实践经验。

由于对数据驱动技术的需求不断增加,数据增长迅速,需要使用快速的计算机来处理这些庞大的数据量。大数据的处理可以采用两种方式:使用一台功能强大的计算机,或者使用分布式计算机系统并行处理。由于单一计算机无法处理大量数据,因此后者成为主流选择。

并行计算意味着将处理任务的工作量分割,并独立运行。如果任何一个计算机节点(节点)无法处理数据,其他节点会接手任务。分布式系统集群是一组协同工作的节点(单台计算机),它们同步运行。开源项目如Hadoop项目和大数据工具如Apache Hive和Apache Spark,因为它们免费且完全透明,所以在大数据世界中占据主导地位。

大数据生态系统需要根据不同的需求使用各种工具。它需要云管理、数据处理工具、数据库、商业智能工具和编程语言。这些是所需的分类工具。

编程工具、数据库(NoSql和SQL)、商业智能工具、数据技术、分析和可视化、云技术,这些都是大数据生态系统中不可或缺的部分。

Hadoop生态系统包含相互支持的组件:数据摄入(Flume、Sqoop)、数据存储(HDFS、HBase)、数据处理与分析(Pig & Hive)、数据访问(Impala、Hue)。

Apache Spark为何受到青睐?Uber、Netflix、腾讯和阿里巴巴等公司都在运行Apache Spark操作。Spark是Apache Hadoop的强大补充。Spark更强大、更易于访问,并且能够在分布式系统中处理大量数据。

Spark是一个基于内存框架的开源应用程序,用于大规模分布式数据处理。以下是Apache Spark的一些顶级特性,使其广受欢迎。

1) 快速处理:Spark在磁盘数据排序方面创下了世界纪录。它在内存中处理数据的速度比Hadoop集群快100倍,快10倍。

2) 支持多种API:Spark应用程序可以用多种编程语言编写,如SCALA、JAVA、PYTHON、R、CLOJURE。Spark支持多种高级语言API,使其更易于使用。Spark Core是Spark的基础引擎;它执行大规模分布式和并行数据处理,其容错性意味着如果一个节点宕机,处理不会停止。它包含能够在集群中并行工作的各种元素。

3) 强大的库:它支持Map-Reduce函数;它促进了SQL和数据框架。它为机器学习任务提供MLlib,并为实时数据分析提供Spark Streaming。

4) 实时处理:Spark支持MapReduce,能够处理存储在Hadoop集群和HDFS文件中的数据。Spark Streaming能够处理实时数据。

5) 兼容性和部署:Spark能够轻松运行在Hadoop、Kubernetes、Mesos或任何云服务上。

Apache Spark运行时架构包含三个高级组件:Spark Driver、Cluster Manager和Executors。

使用Python与Apache Spark开始:Spark最初是用Scala编写的,可以编译成java字节码,但可以用Python代码通过py4j与JVM(java虚拟机)通信。因此,可以用Python编写Spark应用程序。

目标:启动Pyspark、初始化上下文和会话、创建RDD、应用转换和动作、缓存。

对于运行Spark,可以选择任何云笔记本,但强烈建议创建一个Databricks Spark集群;这是一个关于如何在Databricks上免费启动Spark集群的逐步指南。

PySpark是为Python制作的Spark API。在本文中,将使用PySpark初始化和操作Spark。

安装所需的包:

!pip install pyspark !pip install findspark

Findspark添加了一个启动文件到当前IPython配置文件,并使环境准备好运行Spark。

创建Spark上下文和Spark会话。Spark会话需要SparkSQL和数据框架。SparkContext是Spark应用程序的入口点,它包含创建RDD(弹性分布式数据库)的函数,如parallelize()。

import findspark findspark.init() sc = SparkContext() spark = SparkSession .builder .appName("Python Spark DataFrames basic example") .config("spark.some.config.option", "some-value") .getOrCreate()

初始化Spark会话后,需要验证刚刚创建的Spark会话实例。

Spark RDDs是Spark中的基本数据抽象,使用函数式编程概念来处理RDD。RDD支持各种类型的文件,包括文本、序列文件、Avro、Parquet、Hadoop输入等。Cassandra、HBase、HDFS、Amazon S3等。RDD是不可变的,以维护数据完整性。RDD以分布式方式工作;RDD跨集群节点分区。RDD支持延迟评估,意味着在调用动作之前不会应用任何转换。

例如,通过调用函数parallelize在Spark中创建一个RDD。sc.parallelize接受数据列表并将其转换为RDD。

data = range(1,30) print(data[0]) len(data) xrangeRDD = sc.parallelize(data, 4) xrangeRDD

RDD转换:正如讨论的,RDD是不可变的;当应用转换时,它会返回一个新的RDD。RDD是延迟评估的,意味着在生成新的RDD时不会进行任何计算。新的RDD将包含一系列转换规则。一旦调用动作,所有转换将被执行。

subRDD = xrangeRDD.map(lambda x: x-1) filteredRDD = subRDD.filter(lambda x : x<10) print(filteredRDD.collect()) filteredRDD.count() import time test = sc.parallelize(range(1,50000),4) test.cache() t1 = time.time() count1 = test.count() dt1 = time.time() - t1 print("dt1: ", dt1) t2 = time.time() count2 = test.count() dt2 = time.time() - t2 print("dt2: ", dt2)
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485