Windows环境下的Structured Streaming设置指南

在初次尝试设置Structured Streaming环境时,经常面临配置所需环境的挑战。虽然有许多在线教程指导如何设置,但大多数教程都要求安装虚拟机和Ubuntu操作系统,然后通过修改bash文件来设置所有必需的文件。这种方法虽然可行,但并不适用于所有人。使用虚拟机时,如果机器内存较低,可能需要等待很长时间,并且由于内存延迟问题,过程可能会卡住。因此,为了更简便的操作方式,将展示如何在Windows操作系统上设置Structured Streaming。

使用的工具

在设置过程中,将使用以下工具:

环境变量

请注意,已将所有文件添加到C驱动器中。同时,文件的命名应与在线安装的文件保持一致。在安装这些文件的过程中,需要设置环境变量。安装过程中,请参考这些图片以获得无忧的体验。最后一张图片是系统变量中的Path。

所需文件

所需文件链接:

安装Kafka

第一步是在系统中安装Kafka。为此,需要访问这个链接:。首先需要安装Java 8并设置环境变量。可以从链接中获得所有指令。完成Java安装后,必须安装zookeeper。已经将zookeeper文件添加到Google Drive中。可以随时使用它,或者按照链接中给出的所有指令操作。如果正确安装了zookeeper并设置了环境变量,当以管理员身份在命令提示符中运行zkserver时,将看到以下输出。接下来,按照链接中的指令安装Kafka,并使用指定的命令运行它。

.binwindowskafka-server-start.bat .configserver.properties

一旦一切设置完成,尝试创建一个主题并检查它是否正常工作。如果它工作正常,那么就完成了Kafka的安装。

安装Spark

在这一步中,安装Spark。基本上可以按照这个链接在Windows机器上设置Spark:。在其中一个步骤中,它会要求设置winutils文件。为了方便起见,已经在共享的驱动器链接中添加了该文件。在名为Hadoop的文件夹中,只需将该文件夹放在C驱动器上并按照图片中所示设置环境变量。强烈建议使用添加到Google Drive中的Spark文件。一个主要原因是,为了流式传输数据,需要手动设置一个Structured Streaming环境。在情况下,设置并修改了所有必需的东西,并在测试了很多之后。如果想全新设置,随时可以这样做。如果设置不正确,在Pyspark中流式传输数据时可能会遇到这样的错误:

Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

一旦有了Spark,现在可以从CSV文件中流式传输所需的数据,使用Kafka主题作为生产者,并通过Kafka主题在消费者中获取它。主要使用Jupyter Notebook,所以在这个教程中使用了一个笔记本。

Jupyter Notebook中安装库

在笔记本中,首先需要安装一些库:

  • pip install pyspark
  • pip install Kafka
  • pip install py4j

Structured Streaming与Pyspark的工作原理

有一个CSV文件,其中包含想要流式传输的数据。让继续使用经典的Iris数据集。现在,如果想要流式传输iris数据,需要使用Kafka作为生产者。创建一个Kafka主题,将iris数据流式传输到该主题,消费者可以从该主题检索数据框架。

生产者代码以流式传输iris数据

import pandas as pd from kafka import KafkaProducer from datetime import datetime import time import random import numpy as np # pip install kafka-python KAFKA_TOPIC_NAME_CONS = "Topic" KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092' if __name__ == "__main__": print("Kafka Producer Application Started ... ") kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS, value_serializer=lambda x: x.encode('utf-8')) filepath = "IRIS.csv" flower_df = pd.read_csv(filepath) flower_df['order_id'] = np.arange(len(flower_df)) flower_list = flower_df.to_dict(orient="records") message_list = [] message = None for message in flower_list: message_fields_value_list = [] message_fields_value_list.append(message["order_id"]) message_fields_value_list.append(message["sepal_length"]) message_fields_value_list.append(message["sepal_width"]) message_fields_value_list.append(message["petal_length"]) message_fields_value_list.append(message["petal_width"]) message_fields_value_list.append(message["species"]) message = ','.join(str(v) for v in message_fields_value_list) print("Message Type: ", type(message)) print("Message: ", message) kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, message) time.sleep(1) print("Kafka Producer Application Completed. ")

要启动生产者,需要在Windows命令提示符中以管理员身份运行zkserver,然后在Kafka目录的命令提示符中启动Kafka:.binwindowskafka-server-start.bat .configserver.properties。如果收到一个“没有代理”的错误,这意味着Kafka没有正确运行。

现在,让检查消费者是否运行正常。在新的笔记本中运行以下代码。

from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.ml.feature import Normalizer, StandardScaler import random import time kafka_topic_name = "Topic" kafka_bootstrap_servers = 'localhost:9092' spark = SparkSession \ .builder \ .appName("Structured Streaming ") \ .master("local[*]") \ .getOrCreate() spark.sparkContext.setLogLevel("ERROR") # 构建一个从主题读取的流式DataFrame flower_df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \ .option("subscribe", kafka_topic_name) \ .option("startingOffsets", "latest") \ .load() flower_df1 = flower_df.selectExpr("CAST(value AS STRING)", "timestamp") flower_schema_string = "order_id INT,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,species STRING" flower_df2 = flower_df1 \ .select(from_csv(col("value"), flower_schema_string) \ .alias("flower"), "timestamp") flower_df3 = flower_df2.select("flower.*", "timestamp") flower_df3.createOrReplaceTempView("flower_find") song_find_text = spark.sql("SELECT * FROM flower_find") flower_agg_write_stream = song_find_text \ .writeStream \ .trigger(processingTime='5 seconds') \ .outputMode("append") \ .option("truncate", "false") \ .format("memory") \ .queryName("testedTable") \ .start() flower_agg_write_stream.awaitTermination(1)

运行后,应该获得类似的输出。正如看到的,运行了一些查询并检查数据是否正在流式传输。第一次计数是5,几秒钟后计数增加到14,这证实了数据正在流式传输。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485