分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 网页技术

基于Netty的时间服务器程序代码

发布时间:2023-09-06 01:42责任编辑:彭小芳关键词:暂无标签
[toc]


基于Netty的时间服务器程序代码

程序代码来自于《Netty权威指南》第三章,不过我都加了注释,所以看起来会非常好理解。需要注意的是,《Netty权威指南》中TimeServerHandler类继承的是ChannelHandlerAdapter,因为其使用的是Netty 5.x的版本,该版本现在已经被官方废弃,而我使用的是Netty 4.x的,所以如果继承ChannelHandlerAdapter,启动程序时不会报错,但是无法正常工作,即handler不生效这点尤其需要注意。

服务端代码

TimeServer.java:

package cn.xpleaf.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class TimeServer { ???/** ????* 绑定端口号,启动Netty服务端 ????* @param port ????* @throws Exception ????*/ ???public void bind(int port) throws Exception { ???????// NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理 ???????// 实际上它们就是Reactor线程组,关于Reactor,这是一种设计模型, ???????// Reactor模型就是将消息放到了一个队列中,通过异步线程池对其进行消费 ???????// 在Netty中就是,NioEventLoopGroup就是Reactor,ChannelHandler就是Reactor模型中的handler ???????// 可以参考文章:http://www.ivaneye.com/2016/07/23/iomodel.html ???????// 这里创建了两个线程组,一个用于服务端接受客户端的连接,一个用于进行SocketChannel的网络读写 ???????EventLoopGroup bossGroup = new NioEventLoopGroup(); ???????EventLoopGroup workerGroup = new NioEventLoopGroup(); ???????try { ???????????// 创建NIO服务端的辅助启动类,目的是降低服务端的开发复杂度 ???????????ServerBootstrap b = new ServerBootstrap(); ???????????// 将两个NIO线程组作为参数传递到ServerBootstrap中 ???????????b.group(bossGroup, workerGroup) ???????????????// NioServerSocketChannel对应JDK NIO类库中的ServerSocketChannel ???????????????.channel(NioServerSocketChannel.class) ???????????????// 设置NioServerSocketChannel的TCP参数 ???????????????.option(ChannelOption.SO_BACKLOG, 1024) ???????????????// 绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的Handler类 ???????????????// 主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等 ???????????????.childHandler(new ChildChannelHandler()); ???????????// 绑定端口,同步等待成功,该方法是同步阻塞的,绑定成功后返回一个ChannelFuture ???????????ChannelFuture f = b.bind(port).sync(); ???????????// 等待服务端监听端口关闭,阻塞,等待服务端链路关闭之后main函数才退出 ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????// 优雅退出,释放线程池资源 ???????????bossGroup.shutdownGracefully(); ???????????workerGroup.shutdownGracefully(); ???????} ???} ???/** ????* 实现initChannel方法,其作用是当创建NioServerSocketChannel成功之后, ????* 在进行初始化是,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件 ????* @author yeyonghao ????* ????*/ ???private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { ???????@Override ???????protected void initChannel(SocketChannel ch) throws Exception { ???????????ch.pipeline().addLast(new TimeServerHandler()); ???????} ???} ???public static void main(String[] args) throws Exception { ???????int port = 8080; ???????if(args != null && args.length > 0) { ???????????try { ???????????????port = Integer.valueOf(port); ???????????} catch (NumberFormatException e) { ???????????????// TODO: handle exception ???????????} ???????} ???????new TimeServer().bind(port); ???}}

TimeServerHandler.java:

package cn.xpleaf.netty;import java.sql.Date;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * 这里需要注意,《Netty权威指南》中TimeServerHandler类继承的是ChannelHandlerAdapter, * 因为其使用的是Netty 5.x的版本,该版本现在已经被官方废弃,而我使用的是Netty 4.x的, * 所以如果继承ChannelHandlerAdapter,启动程序时不会报错,但是无法正常工作,即handler不生效 * 这点尤其需要注意。 * @author yeyonghao * */public class TimeServerHandler extends ChannelInboundHandlerAdapter { ???/** ????* 对网络事件进行写操作 ????*/ ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????// Netty中的ByteBuf类似于nio类库中的ByteBuffer,不过功能更强大 ???????ByteBuf buf = (ByteBuf) msg; ???????// 创建与buf一样大小的字节数组 ???????byte[] req = new byte[buf.readableBytes()]; ???????// 将数据从buf中复制到req中 ???????buf.readBytes(req); ???????String body = new String(req, "utf-8"); ???????System.out.println("The time server receive order : " + body); ???????String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? ????????????????new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; ???????// 创建ByteBuf对象 ???????ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ???????// 通过ChannelHandlerContext的write方法异步发送给客户端 ???????ctx.write(resp); ???} ???/** ????* 写操作完成之后进行的操作 ????*/ ???@Override ???public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ???????// ctx.write()只是将消息放到发送缓冲数组中,调用flush就将其发送缓冲区中的消息全部写到SocketChannel中 ???????ctx.flush(); ???} ???/** ????* 释放资源 ????*/ ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ???????ctx.close(); ???}}

客户端代码

TimeClient.java:

package cn.xpleaf.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class TimeClient { ???public void connect(int port, String host) throws Exception { ???????// 配置客户端NIO线程组 ???????EventLoopGroup group = new NioEventLoopGroup(); ???????try { ???????????// 客户端是Bootstrap,注意客户端的bootstrap只能传入一个NIO线程组 ???????????Bootstrap b = new Bootstrap(); ???????????// 客户端是NioSocketChannel ???????????b.group(group).channel(NioSocketChannel.class) ???????????????.option(ChannelOption.TCP_NODELAY, true) ???????????????// 与服务端一样的,只不过这里使用了匿名内部类 ???????????????.handler(new ChannelInitializer<SocketChannel>() { ???????????????????@Override ???????????????????protected void initChannel(SocketChannel ch) throws Exception { ???????????????????????ch.pipeline().addLast(new TimeClientHandler()); ???????????????????} ???????????????}); ???????????// 发起异步连接操作(注意服务端是bind,客户端则需要connect) ???????????ChannelFuture f = b.connect(host, port).sync(); ???????????// 等待客户端链路关闭 ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????// 优雅退出,释放NIO线程组 ???????????group.shutdownGracefully(); ???????} ???} ???public static void main(String[] args) throws Exception { ???????int port = 8080; ???????if(args != null && args.length > 0) { ???????????try { ???????????????port = Integer.valueOf(port); ???????????} catch (NumberFormatException e) { ???????????????// 采用默认值 ???????????} ???????} ???????new TimeClient().connect(port, "localhost"); ???}}

TimeClientHandler.java:

package cn.xpleaf.netty;import java.util.logging.Logger;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * 该类的方法与功能与服务端的handler类似 * @author yeyonghao * */public class TimeClientHandler extends ChannelInboundHandlerAdapter { ???private static final Logger logger = Logger.getLogger(TimeServerHandler.class.getName()); ???private ByteBuf firstMessage; ???/** ????* 初始化需要发送的数据 ????*/ ???public TimeClientHandler() { ???????byte[] req = "QUERY TIME ORDER".getBytes(); ???????firstMessage = Unpooled.buffer(req.length); ???????firstMessage.writeBytes(req); ???} ???/** ????* 当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法 ????*/ ???@Override ???public void channelActive(ChannelHandlerContext ctx) { ???????// 将请求消息发送给服务端 ???????ctx.writeAndFlush(firstMessage); ???} ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????ByteBuf buf = (ByteBuf)msg; ???????byte[] req = new byte[buf.readableBytes()]; ???????buf.readBytes(req); ???????String body = new String(req, "utf-8"); ???????System.out.println("Now is : " + body); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????logger.warning("Unexpected exception from downstream : "); ???????ctx.close(); ???}}

基于Netty的时间服务器程序代码

原文地址:http://blog.51cto.com/xpleaf/2070997

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved