在现代软件开发中,跨进程通信是一个常见的需求。随着.NET 5的推出,开发者有了更多选择来构建高性能的通信解决方案。本文将介绍一个基于.NET 5和SignalR库的跨进程通信解决方案,它支持RPC调用、异步响应和流媒体传输。
该解决方案提供了以下功能:
解决方案基于SignalR库,主要使用WebSocket作为传输类型,并以长轮询作为备选。支持以下特性:
SignalRBaseHubServerLib提供了服务器端的基础设施,而SignalRBaseHubClientLib包含基础类HubClient用于客户端。DtoLib定义了服务器和客户端之间共享的请求和响应数据传输对象(DTO)。AsyncAutoResetEventLib提供了用于流媒体的异步版本的自动重置事件。SignalRBaseHubServerLib提供了流媒体接口(IStreamingDataProvider
服务器端创建一个从RpcAndStreamingHub
using System.Linq;
using System.Threading.Tasks;
using System.Text;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using SignalRBaseHubServerLib;
using MessageProviderLib;
using RemoteInterfaces;
using RemoteImplementations;
namespace SignalRSvc.Hubs
{
public class AHub : RpcAndStreamingHub
{
static AHub()
{
RegisterPerCall();
RegisterPerSession();
RegisterSingleton(new RemoteCall3(5));
}
public AHub(ILoggerFactory loggerFactory)
: base(loggerFactory, MessageEventProvider.Instance, (logger, isDirectCall, requestId, clientId, interfaceName, methodName, methodArgs) =>
{
// ...
}, (logger, isDirectCall, requestId, clientId, interfaceName, methodName, methodArgs, result, duration, exception) =>
{
// ...
})
{
}
public async Task ProcessMessage(Message[] args)
{
StringBuilder sbClients = new();
StringBuilder sbData = new();
if (args != null && args.Length > 0)
{
sbClients.Append("Clients: ");
foreach (var clientId in args.Select(dto => dto.ClientId).Distinct())
sbClients.Append($"\n{clientId} ");
sbData.Append("--> Data: ");
foreach (var dto in args)
sbData.Append($"\n{dto.Data} ");
}
else
{
sbClients.Append("No clients");
sbData.Append("No data available");
}
// Send message to all clients
await Clients.All.SendAsync("ReceiveMessage", sbClients.ToString(), sbData.ToString());
return args;
}
}
}
RPC接口在客户端和服务器端注册。服务器提供实现该接口的类。客户端调用RpcAsync()方法,传入接口和方法名称作为参数,以及被调用方法本身的参数。然后服务器端执行该方法,客户端获取结果。如果需要,可以实现将结果广播给其他客户端。服务器支持单例、每次调用和每次会话实例模型,类似于WCF中的模型。
在客户端,创建HubClient类的实例,并调用其方法RegisterInterface
using var hubClient = await new HubClient(url, loggerFactory, $"Client-{Guid.NewGuid()}", null, (logger, isOneWay, request, result, duration, exception) =>
{
// ...
})
.RegisterInterface()
.RegisterInterface()
.RegisterInterface()
.StartConnectionAsync(retryIntervalMs: 1000, numOfAttempts: 15);
然后执行远程调用:
var str = await hubClient.RpcAsync("IRemoteCall1", "Echo", "some text");
var ret3 = (Ret3)await hubClient.RpcAsync("IRemoteCall3", "GetIdAndParam");
此功能需要在服务器端使用与RPC案例相同的派生自RpcAndStreamingHub
hubClient.Connection.On("ReceiveMessage", (s0, s1) => logger.LogInformation($"ReceiveMessage: {s0} {s1}"));
然后使用InvokeAsync()调用服务器上的远程方法:
var jarr = (JArray)await hubClient.InvokeAsync("ProcessMessage", new[] { new Message { ... }, new Message { ... } });
类MessageEventProvider : StreamingDataProvider
hubClient.Subscribe(arg => logger.LogInformation($"Stream: {arg}"));