使用AWS Kinesis Firehose流传输数据

在本教程中,将学习如何使用Python和AWS Kinesis Firehose流传输数据。将通过编写一个简单的Python客户端来实现这一目标,该客户端将使用put_recordput_record_batch函数分别发送单个记录和批量记录。

创建样本数据

首先,需要创建一些样本数据。访问并创建一个免费账户。然后,点击“Schemas”创建一个新的模式。将这个模式命名为SampleTempDataForTutorial

在Mockaroo中创建模式:

创建一个名为station的字段,并将其类型设置为State(缩写)。创建一个名为temp的字段,并将其设置为Number类型,最小值为1,最大值为100,并保留两位小数。

在Mockaroo中创建SampleTempDataForTutorial数据:

点击fx按钮并创建以下公式:

Python if random(0, 10) == 10: this = this + 1000 end if this > random(0, 100): format(this, 2) + 'F' elif this < random(0, 100): format(this, 2) + 'f' elif this > random(0, 75): format(this, 2) + 'c' else: format(this, 2) + 'C'

该公式随机生成温度值,并随机分配F、f、C或c后缀。注意,它还生成了一些超过1000度的无效温度。将在后续教程中使用这些异常数据来说明Kinesis Analytics。

点击“Apply”返回主屏幕。输入1000行,选择“Json”格式,并勾选数组复选框。点击“download”下载数据。

Python客户端(PyCharm)

这里假设使用的是PyCharm,可以使用任何喜欢的IDE或Python交互式解释器。首先,使用put_record命令将记录单独写入Firehose,然后使用put_record_batch命令批量写入记录。

启动PyCharm。假设已经安装了AWS Toolkit并配置了凭据。注意,这里使用的是默认开发凭据。

在生产软件中,应该使用适当的角色和凭据提供者,不要依赖于内置的AWS账户。

遵循最佳安全实践配置AWS资源:

  • AWS资源配置最佳实践
  • 凭据最佳实践

创建一个新的纯Python应用程序,命名为StreamingDataClient

在PyCharm中创建一个新的纯Python项目。

创建一个名为FireHoseClient.py的新文件,并导入Boto3和json。

使用为开发分配的AWS配置文件创建一个新的会话。

从会话中创建一个新的Firehose客户端。

使用默认AWS凭据创建会话:

import json import boto3 session = boto3.Session(profile_name='default') temperatureClient = session.client('firehose')

编写以下代码:

with open('sampleTempDataForTutorial.json') as json_file: observations = json.load(json_file) for observation in observations: print(observation) response = temperatureClient.put_record( DeliveryStreamName='temperatureStream', Record={'Data': json.dumps(observation)} ) print(response)

在前面的代码中,将文件作为json打开并加载到observations变量中。然后,遍历每个observation并将记录发送到Firehose,使用put_record方法。注意,当将数据添加到Record时,输出json中的记录。

应该在Python控制台中看到记录和响应。

导航到AWS控制台,然后转到S3存储桶。

应该看到写入到存储桶的记录。

打开文件以确保记录已转换为开尔文。

如果客户端连续生成数据,单独写入记录就足够了。但是,也可以使用put_record_batch方法批量写入数据,一次性写入Firehose。

用以下代码替换代码:

import json import boto3 session = boto3.Session(profile_name='default') temperatureClient = session.client('firehose') records = [] with open('sampleTempDataForTutorial.json') as json_file: observations = json.load(json_file) count = 1 for observation in observations: if count % 500 == 0: response = temperatureClient.put_record_batch( DeliveryStreamName='temperatureStream', Records=records ) print(response) print(len(records)) records.clear() record = {'Data': json.dumps(observation)} records.append(record) count += 1 if len(records) > 0: print(len(records)) response = temperatureClient.put_record_batch( DeliveryStreamName='temperatureStream', Records=records ) print(response)

在前面的代码中,创建了一个名为records的列表。还定义了一个名为count的计数器,并将其初始化为1。代码遍历观察结果。每个观察结果被写入一个记录,计数器递增。当计数器是500的倍数时,记录随后被写入Firehose。注意,Firehose允许的最大批处理大小为500条记录。在遍历所有观察结果后,任何剩余的记录都被写入Firehose。

使用put_record_batch方法将数据写入Firehose。与写入一条记录不同,将记录列表写入Firehose。

在执行代码之前,向Json数据文件中添加三条更多记录。

运行代码,应该在Python控制台中看到类似于以下输出。

导航到AWS控制台中的S3存储桶,应该看到写入到存储桶的数据集。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485