分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > IT知识

Netty JDK序列化编解码传输对象

发布时间:2023-09-06 01:26责任编辑:顾先生关键词:暂无标签

JDK序列化不需要额外的类库,只需要实现Serializable即可,但是序列化之后的码流只有Java才能反序列化,所以它不是跨语言的,另外由于Java序列化后码流比较大,效率也不高,所以在RPC中很少使用,本文只是做学习之用。

编解码器:

public class JdkDecoder extends MessageToMessageDecoder<ByteBuf> { ???@Override ???protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { ???????final int length = byteBuf.readableBytes(); ???????final byte[] b = new byte[length]; ???????byteBuf.getBytes(byteBuf.readerIndex(), b, 0, length); ???????ByteArrayInputStream bis = new ByteArrayInputStream(b); ???????ObjectInputStream ois = new ObjectInputStream(bis); ???????list.add(ois.readObject()); ???????ois.close(); ???}}public class JdkEncoder extends MessageToByteEncoder<Object> { ???@Override ???protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { ???????ByteArrayOutputStream bos = new ByteArrayOutputStream(); ???????ObjectOutputStream oos = new ObjectOutputStream(bos); ???????oos.writeObject(o); ???????oos.flush(); ???????byteBuf.writeBytes(bos.toByteArray()); ???????bos.close(); ???????oos.close(); ???}}

---

传输对象:

public class Person implements Serializable{ ???private int age; ???private String name; ???private boolean man; ???private List<String> list; ???private Date birth; ???private Person son; ???public int getAge() { ???????return age; ???} ???public void setAge(int age) { ???????this.age = age; ???} ???。。。 ???@Override ???public String toString() { ???????return "Person{" + ???????????????"age=" + age + ???????????????", name=‘" + name + ‘\‘‘ + ???????????????", man=" + man + ???????????????", list=" + list + ???????????????", birth=" + birth + ???????????????", son=" + son + ???????????????‘}‘; ???}}

---

Server端:

public class EchoServer { ???public static void main(String[] args) { ???????new EchoServer().bind(8080); ???} ???public void bind(int port) { ???????//配置服务端的线程组,一个用于服务端接收客户端连接,另一个进行SocketChannel的网络读写 ???????EventLoopGroup bossGroup = new NioEventLoopGroup(); ???????EventLoopGroup workerGroup = new NioEventLoopGroup(); ???????try { ???????????//ServerBootstrap用于启动NIO服务端的辅助启动类 ???????????ServerBootstrap bootstrap = new ServerBootstrap(); ???????????bootstrap.group(bossGroup, workerGroup) ???????????????????.channel(NioServerSocketChannel.class) ???????????????????.option(ChannelOption.SO_BACKLOG, 100) ???????????????????//.handler(new LoggingHandler(LogLevel.INFO)) ???????????????????.childHandler(new ChannelInitializer<SocketChannel>() { ???????????????????????@Override ???????????????????????protected void initChannel(SocketChannel ch) throws Exception { ???????????????????????????// ???????????????????????????ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ???????????????????????????//ch.pipeline().addLast(new MsgpackDecoder()); ???????????????????????????ch.pipeline().addLast(new JdkDecoder()); ???????????????????????????//在报文前增加2个字节,写消息长度 ???????????????????????????ch.pipeline().addLast(new LengthFieldPrepender(2)); ???????????????????????????//ch.pipeline().addLast(new MsgpackEncoder()); ???????????????????????????ch.pipeline().addLast(new JdkEncoder()); ???????????????????????????ch.pipeline().addLast(new EchoServerHandler()); ???????????????????????} ???????????????????}); ???????????//绑定端口,sync为同步阻塞方法,等待绑定成功,ChannelFuture用于异步操作的通知回调 ???????????ChannelFuture future = bootstrap.bind(port).sync(); ???????????System.out.println("server started"); ???????????//等待服务端监听端口关闭 ???????????future.channel().closeFuture().sync(); ???????} catch (InterruptedException e) { ???????????e.printStackTrace(); ???????} finally { ???????????System.out.println("server shuting down"); ???????????//释放线程资源 ???????????bossGroup.shutdownGracefully(); ???????????workerGroup.shutdownGracefully(); ???????} ???}}public class EchoServerHandler extends ChannelInboundHandlerAdapter { ???int count = 0; ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????if(msg instanceof Person){ ???????????Person p = (Person)msg; ???????????Q.p(p.toString()); ???????}else { ???????????System.out.println("The server received(" + count++ + "): " + msg); ???????} ???????ctx.writeAndFlush(msg);//异步发送 ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????cause.printStackTrace(); ???????ctx.close(); ???}}

---

Client端:

public class EchoClient { ???public static void main(String[] args) { ???????new EchoClient().connect("127.0.0.1", 8080); ???} ???public void connect(String host, int port) { ???????//配置客户端NIO线程组 ???????EventLoopGroup group = new NioEventLoopGroup(); ???????try { ???????????Bootstrap bootstrap = new Bootstrap(); ???????????bootstrap.group(group).channel(NioSocketChannel.class) ???????????????????.option(ChannelOption.TCP_NODELAY, true) ???????????????????.handler(new ChannelInitializer<SocketChannel>() { ???????????????????????protected void initChannel(SocketChannel ch) throws Exception { ???????????????????????????ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); ???????????????????????????//ch.pipeline().addLast(new MsgpackDecoder()); ???????????????????????????ch.pipeline().addLast(new JdkDecoder()); ???????????????????????????ch.pipeline().addLast(new LengthFieldPrepender(2)); ???????????????????????????//ch.pipeline().addLast(new MsgpackEncoder()); ???????????????????????????ch.pipeline().addLast(new JdkEncoder()); ???????????????????????????ch.pipeline().addLast(new EchoClientHandler()); ???????????????????????} ???????????????????}); ???????????//发起异步连接操作,同步等待连接成功 ???????????ChannelFuture future = bootstrap.connect(host, port).sync(); ???????????System.out.println("client started"); ???????????//等待客户端链路关闭 ???????????future.channel().closeFuture().sync(); ???????} catch (InterruptedException e) { ???????????e.printStackTrace(); ???????} finally { ???????????System.out.println("client shuting down"); ???????????//释放NIO线程组 ???????????group.shutdownGracefully(); ???????} ???}}public class EchoClientHandler extends ChannelInboundHandlerAdapter { ???private int count = 0; ???@Override ???public void channelActive(ChannelHandlerContext ctx) throws Exception { ???????List l = new ArrayList<String>(); ???????l.add("abc"); ???????l.add("123"); ???????Person p = new Person(); ???????p.setName("luangeng"); ???????p.setMan(true); ???????p.setBirth(new Date()); ???????p.setList(l); ???????for (int i = 0; i < 10; i++) { ???????????p.setAge(i); ???????????ctx.write(p); ???????} ???????ctx.flush(); ???} ???//服务端返回应答信息后调用 ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????if(msg instanceof Person){ ???????????Person p = (Person)msg; ???????????Q.p(p.toString()); ???????}else { ???????????Q.p(count++ + " client get: " + msg); ???????} ???} ???@Override ???public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ???????ctx.flush(); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????cause.printStackTrace(); ???????ctx.close(); ???}}

---

执行结果:

client started
Person{age=0, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=1, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=2, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=3, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=4, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=5, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=6, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=7, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=8, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}
Person{age=9, name=‘luangeng‘, man=true, list=[abc, 123], birth=Wed Nov 22 21:18:48 CST 2017, son=null}

MessagePack工具:

MessagePack是与JSON数据格式类似的二进制序列化格式,更快更小,并且是跨语言的,用于在多个语言之间交换数据。使用MessagePack实现的编解码器如下:

public class MsgpackEncoder extends MessageToByteEncoder<Object> { ???@Override ???protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { ???????MessagePack mp = new MessagePack(); ???????byte[] raw = mp.write(o); ???????byteBuf.writeBytes(raw); ???}}public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> { ???@Override ???protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { ???????final int length = byteBuf.readableBytes(); ???????final byte[] b = new byte[length]; ???????byteBuf.getBytes(byteBuf.readerIndex(), b, 0, length); ???????MessagePack mp = new MessagePack(); ???????list.add(mp.read(b)); ???}}

---

使用这种编解码后,服务端和客户端接收到的对象都不能转换为Person对象。

end

Netty JDK序列化编解码传输对象

原文地址:http://www.cnblogs.com/luangeng/p/7881428.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved