Netty 事件循环主逻辑在 NioEventLoop.run 中的 processSelectedKeys函数中
1 protected void run() {
//主循环不断读取IO事件和task,因为 EventLoop 也是 juc 的 ScheduledExecutorService 实现 2 ????????for (;;) { 3 ????????????try { 4 ????????????????switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { 5 ????????????????????case SelectStrategy.CONTINUE: 6 ????????????????????????continue; 7 ????????????????????case SelectStrategy.SELECT: 8 ????????????????????????select(wakenUp.getAndSet(false)); 9 10 ????????????????????????37 38 ????????????????????????if (wakenUp.get()) {39 ????????????????????????????selector.wakeup();40 ????????????????????????}41 ????????????????????????// fall through42 ????????????????????default:43 ????????????????}44 45 ????????????????cancelledKeys = 0;46 ????????????????needsToSelectAgain = false;
??// IO事件占总执行时间的百分比 */47 ????????????????final int ioRatio = this.ioRatio;48 ????????????????if (ioRatio == 100) {49 ????????????????????try {50 ????????????????????????processSelectedKeys();51 ????????????????????} finally {52 ????????????????????????// Ensure we always run tasks.53 ????????????????????????runAllTasks();54 ????????????????????}55 ????????????????} else {56 ????????????????????final long ioStartTime = System.nanoTime();57 ????????????????????try {58 ????????????????????????processSelectedKeys();59 ????????????????????} finally {60 ????????????????????????// Ensure we always run tasks.61 ????????????????????????final long ioTime = System.nanoTime() - ioStartTime;62 ????????????????????????runAllTasks(ioTime * (100 - ioRatio) / ioRatio);63 ????????????????????}64 ????????????????}65 ????????????} catch (Throwable t) {66 ????????????????handleLoopException(t);67 ????????????}68 ????????????// Always handle shutdown even if the loop processing threw an exception.69 ????????????try {70 ????????????????if (isShuttingDown()) {71 ????????????????????closeAll();72 ????????????????????if (confirmShutdown()) {73 ????????????????????????return;74 ????????????????????}75 ????????????????}76 ????????????} catch (Throwable t) {77 ????????????????handleLoopException(t);78 ????????????}79 ????????}80 ????}
processSelectedKeys 函数 执行时会判断是否执行优化的版本,即判断 SelectedSelectionKeySet 是否为空。
是否开启优化取决于是否设置了环境变量 io.netty.noKeySetOptimization ,默认是 false 代表开启
private static final boolean DISABLE_KEYSET_OPTIMIZATION = ???????????SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
原理是通过反射的方式设置 eventLoop绑定的selector中的 selectKeys属性 为 SelectedSelectionKeySet ,好处是不用 迭代 selector.selectedKeys()
注入时机为初始化 EventLoop 的时候
1 private SelectorTuple openSelector() { 2 ????????12 //注入逻辑40 41 ????????Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {42 ????????????@Override43 ????????????public Object run() {44 ????????????????try {45 ????????????????????Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");46 ????????????????????Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");47 48 ????????????????????Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);49 ????????????????????if (cause != null) {50 ????????????????????????return cause;51 ????????????????????}52 ????????????????????cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);53 ????????????????????if (cause != null) {54 ????????????????????????return cause;55 ????????????????????}56 57 ????????????????????selectedKeysField.set(unwrappedSelector, selectedKeySet);58 ????????????????????publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);59 ????????????????????return null;60 ????????????????} catch (NoSuchFieldException e) {61 ????????????????????return e;62 ????????????????} catch (IllegalAccessException e) {63 ????????????????????return e;64 ????????????????}65 ????????????}66 ????????});67 68 ????????........78 ????}
处理读事件 主要在 processSelectedKey 中 ,分别对 读、写、连接事件进行了处理。
private void processSelectedKeysOptimized() { ???????for (int i = 0; i < selectedKeys.size; ++i) { ???????????final SelectionKey k = selectedKeys.keys[i]; ???????????// null out entry in the array to allow to have it GC‘ed once the Channel close ???????????// See https://github.com/netty/netty/issues/2363 ???????????selectedKeys.keys[i] = null; ???????????final Object a = k.attachment(); ???????????if (a instanceof AbstractNioChannel) {
???????????????//分别处理每个channel的事件 ???????????????processSelectedKey(k, (AbstractNioChannel) a); ???????????} else { ???????????????@SuppressWarnings("unchecked") ???????????????NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; ???????????????processSelectedKey(k, task); ???????????} ???????????if (needsToSelectAgain) { ???????????????// null out entries in the array to allow to have it GC‘ed once the Channel close ???????????????// See https://github.com/netty/netty/issues/2363 ???????????????selectedKeys.reset(i + 1); ???????????????selectAgain(); ???????????????i = -1; ???????????} ???????} ???} ???private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ???????final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ???????if (!k.isValid()) { ???????????final EventLoop eventLoop; ???????????try { ???????????????eventLoop = ch.eventLoop(); ???????????} catch (Throwable ignored) { ???????????????// If the channel implementation throws an exception because there is no event loop, we ignore this ???????????????// because we are only trying to determine if ch is registered to this event loop and thus has authority ???????????????// to close ch. ???????????????return; ???????????} ???????????// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop ???????????// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is ???????????// still healthy and should not be closed. ???????????// See https://github.com/netty/netty/issues/5125 ???????????if (eventLoop != this || eventLoop == null) { ???????????????return; ???????????} ???????????// close the channel if the key is not valid anymore ???????????unsafe.close(unsafe.voidPromise()); ???????????return; ???????} ???????try { ???????????int readyOps = k.readyOps(); ???????????// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise ???????????// the NIO JDK channel implementation may throw a NotYetConnectedException. ???????????if ((readyOps & SelectionKey.OP_CONNECT) != 0) { ???????????????// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking ???????????????// See https://github.com/netty/netty/issues/924 ???????????????int ops = k.interestOps(); ???????????????ops &= ~SelectionKey.OP_CONNECT; ???????????????k.interestOps(ops); //处理了连接事件 ???????????????unsafe.finishConnect(); ???????????} ???????????// Process OP_WRITE first as we may be able to write some queued buffers and so free memory. ???????????if ((readyOps & SelectionKey.OP_WRITE) != 0) { ???????????????// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
//将要写入的buffer flush掉
ch.unsafe().forceFlush();
???????????} ???????????// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead ???????????// to a spin loop ???????????if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
??//回调 pipeline 上所有的 ChannelInboundHandler 的 fireChannelRead ?和 channelReadComplete 函数 ???????????????unsafe.read(); ???????????} ???????} catch (CancelledKeyException ignored) { ???????????unsafe.close(unsafe.voidPromise()); ???????} ???}
Netty-NioEventLoop
原文地址:https://www.cnblogs.com/ironroot/p/8575134.html