首先是使用java原生nio类库编写的例子,开发一套nio框架不简单,所以选择了netty,该例完成后,是netty举例。
package com.smkj.netty;public class TimeServer { ???public static void main(String[] args) { ???????int port = 8080; ???????if(args!=null&&args.length!=0) { ???????????try { ???????????????port = Integer.valueOf(args[0]); ???????????} catch (NumberFormatException e) { ???????????????//采用默认值 ???????????} ???????} ???????????????MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); ???????new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); ???}}
package com.smkj.netty;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class MultiplexerTimeServer implements Runnable { ???private Selector selector; ???private ServerSocketChannel servChannel; ???private volatile boolean stop; ???/** ????* 初始化多路复用器,绑定监听端口 ????* ?????* @param port ????*/ ???public MultiplexerTimeServer(int port) { ???????try { ???????????selector = Selector.open(); ???????????servChannel = ServerSocketChannel.open(); ???????????servChannel.configureBlocking(false); ???????????servChannel.socket().bind(new InetSocketAddress(port), 1024); ???????????servChannel.register(selector, SelectionKey.OP_ACCEPT); ???????????System.out.println("The time server is start in port:" + port); ???????} catch (IOException e) { ???????????e.printStackTrace(); ???????????System.exit(1); ???????} ???} ???public void stop() { ???????this.stop = true; ???} ???@Override ???public void run() { ???????while (!stop) { ???????????try { ???????????????selector.select(1000); ???????????????Set<SelectionKey> selectedKeys = selector.selectedKeys(); ???????????????Iterator<SelectionKey> it = selectedKeys.iterator(); ???????????????SelectionKey key = null; ???????????????while (it.hasNext()) { ???????????????????key = it.next(); ???????????????????it.remove(); ???????????????????try { ???????????????????????handleInput(key); // 处理key ???????????????????} catch (Exception e) { ???????????????????????// TODO: handle exception ???????????????????????if (key != null) { ???????????????????????????key.cancel(); ???????????????????????????if (key.channel() != null) { ???????????????????????????????key.channel().close(); ???????????????????????????} ???????????????????????} ???????????????????} ???????????????} ???????????} catch (Exception e) { ???????????????// TODO: handle exception ???????????????e.printStackTrace(); ???????????} ???????} ???????// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册,所以不需要重复释放资源 ???????if (selector != null) { ???????????try { ???????????????selector.close(); ???????????} catch (Exception e) { ???????????????// TODO: handle exception ???????????????e.printStackTrace(); ???????????} ???????} ???} ???private void handleInput(SelectionKey key) throws IOException { ???????if (key.isValid()) { ???????????// 处理新接入的请求消息 ???????????if (key.isAcceptable()) { ???????????????// Accept the new connection ???????????????ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); ???????????????SocketChannel sc = ssc.accept(); ???????????????sc.configureBlocking(false); ???????????????// Add the new connection to the selector ???????????????sc.register(selector, SelectionKey.OP_READ); ???????????} ???????????if (key.isReadable()) { ???????????????// Read the Data ???????????????SocketChannel sc = (SocketChannel) key.channel(); ???????????????ByteBuffer readBuffer = ByteBuffer.allocate(1024); ???????????????int readBytes = sc.read(readBuffer); ???????????????if (readBytes > 0) { ???????????????????readBuffer.flip(); ???????????????????byte[] bytes = new byte[readBuffer.remaining()]; ???????????????????readBuffer.get(bytes); ???????????????????String body = new String(bytes, "UTF-8"); ???????????????????System.out.println("the time server receive order:" + body); ???????????????????String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ???????????????????????????? new java.util.Date(System.currentTimeMillis()).toString() ???????????????????????????: "Bad Order"; ???????????????????????????doWrite(sc,currentTime); ???????????????}else if(readBytes<0) { ???????????????????//对端链路关闭 ???????????????????key.cancel(); ???????????????????sc.close(); ???????????????}else { ???????????????????//读到0字节 忽略 ???????????????????; ???????????????} ???????????} ???????} ???} ???????private void doWrite(SocketChannel channel,String response) throws IOException { ???????????????if(response!=null&&response.trim().length()>0) { ???????????byte[] bytes = response.getBytes(); ???????????ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); ???????????writeBuffer.put(bytes); ???????????writeBuffer.flip(); ???????????channel.write(writeBuffer); ??????????????if (!writeBuffer.hasRemaining()) ???????????????????System.out.println("Send response to client succeed."); ???????????????} ???????} ???????}
package com.smkj.netty;public class TimeClient {public static void main(String[] args) { ???int port = 8080; ???if(args!=null&&args.length!=0) { ???????try { ???????????port = Integer.valueOf(args[0]); ???????} catch (NumberFormatException e) { ???????????//采用默认值 ???????} ???} ???????new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();}}
package com.smkj.netty;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class TimeClientHandle implements Runnable { ???private String host; ???private int port; ???private Selector selector; ???private SocketChannel socketChannel; ???private volatile boolean stop; ????public TimeClientHandle(String host, int port) { ???this.host = host == null ? "127.0.0.1" : host; ???this.port = port; ???try { ???????selector = Selector.open(); ???????socketChannel = SocketChannel.open(); ???????socketChannel.configureBlocking(false); ???} catch (IOException e) { ???????e.printStackTrace(); ???????System.exit(1); ???} ???} ???/*022 ????* (non-Javadoc)023 ????*024 ????* @see java.lang.Runnable#run()025 ????*/ ???@Override ???public void run() { ???try { ???????doConnect(); ???} catch (IOException e) { ???????e.printStackTrace(); ???????System.exit(1); ???} ???while (!stop) { ???????try { ???????selector.select(1000); ???????Set<SelectionKey> selectedKeys = selector.selectedKeys(); ???????Iterator<SelectionKey> it = selectedKeys.iterator(); ???????SelectionKey key = null; ???????while (it.hasNext()) { ???????????key = it.next(); ???????????it.remove(); ???????????try { ???????????handleInput(key); ???????????} catch (Exception e) { ???????????if (key != null) { ???????????????key.cancel(); ???????????????if (key.channel() != null) ???????????????key.channel().close(); ???????????} ???????????} ???????} ???????} catch (Exception e) { ???????e.printStackTrace(); ???????System.exit(1); ???????} ???} ????// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 ???if (selector != null) ???????try { ???????selector.close(); ???????} catch (IOException e) { ???????e.printStackTrace(); ???????} ???} ????private void handleInput(SelectionKey key) throws IOException { ????if (key.isValid()) { ???????// 判断是否连接成功 ???????SocketChannel sc = (SocketChannel) key.channel(); ???????if (key.isConnectable()) { ???????if (sc.finishConnect()) { ???????????sc.register(selector, SelectionKey.OP_READ); ???????????doWrite(sc); ???????} else ???????????System.exit(1);// 连接失败,进程退出 ???????} ???????if (key.isReadable()) { ???????ByteBuffer readBuffer = ByteBuffer.allocate(1024); ???????int readBytes = sc.read(readBuffer); ???????if (readBytes > 0) { ???????????readBuffer.flip(); ???????????byte[] bytes = new byte[readBuffer.remaining()]; ???????????readBuffer.get(bytes); ???????????String body = new String(bytes, "UTF-8"); ???????????System.out.println("Now is : " + body); ???????????this.stop = true; ???????} else if (readBytes < 0) { ???????????// 对端链路关闭 ???????????key.cancel(); ???????????sc.close(); ???} else ???????????; // 读到0字节,忽略 ???????} ???} ????} ???private void doConnect() throws IOException { ???// 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答 ???if (socketChannel.connect(new InetSocketAddress(host, port))) { ???????socketChannel.register(selector, SelectionKey.OP_READ); ???????doWrite(socketChannel); ???} else ???????socketChannel.register(selector, SelectionKey.OP_CONNECT); ???} ???private void doWrite(SocketChannel sc) throws IOException { ???byte[] req = "QUERY TIME ORDER".getBytes(); ???ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); ???writeBuffer.put(req); ???writeBuffer.flip(); ???sc.write(writeBuffer); ???if (!writeBuffer.hasRemaining()) ???????System.out.println("Send order 2 server succeed."); ???}}
可以发现服务端的最后进行了remove()操作,将SelectionKey从迭代器中删除了,博主一开始总觉得很纳闷,SelectionKey中可是记录了相关的channel信息,如果将SelectionKey删除了,那不就代表着将通道信息也抹除了吗,那后续还怎么继续获取通道,说来惭愧,这问题问的确实缺乏水准。
后来博主理了理selector的思路,要知道,一码事归一码事,channel是注册在selector中的,在后面的轮询中,是先将已准备好的channel挑选出来,即selector.select(),再通过selectedKeys()生成的一个SelectionKey迭代器进行轮询的,一次轮询会将这个迭代器中的每个SelectionKey都遍历一遍,每次访问后都remove()相应的SelectionKey,但是移除了selectedKeys中的SelectionKey不代表移除了selector中的channel信息(这点很重要),注册过的channel信息会以SelectionKey的形式存储在selector.keys()中,也就是说每次select()后的selectedKeys迭代器中是不能还有成员的,但keys()中的成员是不会被删除的(以此来记录channel信息)。
那么为什么要删除呢,要知道,迭代器如果只需要访问的话,直接访问就好了,完全没必要remove()其中的元素啊,查询了相关资料,一致的回答是为了防止重复处理(大雾),后来又有信息说明:每次循环调用remove()是因为selector不会自己从已选择集合中移除selectionKey实例,必须在处理完通道时自己移除,这样,在下次select时,会将这个就绪通道添加到已选择通道集合中,其实到这里就已经可以理解了,selector不会自己删除selectedKeys()集合中的selectionKey,那么如果不人工remove(),将导致下次select()的时候selectedKeys()中仍有上次轮询留下来的信息,这样必然会出现错误,假设这次轮询时该通道并没有准备好,却又由于上次轮询未被remove()的原因被认为已经准备好了,这样能不出错吗?
即selector.select()会将准备好的channel以SelectionKey的形式放置于selector的selectedKeys()中供使用者迭代,使用的过程中需将selectedKeys清空,这样下次selector.select()时就不会出现错误了。
netty简单NIO模型
原文地址:https://www.cnblogs.com/fengwenzhee/p/10407587.html