分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 代码编程

Netty源码学习(二)NioEventLoopGroup

发布时间:2023-09-06 01:13责任编辑:林大明关键词:暂无标签

0. NioEventLoopGroup简介

NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。

1. NioEventLoopGroup类图

2. 构造方法

new NioEventLoopGroup()方法会调用到MultithreadEventLoopGroup的构造方法:

   private static final int DEFAULT_EVENT_LOOP_THREADS; ???static { ???????DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( ???????????????"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));//默认值为系统core数的两倍 ???????if (logger.isDebugEnabled()) { ???????????logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); ???????} ???} ???/** ????* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) ????*/ ???protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { ???????super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);//如果采用无参的构造函数,传入的nThreads变量为0,此时线程数会被设置为系统core数*2 ???}

然后会调用MultithreadEventExecutorGroup的构造方法:

 ???/** ????* Create a new instance. ????* ????* @param nThreads ?????????the number of threads that will be used by this instance. ????* @param executor ?????????the Executor to use, or {@code null} if the default should be used. ????* @param chooserFactory ???the {@link EventExecutorChooserFactory} to use. ????* @param args ?????????????arguments which will passed to each {@link #newChild(Executor, Object...)} call ????*/ ???protected MultithreadEventExecutorGroup(int nThreads, Executor executor, ???????????????????????????????????????????EventExecutorChooserFactory chooserFactory, Object... args) { ???????if (nThreads <= 0) { ???????????throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); ???????} ???????if (executor == null) { ???????????executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); ???????} ???????children = new EventExecutor[nThreads];//设定线程池大小 ???????for (int i = 0; i < nThreads; i ++) { ???????????boolean success = false; ???????????try { ???????????????children[i] = newChild(executor, args);//新建nThreads个子线程 ???????????????success = true; ???????????} catch (Exception e) { ???????????????// TODO: Think about if this is a good exception type ???????????????throw new IllegalStateException("failed to create a child event loop", e); ???????????} finally { ???????????????if (!success) {//如果新建子线程的过程中出错,则关闭所有子线程 ???????????????????for (int j = 0; j < i; j ++) { ???????????????????????children[j].shutdownGracefully(); ???????????????????} ???????????????????for (int j = 0; j < i; j ++) { ???????????????????????EventExecutor e = children[j]; ???????????????????????try { ???????????????????????????while (!e.isTerminated()) { ???????????????????????????????e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); ???????????????????????????} ???????????????????????} catch (InterruptedException interrupted) { ???????????????????????????// Let the caller handle the interruption. ???????????????????????????Thread.currentThread().interrupt(); ???????????????????????????break; ???????????????????????} ???????????????????} ???????????????} ???????????} ???????} ???????chooser = chooserFactory.newChooser(children);//设定新任务的分配策略 ???????final FutureListener<Object> terminationListener = new FutureListener<Object>() {//注册一些回调函数用于清理工作 ???????????@Override ???????????public void operationComplete(Future<Object> future) throws Exception { ???????????????if (terminatedChildren.incrementAndGet() == children.length) { ???????????????????terminationFuture.setSuccess(null); ???????????????} ???????????} ???????}; ???????for (EventExecutor e: children) { ???????????e.terminationFuture().addListener(terminationListener); ???????} ???????Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); ???????Collections.addAll(childrenSet, children); ???????readonlyChildren = Collections.unmodifiableSet(childrenSet); ???}

其中比较重要的地方有两处:调用子类实现的newChild方法设置子线程,以及设置新任务的分配策略

先看一下NioEventLoopGroup中实现的newChild方法:

 ???@Override ???protected EventLoop newChild(Executor executor, Object... args) throws Exception { ???????return new NioEventLoop(this, executor, (SelectorProvider) args[0], ???????????((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); ???}

很简单的新建一个NioEventLoop对象并返回,我们下一节会介绍NioEventLoop

而chooserFactory.newChooser最终会跳转到DefaultEventExecutorChooserFactory里:

 ???@SuppressWarnings("unchecked") ???@Override ???public EventExecutorChooser newChooser(EventExecutor[] executors) { ???????if (isPowerOfTwo(executors.length)) { ???????????return new PowerOfTwoEventExecutorChooser(executors); ???????} else { ???????????return new GenericEventExecutorChooser(executors); ???????} ???} ???private static boolean isPowerOfTwo(int val) { ???????return (val & -val) == val; ???}

虽然分配策略都是round-robin,但是在子线程的数量为2的幂时,可以用位运算来加速,效率很高。

Netty为了追求效率确实不择手段。

Netty源码学习(二)NioEventLoopGroup

原文地址:http://www.cnblogs.com/stevenczp/p/7581940.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved