基本功能:与客户端建立连接后立刻发送当前时间
先建立一个时间的类
package timeExample;import java.sql.Date;public class UnixTime { ???private final long value; ???????public UnixTime() { ???????this(System.currentTimeMillis()/1000L); ???} ???????public UnixTime(long value) { ???????this.value = value; ???} ???????????public long value() { ???????return value; ???} ???????????@Override ???public String toString() { ???????return new Date((value())*1000L ).toString(); ???????????} ???public static void main(String[] arg) { ???????System.out.println(new UnixTime()); ???}}
服务端代码:
package timeExample;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;public class TimeEncoder extends MessageToByteEncoder<UnixTime> { ???@Override ???protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) throws Exception { ???????// TODO Auto-generated method stub ???????out.writeInt((int)msg.value()); ???} ???????}/** * ?* 这个类同样可以这样实现: * public class TimeEncoder extends ChannelOutboundHandlerAdapter { ???@Override ???public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ???????UnixTime m = (UnixTime) msg; ???????ByteBuf encoded = ctx.alloc().buffer(4); ???????encoded.writeInt((int)m.value()); ???????ctx.write(encoded, promise); // (1) ???}} * 可以看到MessageToByteEncoder这个类封装了一些必要且固定的代码 * ?* ?* ?* ?* ?* ?* ?* ?* ?* ?* ?* **/
package timeExample;import java.beans.EventHandler;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelOutboundHandlerAdapter;public class TImeServerHandler extends ChannelInboundHandlerAdapter{ ???//之所以用ChannelActive是因为这个timeServer的作用是在连接建立后立刻给客户端发送时间,而不接收客户端发送的消息 ???//channelActive() method will be invoked when a connection is established and ready to generate traffic ???@Override ???public void channelActive(ChannelHandlerContext ctx) throws Exception { ???????ChannelFuture future=ctx.writeAndFlush(new UnixTime()); ???????//使用的是JavaNio,因此它是非阻塞的,writeAndFlush可能还未完成就返回结果,ChannelFuture是标志这个操作的状态 ???????//因此可以添加监听器监听这个操作状态 ???????future.addListener(ChannelFutureListener.CLOSE);//一旦监听听监听到操作完成就关闭连接。这个是下面那段代码的简写 ?????/* ?future.addListener(new ChannelFutureListener() { ???????????@Override ???????????public void operationComplete(ChannelFuture future) { ???????????????assert f == future; ???????????????ctx.close(); ???????????} ???????}); */ ???????????} ???}
package timeExample;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.util.concurrent.Future;//without receiving any requests and closes the connection once the message is sent//close the connection on completion.public class TimeServer { ???public int port; ???public TimeServer(int port) { ???????this.port=port; ???????????} ???public void run() throws InterruptedException { ???????ServerBootstrap b=new ServerBootstrap(); ???????EventLoopGroup boss=new NioEventLoopGroup(); ???????EventLoopGroup worker= new NioEventLoopGroup(); ???????try { ???????b.group(boss,worker).channel(NioServerSocketChannel.class) ???????.childHandler(new ChannelInitializer<Channel>() { ???????????@Override ???????????protected void initChannel(Channel ch) throws Exception { ???????????????ch.pipeline().addLast(new TimeEncoder(),new TImeServerHandler());// TODO Auto-generated method stub ???????????????????????????} ???????}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); ???????ChannelFuture future=b.bind(port).sync(); ???????future.channel().closeFuture().sync(); ???} ???finally { ???????worker.shutdownGracefully(); ???????boss.shutdownGracefully(); ???????????}} ???public static void main(String arg[]) throws InterruptedException { ???????new TimeServer(10111).run(); ???}}
客户端代码:
package timeExample;import java.util.List;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;/** * netty将读到的消息缓存到一个buf里,有时候一个完整的消息就会被碎片化到不同的buf里了 * 使用ByteToMessageDecoder来解决这个问题 * ?* */public class TimeDecoder extends ByteToMessageDecoder { ???@Override ???protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { ???????// TODO Auto-generated method stub ???????????????if(in.readableBytes()<4) ???????????return;//每当有新的data到来时decode()方法就会被调用 ???????UnixTime t=new UnixTime(in.readUnsignedInt()); ???????out.add(t); ???????????}}
package timeExample;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPromise;public class TimeClientHandler extends ChannelInboundHandlerAdapter{@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {UnixTime t=(UnixTime)msg; ???????System.out.println(t); ???????ctx.close(); ???}}
package timeExample;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class TimeClient { ???private String address; ???private int port; ???TimeClient(String address,int port){ ???????this.address=address; ???????this.port=port; ???????????} ???public void run() throws InterruptedException { ???????????????EventLoopGroup worker=new NioEventLoopGroup(); ???????Bootstrap b=new Bootstrap(); ???????try { ???????b.group(worker).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() { ???????????@Override ???????????protected void initChannel(Channel ch) throws Exception { ???????????????// TODO Auto-generated method stub ???????????????ch.pipeline().addLast(new TimeDecoder(),new TimeClientHandler()); ???????????????????????????} ???????}).option(ChannelOption.SO_KEEPALIVE, true); ???????ChannelFuture future =b.connect(address, port).sync(); ???????future.channel().closeFuture().sync(); ???????????????????}finally { ???????worker.shutdownGracefully();//当EventLoopGroup shutdown以后所有的Channel才会shutdown。 ???} ???????????} ???public static void main(String[] arg) throws InterruptedException { ???????new TimeClient("localhost",10111).run(); ???}}
参考:netty官方文档:Netty.docs: User guide for 4.x
基于netty的一个简单的时间服务器的实现(netty学习)
原文地址:https://www.cnblogs.com/cai-cai777/p/10274389.html