Netty用户指南
一、前言
1.问题
当今世界我们需要使用通用的软件或库与其他组件进行通信,例如使用HTTP客户端从服务器中获取信息,或通过网络服务调用一个远程的方法。然而通用的协议及其实现通常不具备较好的伸缩性。所以问题看起来是我们怎么不使用通用的HTTP服务器去传输大文件、e-mail、实事数据、多媒体数据等。我们需要的是针对特定问题而进行优化的协议实现。例如我们可能需要重新实现一个HTTP服务器来与AJAX的客户端进行通信。另外一种情况是需要处理历史遗留的协议保证与旧的系统兼容。这些例子的关键在于怎样快速的实现协议而不损失目标系统的稳定性和性能。
2.解决方案
Netty是一个异步事件驱动的网络应用框架,可以用来快速开发可维护的、高性能、可扩展的协议服务器和客户端。
换句话说,Netty是一个基于NIO的客户端和服务器框架,可以简单快速的开发网络应用程序,如协议的客户端和服务器。它极大的简化了TCP、UDP服务器之类的网络编程。
二、开始
1.编写DiscardServer
最简单的协议并不是“hello world”,而是丢弃。丢弃协议会丢弃任何接受到的数据不做任何的响应。
要实现丢弃协议,需要做的就是丢弃任何接收到的数据。首先从handler的实现开始,handler会处理由Netty产生的I/O事件。
package io.netty.example.discard;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * Handles a server-side channel. */public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) ???????// Discard the received data silently. ???????((ByteBuf) msg).release(); // (3) ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) ???????// Close the connection when an exception is raised. ???????cause.printStackTrace(); ???????ctx.close(); ???}}
DiscardServerHandler
继承了ChannelInboundHandlerAdapter
,而他又实现了ChannelInboundHandler
,ChannelInboundHandler
提供了不同的事件处理方法,你可以根据需要去覆写相应的方法。ChannelInboundHandlerAdapter
提供了一些默认的实现,所以在这个例子中只需要去继承它就可以了。- 覆写了
channelRead
方法,Netty从客户端收到数据时就会调用该方法。消息的类型是ByteBuf
。 ByteBuf
是一个引用计数对象,需要进行手动的释放。需要注意的是,handler需要释放任何传递给他的引用计数对象。通常情况下channelRead()
方法通常的实现方式如下:
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { ???try { ???????// Do something with msg ???} finally { ???????ReferenceCountUtil.release(msg); ???}}
- 由于IO错误Netty抛出异常或handle处理事件抛出异常,都会使
exceptionCaught()
方法被调用。在大多数情况下,都需要对异常记日志,并且关闭相关连的channel
。
到目前为止实现了DISCARD服务的一般,接下来需要实现main()
方法来启动服务。
package io.netty.example.discard; ???import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel; ???/** * Discards any incoming data. */public class DiscardServer { ???????private int port; ???????public DiscardServer(int port) { ???????this.port = port; ???} ???????public void run() throws Exception { ???????EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) ???????EventLoopGroup workerGroup = new NioEventLoopGroup(); ???????try { ???????????ServerBootstrap b = new ServerBootstrap(); // (2) ???????????b.group(bossGroup, workerGroup) ????????????.channel(NioServerSocketChannel.class) // (3) ????????????.childHandler(new ChannelInitializer<SocketChannel>() { // (4) ????????????????@Override ????????????????public void initChannel(SocketChannel ch) throws Exception { ????????????????????ch.pipeline().addLast(new DiscardServerHandler()); ????????????????} ????????????}) ????????????.option(ChannelOption.SO_BACKLOG, 128) ?????????// (5) ????????????.childOption(ChannelOption.SO_KEEPALIVE, true); // (6) ???????????????// Bind and start to accept incoming connections. ???????????ChannelFuture f = b.bind(port).sync(); // (7) ???????????????// Wait until the server socket is closed. ???????????// In this example, this does not happen, but you can do that to gracefully ???????????// shut down your server. ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????workerGroup.shutdownGracefully(); ???????????bossGroup.shutdownGracefully(); ???????} ???} ???????public static void main(String[] args) throws Exception { ???????int port; ???????if (args.length > 0) { ???????????port = Integer.parseInt(args[0]); ???????} else { ???????????port = 8080; ???????} ???????new DiscardServer(port).run(); ???}}
NioEventLoopGroup
是一个多线程的事件循环,用来处理I/O操作。Netty为不同的通信方式提供了多种EventLoopGroup
实现。在本例中,我们只需要实现服务器端的应用,所以需要两个NioEventLoopGroup
。第一个通常称为boss
,用来接收客户端的链接请求。第二个称为worker
,用来处理boss
已接收连接的I/O请求和把接收的连接注册到worker
。ServerBootstrap
是用来创建服务器的辅助类。- 使用
NioServerSocketChannel
类来实例化channel
,用来接收连接请求。 - 在这里设置的handler会被每一个新
channel
调用,ChannelInitializer
是一个特殊的handler用来配置一个新的channel。在本例中,我们将DiscardServerHandler
添加到新channel 的管道中。随着应用程序的复杂度增加,可能会向管道中加入更多的handler。 - 可以通过
option()
方法给channel设置一些参数。 option()
方法是用来设置NioServerSocketChannel
参数的,而childOption()
是给接收的连接设置参数的。- 剩下的就是绑定端口然后启动服务了。
2. 测试DiscardServer是否成功
最简单的方法是使用telnet命令。例如输入telnet localhost 8080
。DiscarServer丢弃了任何接受的数据,我们可以把DiscardServer的接收的数据打印出来。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { ???ByteBuf in = (ByteBuf) msg; ???try { ???????while (in.isReadable()) { // (1) ???????????System.out.print((char) in.readByte()); ???????????System.out.flush(); ???????} ???} finally { ???????ReferenceCountUtil.release(msg); // (2) ???}}
- 循环可以等价于
System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
。 - 等价于
in.release()
3.写一个Echo Server
一个服务器通常需要对请求作出响应,而一个Echo服务仅仅需要做的是把请求的内容返回给客户端。
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { ???ctx.write(msg); // (1) ???ctx.flush(); // (2)}
ChannelHandlerContext
对象提供了各种出发IO时间的操作。通过调用write(Object)
方法把数据发给客户端。在这里没有手动的释放msg,这是因为当把msg写入时Netty会自动的释放它。ctx.write(Object)
并不会把数据写到外部,而是在内部的缓冲区中,通过调用ctx.flush()
把数据刷出到外部。可以简洁的调用ctx.wirteAndFlush(msg)
达到同样的效果。
4. 写一个Timer Server
TIME协议与前面的例子不同之处在于,它发送一个32位的整数,不接收任何请求,并且只要消息发送了就立刻关闭连接。
因为我们不需要接收任何数据,而且在连接建立时就发送数据,所以不能使用channelRead()
方法。需要覆写channelActive()
方法
package io.netty.example.time;public class TimeServerHandler extends ChannelInboundHandlerAdapter { ???@Override ???public void channelActive(final ChannelHandlerContext ctx) { // (1) ???????final ByteBuf time = ctx.alloc().buffer(4); // (2) ???????time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); ???????????????final ChannelFuture f = ctx.writeAndFlush(time); // (3) ???????f.addListener(new ChannelFutureListener() { ???????????@Override ???????????public void operationComplete(ChannelFuture future) { ???????????????assert f == future; ???????????????ctx.close(); ???????????} ???????}); // (4) ???} ???????@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ???????cause.printStackTrace(); ???????ctx.close(); ???}}
当一个连接建立时,
activeChannel()
方法会被调用,然后写一个32位的整数。为了发送一个新的信息,需要分配一个缓冲区。通过调用
ctx.alloc()
获取ByteBufAllocator
来分配缓冲区。在Netty中的Buffer不需要像Java NIO一样调用
flip()
,这是因为Netty中的Buffer具有两个指针,分别用于读写操作。当进行写操作时写指针在移动而读指针不移动,读写指针分别代表数据的开始和结束。另外需要指出的是,
ctx.write()
返回一个ChannelFuture
对象,该对象代表着一个还未发生的IO操作。这意味着,任何一个请求操作可能都未发生,这是因为在Netty中,所有操作都是异步的。例如下面的代码可能在发送信息前关闭连接:Channel ch = ...;ch.writeAndFlush(message);ch.close();
所以要在
ChannelFuture
完成前调用close()
,当操作完成时,ChannelFuture
会通知他的监听器。close()
可能也不会立即关闭连接。本例中添加一个匿名内部类作为监听器,来关闭连接。也可以使用预定义的监听器:
f.addListener(ChannelFutureListener.CLOSE);
5.Time Client
不同于DISCARD和ECHO,TIME协议需要一个客户端将32位的整数转为一个日期。Netty中的客户端和服务器最大的不同在于使用了不同的BootStrap
和Channel
现实。
package io.netty.example.time;public class TimeClient { ???public static void main(String[] args) throws Exception { ???????String host = args[0]; ???????int port = Integer.parseInt(args[1]); ???????EventLoopGroup workerGroup = new NioEventLoopGroup(); ???????????????try { ???????????Bootstrap b = new Bootstrap(); // (1) ???????????b.group(workerGroup); // (2) ???????????b.channel(NioSocketChannel.class); // (3) ???????????b.option(ChannelOption.SO_KEEPALIVE, true); // (4) ???????????b.handler(new ChannelInitializer<SocketChannel>() { ???????????????@Override ???????????????public void initChannel(SocketChannel ch) throws Exception { ???????????????????ch.pipeline().addLast(new TimeClientHandler()); ???????????????} ???????????}); ???????????// Start the client. ???????????ChannelFuture f = b.connect(host, port).sync(); // (5) ???????????// Wait until the connection is closed. ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????workerGroup.shutdownGracefully(); ???????} ???}}
BootStap
和ServerBootStrap
很相似,但它是用于客户端的。- 只需指定一个
EventLoopGroup
,在客户端中不需要boss。 - 使用
NioSocketChannel
而不是NioServerSocketChannel
。 - 不需要
childOption()
。 - 使用
connect()
方法而不是bind()
在TimeClientHandler
中,将整数翻译成日期格式的类型。
package io.netty.example.time;import java.util.Date;public class TimeClientHandler extends ChannelInboundHandlerAdapter { ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) { ???????ByteBuf m = (ByteBuf) msg; // (1) ???????try { ???????????long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; ???????????System.out.println(new Date(currentTimeMillis)); ???????????ctx.close(); ???????} finally { ???????????m.release(); ???????} ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ???????cause.printStackTrace(); ???????ctx.close(); ???}}
6.处理基于流的传输问题。
TCP/IP协议接收数据并储存到Socket缓冲区中,但是缓冲区不是数据包的队列,而是字节的队列,这意味着你发送了两条消息,但操作系统会并不认为是两条消息而是一组字节。所以在读数据时并不能确定读到了对方发过来的数据。
在TIME协议中,在调用m.readUnsignedInt()
时缓冲区中需要有四个字节,如果缓冲区中还未接收到四个字节时就会抛出异常。
解决方法是,再加一个ChannelHandle
到ChannelPipeline
。该handler专门处理编码问题。
package io.netty.example.time;public class TimeDecoder extends ByteToMessageDecoder { // (1) ???@Override ???protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) ???????if (in.readableBytes() < 4) { ???????????return; // (3) ???????} ???????out.add(in.readBytes(4)); // (4) ???}}
ByteToMessageDecoder
是ChannelInboundHandler
的一个实现,专门用于编码问题。- 当新的数据到达时,Netty会调用decode方法,并且其内部维护着一个累加Buffer。
- 当累加Buffer中没有足够的数据时,可以不在out中添加任何数据。当新数据到达后Netty又会调用decode方法。
- 如果
decode()
添加一个对象到out中,意味着编码信息成功了。Netty会丢弃Buffer中已读取的部分数据。
把TimeDecoder
添加到ChannelPipeline
中:
b.handler(new ChannelInitializer<SocketChannel>() { ???@Override ???public void initChannel(SocketChannel ch) throws Exception { ???????ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); ???}});
另外一种更简单的方式是使用ReplayingDecoder
public class TimeDecoder extends ReplayingDecoder<Void> { ???@Override ???protected void decode( ???????????ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { ???????out.add(in.readBytes(4)); ???}}
当调用in.readBytes(4)
抛出异常时,ReplayingDecoder
会捕捉异常并重复执行decode()
7.使用POJO代替ByteBuf
在之前的TIME服务中,都是直接使用ByteBuf作为协议的数据结构。在Handler中使用POJO对象,可以把从ByteBuf抽取POJO的代码分离开。
首先定义UnixTime类:
package io.netty.example.time;import java.util.Date;public class UnixTime { ???private final long value; ???????public UnixTime() { ???????this(System.currentTimeMillis() / 1000L + 2208988800L); ???} ???????public UnixTime(long value) { ???????this.value = value; ???} ???????????public long value() { ???????return value; ???} ???????????@Override ???public String toString() { ???????return new Date((value() - 2208988800L) * 1000L).toString(); ???}}
在TimeDecoder
中解码产生UnixTime
对象
@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { ???if (in.readableBytes() < 4) { ???????return; ???} ???out.add(new UnixTime(in.readUnsignedInt()));}
在TimeClientHandler
中不再需要使用ByteBuf
了。
在服务器端,首先更改TimeServerHandler
@Overridepublic void channelActive(ChannelHandlerContext ctx) { ???ChannelFuture f = ctx.writeAndFlush(new UnixTime()); ???f.addListener(ChannelFutureListener.CLOSE);}
还需要创建一个编码器,将UnixTime
转为ByteBuf
以便网络传输
public class TimeEncoder extends MessageToByteEncoder<UnixTime> { ???@Override ???protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { ???????out.writeInt((int)msg.value()); ???}}
netty使用指南
原文地址:https://www.cnblogs.com/mler/p/9445202.html