在 Netty 中创建 1 个 NioServerSocketChannel 在指定的端口监听客户端连接,这个过程主要有以下 个步骤:
- 创建 NioServerSocketChannel
- 初始化并注册 NioServerSocketChannel
- 绑定指定端口
首先列出一个简易服务端的启动代码:
1 public void start() { 2 ????EventLoopGroup bossGroup = new NioEventLoopGroup(1); 3 ????EventLoopGroup workerGroup = new NioEventLoopGroup(); 4 ????try { 5 ????????ServerBootstrap sbs = new ServerBootstrap() 6 ????????????????//添加 group 7 ????????????????.group(bossGroup, workerGroup) 8 ????????????????//指定服务端 Channel 类型 9 ????????????????.channel(NioServerSocketChannel.class)10 ????????????????//添加服务端 Channel 的 Handler11 ????????????????.handler(new HelloWorldServerHandler())12 ????????????????//添加客户端 Channel 的 Handler13 ????????????????.childHandler(new ChannelInitializer<NioSocketChannel>() {14 ????????????????????@Override15 ????????????????????protected void initChannel(NioSocketChannel ch) throws Exception {16 ????????????????????????//为后续接入的客户端 Channel 准备的字符串编解码 Handler 17 ????????????????????????ch.pipeline().addLast(new StringDecoder());18 ????????????????????????ch.pipeline().addLast(new StringEncoder());19 ????????????????????}20 ????????????????});21 ????????//监听指定的端口22 ????????ChannelFuture future = sbs.bind(port).sync();23 ????????System.out.println("Server start listen at " + port);24 ????????future.channel().closeFuture().sync();25 ????} catch (Exception e) {26 ????????bossGroup.shutdownGracefully();27 ????????workerGroup.shutdownGracefully();28 ????}29 }
下面就从 ServerBootstrap 的 bind(int port)方法开始分析服务端的 NioServerSocketChannel 的创建过程。
1. 创建 NioServerSocketChannel
跟随 bind 方法的调用,最终在 AbstractBootstrap 类的 doBind()方法找到了初始化,注册和绑定方法调用:
1 private ChannelFuture doBind(final SocketAddress localAddress) { 2 ????//初始化并注册 3 ????final ChannelFuture regFuture = initAndRegister(); 4 ????final Channel channel = regFuture.channel(); 5 ????if (regFuture.cause() != null) { 6 ????????return regFuture; 7 ????} 8 ?9 ????if (regFuture.isDone()) {10 ????????// At this point we know that the registration was complete and successful.11 ????????ChannelPromise promise = channel.newPromise();12 ????????//绑定本地端口13 ????????doBind0(regFuture, channel, localAddress, promise);14 ????????return promise;15 ????} else {16 ????????//....17 ????}18 }
2.
- 初始化并注册 NioServerSocketChannel
首先来看一下这个 initAndRegister()方法:
1 final ChannelFuture initAndRegister() { 2 ????Channel channel = null; 3 ????try { 4 ????????//创建 Channel 5 ????????channel = channelFactory.newChannel(); 6 ????????//初始化 Channel 7 ????????init(channel); 8 ????} catch (Throwable t) { 9 ????????//...10 ????}11 12 ????//注册13 ????ChannelFuture regFuture = config().group().register(channel);14 ????if (regFuture.cause() != null) {15 ????????if (channel.isRegistered()) {16 ????????????channel.close();17 ????????} else {18 ????????????channel.unsafe().closeForcibly();19 ????????}20 ????}21 ????//...22 }
Channel 也是通过工厂类来创建的,这个工厂默认是 ReflectiveChannelFactory,是在前面启动代码中,设置服务端 Channel 类型时创建的。通过名字可以知道,是用反射的方式创建了 Channel 对象。
init()方法有两种实现,这里分析的是 ServerBootstrap 的实现:
1 @Override 2 void init(Channel channel) throws Exception { 3 ????//... option 的设置省略掉 4 ????//pipeline 的创建,默认使用的 DefaultPipeline 5 ????ChannelPipeline p = channel.pipeline(); 6 ?7 ????//... 客户端 Channel 相关配置的保存 8 ?9 ????p.addLast(new ChannelInitializer<Channel>() {10 ????????@Override11 ????????public void initChannel(Channel ch) throws Exception {12 ????????????final ChannelPipeline pipeline = ch.pipeline();13 ????????????//这里添加的是启动代码中,服务端的 Handler14 ????????????ChannelHandler handler = config.handler();15 ????????????if (handler != null) {16 ????????????????pipeline.addLast(handler);17 ????????????}18 19 ????????????// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.20 ????????????// In this case the initChannel(...) method will only be called after this method returns. Because21 ????????????// of this we need to ensure we add our handler in a delayed fashion so all the users handler are22 ????????????// placed in front of the ServerBootstrapAcceptor.23 ????????????ch.eventLoop().execute(new Runnable() {24 ????????????????@Override25 ????????????????public void run() {26 ????????????????????//这里添加了一个 Accepter,用来处理新连接的接入27 ????????????????????pipeline.addLast(new ServerBootstrapAcceptor(28 ????????????????????????????currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));29 ????????????????}30 ????????????});31 ????????}32 ????});33 }
初始化 Channel 这个动作,主要做了 4 件事:
- 创建 pipeline
- 为 Channel 添加用户创建的 Handler
- 添加 Accepter
- 其他属性的设置
接下来分析 Channel 的注册,需要关注的是这行代码:
1 ChannelFuture regFuture = config().group().register(channel);
config()方法获取了启动时创建的 config 对象,这个对象的 group()方法就返回了启动时传入的 bossGroup。启动代码中传入了两个 group,返回的为什么是 boosGroup 呢?查看启动代码中的 group(EventLoopGroup parentGroup, EventLoopGroup childGroup)方法,在它第一行就调用了 super.group(parentGroup),将第一个 group 对象传给了父类 AbstractBootstrap。而此处 config 调用的 group()方法返回的正是父类中的 group。
因为这里是一个 NioEventLoopGroup 对象,所以使用的 register(channel)方法是 MultithreadEventLoopGroup 中的。
1 @Override2 public ChannelFuture register(Channel channel) {3 ????return next().register(channel);4 }
查看 next()方法可以发现,最终是调用之前创建 group 时创建的 chooser 的 next()方法,该方法会返回一个 NioEventLooop 对象(EventLoop 是在这里分配的),它的 register()方法是在父类 SingleThreadEventLoop 中实现的。最终调用了 AbstractChannel 中的注册方法。
1 @Override 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 ????//... 4 ????//将前面返回的 eventLoop 保存起来 5 ????AbstractChannel.this.eventLoop = eventLoop; 6 ????//判断 eventLoop 中的 thread 是否是当前线程 7 ????//初次启动时,eventLoop 中的 thread 为 null 8 ????if (eventLoop.inEventLoop()) { 9 ????????register0(promise);10 ????} else {11 ????????try {12 ????????????//将注册任务传进去13 ????????????eventLoop.execute(new Runnable() {14 ????????????????@Override15 ????????????????public void run() {16 ????????????????????//注册17 ????????????????????register0(promise);18 ????????????????}19 ????????????});20 ????????} catch (Throwable t) {21 ????????????//...22 ????????}23 ????}24 }
将注册动作封装成一个任务,然后交给 eventLoop 对象处理。
@Overridepublic void execute(Runnable task) { ???//... ???//这里通用是 false ???boolean inEventLoop = inEventLoop(); ???if (inEventLoop) { ???????addTask(task); ???} else { ???????//启动线程 ???????startThread(); ???????addTask(task);//将前面传进来的注册任务添加进队列 ???????if (isShutdown() && removeTask(task)) { ???????????reject(); ???????} ???} ???if (!addTaskWakesUp && wakesUpForTask(task)) { ???????wakeup(inEventLoop); ???}}private void startThread() { ???//判断是否需要启动线程 ???if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { ???????if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { ???????????//启动线程 ???????????doStartThread(); ???????} ???}}
上面代码中的 startThread()方法有个 STATE_UPDATER,它是用来更新该对象的 state 属性,是一个线程安全的操作。state 默认值为 ST_NOT_STARTED,所以第一次进入该方法,条件判断为 true,接下来进行 CAS 操作,将 state 设置为 ST_STARTED,然后调用 doStartThread()方法。当 group 中的线程都启用之后,下一次 chooser 再选中这个线程,startThread()方法中的第一个 if 的条件判断就是 false 了,不会再创建新的线程。
1 private void doStartThread() { 2 ????assert thread == null; 3 ????//这个 executor 就是构建 group 时,创建出来的 executor 4 ????executor.execute(new Runnable() { 5 ????????@Override 6 ????????public void run() { 7 ????????????thread = Thread.currentThread(); 8 ????????????if (interrupted) { 9 ????????????????thread.interrupt();10 ????????????}11 12 ????????????boolean success = false;13 ????????????updateLastExecutionTime();14 ????????????try {15 ????????????????//前面创建的是 NioEventLoop16 ????????????????SingleThreadEventExecutor.this.run();17 ????????????????success = true;18 ????????????} catch (Throwable t) {19 ????????????????logger.warn("Unexpected exception from an event executor: ", t);20 ????????????} finally {21 ????????????????for (;;) {22 ????????????????????//更新 state23 ????????????????????int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);24 ????????????????????if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(25 ????????????????????????????SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {26 ????????????????????????break;27 ????????????????????}28 ????????????????}29 ????????????????//...30 ????????????}31 ????????}32 ????});33 }
前一篇分析 EventLoopGroup 创建时说过,会在 EventLoop 保存一个 executor 对象的引用,最终个任务就是交给这个 executor 来处理的。executor 的 execute(Runnable task) 方法会创建新线程,并执行传入的 task。接下来看一下 NioEventLoop 中的 run() 方法。
1 protected void run() { 2 ????for (;;) { 3 ????????try { 4 ????????????//计算 select 策略,当前有任务时,会进行一次 selectNow(NIO),返回就绪的 key 个数 5 ????????????//显然 switch 中没有匹配项,直接跳出 switch 6 ????????????//无任务时,则直接返回 SelectStrategy.SELECT 7 ????????????//这里的 SelectStrategy.CONTINUE 感觉不会匹配到 8 ????????????switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { 9 ????????????????case SelectStrategy.CONTINUE:10 ????????????????????continue;11 ????????????????case SelectStrategy.SELECT:12 ????????????????????//当没有可处理的任务时,直接进行 select 操作13 ????????????????????// wakenUp.getAndSet(false) 返回的是 oldValue,由于默认值是 false14 ????????????????????// 所以第一次返回的是 false15 ????????????????????select(wakenUp.getAndSet(false));16 17 ????????????????????// ‘wakenUp.compareAndSet(false, true)‘ is always evaluated18 ????????????????????// before calling ‘selector.wakeup()‘ to reduce the wake-up19 ????????????????????// overhead. (Selector.wakeup() is an expensive operation.)20 ????????????????????//21 ????????????????????// However, there is a race condition in this approach.22 ????????????????????// The race condition is triggered when ‘wakenUp‘ is set to23 ????????????????????// true too early.24 ????????????????????//25 ????????????????????// ‘wakenUp‘ is set to true too early if:26 ????????????????????// 1) Selector is waken up between ‘wakenUp.set(false)‘ and27 ????????????????????// ???‘selector.select(...)‘. (BAD)28 ????????????????????// 2) Selector is waken up between ‘selector.select(...)‘ and29 ????????????????????// ???‘if (wakenUp.get()) { ... }‘. (OK)30 ????????????????????//31 ????????????????????// In the first case, ‘wakenUp‘ is set to true and the32 ????????????????????// following ‘selector.select(...)‘ will wake up immediately.33 ????????????????????// Until ‘wakenUp‘ is set to false again in the next round,34 ????????????????????// ‘wakenUp.compareAndSet(false, true)‘ will fail, and therefore35 ????????????????????// any attempt to wake up the Selector will fail, too, causing36 ????????????????????// the following ‘selector.select(...)‘ call to block37 ????????????????????// unnecessarily.38 ????????????????????//39 ????????????????????// To fix this problem, we wake up the selector again if wakenUp40 ????????????????????// is true immediately after selector.select(...).41 ????????????????????// It is inefficient in that it wakes up the selector for both42 ????????????????????// the first case (BAD - wake-up required) and the second case43 ????????????????????// (OK - no wake-up required).44 45 ????????????????????if (wakenUp.get()) {46 ????????????????????????selector.wakeup();47 ????????????????????}48 ????????????????default:49 ????????????????????// fallthrough50 ????????????}51 52 ????????????cancelledKeys = 0;53 ????????????needsToSelectAgain = false;54 ????????????final int ioRatio = this.ioRatio;55 ????????????//根据比例来处理 IO 事件和任务56 ????????????if (ioRatio == 100) {57 ????????????????try {58 ????????????????????//处理就绪的 key59 ????????????????????processSelectedKeys();60 ????????????????} finally {61 ????????????????????// Ensure we always run tasks.62 ????????????????????//执行任务63 ????????????????????runAllTasks();64 ????????????????}65 ????????????} else {66 ????????????????final long ioStartTime = System.nanoTime();67 ????????????????try {68 ????????????????????processSelectedKeys();69 ????????????????} finally {70 ????????????????????// Ensure we always run tasks.71 ????????????????????// 计算出处理 IO 事件的时间,然后根据比例算出执行任务的时间72 ????????????????????final long ioTime = System.nanoTime() - ioStartTime;73 ????????????????????runAllTasks(ioTime * (100 - ioRatio) / ioRatio);74 ????????????????}75 ????????????}76 ????????} catch (Throwable t) {77 ????????????handleLoopException(t);78 ????????}79 ????????// Always handle shutdown even if the loop processing threw an exception.80 ????????try {81 ????????????if (isShuttingDown()) {82 ????????????????closeAll();83 ????????????????if (confirmShutdown()) {84 ????????????????????return;85 ????????????????}86 ????????????}87 ????????} catch (Throwable t) {88 ????????????handleLoopException(t);89 ????????}90 ????}91 }
run()方法主要是做 select 操作,和处理 IO 事件和任务队列中的任务,这部分内容下一篇文章再分析。从 executor 执行 execute()方法开始,由 Netyy 管理的线程就开始启动运行了。实际上此时的 NioServerSocketChannel 对象还没有注册到 Netty 线程的 Selector 上,Debug 结果如下图:
上图中的 startThread()方法实际上是给 executor 提交了一个任务,紧接着 main 线程就调用了 addTask()方法,将 task 添加到 EventLoop 对象的任务队列中,而这个 task 的内容就是执行注册操作。在添加了注册任务之后,Netty 线程就会在 select 完成后,执行队列中的任务,将 NioServerSocketChannel 注册到该线程的 Selector 上。接下来分析一下 AbstractChannel 的 register0()方法:
1 private void register0(ChannelPromise promise) { 2 ????try { 3 ????????// check if the channel is still open as it could be closed in the mean time when the register 4 ????????// call was outside of the eventLoop 5 ????????if (!promise.setUncancellable() || !ensureOpen(promise)) { 6 ????????????return; 7 ????????} 8 ????????boolean firstRegistration = neverRegistered; 9 ????????//注册通道10 ????????doRegister();11 ????????neverRegistered = false;12 ????????registered = true;13 14 ????????// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the15 ????????// user may already fire events through the pipeline in the ChannelFutureListener.16 ????????//添加服务端 Channel 的 Handler17 ????????pipeline.invokeHandlerAddedIfNeeded();18 19 ????????safeSetSuccess(promise);20 ????????//触发通道注册事件在 pipeline 上传播21 ????????pipeline.fireChannelRegistered();22 ????????// Only fire a channelActive if the channel has never been registered. This prevents firing23 ????????// multiple channel actives if the channel is deregistered and re-registered.24 ????????if (isActive()) {//第一次运行到这儿时,结果为 false,因为此时还没有 bind25 ????????????if (firstRegistration) {26 ????????????????pipeline.fireChannelActive();27 ????????????} else if (config().isAutoRead()) {28 ????????????????// This channel was registered before and autoRead() is set. This means we need to begin read29 ????????????????// again so that we process inbound data.30 ????????????????//31 ????????????????// See https://github.com/netty/netty/issues/480532 ????????????????beginRead();33 ????????????}34 ????????}35 ????} catch (Throwable t) {36 ????????// Close the channel directly to avoid FD leak.37 ????????closeForcibly();38 ????????closeFuture.setClosed();39 ????????safeSetFailure(promise, t);40 ????}41 }
doRegister()方法实际上就是 Java NIO 中将通道注册到 Selector 上的操作:
1 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//这里感兴趣的事件传入的是 0
pipeline.invokeHandlerAddedIfNeeded() 和 pipeline.fireChannelRegistered() 则是用来添加 Handler 并触发 Handler 别添加的事件的动作。
在 isActive()这个方法,由于当前是 NioServerSocketChannel,所以实际上是判断当前通道是否成功绑定到一个地址,很显然到目前为止,只是创建了通道并注册到 Selector 上,还没由绑定。
3. 绑定指定端口
在 initAndRegister()方法结束后,main 线程开始调用 doBind0()方法,该方法将绑定操作封装成任务交给 Netty 线程去执行。最后,调用 DefaultPipeline 中的 HeadContext 的 bind()方法,然后通过 unsafe.bind(localAddress,promise)完成绑定:
1 @Override 2 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { 3 ????//... 4 ????//显然这里返回的是 false 5 ????boolean wasActive = isActive(); 6 ????try { 7 ????????//绑定操作 8 ????????doBind(localAddress); 9 ????} catch (Throwable t) {10 ????????safeSetFailure(promise, t);11 ????????closeIfClosed();12 ????????return;13 ????}14 15 ????if (!wasActive && isActive()) {16 ????????invokeLater(new Runnable() {17 ????????????@Override18 ????????????public void run() {19 ????????????????//这里才是触发服务端 Channel 激活事件的地方20 ????????????????pipeline.fireChannelActive();21 ????????????}22 ????????});23 ????}24 25 ????safeSetSuccess(promise);26 }
这个过程,建议 Debug 跟一下代码,比较清楚代码是如何一步一步到 HeadContext 中来的。接下来分析一下 doBind()方法:
1 @Override2 protected void doBind(SocketAddress localAddress) throws Exception {3 ????if (PlatformDependent.javaVersion() >= 7) {4 ????????javaChannel().bind(localAddress, config.getBacklog());5 ????} else {6 ????????javaChannel().socket().bind(localAddress, config.getBacklog());7 ????}8 }
最终是根据平台及其 Java 版本来调用 JDK 中的绑定方法。在绑定完成后,会触发通道激活事件,在 HeadContext 中经过时,发现它里面有这么一行代码:
1 readIfIsAutoRead();
Debug 一下,发现这个方法最终会调用到 HeadContext 的 read()方法,该方法是调用了 unsafe.beginRead(),紧接着就到了 AbstractNioChannel 的 doBeginRead()方法:
1 @Override 2 protected void doBeginRead() throws Exception { 3 ????// Channel.read() or ChannelHandlerContext.read() was called 4 ????final SelectionKey selectionKey = this.selectionKey; 5 ????if (!selectionKey.isValid()) { 6 ????????return; 7 ????} 8 ?9 ????readPending = true;10 11 ????final int interestOps = selectionKey.interestOps();12 ????if ((interestOps & readInterestOp) == 0) {//说明对 OP_ACCEPT 不感兴趣13 ????????selectionKey.interestOps(interestOps | readInterestOp);//通过 | 修改感兴趣的事件14 ????}15 }
前面通过反射创建 NioServerSocketChannel 对象时,调用了父类也就是 AbstractNioChannel 的构造方法,将 readInterestOp 设置为 16 了,在 NIO 中就是 OP_ACCEPT。从此,该 NioServerSocketChannel 就可以接收客户端连接了。
4. 总结
在 Netty 服务端启动过程中,主线程仅仅是创建了 EventLoopGroup 和启动引导对象,然后发起绑定操作。这个过程中的绑定,注册等操作都是主线程封装成任务交给 Netty 线程去执行的。
由于 Netty 代码中抽象类和接口都比较多,所以某些地方调用的方法有很多种实现,不熟悉的时候可以通过 Debug 来确定。
Netty 服务端启动过程
原文地址:https://www.cnblogs.com/magexi/p/10228780.html