0. NioEventLoopGroup简介
NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。
1. NioEventLoopGroup类图
2. 构造方法
new NioEventLoopGroup()方法会调用到MultithreadEventLoopGroup的构造方法:
private static final int DEFAULT_EVENT_LOOP_THREADS; ???static { ???????DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( ???????????????"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));//默认值为系统core数的两倍 ???????if (logger.isDebugEnabled()) { ???????????logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); ???????} ???} ???/** ????* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) ????*/ ???protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { ???????super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);//如果采用无参的构造函数,传入的nThreads变量为0,此时线程数会被设置为系统core数*2 ???}
然后会调用MultithreadEventExecutorGroup的构造方法:
???/** ????* Create a new instance. ????* ????* @param nThreads ?????????the number of threads that will be used by this instance. ????* @param executor ?????????the Executor to use, or {@code null} if the default should be used. ????* @param chooserFactory ???the {@link EventExecutorChooserFactory} to use. ????* @param args ?????????????arguments which will passed to each {@link #newChild(Executor, Object...)} call ????*/ ???protected MultithreadEventExecutorGroup(int nThreads, Executor executor, ???????????????????????????????????????????EventExecutorChooserFactory chooserFactory, Object... args) { ???????if (nThreads <= 0) { ???????????throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); ???????} ???????if (executor == null) { ???????????executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); ???????} ???????children = new EventExecutor[nThreads];//设定线程池大小 ???????for (int i = 0; i < nThreads; i ++) { ???????????boolean success = false; ???????????try { ???????????????children[i] = newChild(executor, args);//新建nThreads个子线程 ???????????????success = true; ???????????} catch (Exception e) { ???????????????// TODO: Think about if this is a good exception type ???????????????throw new IllegalStateException("failed to create a child event loop", e); ???????????} finally { ???????????????if (!success) {//如果新建子线程的过程中出错,则关闭所有子线程 ???????????????????for (int j = 0; j < i; j ++) { ???????????????????????children[j].shutdownGracefully(); ???????????????????} ???????????????????for (int j = 0; j < i; j ++) { ???????????????????????EventExecutor e = children[j]; ???????????????????????try { ???????????????????????????while (!e.isTerminated()) { ???????????????????????????????e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); ???????????????????????????} ???????????????????????} catch (InterruptedException interrupted) { ???????????????????????????// Let the caller handle the interruption. ???????????????????????????Thread.currentThread().interrupt(); ???????????????????????????break; ???????????????????????} ???????????????????} ???????????????} ???????????} ???????} ???????chooser = chooserFactory.newChooser(children);//设定新任务的分配策略 ???????final FutureListener<Object> terminationListener = new FutureListener<Object>() {//注册一些回调函数用于清理工作 ???????????@Override ???????????public void operationComplete(Future<Object> future) throws Exception { ???????????????if (terminatedChildren.incrementAndGet() == children.length) { ???????????????????terminationFuture.setSuccess(null); ???????????????} ???????????} ???????}; ???????for (EventExecutor e: children) { ???????????e.terminationFuture().addListener(terminationListener); ???????} ???????Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); ???????Collections.addAll(childrenSet, children); ???????readonlyChildren = Collections.unmodifiableSet(childrenSet); ???}
其中比较重要的地方有两处:调用子类实现的newChild方法设置子线程,以及设置新任务的分配策略
先看一下NioEventLoopGroup中实现的newChild方法:
???@Override ???protected EventLoop newChild(Executor executor, Object... args) throws Exception { ???????return new NioEventLoop(this, executor, (SelectorProvider) args[0], ???????????((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); ???}
很简单的新建一个NioEventLoop对象并返回,我们下一节会介绍NioEventLoop
而chooserFactory.newChooser最终会跳转到DefaultEventExecutorChooserFactory里:
???@SuppressWarnings("unchecked") ???@Override ???public EventExecutorChooser newChooser(EventExecutor[] executors) { ???????if (isPowerOfTwo(executors.length)) { ???????????return new PowerOfTwoEventExecutorChooser(executors); ???????} else { ???????????return new GenericEventExecutorChooser(executors); ???????} ???} ???private static boolean isPowerOfTwo(int val) { ???????return (val & -val) == val; ???}
虽然分配策略都是round-robin,但是在子线程的数量为2的幂时,可以用位运算来加速,效率很高。
Netty为了追求效率确实不择手段。
Netty源码学习(二)NioEventLoopGroup
原文地址:http://www.cnblogs.com/stevenczp/p/7581940.html