使用Azure流分析和SQL数据库处理事件中心数据

在本教程中,将学习如何使用Azure流分析服务处理来自事件中心的数据,并将这些数据写入Azure SQL数据库。首先,需要创建一个Azure SQL服务器数据库来接收数据。请按照以下步骤操作:

创建Azure SQL服务器数据库

首先,登录到Azure门户并搜索“SQL数据库”。然后点击“创建”按钮。接下来,选择一个订阅和资源组,例如使用之前文章中提到的“data-streaming-example”资源组。

接下来,输入数据库名称,例如“weather-db”,因为这是本教程将要处理的数据。然后,选择一个服务器。如果没有服务器,请单击“新建”。一个新的创建窗口将打开。使用一个在Azure中唯一的服务器名称,例如“example-weather-server”。

最好选择与之前文章相同的位置,例如西欧。同时,创建一个管理员用户名和密码。通常,这些信息应该是安全的,但在这个例子中,可以选择一些简单的东西,比如名字和“Welcome123!@#”。

接受服务器设置后,屏幕将返回到数据库创建窗口以配置计算和存储层。这些设置指示用户可以存储多少数据以及数据库可以使用多少核心或数据事务单位(DTUs)。核心或DTUs越多,数据库速度越快,但成本越高。无服务器数据库也是一个选项,在这种情况下,用户按使用量付费并获得动态扩展,但价格动态变化。最低层级——基础DTU层级——有5个DTUs,可以存储最多2GB的数据。这一层级对于大多数测试来说已经足够,并且每月成本大约为5美元,因此请选择这一层级。

回到数据库创建窗口,选择存储冗余。选择“本地冗余”,因为它是最不昂贵的。如果服务器发生故障,数据仍然可以在同一个数据中心的另一台服务器上可用。但是,如果整个数据中心出现问题,数据将不可用。

接下来是网络、安全性、附加设置,如初始数据源和校对,最后是标签。跳过所有这些,转到“审查+创建”选项卡,然后点击“创建”按钮。审查应该看起来像这样:

或者,使用Azure CLI,首先创建服务器,然后创建数据库,使用以下命令:

az sql server create –resource-group "data-streaming-example" –name "example-weather-server" –admin-user sander –admin-password "Welcome123!@#" –location "West Europe" az sql db create –resource-group "data-streaming-example" –server "example-weather-server" –name "weather-db" –tier Basic –capacity 5 –max-size "2GB"

不要忘记在管理员密码中转义感叹号。

注意:创建可能需要大约15分钟。

探索Azure流分析

Azure流分析是一个无服务器服务,用于在云中分析实时数据。它使用SQL语法,并可以与自定义的C#和JavaScript扩展。

由于其原生集成,将数据从事件中心移动到流分析变得非常容易。实际上,在之前文章中的天气事件中心,有一个“从事件中启用实时洞察”按钮在“处理数据”选项中,直接从菜单中。从那里,可以尝试一些查询并立即创建一个流分析作业。但是,本教程将通过不同的方式创建一个流分析作业。

创建流分析作业

要在Azure门户中创建一个流分析作业,请搜索该术语,然后点击“创建+”。这个操作会打开一个创建窗口,这个窗口非常明显的小。输入作业名称、订阅、资源组和位置。使用像“evhub-stream-analytics”这样的名称,并使用与事件中心相同的位置,例如西欧。

需要解释两个属性:托管环境和流单元。托管环境指定在云中(通常选项)还是在边缘设备上运行流分析。边缘设备通常是本地的物联网(IoT)设备,需要一个网关连接到Azure。本教程使用云。

流单元的数量指示作业可以同时计算多少数据。更多的流单元意味着更多的内存和计算能力。由于流分析是无服务器的,用户按使用量付费,并且随着使用的流单元数量增加,计费也会增加。默认的三个对于本文的用例来说是可以的。

另一个选项是在用户选择的存储帐户中保存私有数据,如连接字符串。本教程不需要这个,所以不要打开这个选项。

创建窗口现在应该看起来像这个屏幕截图:

或者,使用Azure CLI,使用此命令:

az stream-analytics job create –job-name "evhub-stream-analytics" –resource-group "data-streaming-example" –location "West Europe" –compatibility-level "1.2"

该命令需要在CLI中安装“streaming-analytics”扩展。在Azure门户中运行此命令会提示安装它,所以只需确认即可。

要在自动化环境中运行此命令,请使用:

az extension add –name stream-analytics

添加事件中心输入

流分析作业最初处于停止模式。因此,在启动它之前,首先向作业添加输入和输出。

只需导航到“输入”从刀片菜单添加一个输入并点击“+添加流输入”。此时,用户可以添加三个输入,Blob存储/ADLS Gen2(Azure数据湖存储Gen2)、事件中心或IoT中心。选择事件中心,并打开一个新的刀片以选择事件中心。本教程只有一个事件中心,默认设置。如果有更多,请选择正确的一个。

然后,提供一个输入别名,例如“evhub-weather-input”。值得检查输入格式,它有CSV、Avro、其他(XML)和JSON。默认是JSON,之前文章的代码正在将JSON发送到事件中心,所以应该是可以的。请注意,原始CSV也是可用的,并且在流分析中仍然有现成的支持。

添加SQL数据库输出

接下来,将SQL数据库作为输出添加。只需转到“输出”,点击“添加”按钮,并选择“SQL数据库”。将输出命名为“sql-weather-output”。然后,从下拉菜单中选择资源组和数据库,并输入一个表名,例如“WeatherData”。

给流分析作业访问SQL服务器需要额外的工作。要正确执行此操作,请参见Microsoft的文档。本教程采取了一点捷径。

首先,在分析作业中找到“Managed Identity”并确保它处于打开状态。然后,转到SQL服务器和菜单中的“Azure Active Directory”选项。点击“设置管理员”,找到分析作业的名称,选择它并保存。然后,转到“防火墙和虚拟网络”并选择“是”以“允许Azure服务和资源访问此服务器”。这避免了将分析作业的IP地址列入安全白名单。最后,点击“+添加客户端IP”,然后保存。

还将“WeatherData”表添加到Azure SQL实例中。使用Azure门户或SQL Server Management Studio连接,如果服务器防火墙包括客户端IP。使用Azure SQL创建窗口中的凭据登录。

表应该如下所示:

CREATE TABLE [dbo].[WeatherData]( [id] [int] IDENTITY(1,1) NOT NULL, [date] nvarchar(256) NULL, [city] nvarchar(256) NULL, [description] nvarchar(256) NULL, [EventProcessedUtcTime] datetime2 NULL, [PartitionId] [int] NULL, [EventEnqueuedUtcTime] datetime2 NULL )

通过点击“测试”按钮来测试输入和输出是否配置正确。如果一开始失败,请等待几分钟然后重试。

查询流分析

流分析使用类似SQL的查询从源查询数据。在菜单的“查询”选项中设置这个。本教程使用一个简单的查询(如下)。流分析添加了“EventProcessedUtcTime”、“PartitionId”和“EventEnqueuedUtcTime”。

SELECT [date], city, [description], EventProcessedUtcTime, PartitionId, EventEnqueuedUtcTime INTO [sql-weather-output] FROM [evhub-weather-input]

可选地,用户可以使用聚合。流分析有一组特定的“窗口函数”,指定了分组的时间段。

查看分析操作

现在,返回到概述窗口并启动流分析作业。要么立即开始,要么安排一个开始时间。

启动和停止作业可能需要一两分钟。一旦它运行,启动第一篇文章中的Python应用程序,数据应该出现在Azure SQL表中。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485