在初次尝试设置Structured Streaming环境时,经常面临配置所需环境的挑战。虽然有许多在线教程指导如何设置,但大多数教程都要求安装虚拟机和Ubuntu操作系统,然后通过修改bash文件来设置所有必需的文件。这种方法虽然可行,但并不适用于所有人。使用虚拟机时,如果机器内存较低,可能需要等待很长时间,并且由于内存延迟问题,过程可能会卡住。因此,为了更简便的操作方式,将展示如何在Windows操作系统上设置Structured Streaming。
在设置过程中,将使用以下工具:
请注意,已将所有文件添加到C驱动器中。同时,文件的命名应与在线安装的文件保持一致。在安装这些文件的过程中,需要设置环境变量。安装过程中,请参考这些图片以获得无忧的体验。最后一张图片是系统变量中的Path。
所需文件链接:
第一步是在系统中安装Kafka。为此,需要访问这个链接:。首先需要安装Java 8并设置环境变量。可以从链接中获得所有指令。完成Java安装后,必须安装zookeeper。已经将zookeeper文件添加到Google Drive中。可以随时使用它,或者按照链接中给出的所有指令操作。如果正确安装了zookeeper并设置了环境变量,当以管理员身份在命令提示符中运行zkserver时,将看到以下输出。接下来,按照链接中的指令安装Kafka,并使用指定的命令运行它。
.binwindowskafka-server-start.bat .configserver.properties
一旦一切设置完成,尝试创建一个主题并检查它是否正常工作。如果它工作正常,那么就完成了Kafka的安装。
在这一步中,安装Spark。基本上可以按照这个链接在Windows机器上设置Spark:。在其中一个步骤中,它会要求设置winutils文件。为了方便起见,已经在共享的驱动器链接中添加了该文件。在名为Hadoop的文件夹中,只需将该文件夹放在C驱动器上并按照图片中所示设置环境变量。强烈建议使用添加到Google Drive中的Spark文件。一个主要原因是,为了流式传输数据,需要手动设置一个Structured Streaming环境。在情况下,设置并修改了所有必需的东西,并在测试了很多之后。如果想全新设置,随时可以这样做。如果设置不正确,在Pyspark中流式传输数据时可能会遇到这样的错误:
Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
一旦有了Spark,现在可以从CSV文件中流式传输所需的数据,使用Kafka主题作为生产者,并通过Kafka主题在消费者中获取它。主要使用Jupyter Notebook,所以在这个教程中使用了一个笔记本。
在笔记本中,首先需要安装一些库:
pip install pyspark
pip install Kafka
pip install py4j
有一个CSV文件,其中包含想要流式传输的数据。让继续使用经典的Iris数据集。现在,如果想要流式传输iris数据,需要使用Kafka作为生产者。创建一个Kafka主题,将iris数据流式传输到该主题,消费者可以从该主题检索数据框架。
import pandas as pd
from kafka import KafkaProducer
from datetime import datetime
import time
import random
import numpy as np
# pip install kafka-python
KAFKA_TOPIC_NAME_CONS = "Topic"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092'
if __name__ == "__main__":
print("Kafka Producer Application Started ... ")
kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
value_serializer=lambda x: x.encode('utf-8'))
filepath = "IRIS.csv"
flower_df = pd.read_csv(filepath)
flower_df['order_id'] = np.arange(len(flower_df))
flower_list = flower_df.to_dict(orient="records")
message_list = []
message = None
for message in flower_list:
message_fields_value_list = []
message_fields_value_list.append(message["order_id"])
message_fields_value_list.append(message["sepal_length"])
message_fields_value_list.append(message["sepal_width"])
message_fields_value_list.append(message["petal_length"])
message_fields_value_list.append(message["petal_width"])
message_fields_value_list.append(message["species"])
message = ','.join(str(v) for v in message_fields_value_list)
print("Message Type: ", type(message))
print("Message: ", message)
kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, message)
time.sleep(1)
print("Kafka Producer Application Completed. ")
要启动生产者,需要在Windows命令提示符中以管理员身份运行zkserver,然后在Kafka目录的命令提示符中启动Kafka:.binwindowskafka-server-start.bat .configserver.properties
。如果收到一个“没有代理”的错误,这意味着Kafka没有正确运行。
现在,让检查消费者是否运行正常。在新的笔记本中运行以下代码。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random
import time
kafka_topic_name = "Topic"
kafka_bootstrap_servers = 'localhost:9092'
spark = SparkSession \
.builder \
.appName("Structured Streaming ") \
.master("local[*]") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# 构建一个从主题读取的流式DataFrame
flower_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.option("startingOffsets", "latest") \
.load()
flower_df1 = flower_df.selectExpr("CAST(value AS STRING)", "timestamp")
flower_schema_string = "order_id INT,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,species STRING"
flower_df2 = flower_df1 \
.select(from_csv(col("value"), flower_schema_string) \
.alias("flower"), "timestamp")
flower_df3 = flower_df2.select("flower.*", "timestamp")
flower_df3.createOrReplaceTempView("flower_find")
song_find_text = spark.sql("SELECT * FROM flower_find")
flower_agg_write_stream = song_find_text \
.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("append") \
.option("truncate", "false") \
.format("memory") \
.queryName("testedTable") \
.start()
flower_agg_write_stream.awaitTermination(1)
运行后,应该获得类似的输出。正如看到的,运行了一些查询并检查数据是否正在流式传输。第一次计数是5,几秒钟后计数增加到14,这证实了数据正在流式传输。