分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 代码编程

Netty5 序列化方式(Jboss Marshalling)

发布时间:2023-09-06 01:08责任编辑:蔡小小关键词:暂无标签


Netty作为很多高性能的底层通讯工具,被很多开发框架应用再底层,今天来说说常用的序列化工具,用Jboss的Marshalling。


直接上代码,Marshalling的工厂类

packagecom.netty.serialize.marshalling;importio.netty.handler.codec.marshalling.*;importorg.jboss.marshalling.MarshallerFactory;importorg.jboss.marshalling.Marshalling;importorg.jboss.marshalling.MarshallingConfiguration;/***Createdbysdcon2017/8/28.*/publicclassMarshallingCodeCFactory{/***解码*@return*/publicstaticMarshallingDecoderbuildMarshallingDecoder(){//首先通过Marshalling工具类的精通方法获取Marshalling实例对象参数serial标识创建的是java序列化工厂对象。finalMarshallerFactorymarshallerFactory=Marshalling.getProvidedMarshallerFactory("serial");//创建了MarshallingConfiguration对象,配置了版本号为5finalMarshallingConfigurationconfiguration=newMarshallingConfiguration();configuration.setVersion(5);//根据marshallerFactory和configuration创建providerUnmarshallerProviderprovider=newDefaultUnmarshallerProvider(marshallerFactory,configuration);//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度MarshallingDecoderdecoder=newMarshallingDecoder(provider,1024);returndecoder;}/***编码*@return*/publicstaticMarshallingEncoderbuildMarshallingEncoder(){finalMarshallerFactorymarshallerFactory=Marshalling.getProvidedMarshallerFactory("serial");finalMarshallingConfigurationconfiguration=newMarshallingConfiguration();configuration.setVersion(5);MarshallerProviderprovider=newDefaultMarshallerProvider(marshallerFactory,configuration);//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组MarshallingEncoderencoder=newMarshallingEncoder(provider);returnencoder;}}

这个是Marshalling的序列化方式,Marshalling自带编解码,所以不用担心中途编解码半包的问题。


服务端的Server实现:

packagecom.netty.serialize.server;importcom.netty.serialize.coder.MsgDecoder;importcom.netty.serialize.coder.MsgEncoder;importcom.netty.serialize.handler.ServerHandler;importcom.netty.serialize.marshalling.MarshallingCodeCFactory;importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioServerSocketChannel;/***Createdbysdcon2017/8/26.*/publicclassMsgServer{publicvoidbind(intport)throwsException{EventLoopGroupbossGroup=newNioEventLoopGroup();EventLoopGroupworkerGroup=newNioEventLoopGroup();try{ServerBootstrapsb=newServerBootstrap();sb.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)//.childHandler(newChildChannelHandler()).option(ChannelOption.SO_BACKLOG,1024).childHandler(newChannelInitializer<SocketChannel>(){@OverrideprotectedvoidinitChannel(SocketChannelchannel)throwsException{channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());channel.pipeline().addLast(newServerHandler());}}).childOption(ChannelOption.SO_KEEPALIVE,true);ChannelFuturecf=sb.bind(port).sync();System.out.println("服务端已启动");cf.channel().closeFuture().sync();}finally{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}publicstaticclassChildChannelHandlerextendsChannelInitializer{protectedvoidinitChannel(Channelchannel)throwsException{channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());channel.pipeline().addLast(newServerHandler());}}publicstaticvoidmain(String[]args){try{newMsgServer().bind(8080);}catch(Exceptione){e.printStackTrace();}}}
packagecom.netty.serialize.handler;importcom.netty.serialize.message.Message;importcom.netty.serialize.message.MsgHeader;importio.netty.channel.ChannelHandlerAdapter;importio.netty.channel.ChannelHandlerContext;/***用于测试服务端实现的*Createdbysdcon2017/8/29.*/publicclassServerHandlerextendsChannelHandlerAdapter{@OverridepublicvoidchannelActive(ChannelHandlerContextctx)throwsException{super.channelActive(ctx);//System.out.println("active");}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{cause.printStackTrace();//ctx.close();super.exceptionCaught(ctx,cause);}@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{MessagenewMsg=(Message)msg;//StringmsgStrClient=(String)msg;System.out.println("获取客户端里的内容:"+newMsg);Messagemessage=newMessage();StringmsgStr="客户端接受到通知";MsgHeaderheader=newMsgHeader();header.setStartTag(newByte("0"));header.setCmdCode("1234".getBytes());header.setLength(msgStr.length());header.setVersion("11".getBytes());message.setBody(msgStr);message.setHeader(header);ctx.writeAndFlush(message);}}


客户端的实现:

packagecom.netty.serialize.client;importcom.netty.serialize.coder.MsgDecoder;importcom.netty.serialize.coder.MsgEncoder;importcom.netty.serialize.handler.ClientHandler;importcom.netty.serialize.handler.ServerHandler;importcom.netty.serialize.marshalling.MarshallingCodeCFactory;importcom.netty.serialize.message.Message;importcom.netty.serialize.message.MsgHeader;importio.netty.bootstrap.Bootstrap;importio.netty.channel.*;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NioSocketChannel;/***Createdbysdcon2017/8/26.*/publicclassMsgClient{publicvoidconnect(Stringip,intport)throwsException{EventLoopGroupworkerGroup=newNioEventLoopGroup();//Messagemessage=newMessage();//StringmsgStr="我想发送一条消息";//MsgHeaderheader=newMsgHeader();//header.setStartTag(newByte("0"));//header.setCmdCode("1234".getBytes());//header.setLength(msgStr.length());//header.setVersion("11".getBytes());////message.setBody(msgStr);//message.setHeader(header);try{Bootstrapbs=newBootstrap();bs.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)//.handler(newChildChannelHandler());ChannelFuturef=bs.connect(ip,port).sync();//写入消息//f.channel().writeAndFlush(message).sync();f.channel().closeFuture().sync();}finally{workerGroup.shutdownGracefully();}}publicstaticclassChildChannelHandlerextendsChannelInitializer{protectedvoidinitChannel(Channelchannel)throwsException{channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());channel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());channel.pipeline().addLast(newClientHandler());}}publicstaticvoidmain(String[]args){try{newMsgClient().connect("127.0.0.1",8080);}catch(Exceptione){e.printStackTrace();}}}
packagecom.netty.serialize.handler;importcom.netty.serialize.message.Message;importcom.netty.serialize.message.MsgHeader;importio.netty.channel.*;importio.netty.util.ReferenceCountUtil;/***Createdbysdcon2017/8/29.*/publicclassClientHandlerextendsChannelHandlerAdapter{@OverridepublicvoidchannelActive(ChannelHandlerContextctx)throwsException{Messagemessage=newMessage();StringmsgStr="我想发送一条消息";MsgHeaderheader=newMsgHeader();header.setStartTag(newByte("0"));header.setCmdCode("1234".getBytes());header.setLength(msgStr.length());header.setVersion("11".getBytes());message.setBody(msgStr);message.setHeader(header);ctx.writeAndFlush(message).addListener(newChannelFutureListener(){publicvoidoperationComplete(ChannelFuturechannelFuture)throwsException{if(channelFuture.isSuccess()){//dosthSystem.out.println("成功发送到服务端消息");}else{//dosthSystem.out.println("失败服务端消息失败");}}});//ctx.writeAndFlush(message);}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{//ctx.close();super.exceptionCaught(ctx,cause);}@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{try{MessagenewMsg=(Message)msg;System.out.println("收到服务端的内容"+newMsg);}finally{ReferenceCountUtil.release(msg);}}}


传输的POJO的类,是自定义的封装好的信息。

packagecom.netty.serialize.message;importjava.io.Serializable;/***Createdbysdcon2017/8/26.*/publicclassMessageimplementsSerializable{/****/privatestaticfinallongserialVersionUID=4923081103118853877L;privateMsgHeaderheader;privateObjectbody;//检验和//privatebytecrcCode;//publicbytegetCrcCode(){//returncrcCode;//}////publicvoidsetCrcCode(bytecrcCode){//this.crcCode=crcCode;//}publicMsgHeadergetHeader(){returnheader;}publicvoidsetHeader(MsgHeaderheader){this.header=header;}publicObjectgetBody(){returnbody;}publicvoidsetBody(Objectbody){this.body=body;}@OverridepublicStringtoString(){return"Message{"+"header="+header+",body="+body+//",crcCode="+crcCode+‘}‘;}}
packagecom.netty.serialize.message;importjava.io.Serializable;importjava.util.Arrays;/***Createdbysdcon2017/8/26.*/publicclassMsgHeaderimplementsSerializable{/****/privatestaticfinallongserialVersionUID=4923081103118853877L;//固定头privatebytestartTag;//命令码,4位privatebyte[]cmdCode;//版本2位privatebyte[]version;privateintlength;publicbyte[]getVersion(){returnversion;}publicvoidsetVersion(byte[]version){this.version=version;}publicbyte[]getCmdCode(){returncmdCode;}publicvoidsetCmdCode(byte[]cmdCode){this.cmdCode=cmdCode;}publicbytegetStartTag(){returnstartTag;}publicvoidsetStartTag(bytestartTag){this.startTag=startTag;}publicintgetLength(){returnlength;}publicvoidsetLength(intlength){this.length=length;}@OverridepublicStringtoString(){return"MsgHeader{"+"startTag="+startTag+",cmdCode="+Arrays.toString(cmdCode)+",version="+Arrays.toString(version)+",length="+length+‘}‘;}}


到此就完事了,netty的版本,和marshalling的版本,其他的版本我不清楚会不会有什么错误。

<dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><!--netty--><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>5.0.0.Alpha2</version></dependency><!--jboss-marshalling--><dependency><groupId>org.jboss.marshalling</groupId><artifactId>jboss-marshalling-serial</artifactId><version>2.0.0.Beta2</version></dependency>



Netty5 序列化方式(Jboss Marshalling)

原文地址:http://shangdc.blog.51cto.com/10093778/1962121

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved