分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 前端开发

Google Protobuf在Netty中的使用

发布时间:2023-09-06 01:42责任编辑:董明明关键词:暂无标签
[toc]


Google Protobuf在Netty中的使用

程序代码来自于《Netty权威指南》第8章,已经加了注释,不过需要注意的是,使用的proto源代码是在Google Protobuf入门与使用中生成的,关于protobuf代码自动生成工具的使用可以参考这篇文章。

例子中,通过解码器ProtobufVarint32FrameDecoder和编码器ProtobufVarint32LengthFieldPrepender的使用已经解决了半包问题,测试时可以把其注释掉,这样就可以演示Netty中使用Protobuf出现的TCP粘包问题。

同时,通过protobuf的使用,也可以深刻感受到,其在Netty中的使用确实非常简单,编解码、半包问题,只需要添加相关的处理器即可,而且它可以方便地实现跨语言的远程服务调用。(protobuf本身提供了对不同语言的支持)

但其实在使用时会发现有一个问题,就是编解码的对象是需要使用其生成的特定的proto对象来进行操作的,也就是说,需要编写.proto文件,再通过protoc来生成相应语言的代码文件,显然这样做还是会有些麻烦(虽然其实也还好,不算麻烦),有没有方便点的方法呢?后面通过protostuff的使用即可解决这个问题。

服务端

SubReqServer.java

package cn.xpleaf.subscribe;import cn.xpleaf.protobuf.SubscribeReqProto;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class SubReqServer { ???public void bind(int port) throws Exception { ???????// 配置服务端的NIO线程组 ???????EventLoopGroup bossGroup = new NioEventLoopGroup(); ???????EventLoopGroup workerGroup = new NioEventLoopGroup(); ???????try { ???????????ServerBootstrap b = new ServerBootstrap(); ???????????b.group(bossGroup, workerGroup) ???????????????.channel(NioServerSocketChannel.class) ???????????????.option(ChannelOption.SO_BACKLOG, 1024) ???????????????// 添加日志处理器 ???????????????.handler(new LoggingHandler(LogLevel.INFO)) ???????????????.childHandler(new ChannelInitializer<SocketChannel>() { ???????????????????@Override ???????????????????protected void initChannel(SocketChannel ch) throws Exception { ???????????????????????// 添加ProtobufVarint32FrameDecoder,主要用于Protobuf的半包处理 ???????????????????????ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ???????????????????????// 添加ProtobufDecoder解码器,它的参数是com.google.protobuf.MessageLite ???????????????????????// 实际上就是要告诉ProtobufDecoder需要解码的目标类是什么,否则仅仅从字节数组中是 ???????????????????????// 无法判断出要解码的目标类型信息的(服务端需要解析的是客户端请求,所以是Req) ???????????????????????ch.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance())); ???????????????????????/** ????????????????????????* 来自源码的代码注释,用于Protobuf的半包处理 ????????????????????????* * An encoder that prepends the the Google Protocol Buffers ????????????????????????* <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base ????????????????????????* 128 Varints</a> integer length field. For example: ????????????????????????* <pre> ????????????????????????* BEFORE ENCODE (300 bytes) ??????AFTER ENCODE (302 bytes) ????????????????????????* +---------------+ ??????????????+--------+---------------+ ????????????????????????* | Protobuf Data |-------------->| Length | Protobuf Data | ????????????????????????* | ?(300 bytes) ?| ??????????????| 0xAC02 | ?(300 bytes) ?| ????????????????????????* +---------------+ ??????????????+--------+---------------+ ????????????????????????* </pre> * ????????????????????????*/ ???????????????????????ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ???????????????????????// 添加ProtobufEncoder编码器,这样就不需要对SubscribeResp进行手工编码 ???????????????????????ch.pipeline().addLast(new ProtobufEncoder()); ???????????????????????// 添加业务处理handler ???????????????????????ch.pipeline().addLast(new SubReqServerHandler()); ???????????????????} ???????????????}); ???????????// 绑定端口,同步等待成功 ???????????ChannelFuture f = b.bind(port).sync(); ???????????// 等待服务端监听端口关闭 ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????// 优雅退出,释放线程池资源 ???????????bossGroup.shutdownGracefully(); ???????????workerGroup.shutdownGracefully(); ???????} ???} ???public static void main(String[] args) throws Exception { ???????int port = 8080; ???????if(args != null && args.length > 0) { ???????????try { ???????????????port = Integer.valueOf(port); ???????????} catch (NumberFormatException e) { ???????????????// TODO: handle exception ???????????} ???????} ???????new SubReqServer().bind(port); ???}}

SubReqServerHandler.java

package cn.xpleaf.subscribe;import cn.xpleaf.protobuf.SubscribeReqProto;import cn.xpleaf.protobuf.SubscribeRespProto;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class SubReqServerHandler extends ChannelInboundHandlerAdapter { ???/** ????* 由于ProtobufDecoder已经对消息进行了自动解码,因此接收到的订购请求消息可以直接使用 ????* 对用户名进行校验,校验通过后构造应答消息返回给客户端,由于使用了ProtobufEncoder, ????* 所以不需要对SubscribeRespProto.SubscribeResp进行手工编码 ????*/ ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg; ???????String username = req.getUserName(); ???????if("xpleaf".equalsIgnoreCase(username)) { ???????????System.out.println("Service accept client subscribe req : [" + req.toString() + "]"); ???????????ctx.writeAndFlush(resp(req.getSubReqID())); ???????} ???} ???/** ????* 构建SubscribeRespProto.SubscribeResp对象 ????* @param subReqID ????* @return ????*/ ???private SubscribeRespProto.SubscribeResp resp(int subReqID) { ???????SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder(); ???????builder.setSubReqID(subReqID); ???????builder.setRespCode(0); ???????builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); ???????return builder.build(); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ???????// 发生异常,关闭链路 ???????ctx.close(); ???}}

客户端

SubReqClient.java

package cn.xpleaf.subscribe;import cn.xpleaf.protobuf.SubscribeRespProto;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class SubReqClient { ???public void connect(String host, int port) throws Exception { ???????// 配置客户端NIO线程组 ???????EventLoopGroup group = new NioEventLoopGroup(); ???????try { ???????????Bootstrap b = new Bootstrap(); ???????????b.group(group).channel(NioSocketChannel.class) ???????????????.option(ChannelOption.TCP_NODELAY, true) ???????????????// 设置TCP连接超时时间 ???????????????.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) ???????????????.handler(new ChannelInitializer<SocketChannel>() { ???????????????????@Override ???????????????????protected void initChannel(SocketChannel ch) throws Exception { ???????????????????????// 添加ProtobufVarint32FrameDecoder,主要用于Protobuf的半包处理 ???????????????????????ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ???????????????????????// 添加ProtobufDecoder解码器,它的参数是com.google.protobuf.MessageLite ???????????????????????// 实际上就是要告诉ProtobufDecoder需要解码的目标类是什么,否则仅仅从字节数组中是 ???????????????????????// 无法判断出要解码的目标类型信息的(客户端需要解析的是服务端请求,所以是Resp) ???????????????????????ch.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance())); ???????????????????????/** ????????????????????????* 来自源码的代码注释,用于Protobuf的半包处理 ????????????????????????* * An encoder that prepends the the Google Protocol Buffers ????????????????????????* <a href="https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints">Base ????????????????????????* 128 Varints</a> integer length field. For example: ????????????????????????* <pre> ????????????????????????* BEFORE ENCODE (300 bytes) ??????AFTER ENCODE (302 bytes) ????????????????????????* +---------------+ ??????????????+--------+---------------+ ????????????????????????* | Protobuf Data |-------------->| Length | Protobuf Data | ????????????????????????* | ?(300 bytes) ?| ??????????????| 0xAC02 | ?(300 bytes) ?| ????????????????????????* +---------------+ ??????????????+--------+---------------+ ????????????????????????* </pre> * ????????????????????????*/ ???????????????????????ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); ???????????????????????// 添加ProtobufEncoder编码器,这样就不需要对SubscribeResp进行手工编码 ???????????????????????ch.pipeline().addLast(new ProtobufEncoder()); ???????????????????????// 添加业务处理handler ???????????????????????ch.pipeline().addLast(new SubReqClientHandler()); ???????????????????} ???????????????}); ???????????// 发起异步连接操作 ???????????ChannelFuture f = b.connect(host, port).sync(); ???????????// 等待客户端链路关闭 ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????// 优雅退出,释放NIO线程组 ???????????group.shutdownGracefully(); ???????} ???} ???public static void main(String[] args) throws Exception { ???????int port = 8080; ???????if(args != null && args.length > 0) { ???????????try { ???????????????port = Integer.valueOf(port); ???????????} catch (NumberFormatException e) { ???????????????// 采用默认值 ???????????} ???????} ???????new SubReqClient().connect("localhost", port); ???}}

SubReqClientHandler.java

package cn.xpleaf.subscribe;import java.util.ArrayList;import java.util.List;import cn.xpleaf.protobuf.SubscribeReqProto;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class SubReqClientHandler extends ChannelInboundHandlerAdapter { ???@Override ???public void channelActive(ChannelHandlerContext ctx) { ???????for(int i = 0; i < 10; i++) { ???????????ctx.write(subReq(i)); ???????} ???????ctx.flush(); ???} ???/** ????* 构建SubscribeReqProto.SubscribeReq对象 ????* @param i ????* @return ????*/ ???private SubscribeReqProto.SubscribeReq subReq(int i) { ???????SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder(); ???????builder.setSubReqID(i); ???????builder.setUserName("xpleaf"); ???????builder.setProductName("Netty Book For Protobuf"); ???????List<String> address = new ArrayList<>(); ???????address.add("NanJing YuHuaTai"); ???????address.add("BeiJing LiuLiChange"); ???????address.add("ShenZhen HongShuLin"); ???????builder.addAllAddress(address); ???????return builder.build(); ???} ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????System.out.println("Service accept server subscribe response : [" + msg + "]"); ???} ???@Override ???public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ???????ctx.flush(); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????cause.printStackTrace(); ???????ctx.close(); ???}}

测试

服务端输出如下:

Service accept client subscribe req : [subReqID: 0userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 1userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 2userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 3userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 4userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 5userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 6userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 7userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 8userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]Service accept client subscribe req : [subReqID: 9userName: "xpleaf"productName: "Netty Book For Protobuf"address: "NanJing YuHuaTai"address: "BeiJing LiuLiChange"address: "ShenZhen HongShuLin"]

客户端输出如下:

Service accept server subscribe response : [subReqID: 0respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 1respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 2respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 3respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 4respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 5respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 6respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 7respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 8respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]Service accept server subscribe response : [subReqID: 9respCode: 0desc: "Netty book order succeed, 3 days later, sent to the designated address"]

Google Protobuf在Netty中的使用

原文地址:http://blog.51cto.com/xpleaf/2071715

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved