Apache Spark数据流处理与MongoDB存储

Apache Spark出现之前,Hadoop被用于批处理,但由于延迟较高,无法用于实时数据处理。处理大量数据需要近乎实时的处理能力。Apache Spark提供了一个灵活的流处理API,并支持多种数据源。

Apache Spark的流处理功能通过Discretized Streams(离散化流)这一新架构实现。Spark DStream(离散化流)是Spark Streaming的基本抽象,它代表了一个连续的数据流。Spark Streaming将数据离散化为微批处理,这些微批处理在内部是一系列RDDs。接收器并行接收数据,并将其缓冲到Spark工作节点的内存中。

Spark Streaming的特点包括动态负载均衡、容错、支持高级分析和MLlib、高吞吐量等。本文的目标是流式传输CSV文件到Spark SQL,创建临时表视图,并在Spark SQL上运行查询,使用write stream将数据写入控制台。

目标是将CSV文件作为Spark SQL进行流式传输。这与将CSV文件加载为Spark SQL完全不同。在流式传输过程中,如果在源CSV文件中添加了一些行,它将立即反映在Spark SQL中。即使在流式传输文件夹中添加了具有相同模式的新CSV文件,它也会自动加载到Spark SQL中。

源数据包含4列:站点代码、站点名称、日期和每小时降水量。必须将data.csv文件保存在源文件夹中,以便从中获取流式传输的文件,也可以选择流式传输单个文件。如果CSV文件中有任何更改,它将立即反映在Spark流中。

首先,需要创建一个Spark会话才能使用Spark。Spark会话定义了Spark节点的运行位置、使用的核心数等。

from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master("local[*]") \ .appName("myApp") \ .getOrCreate()

.master("local[*]")表示在本地运行Spark,*表示使用所有可用的核心。getOrCreate()会在appName指定的会话不存在时创建一个会话。

接下来,需要在Spark中创建一个模式(要流式传输的数据的结构),这可以使用Spark SQL API完成。

from pyspark.sql.types import FloatType, StructField, StructType, StringType schemaRain = StructType([ StructField("STATION", StringType(), True), StructField("STATION_NAME", StringType(), True), StructField("DATE", StringType(), True), StructField("HPCP", FloatType(), True) ])

参数True表示该值可以为空,使用相应的列数据类型。

在创建会话和模式之后,可以开始读取流。可以流式传输文件夹中的所有文件或单个文件。将CSV文件保存在名为files的文件夹中。

df = spark.readStream.schema(schemaRain).option("maxFilesPerTrigger", 1).csv("./files", header=True)

maxFilesPerTrigger参数表示每个微批处理中要考虑的新文件数量,默认值为1000。readStream用于读取流数据。

为了验证数据是否正在流式传输,可以打印df.isStreaming。

print(df.isStreaming)

writeStream用于写入流。在这种情况下,将流写入控制台,使用追加方法作为输出模式。

df.writeStream.format("console").outputMode("append").start().awaitTermination()

outputMode('append')表示新数据将追加到输出中。format("console")表示以控制台格式写入数据。awaitTermination()表示等待用户发出终止信号。

在Spark DataFrame中,将执行聚合操作。目标是计算不同站点的数量。

dfc = df.groupBy("STATION_NAME").count()

将dfc数据框架打印到控制台,因为它是一个流式数据框架。

dfc.writeStream.outputMode("complete").format("console").start().awaitTermination()

SQL查询需要聚合,使用所有工作节点的缓冲区来从其他节点访问数据。为了执行SQL查询,首先需要创建一个临时视图,它将充当表名。

df.createOrReplaceTempView("tempdf")

编写查询,选择所有站点名称和小时降水量,其中HPCP = 999.99。

dfclean = spark.sql("Select STATION_NAME, HPCP FROM tempdf where HPCP == '999.99'")

dfclean.writeStream.outputMode("append").format("console").start().awaitTermination()。如果在源文件夹中添加新文件或更新源CSV文件,结果将立即更改。

将流数据写入MongoDB。可以轻松地将在控制台中写入的流数据写入MongoDB。首先,需要在创建Spark会话时建立Spark和MongoDB之间的连接。

spark = SparkSession \ .builder \ .master("local[1]") \ .appName("myApp") \ .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/prcp.hpcp") \ .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/prcp.hpcp") \ .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \ .getOrCreate() def write_row(batch_df, batch_id): batch_df.write.format("mongo").mode("append").save() pass df.writeStream.foreachBatch(write_row).start().awaitTermination()
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485