Apache Kafka生产者和消费者应用程序

本文将介绍如何使用C#和Scala编写Apache Kafka生产者和消费者应用程序。这些应用程序可以操作相同的数据,并确保不同语言编写的应用程序之间的互操作性。将使用Avro schema进行数据的序列化和反序列化,这是推荐的做法,以确保不同Kafka生产者/消费者之间的数据一致性。

先决条件

要运行代码,需要安装Zookeeper和Kafka。可以在找到在本地Windows上安装的适当指导。Zookeeper和Kafka被安装为Windows服务,但也可以作为控制台应用程序运行。在代码示例中,将它们作为控制台应用程序运行,使用适当的命令文件:

  1. Start_Zookeeper.cmd
  2. Start_Kafka.cmd
文件名中的前导数字表示启动顺序 - 应首先启动Zookeeper。将Zookeeper版本3.5.5安装到目录C:\zookeeper-3.5.5,将Kafka版本2.12-2.3.0安装到目录C:\kafka_2.12-2.3.0。这些目录被启动cmd文件使用。因此,如果有其他目录,则应在cmd文件中进行相应的更改。在安装中,包含Kafka日志和Zookeeper数据的文件夹是C:\kafka_2.12-2.3.0kafka-logs和C:\kafka_2.12-2.3.0zookeeper-data。这些目录可以在下次运行Zookeeper-Kafka之前删除,以删除先前的数据。

注意:计算机上的Java更新可能会更改Zookeeper和Kafka使用的JAVA_HOME环境变量,它们可能停止工作。

设计

专门设计的类用于生产和消费消息,在案例中,这些消息构成了字符串键值对。值是按照Avro schema在JSON格式中构建的对象。这是标准且推荐的方法,确保了不同Kafka生产者/消费者之间的互操作性。例如,在案例中,使用C#和Scala应用程序生产和消费相同类型的对象。通常,Avro schema可以从获得(有关它的详细信息,可以在Sacha Barber的优秀文章中找到)。这个服务器应该对所有Kafka提供者和消费者可用。Kafka Schema Registry通过REST API为其客户端提供服务。特别是,特定格式的HTTP POST请求会导致将Avro schema传输给客户端。Confluent.Kafka库的适当类在后台与Kafka Schema Registry通信,无需任何额外的代码。应安装并维护Kafka Schema Registry。

在不同的Kafka客户端之间使用统一的Avro对象schema始终是有益的。但在一些相对简单的情况下,标准的Kafka Schema Registry是一个过度的解决方案,schema可以从更简单的服务器获得,或者简单地从文件中获得。本文的代码提供了使用简单的HTTP GET请求或从本地JSON文件读取Avro schema。

提供了Confluent.Kafka生产者和消费者的包装器。消费者包装器允许Kafka客户端订阅消息并使用给定的回调处理它们。生产者包装器提供了将消息发送到Kafka的方法。这两个包装器的构造函数以定制的方式读取Avro schema(从某个Web服务器或文件)。

在C#和Scala环境中,应用程序的组织方式类似。首先,提供Avro schema访问和Kafka配置的数据(为了简单起见,硬编码)。然后创建消费者并调用其异步方法以开始消息消费。这个方法采用消息处理回调作为其参数。在此准备之后,创建生产者并定期调用其发送消息的方法。如上所述,消息在Kafka中以键值对的形式放置。键是字符串类型,值是标准Avro类型GenericRecord。

访问Schema

对于C#和Scala解决方案,Avro schema文件schema.json位于解决方案根目录下的wwwroot目录中。它可以直接通过文件系统访问,也可以通过简单的Web服务器访问。为了测试后一种方式,在Windows系统上使用了IISExpress。应该将访问schema文件的网站添加到IISExpress配置文件C:\Users\Igorl\Documents\IISExpress\config\applicationhost.config中。适当的片段放置在文件SiteForIISExpressConfig.txt中。应该将此文件的内容复制到IISExpress配置文件的章节中。然后应该启动IISExpress以供此网站使用。这可以通过在C#和Scala解决方案的主目录中运行startIIS.cmd文件来完成。

注意:C#Scala解决方案中切换到使用IISExpress进行schema访问,请将硬编码布尔常量isFromLocalFile的值更改为false(代码中适当行已用注释//1标记)。

C#项目

KafkaHelperLib项目为Confluent.Kafka生产者和消费者提供包装器。这个项目是使用.NET Standard开发的,因此可以从.NET Core和Framework项目中引用。该项目包含类KafkaConsumer和KafkaProducer。它们序列化/反序列化众所周知的GenericRecord Avro对象作为Kafka数据对的值。类RecordConfig将其构造函数参数schemaRegistryUrl作为schema访问字符串。其public方法GetSchemaRegistryClient()返回ISchemaRegistryClient接口的对象。通常,类Confluent.SchemaRegistry.CachedSchemaRegistryClient : ISchemaRegistryClient, ...用作ISchemaRegistryClient接口的实现。在简化案例中,创建了"stub"类SchemaRegistryClient作为ISchemaRegistryClient接口的实现。KafkaConsumer和KafkaProducer类的构造函数分别使用类RecordConfig创建Avro序列化和反序列化对象。

项目HttpClient也构建为.NET Standard,可供.NET Core和Framework使用。它来自一些旧解决方案,只是为了实现HTTP GET调用以从自定义Web服务器接收Avro schema。

测试应用程序

KafkaApp是一个.NET Core应用程序。它将Kafka生产者和消费者组合在一起。该应用程序为消费的对象提供了一个回调。这个回调简单地将对象的内容打印到控制台。生产者将根据Avro schema创建的对象发送到Kafka。

Scala解决方案

Scala解决方案通常与C#解决方案类似,但在这里,GenericRecord对象被序列化为/反序列化为Array[Byte],以便使用标准的ByteArraySerializer和ByteArrayDeserializer类。Scala应用程序也将消费的Kafka对打印到其控制台。为了区分由C#和Scala生产的对象,后者创建时带有负的Id字段。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485