Azure Stream Analytics与Azure Function集成指南

在本文中,将探讨如何将Azure Function设置为Azure Stream Analytics作业的输出作业拓扑。这听起来是否很有趣?在之前的文章中,已经介绍了什么是Azure Stream Analytics作业以及如何通过门户和Visual Studio创建它。如果还没有阅读那些文章,强烈建议阅读它们。现在,让开始本文吧。

正如之前提到的,在本文中,将:

  • 使用现有的Azure Stream Analytics作业
  • 创建一个新的Azure Function应用
  • 将新创建的Azure Function设置为流分析作业的输出作业拓扑
  • 监控从Stream Analytics作业流向Azure Function的数据

开始使用Azure Function

将开始创建一个Azure Function。首先,登录到Azure门户并点击“创建资源”图标,然后搜索“Function App”。

要创建Azure Function应用程序,需要登录到Azure门户,点击“创建资源”图标,然后搜索“Function App”。在接下来的屏幕上,提供以下信息:

  • 应用名称
  • 订阅
  • 资源组
  • 操作系统
  • 托管计划
  • 位置
  • 运行时堆栈
  • 存储
  • 应用程序洞察

在这里,消耗计划托管计划允许按执行次数付费,而应用服务计划允许拥有预定义的容量。对于运行时堆栈,将使用.NET,但可以自由选择想要的任何内容。

创建完成后,应该能够在“Function Apps”部分看到它。

现在让转到Visual Studio并为Azure Function创建一个新解决方案。

Azure FunctionApp项目类型

现在可以右键单击新创建的项目并添加一个新的HttpTrigger函数。暂时将访问权限设置为匿名。将函数命名为“GetData”。目前,只是从Stream Analytics作业获取数据并检查长度。

using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Extensions.Http; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using System.Net; using System.Net.Http; using System.Threading.Tasks; namespace ml.IoTPlatform.AzureFunctions { public static class GetData { [FunctionName("GetData")] public static async Task Run( [HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequestMessage req, ILogger log) { log.LogInformation($"GetData function triggered with Uri {req.RequestUri}"); string content = await req.Content.ReadAsStringAsync(); log.LogInformation($"String content is {content}"); dynamic data = JsonConvert.DeserializeObject(content); log.LogInformation($"Data count is {data?.Count}"); if (data?.ToString()?.Length > 262144) { return new HttpResponseMessage(HttpStatusCode.RequestEntityTooLarge); } return req.CreateResponse(HttpStatusCode.OK, "Success"); } } }

如所见,目前没有做太多事情,只是接收数据作为HttpRequestMessage,然后读取内容为req.Content.ReadAsStringAsync(),然后反序列化对象。如果没有执行此步骤,可能会收到错误:“没有MediaTypeFormatter可用于从具有媒体类型‘application/octet-stream’的内容中读取类型为‘Object’的对象。”

还检查实体长度,如果它太大,发送一个带有状态码413的HttpResponseMessage。

要发布Azure Function应用程序,只需右键单击项目并点击发布,然后设置发布目标,选择现有的Azure Function应用,记得之前创建了一个吗?发布完成后,可以进入Function App并查看Function。还可以使用一些虚拟数据进行测试。

让回到AzureStream Analytics,因为已经成功配置了Azure Function App。

在之前的文章中,使用Visual Studio创建了一个Azure Stream Analytics作业解决方案,现在让打开该解决方案并为Azure Function配置新的输出。

还需要在Script.asaql文件中进行一些更改以支持新创建的输出。

WITH BasicOutput AS ( SELECT messageId, deviceId, temperature, humidity, pressure, pointInfo, IoTHub, MAX(EventEnqueuedUtcTime) AS EventEnqueuedUtcTime, EventProcessedUtcTime, PartitionId FROM Input TIMESTAMP BY EventEnqueuedUtcTime GROUP BY TUMBLINGWINDOW(second, 10), messageId, deviceId, temperature, humidity, pressure, pointInfo, IoTHub, EventEnqueuedUtcTime, EventProcessedUtcTime, PartitionId ) SELECT * INTO SQLServerOutput FROM BasicOutput SELECT * INTO AzureFunctionOutput FROM BasicOutput

完成后,只需点击“提交到Azure”按钮,如果对此部分有任何疑问,请阅读之前关于此主题的帖子。现在让再次登录到门户并查看所有输出、输入和查询是否已发布。

酷!干得好,看起来它已经发布了。现在如果点击AzureFunctionOutput,可能会收到一个警告:“请确保在启动ASA作业之前将Azure Functions的最小TLS版本设置为1.0”。宁愿将这视为一个错误而不是警告,因为如果不进行这些更改,Azure Stream Analytics作业将不会写入Azure Function。所以这非常重要,花了很长时间才找到问题的根源,可以在这里看到关于这个问题的回答。

所以只需转到Azure Function App并点击平台功能 -> SSL -> 最小TLS版本。

一旦完成了上述所有步骤,就可以开始Stream Analytics作业了,请确保MXChip连接到电源,以便设备可以开始发送数据。

现在让登录到SQL Server数据库并运行以下查询,以确保从设备获取数据。

SELECT TOP (1000) [Id], [messageId], [deviceId], [temperature], [humidity], [pressure], [pointInfo], [IoTHub], [EventEnqueuedUtcTime], [EventProcessedUtcTime], [PartitionId] FROM [dbo].[StreamData] order by [EventEnqueuedUtcTime] desc

要检查Azure Function输出,可以返回到Azure Function并点击函数并使用监视选项。

在本文中,学习了如何:

  • 使用Azure Stream Analytics作业
  • 创建Azure Function应用
  • 在Visual Studio中创建Azure Function App解决方案
  • 编写HttpTrigger函数并将其发布到Azure Function应用
  • 将Azure Function App设置为Azure Stream Analytics作业的输出作业拓扑
  • 在另一个解决方案中使用创建的包
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485