在本教程中,将学习如何使用Python和AWS Kinesis Firehose流传输数据。将通过编写一个简单的Python客户端来实现这一目标,该客户端将使用put_record
和put_record_batch
函数分别发送单个记录和批量记录。
首先,需要创建一些样本数据。访问并创建一个免费账户。然后,点击“Schemas”创建一个新的模式。将这个模式命名为SampleTempDataForTutorial
。
在Mockaroo中创建模式:
创建一个名为station
的字段,并将其类型设置为State
(缩写)。创建一个名为temp
的字段,并将其设置为Number
类型,最小值为1,最大值为100,并保留两位小数。
在Mockaroo中创建SampleTempDataForTutorial
数据:
点击fx
按钮并创建以下公式:
Python
if random(0, 10) == 10:
this = this + 1000
end
if this > random(0, 100):
format(this, 2) + 'F'
elif this < random(0, 100):
format(this, 2) + 'f'
elif this > random(0, 75):
format(this, 2) + 'c'
else:
format(this, 2) + 'C'
该公式随机生成温度值,并随机分配F、f、C或c后缀。注意,它还生成了一些超过1000度的无效温度。将在后续教程中使用这些异常数据来说明Kinesis Analytics。
点击“Apply”返回主屏幕。输入1000行,选择“Json”格式,并勾选数组复选框。点击“download”下载数据。
这里假设使用的是PyCharm,可以使用任何喜欢的IDE或Python交互式解释器。首先,使用put_record
命令将记录单独写入Firehose,然后使用put_record_batch
命令批量写入记录。
启动PyCharm。假设已经安装了AWS Toolkit并配置了凭据。注意,这里使用的是默认开发凭据。
在生产软件中,应该使用适当的角色和凭据提供者,不要依赖于内置的AWS账户。
遵循最佳安全实践配置AWS资源:
创建一个新的纯Python应用程序,命名为StreamingDataClient
。
在PyCharm中创建一个新的纯Python项目。
创建一个名为FireHoseClient.py
的新文件,并导入Boto3和json。
使用为开发分配的AWS配置文件创建一个新的会话。
从会话中创建一个新的Firehose客户端。
使用默认AWS凭据创建会话:
import json
import boto3
session = boto3.Session(profile_name='default')
temperatureClient = session.client('firehose')
编写以下代码:
with open('sampleTempDataForTutorial.json') as json_file:
observations = json.load(json_file)
for observation in observations:
print(observation)
response = temperatureClient.put_record(
DeliveryStreamName='temperatureStream',
Record={'Data': json.dumps(observation)}
)
print(response)
在前面的代码中,将文件作为json打开并加载到observations
变量中。然后,遍历每个observation
并将记录发送到Firehose,使用put_record
方法。注意,当将数据添加到Record时,输出json中的记录。
应该在Python控制台中看到记录和响应。
导航到AWS控制台,然后转到S3存储桶。
应该看到写入到存储桶的记录。
打开文件以确保记录已转换为开尔文。
如果客户端连续生成数据,单独写入记录就足够了。但是,也可以使用put_record_batch
方法批量写入数据,一次性写入Firehose。
用以下代码替换代码:
import json
import boto3
session = boto3.Session(profile_name='default')
temperatureClient = session.client('firehose')
records = []
with open('sampleTempDataForTutorial.json') as json_file:
observations = json.load(json_file)
count = 1
for observation in observations:
if count % 500 == 0:
response = temperatureClient.put_record_batch(
DeliveryStreamName='temperatureStream',
Records=records
)
print(response)
print(len(records))
records.clear()
record = {'Data': json.dumps(observation)}
records.append(record)
count += 1
if len(records) > 0:
print(len(records))
response = temperatureClient.put_record_batch(
DeliveryStreamName='temperatureStream',
Records=records
)
print(response)
在前面的代码中,创建了一个名为records
的列表。还定义了一个名为count
的计数器,并将其初始化为1。代码遍历观察结果。每个观察结果被写入一个记录,计数器递增。当计数器是500的倍数时,记录随后被写入Firehose。注意,Firehose允许的最大批处理大小为500条记录。在遍历所有观察结果后,任何剩余的记录都被写入Firehose。
使用put_record_batch
方法将数据写入Firehose。与写入一条记录不同,将记录列表写入Firehose。
在执行代码之前,向Json数据文件中添加三条更多记录。
运行代码,应该在Python控制台中看到类似于以下输出。
导航到AWS控制台中的S3存储桶,应该看到写入到存储桶的数据集。