分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 网页技术

开发基于protostuff编解码技术的Netty程序:传输pojo对象

发布时间:2023-09-06 01:42责任编辑:白小东关键词:暂无标签
[toc]


开发基于protostuff编解码技术的Netty程序:传输pojo对象

前言

这次开发的Netty程序主要是在网络中传输Java对象,传输的对象不仅限于字符串,也可以是自定义的其它类型对象。

前面使用protostuff都是比较单纯地使用,进行简单的一些测试,下面要完成的这个例子功能虽然不复杂,但相对使用起来会比较综合一些。通过序列化工具类的开发、编解码器的开发,然后将其应用到我们的Netty程序当中。

开发这个Netty程序来传输pojo对象是为了后面进行远程过程调用框架的开发做一定的准备,因为在远程调用时,返回的结果就是一个对象,返回的对象类型取决于调用的方法,所以通过Netty程序来传输pojo对象只是开发自定义RPC框架中的一小部分,但却也是十分重要的一部分。

另外需要注意的是,Netty框架本身的使用很重要,而怎么去开发序列化工具类(即如何通过protostuff来开发具有能用性的序列化工具)、如何基于序列化工具类开发Netty的编码器与解码器、如何在Netty中使用自定义开发的编码器与解码器等这些知识都是十重要,而且也是一定要掌握的。

代码中都已经写好了注释,程序是可以直接跑起来的,依赖的相关包,因为使用的是maven工程,所以在后面也会给出pom.xml文件的内容。

protostuff序列化工具类开发

工具类的开发过程可以参考前面的文章基于protostuff的序列化工具类开发,下面直接给出具有缓存功能的序列化工具类的代码:

SerializationUtil2.java

package cn.xpleaf.protostuff.netty.utils;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import com.dyuproject.protostuff.LinkedBuffer;import com.dyuproject.protostuff.ProtostuffIOUtil;import com.dyuproject.protostuff.runtime.RuntimeSchema;/** * 具备缓存功能的序列化工具类,基于Protostuff实现(其基于Google Protobuf实现) * ?* @author yeyonghao * */public class SerializationUtil2 { ???// 缓存schema对象的map ???private static Map<Class<?>, RuntimeSchema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, RuntimeSchema<?>>(); ???/** ????* 根据获取相应类型的schema方法 ????* ?????* @param clazz ????* @return ????*/ ???@SuppressWarnings({ "unchecked", "unused" }) ???private <T> RuntimeSchema<T> getSchema(Class<T> clazz) { ???????// 先尝试从缓存schema map中获取相应类型的schema ???????RuntimeSchema<T> schema = (RuntimeSchema<T>) cachedSchema.get(clazz); ???????// 如果没有获取到对应的schema,则创建一个该类型的schema ???????// 同时将其添加到schema map中 ???????if (schema == null) { ???????????schema = RuntimeSchema.createFrom(clazz); ???????????if (schema != null) { ???????????????cachedSchema.put(clazz, schema); ???????????} ???????} ???????// 返回schema对象 ???????return schema; ???} ???/** ????* 序列化方法,将对象序列化为字节数组(对象 ---> 字节数组) ????* ?????* @param obj ????* @return ????*/ ???@SuppressWarnings("unchecked") ???public static <T> byte[] serialize(T obj) { ???????// 获取泛型对象的类型 ???????Class<T> clazz = (Class<T>) obj.getClass(); ???????// 创建泛型对象的schema对象 ???????RuntimeSchema<T> schema = RuntimeSchema.createFrom(clazz); ???????// 创建LinkedBuffer对象 ???????LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); ???????// 序列化 ???????byte[] array = ProtostuffIOUtil.toByteArray(obj, schema, buffer); ???????// 返回序列化对象 ???????return array; ???} ???/** ????* 反序列化方法,将字节数组反序列化为对象(字节数组 ---> 对象) ????* ?????* @param data ????* @param clazz ????* @return ????*/ ???public static <T> T deserialize(byte[] data, Class<T> clazz) { ???????// 创建泛型对象的schema对象 ???????RuntimeSchema<T> schema = RuntimeSchema.createFrom(clazz); ???????// 根据schema实例化对象 ???????T message = schema.newMessage(); ???????// 将字节数组中的数据反序列化到message对象 ???????ProtostuffIOUtil.mergeFrom(data, message, schema); ???????// 返回反序列化对象 ???????return message; ???}}

编码器与解码器开发

EchoEncoder.java

package cn.xpleaf.protostuff.netty.utils;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;/** * PojoEncoder继承自Netty中的MessageToByteEncoder类, * 并重写抽象方法encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) * 它负责将Object类型的POJO对象编码为byte数组,然后写入到ByteBuf中 * ?* @author yeyonghao * */public class EchoEncoder extends MessageToByteEncoder<Object> { ???@Override ???protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { ???????// 直接生成序列化对象 ???????// 需要注意的是,使用protostuff序列化时,不需要知道pojo对象的具体类型也可以进行序列化时 ???????// 在反序列化时,只要提供序列化后的字节数组和原来pojo对象的类型即可完成反序列化 ???????byte[] array = SerializationUtil2.serialize(msg); ???????out.writeBytes(array); ???}}

EchoDecoder.java

package cn.xpleaf.protostuff.netty.utils;import java.util.List;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;/** * PojoDecoder继承自Netty中的MessageToMessageDecoder类, * 并重写抽象方法decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) * 首先从数据报msg(数据类型取决于继承MessageToMessageDecoder时填写的泛型类型)中获取需要解码的byte数组 * 然后调用使用序列化工具类将其反序列化(解码)为Object对象 将解码后的对象加入到解码列表out中,这样就完成了解码操作 * ?* @author yeyonghao * */public class EchoDecoder extends MessageToMessageDecoder<ByteBuf> { ???// 需要反序列对象所属的类型 ???private Class<?> genericClass; ???// 构造方法,传入需要反序列化对象的类型 ???public EchoDecoder(Class<?> genericClass) { ???????this.genericClass = genericClass; ???} ???@Override ???protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { ???????// ByteBuf的长度 ???????int length = msg.readableBytes(); ???????// 构建length长度的字节数组 ???????byte[] array = new byte[length]; ???????// 将ByteBuf数据复制到字节数组中 ???????msg.readBytes(array); ???????// 反序列化对象 ???????Object obj = SerializationUtil2.deserialize(array, this.genericClass); ???????// 添加到反序列化对象结果列表 ???????out.add(obj); ???}}

Netty服务端程序开发

EchoServer.java

package cn.xpleaf.protostuff.netty.echoservice;import cn.xpleaf.protostuff.netty.pojo.EchoRequest;import cn.xpleaf.protostuff.netty.utils.EchoDecoder;import cn.xpleaf.protostuff.netty.utils.EchoEncoder;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class EchoServer { ???public void bind(int port) throws Exception { ???????// 配置服务端NIO线程组 ???????EventLoopGroup bossGroup = new NioEventLoopGroup(); ???????EventLoopGroup workerGroup = new NioEventLoopGroup(); ???????try { ???????????ServerBootstrap b = new ServerBootstrap(); ???????????b.group(bossGroup, workerGroup) ???????????????.channel(NioServerSocketChannel.class) ???????????????.option(ChannelOption.SO_BACKLOG, 1024) ???????????????.childHandler(new ChannelInitializer<SocketChannel>() { ???????????????????@Override ???????????????????protected void initChannel(SocketChannel ch) throws Exception { ???????????????????????// 添加编码器 ???????????????????????ch.pipeline().addLast(new EchoDecoder(EchoRequest.class)); ???????????????????????// 添加解码器 ???????????????????????ch.pipeline().addLast(new EchoEncoder()); ???????????????????????// 添加业务处理handler ???????????????????????ch.pipeline().addLast(new EchoServerHandler()); ???????????????????} ???????????????}); ???????????// 绑定端口,同步等待成功,该方法是同步阻塞的,绑定成功后返回一个ChannelFuture ???????????ChannelFuture f = b.bind(port).sync(); ???????????// 等待服务端监听端口关闭,阻塞,等待服务端链路关闭之后main函数才退出 ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????// 优雅退出,释放线程池资源 ???????????bossGroup.shutdownGracefully(); ???????????workerGroup.shutdownGracefully(); ???????} ???} ???public static void main(String[] args) throws Exception { ???????int port = 8080; ???????if(args != null && args.length > 0) { ???????????try { ???????????????port = Integer.valueOf(port); ???????????} catch (NumberFormatException e) { ???????????????// TODO: handle exception ???????????} ???????} ???????new EchoServer().bind(port); ???}}

EchoServerHandler.java

package cn.xpleaf.protostuff.netty.echoservice;import java.util.UUID;import cn.xpleaf.protostuff.netty.pojo.EchoRequest;import cn.xpleaf.protostuff.netty.pojo.EchoResponse;import cn.xpleaf.protostuff.netty.pojo.User;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class EchoServerHandler extends ChannelInboundHandlerAdapter { ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????// 接收到的对象的类型为EchoRequest ???????EchoRequest req = (EchoRequest) msg; ???????System.out.println(req.getRequestId() + " : " + req.getRequestObj()); ???????// 创建需要传输的user对象 ???????User user = new User(); ???????user.setName("server"); ???????user.setAge(10); ???????// 创建传输的user对象载体EchoRequest对象 ???????EchoResponse resp = new EchoResponse(); ???????// 设置responseId ???????resp.setResponseId(UUID.randomUUID().toString()); ???????// 设置需要传输的对象 ???????resp.setResponseObj(user); ???????// 设置需要传输的对象的类型 ???????resp.setResponseObjClass(resp.getResponseObj().getClass()); ???????// 调用writeAndFlush将数据发送到socketChannel ???????ctx.writeAndFlush(resp); ???} ???@Override ???public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ???????ctx.flush(); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ???????ctx.close(); ???}}

Netty客户端程序开发

EchoClient.java

package cn.xpleaf.protostuff.netty.echoservice;import cn.xpleaf.protostuff.netty.pojo.EchoResponse;import cn.xpleaf.protostuff.netty.utils.EchoDecoder;import cn.xpleaf.protostuff.netty.utils.EchoEncoder;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class EchoClient { ???public void connect(int port, String host) throws Exception { ???????// 配置客户端NIO线程组 ???????EventLoopGroup group = new NioEventLoopGroup(); ???????try { ???????????Bootstrap b = new Bootstrap(); ???????????b.group(group).channel(NioSocketChannel.class) ???????????????.option(ChannelOption.TCP_NODELAY, true) ???????????????// 设置TCP连接超时时间 ???????????????.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) ???????????????.handler(new ChannelInitializer<SocketChannel>() { ???????????????????@Override ???????????????????protected void initChannel(SocketChannel ch) throws Exception { ???????????????????????// 添加解码器 ???????????????????????ch.pipeline().addLast(new EchoDecoder(EchoResponse.class)); ???????????????????????// 添加编码器 ???????????????????????ch.pipeline().addLast(new EchoEncoder()); ???????????????????????// 添加业务处理handler ???????????????????????ch.pipeline().addLast(new EchoClientHandler()); ???????????????????} ???????????????}); ???????????// 发起异步连接操作(注意服务端是bind,客户端则需要connect) ???????????ChannelFuture f = b.connect(host, port).sync(); ???????????// 等待客户端链路关闭 ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????// 优雅退出,释放NIO线程组 ???????????group.shutdownGracefully(); ???????} ???} ???public static void main(String[] args) throws Exception { ???????int port = 8080; ???????if(args != null && args.length > 0) { ???????????try { ???????????????port = Integer.valueOf(port); ???????????} catch (NumberFormatException e) { ???????????????// 采用默认值 ???????????} ???????} ???????new EchoClient().connect(port, "localhost"); ???}}

EchoClientHandler.java

package cn.xpleaf.protostuff.netty.echoservice;import java.util.UUID;import cn.xpleaf.protostuff.netty.pojo.EchoRequest;import cn.xpleaf.protostuff.netty.pojo.EchoResponse;import cn.xpleaf.protostuff.netty.pojo.User;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;public class EchoClientHandler extends ChannelInboundHandlerAdapter { ???@Override ???public void channelActive(ChannelHandlerContext ctx) { ???????// 创建需要传输的user对象 ???????User user = new User(); ???????user.setName("client"); ???????user.setAge(10); ???????// 创建传输的user对象载体EchoRequest对象 ???????EchoRequest req = new EchoRequest(); ???????// 设置requestId ???????req.setRequestId(UUID.randomUUID().toString()); ???????// 设置需要传输的对象 ???????req.setRequestObj(user); ???????// 设置需要传输的对象的类型 ???????req.setRequestObjClass(req.getRequestObj().getClass()); ???????// 调用writeAndFlush将数据发送到socketChannel ???????ctx.writeAndFlush(req); ???} ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????// 接收到的对象的类型为EchoResponse ???????EchoResponse resp = (EchoResponse) msg; ???????System.out.println(resp.getResponseId() + " : " + resp.getResponseObj()); ???} ???@Override ???public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ???????ctx.flush(); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????ctx.close(); ???}}

POJO

EchoRequest.java

package cn.xpleaf.protostuff.netty.pojo;/** * EchoRequest是client向server端发送数据的传输载体,将需要进行传输的pojo对象统一封装到EchoRequest对象中, * 这样会为编解码工作带来很大的方便性和统一性,同时也可以携带其它信息, 对于后面对程序进行扩展会有非常大的帮助 * ?* @author yeyonghao * */public class EchoRequest { ???private String requestId; ???private Object requestObj; ???private Class<?> requestObjClass; ???public String getRequestId() { ???????return requestId; ???} ???public void setRequestId(String requestId) { ???????this.requestId = requestId; ???} ???public Object getRequestObj() { ???????return requestObj; ???} ???public void setRequestObj(Object requestObj) { ???????this.requestObj = requestObj; ???} ???public Class<?> getRequestObjClass() { ???????return requestObjClass; ???} ???public void setRequestObjClass(Class<?> requestObjClass) { ???????this.requestObjClass = requestObjClass; ???}}

EchoResponse.java

package cn.xpleaf.protostuff.netty.pojo;/** * EchoResponse是server向client端发送数据的传输载体,将需要进行传输的pojo对象统一封装到EchoResponse对象中, * 这样会为编解码工作带来很大的方便性和统一性,同时也可以携带其它信息, 对于后面对程序进行扩展会有非常大的帮助 * ?* @author yeyonghao * */public class EchoResponse { ???private String responseId; ???private Object responseObj; ???private Class<?> responseObjClass; ???public String getResponseId() { ???????return responseId; ???} ???public void setResponseId(String responseId) { ???????this.responseId = responseId; ???} ???public Object getResponseObj() { ???????return responseObj; ???} ???public void setResponseObj(Object responseObj) { ???????this.responseObj = responseObj; ???} ???public Class<?> getResponseObjClass() { ???????return responseObjClass; ???} ???public void setResponseObjClass(Class<?> responseObjClass) { ???????this.responseObjClass = responseObjClass; ???}}

User.java

package cn.xpleaf.protostuff.netty.pojo;public class User { ???private String name; ???private int age; ???public User() { ???} ???public User(String name, int age) { ???????this.name = name; ???????this.age = age; ???} ???public String getName() { ???????return name; ???} ???public void setName(String name) { ???????this.name = name; ???} ???public int getAge() { ???????return age; ???} ???public void setAge(int age) { ???????this.age = age; ???} ???@Override ???public String toString() { ???????return "User [name=" + name + ", age=" + age + "]"; ???}}

测试

分别执行EchoServer.javaEchoClient.java,服务端和客户端输出如下:

服务端:

4b76d70d-7a31-4738-8daa-ca4f40483e7e : User [name=client, age=10]

客户端:

e40b6e34-33a3-485e-bb8f-7157ee324e97 : User [name=server, age=10]

附录:pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ???xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> ???<modelVersion>4.0.0</modelVersion> ???<groupId>cn.xpleaf</groupId> ???<artifactId>Chapter08</artifactId> ???<version>0.0.1-SNAPSHOT</version> ???<dependencies> ???????<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java --> ???????<dependency> ???????????<groupId>com.google.protobuf</groupId> ???????????<artifactId>protobuf-java</artifactId> ???????????<version>3.5.1</version> ???????</dependency> ???????<!-- https://mvnrepository.com/artifact/io.netty/netty-all --> ???????<dependency> ???????????<groupId>io.netty</groupId> ???????????<artifactId>netty-all</artifactId> ???????????<version>4.1.21.Final</version> ???????</dependency> ???????<!-- https://mvnrepository.com/artifact/com.dyuproject.protostuff/protostuff-core --> ???????<dependency> ???????????<groupId>com.dyuproject.protostuff</groupId> ???????????<artifactId>protostuff-core</artifactId> ???????????<version>1.1.3</version> ???????</dependency> ???????<!-- https://mvnrepository.com/artifact/com.dyuproject.protostuff/protostuff-runtime --> ???????<dependency> ???????????<groupId>com.dyuproject.protostuff</groupId> ???????????<artifactId>protostuff-runtime</artifactId> ???????????<version>1.1.3</version> ???????</dependency> ???</dependencies> ???<build> ???????<plugins> ???????????<!-- java编译插件 --> ???????????<plugin> ???????????????<groupId>org.apache.maven.plugins</groupId> ???????????????<artifactId>maven-compiler-plugin</artifactId> ???????????????<version>3.2</version> ???????????????<configuration> ???????????????????<source>1.8</source> ???????????????????<target>1.8</target> ???????????????????<encoding>UTF-8</encoding> ???????????????</configuration> ???????????</plugin> ???????</plugins> ???</build></project>

存在的问题及解决方案

上面其实传输的对象是EchoRequestEchoResponse,虽然其中封装了返回的对象,但由于其定义的类型为Object类型,在上面的例子当中,其实际上是一个User对象,在这个简单的例子当中是可以通过类型转换来进行向下转型的,但实际使用时,封装的Object对象不一定是User对象,但又需要做向下转型,该如何解决这个问题呢?通过动态代理技术可以解决这个问题,后面会进一步改进这个例子使其更具有通用性。

开发基于protostuff编解码技术的Netty程序:传输pojo对象

原文地址:http://blog.51cto.com/xpleaf/2071808

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved