分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 前端开发

NetCore下搭建websocket集群方案

发布时间:2023-09-06 02:25责任编辑:苏小强关键词:websocket

介绍

最近在做一个基于netcore的实时消息服务。最初选用的是ASP.NET Core SignalR,但是后来发现目前它并没有支持IOS的客户端,所以自己只好又基于websocket重新搭建了一套服务。

因为前期已经使用了SignalR,所以我直接在原本的项目里面重新扩展了一套自定义websocket服务。

在网上有一篇博文介绍了如何在Asp.net Core中使用中间件来管理websocket,我的大部分代码也是参考这篇文章。在这儿贴个链接

在Asp.net Core中使用中间件来管理websocket

自定义WebSocket 中间件

要阅读ASP.NET Core中的WebSockets支持,可以在此处查看。如果你的项目跟我一样,已经使用了Signalr,那么你不需要在安装Microsoft.AspNetCore.WebSockets包,否则在项目开始前,

需要安装此Nuget包。现在你可以自定义你自己的中间件了。

/// <summary> ???/// websocket 协议扩展中间件 ???/// </summary> ???public class CustomWebSocketMiddlewarr ???{ ???????private readonly RequestDelegate _next; ???????public CustomWebSocketMiddlewarr(RequestDelegate next) ???????{ ???????????_next = next; ???????} ???????public async Task Invoke(HttpContext context, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler) ???????{ ????????????if (context.WebSockets.IsWebSocketRequest) ???????????????{ ???????????????????string ConId = context.Request.Query["sign"]; ???????????????????if (!string.IsNullOrEmpty(ConId)) ???????????????????{ ???????????????????????WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync(); ???????????????????????CustomWebSocket userWebSocket = new CustomWebSocket() ???????????????????????{ ???????????????????????????WebSocket = webSocket, ???????????????????????????ConId = ConId ???????????????????????}; ???????????????????????wsFactory.Add(userWebSocket); ???????????????????//await wsmHandler.SendInitialMessages(userWebSocket); ???????????????????await Listen(context, userWebSocket, wsFactory, wsmHandler); ???????????????????????????????????????????} ???????????????} ???????????????else ???????????????{ ???????????????????context.Response.StatusCode = 400; ???????????????} ???????????????????????await _next(context); ???????}     //监听客户端发送过来的消息 ???????private async Task Listen(HttpContext context, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler) ???????{ ???????????WebSocket webSocket = userWebSocket.WebSocket; ???????????var buffer = new byte[1024 * 4]; ???????????WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); ???????????while (!result.CloseStatus.HasValue) ???????????{ ???????????????await wsmHandler.HandleMessage(result, buffer, userWebSocket, wsFactory); ???????????????buffer = new byte[1024 * 4]; ???????????????result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None); ???????????} ???????????wsFactory.Remove(userWebSocket.ConId); ???????????await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None); ???????} ???}

在自定义的中间件中,首先判断是否是websocket请求,如果是的话,在查看是否有对应的sign标识,满足条件后进入后续的处理环节。

简单讲解一下这里面的处理逻辑。因为我的项目中同时存在Signalr,而Signalr也会使用到websocket协议。但是Signalr的websocket请求传入的参数是id,所以我在这儿自定义了一个参数sign为了和Signalr

做区分。那么这个sign是做什么用的呢? 其实sign是前端传过来的唯一标识,和此次连接对应,也可以理解为Signalr里面的connectionId。然后会把标识和对应websocket类到存入到一个list集合中。即代码

中的  wsFactory.Add(userWebSocket)。

CustomWebSocket是一个包含WebSocket和标识的类:

public ?class CustomWebSocket ???{ ???????????????public string ConId { get; set; } ???????public WebSocket WebSocket { get; set; } ???}

然后定义了一个Websocket工厂类,用来存取连接到服务的Websocket实例。

//接口
public ?interface ICustomWebSocketFactory ???{ ???????void Add(CustomWebSocket uws); ???????void Remove(string conId); ???????List<CustomWebSocket> All(); ???????List<CustomWebSocket> Others(CustomWebSocket client); ???????CustomWebSocket Client(string conId); ???}
  

具体实现

public class CustomWebSocketFactory: ICustomWebSocketFactory ???{ ???????List<CustomWebSocket> List; ???????public CustomWebSocketFactory() ???????{ ???????????List = new List<CustomWebSocket>(); ???????} ???????public void Add(CustomWebSocket uws) ???????{ ???????????List.Add(uws); ???????} ???????public void Remove(string conId) ???????{ ???????????List.Remove(Client(conId)); ??????????????????} ???????public List<CustomWebSocket> All() ???????{ ???????????return List; ???????} ??????????????public List<CustomWebSocket> Others(CustomWebSocket client) ???????{ ???????????return List.Where(c => c.ConId != client.ConId).ToList(); ???????} ???????public CustomWebSocket Client(string conId) ???????{ ???????????var uws= List.FirstOrDefault(c => c.ConId == conId); ???????????return uws; ???????} ???}

可以看到最终我们存取websocket都是通过list来进行,所以在注入的时候一定要注意。注入成单例模式。

services.AddSingleton<ICustomWebSocketFactory, CustomWebSocketFactory>();

CustomWebSocketMessageHandle包含有关消息处理的逻辑(发送,接收)
public interface ICustomWebSocketMessageHandler ???{ ???????Task SendInitialMessages(CustomWebSocket userWebSocket); ???????Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory); ???????Task SendMessageInfo(string conId, object data, ICustomWebSocketFactory wsFactory); ???}public ??class CustomWebSocketMessageHandler:ICustomWebSocketMessageHandler ???{ ???????public async Task SendInitialMessages(CustomWebSocket userWebSocket) ???????{ ???????????WebSocket webSocket = userWebSocket.WebSocket; ???????????var msg = new CustomWebSocketMessage ???????????{ ???????????????MessagDateTime = DateTime.Now, ???????????????Type = WSMessageType.连接响应 ???????????}; ???????????string serialisedMessage = JsonConvert.SerializeObject(msg); ???????????byte[] bytes = Encoding.ASCII.GetBytes(serialisedMessage); ???????????await webSocket.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Text, true, CancellationToken.None); ???????} ???????/// <summary> ???????/// 推送消息到客户端 ???????/// </summary> ???????/// <returns></returns> ???????public async Task SendMessageInfo(string conId,object data, ICustomWebSocketFactory wsFactory) ???????{ ???????????var uws = wsFactory.Client(conId); ???????????CustomWebSocketMessage message = new CustomWebSocketMessage(); ???????????message.DataInfo = data; ???????????message.Type = WSMessageType.任务数量; ???????????message.MessagDateTime = DateTime.Now; ???????????if (uws == null) ???????????{ ???????????????//广播到其他集群节点 ???????????????var listpush = new List<PushMsg>(); ???????????????var push = new PushMsg() ???????????????{ ???????????????????sendjsonMsg = new WebSocketFanoutDto() ???????????????????{ ???????????????????????conId = conId, ???????????????????????data = message ???????????????????}, ???????????????????exchangeName = "saas.reltimewsmes.exchange", ???????????????????sendEnum = SendEnum.订阅模式 ???????????????}; ???????????????listpush.Add(push); ???????????????BTRabbitMQManage.PushMessageAsync(listpush); ???????????????return; ???????????} ??????????????????????var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); ???????????var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(message)); ??????????await uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None); ???????} ???????/// <summary> ???????/// 处理接收到的客户端信息 ???????/// </summary> ???????/// <param name="result"></param> ???????/// <param name="buffer"></param> ???????/// <param name="userWebSocket"></param> ???????/// <param name="wsFactory"></param> ???????/// <returns></returns> ???????public async Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory) ???????{ ???????????string msg = Encoding.UTF8.GetString(buffer); ???????????try ???????????{ ???????????????var message = JsonConvert.DeserializeObject<CustomWebSocketMessage>(msg); ???????????????if (message.Type == WSMessageType.用户信息) ???????????????{ ???????????????????var logdto = JsonConvert.DeserializeObject<LoginInfoDto>(message.DataInfo.ToJsonString()); ???????????????????await InitUserInfo(logdto, userWebSocket, wsFactory); ???????????????} ??????????????????????????} ???????????catch (Exception e) ???????????{ ???????????????var exbuffer = Encoding.UTF8.GetBytes(e.Message); ???????????????var excount = Encoding.UTF8.GetByteCount(e.Message); ???????????????await userWebSocket.WebSocket.SendAsync(new ArraySegment<byte>(exbuffer, 0, excount), result.MessageType, result.EndOfMessage, CancellationToken.None); ???????????} ???????} ???????/// <summary> ???????/// 初始化用户连接关系 ???????/// </summary> ???????/// <param name="dto"></param> ???????/// <param name="userWebSocket"></param> ???????/// <param name="wsFactory"></param> ???????/// <returns></returns> ???????private async Task InitUserInfo(LoginInfoDto dto, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory) ???????{ ???????????if (dto.userId == 0) ???????????????return; ???????????var contectid = userWebSocket.ConId; ???????????var key = ""; ???????????if (dto.tenantId.HasValue) ???????????????key += "T_" + dto.userId + "_" + dto.tenantId + "_" + "tenant_"; ???????????if (dto.bankId.HasValue) ???????????????key += "B_" + dto.userId + "_" + dto.bankId + "_" + "bank_"; ???????????key += dto.fromeType; ???????????//添加缓存 ???????????CacheInstace<string>.GetRedisInstanceDefaultMemery().AddOrUpdate(key, contectid, r => ???????????{ ???????????????r = contectid; ???????????????return r; ???????????}); ???????????CacheInstace<string>.GetRedisInstanceDefaultMemery().Expire(key, new TimeSpan(12, 0, 0)); ??????????????????} ??????????}
在这里面,推送消息到客户端的时候,如果未找到标识对应的Websocket对象,则将消息广播到所有的集群节点上。我们知道Signalr里面的集群实现通过redis来做的,但在此处,因为
我项目里面已经搭建了Rabbitmq的高可用集群,所以我直接通过Rabbitmq来进行广播。这样不管我是在集群的那个节点上来推送消息,都可以保证消息被正确推送到客户端。
关于广播消息的订阅实现:
 public class WebSocketFanoutDto ???{ ???????public string conId { get; set; } ???????public CustomWebSocketMessage data { get; set; } ???} public class FanoutMesConsume : IMessageConsume ???{ ???????public void Consume(string message) ???????{ ???????????var condto = JsonConvert.DeserializeObject<WebSocketFanoutDto>(message); ???????????var wsFactory = IOCManage.ServiceProvider.GetService<ICustomWebSocketFactory>(); ???????????var uws = wsFactory.Client(condto.conId); ???????????if (uws != null) ???????????{ ???????????????//发送消息 ???????????????var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(condto.data)); ???????????????var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(condto.data)); ???????????????uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None); ???????????} ???????} ???}

最后在扩展类里面添加消息监视和注入Websocket中间件。

当然不要忘记 消息处理类的依赖注入

services.AddSingleton<ICustomWebSocketMessageHandler, CustomWebSocketMessageHandler>();
 public static IApplicationBuilder UseCustomWebSocketManager(this IApplicationBuilder app) ???????{ ???????????//添加针对分布式集群的消息监视 ???????????RabbitMQManage.Subscribe<FanoutMesConsume>(new MesArgs() ???????????{ ???????????????exchangeName = "reltimewsmes.exchange", ???????????????sendEnum = SendEnum.订阅模式 ???????????}); ???????????return app.UseMiddleware<CustomWebSocketMiddlewarr>(); ???????}

至此这个框架搭建完成,最后在startup类中注入。

关于Rabbitmq的使用,发送和接收是我基于easynetq封装的一个帮助类,大家可以自行实现。

这里面最主要的逻辑就是每一个websocket实例都有一个对应的标识,然后在连接成功后,前端会发送用户信息,后端服务再把用户信息和连接标识关联。这样如果想推送信息到某个用户的话,就可以通过

用户信息来找到用户对应的连接信息。至于为什么整个流程会这么复杂的,就一言难尽(我能怎么办,我也很绝望啊)。大多数时候大家都可以直接通过token认证来绑定用户和socket连接。

目前还有几个问题一个广播消息的时候,发送消息方也会收到这个消息,这挺尴尬,目前我还没想到太好的解决办法。

第二个是采用单例list字段存储连接的websocket实例,少的时候还好,如果多的话,感觉可能会存在堆栈溢出的问题,但没实际测试过,所以目前还不知道最大的连接数多少。

NetCore下搭建websocket集群方案

原文地址:https://www.cnblogs.com/dandan123/p/10059026.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved