分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 运营维护

.Net Rpc服务调用记录3

发布时间:2023-09-06 02:23责任编辑:彭小芳关键词:暂无标签

在服务端定义了IServer 和ServerBase负责服务端的启动,关闭等;在Server启动时,需要1.开启端口检测  2.注册服务 3.提供了接口IServerAddInsInitializer注入,在启动时执行额外逻辑;具体代码如下:

using GP.RPC.Message;using GP.RPC.Route.Manager;using GP.RPC.Server.SerivceType;using GP.RPC.Server.ServerInitializer;using GP.RPC.Server.ServiceExecutor;using GP.RPC.Server.ServiceRegistion;using GP.RPC.Transport.Receiver;using GP.RPC.Transport.Sender;using Microsoft.Extensions.DependencyModel;using System;using System.Collections.Generic;using System.Text;using System.Threading.Tasks;namespace GP.RPC.Server.Abstract{ ??????public abstract class ServerBase : IServer ???{ ???????public IServerAddInsInitializer ServerAddInsInitializer { get; set; } ???????public IServiceRegistion Registion { get; set; } ???????public ServerBase() { ???????????Registion = NullServiceRegistion.Instance; ???????} ???????public virtual async Task StartAsync(int port) { ???????????await StartCoreAsync(port); ???????????await Registion.RegisterAsync(port); ???????????ServerAddInsInitializer = NullServerAddInsInitializer.Instance; ???????????ServerAddInsInitializer.Initialize(); ???????} ???????protected abstract Task StartCoreAsync(int port); ???????public abstract void ShutDown(); ???????public abstract void Print(); ??????????}}
View Code

 通信使用DotNetty完成,因此,服务端实际实现为NettyServer类,具体代码如下:

using DotNetty.Codecs;using DotNetty.Handlers.Logging;using DotNetty.Transport.Bootstrapping;using DotNetty.Transport.Channels;using DotNetty.Transport.Channels.Sockets;using GP.RPC.IOC;using GP.RPC.Message;using GP.RPC.Server;using GP.RPC.Server.Abstract;using GP.RPC.Server.ServiceExecutor;using GP.RPC.Server.ServiceRegistion;using GP.RPC.Transport.Netty.Codec;using GP.RPC.Transport.Netty.Handler;using GP.RPC.Transport.Netty.Sender;using GP.RPC.Transport.Receiver;using System;using System.Collections.Generic;using System.Text;using System.Threading.Tasks;namespace GP.RPC.Transport.Netty.Server{ ???public class NettyServer : ServerBase ???{ ???????private ServerBootstrap bootstrap; ???????private MultithreadEventLoopGroup boss; ???????private MultithreadEventLoopGroup worker; ???????private IChannel bootstrapChannel; ???????//private NettyMessageDecoderAdapter decoder = IocManager.Instance.Resolve<NettyMessageDecoderAdapter>(); ???????//private NettyMessageEncoderAdapter encoder = IocManager.Instance.Resolve<NettyMessageEncoderAdapter>(); ???????//private NettyServerHandler handler = new NettyServerHandler(); ???????private IMessageReceiver _receiver; ???????private IServiceExecutor _executor; ???????public NettyServer( ???????????IMessageReceiver receiver, ???????????IServiceExecutor executor) { ???????????_executor = executor; ???????????_receiver = receiver; ???????????_receiver.Received += Message_Received; ???????????//handler.AsyncServerReceiverHook += _receiver.OnReceived; ???????} ???????public override void ShutDown() ???????{ ???????????Task.WhenAll( ?????????????bootstrapChannel.CloseAsync(), ?????????????boss.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), ?????????????worker.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)) ???????????); ???????} ???????protected override async Task StartCoreAsync(int port) ???????{ ???????????boss = new MultithreadEventLoopGroup(1); ???????????worker = new MultithreadEventLoopGroup(); ???????????bootstrap = new ServerBootstrap(); ???????????bootstrap.Group(boss, worker) ???????????????????.Channel<TcpServerSocketChannel>() ???????????????????.Option(ChannelOption.SoBacklog, 100) ??????????????????????????????????????.Handler(new LoggingHandler(LogLevel.INFO)) ???????????????????.ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel => ???????????????????{ ???????????????????????IChannelPipeline pipeline = channel.Pipeline; ???????????????????????var handler = new NettyServerHandler(); ???????????????????????handler.AsyncServerReceiverHook += _receiver.OnReceived; ???????????????????????pipeline.AddLast(new LengthFieldPrepender(4)); ???????????????????????pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); ???????????????????????pipeline.AddLast(IocManager.Instance.Resolve<NettyMessageEncoderAdapter>(), ?????????????????????????????????????????IocManager.Instance.Resolve<NettyMessageDecoderAdapter>(), ????????????????????????????????????????handler); ???????????????????})); ???????????bootstrapChannel = await bootstrap.BindAsync(port); ?????????????????} ???????private async Task Message_Received(object message) { ???????????var context = message as ServerReceiverContext; ???????????if (context != null) { ???????????????var request = context.Message as RequestMessage; ???????????????if (request != null) { ???????????????????var response = await _executor.ExecuteAsync(request); ???????????????????var sender = new NettySeverMessageSender(context.Context); ???????????????????await sender.SendAsync(null,response); ???????????????} ???????????} ???????} ???????public override void Print() ???????{ ???????????????????????Console.WriteLine($"Channel id is{this.bootstrapChannel.Id} State is {this.bootstrapChannel.Open} Active is{this.bootstrapChannel.Active}"); ???????} ???}}
View Code

服务端接收到消息后,将消息解码为RequestMessage,并交给IServiceExecutor进行实际的服务调用;实现代码如下:

using System;using System.Collections.Generic;using System.Reflection;using System.Text;using System.Threading.Tasks;using GP.RPC.Message;using GP.RPC.Utilities;using Microsoft.Extensions.DependencyModel;using System.Linq;using GP.RPC.Server.SerivceType;namespace GP.RPC.Server.ServiceExecutor{ ???public class SimpleServiceExecutor : IServiceExecutor ???{ ???????private IServiceInstanceFactory _factory; ???????private IServiceTypeManager _typeManager; ???????public SimpleServiceExecutor(IServiceTypeManager typeManager,IServiceInstanceFactory factory) { ???????????_factory = factory; ???????????_typeManager = typeManager; ???????} ???????public async Task<ResponseMessage> ExecuteAsync(RequestMessage request) ???????{ ???????????ResponseMessage response = null; ???????????try ???????????{ ???????????????ContractUtility.CheckNull(request, "RequestMessage"); ???????????????ContractUtility.CheckEmptyOrNull(request.ServiceType, "ServiceType"); ???????????????ContractUtility.CheckEmptyOrNull(request.ServiceMethod, "ServiceMethod"); ???????????????//这里仅通过名字进行匹配 ???????????????var type = GetServiceType(request.ServiceType); ???????????????ContractUtility.Require(type != null, $"Not found matched ServiceType {request.ServiceType}"); ???????????????var parameterTypes = GetParameterTypes(request.Parameters); ???????????????var method = GetServiceMethod(type,request.ServiceMethod, parameterTypes); ???????????????ContractUtility.Require(method != null, $"Not found matched ServiceMethod {request.ServiceMethod} in SerivceType {request.ServiceType}"); ???????????????var result = await ExecuteCoreAsync(_factory.Create(type), method, method.ReturnType, request.Parameters); ???????????????response = new ResponseMessage ???????????????{ ???????????????????RequestId = request.Id, ???????????????????Success=true, ???????????????????Result = result ???????????????}; ???????????????//Console.WriteLine($"Server:{request.Id}"); ???????????} ???????????catch (Exception ex) { ???????????????response =ResponseMessage.FromException(request.Id, ex); ???????????} ???????????return response; ???????????????????} ???????protected Type GetServiceType(string serviceType) { ???????????return _typeManager.GetServiceType(serviceType); ???????} ???????protected Type[] GetParameterTypes(object[] parameters) { ???????????if (null == parameters || 0 == parameters.Length) { ???????????????return Type.EmptyTypes; ???????????} ???????????return parameters.Select(p => p.GetType()).ToArray(); ???????} ???????protected MethodInfo GetServiceMethod(Type type,string serviceMethod, Type[] parameterTypes) { ???????????var method = type.GetMethod(serviceMethod, parameterTypes); ???????????if (method == null || method.IsGenericMethod) { ???????????????return null; ???????????} ???????????return method; ???????} ???????protected async Task<object> ExecuteCoreAsync(object instance, MethodInfo method,Type returnType , object[] parameters) { ???????????var result = method.Invoke(instance, parameters); ???????????if (!IsAsync(returnType)) { ???????????????????????????????return result; ???????????} ???????????if (returnType == typeof(Task)) { ???????????????await ProcessAsyncTask((Task)result); ???????????????return null; ???????????} ???????????return CallProcessAsyncTaskWithResult(returnType.GenericTypeArguments[0], result); ?????????} ???????protected bool IsAsync(Type returnType) { ???????????return ( ??????????????returnType == typeof(Task) || ??????????????(returnType.GetTypeInfo().IsGenericType && returnType.GetGenericTypeDefinition() == typeof(Task<>)) ??????????); ???????} ???????protected async Task ProcessAsyncTask(Task actualTask) { ???????????await actualTask; ???????} ?????????????protected async Task<T> ProcessAsyncTaskWithResult<T>(Task<T> actualTask) { ???????????return await actualTask; ???????} ???????protected object CallProcessAsyncTaskWithResult(Type taskReturnType, object actualTaskValue) ???????{ ???????????return this.GetType() ???????????????????.GetMethod("ProcessAsyncTaskWithResult", BindingFlags.Instance) ???????????????????.MakeGenericMethod(taskReturnType) ???????????????????.Invoke(this, new object[] { actualTaskValue }); ???????} ???}}
View Code

服务调用完成后,生成相应消息ResponseMessage,并交给Server端的MessageSender,进行发送回客户端。由此服务端完成整个处理。

.Net Rpc服务调用记录3

原文地址:https://www.cnblogs.com/fisher3/p/10003290.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved