分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 代码编程

Netty源码分析第4章(pipeline)---->第7节: 前章节内容回顾

发布时间:2023-09-06 02:28责任编辑:顾先生关键词:暂无标签

 

Netty源码分析第四章: pipeline

 

第七节: 前章节内容回顾

我们在第一章和第三章中, 遗留了很多有关事件传输的相关逻辑, 这里带大家一一回顾

首先看两个问题:

1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法

2.客户端handler是什么时候被添加的?

首先看第一个问题:

1.在客户端接入的时候, NioMessageUnsafe的read方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法?

我们首先看这段代码:

public void read() { ???//必须是NioEventLoop方法调用的, 不能通过外部线程调用 ???assert eventLoop().inEventLoop(); ???//服务端channel的config ???final ChannelConfig config = config(); ???//服务端channel的pipeline ???final ChannelPipeline pipeline = pipeline(); ???//处理服务端接入的速率 ???final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); ???//设置配置 ???allocHandle.reset(config); ???boolean closed = false; ???Throwable exception = null; ???try { ???????try { ???????????do { ???????????????//创建jdk底层的channel ???????????????//readBuf用于临时承载读到链接 ???????????????int localRead = doReadMessages(readBuf); ???????????????if (localRead == 0) { ???????????????????break; ???????????????} ???????????????if (localRead < 0) { ???????????????????closed = true; ???????????????????break; ???????????????} ???????????????//分配器将读到的链接进行计数 ???????????????allocHandle.incMessagesRead(localRead); ???????????????//连接数是否超过最大值 ???????????} while (allocHandle.continueReading()); ???????} catch (Throwable t) { ???????????exception = t; ???????} ???????int size = readBuf.size(); ???????//遍历每一条客户端连接 ???????for (int i = 0; i < size; i ++) { ???????????readPending = false; ???????????//传递事件, 将创建NioSokectChannel进行传递 ???????????//最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法 ???????????pipeline.fireChannelRead(readBuf.get(i)); ???????} ???????readBuf.clear(); ???????allocHandle.readComplete(); ???????pipeline.fireChannelReadComplete(); ???????//代码省略 ???} finally { ???????//代码省略 ???}}

重点看pipeline.fireChannelRead(readBuf.get(i))

首先, 这里pipeline是服务端channel的pipeline, 也就是NioServerSocketChannel的pipeline

我们学习过pipeline之后, 对这种写法并不陌生, 就是传递channelRead事件, 这里通过传递channelRead事件走到了ServerBootstrapAcceptor的channelRead()方法, 说明在这步之前, ServerBootstrapAcceptor作为一个handler添加到了服务端channel的pipeline中, 那么这个handler什么时候添加的呢?

我们回顾下第一章, 初始化NioServerSocketChannel的时候, 调用了ServerBootstrap的init方法:

void init(Channel channel) throws Exception { ???//获取用户定义的选项(1) ???final Map<ChannelOption<?>, Object> options = options0(); ???synchronized (options) { ???????channel.config().setOptions(options); ???} ???//获取用户定义的属性(2) ???final Map<AttributeKey<?>, Object> attrs = attrs0(); ???synchronized (attrs) { ???????for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { ???????????@SuppressWarnings("unchecked") ???????????AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); ???????????channel.attr(key).set(e.getValue()); ???????} ???} ???//获取channel的pipline(3) ???ChannelPipeline p = channel.pipeline(); ???//work线程组(4) ???final EventLoopGroup currentChildGroup = childGroup; ???//用户设置的Handler(5) ???final ChannelHandler currentChildHandler = childHandler; ???final Entry<ChannelOption<?>, Object>[] currentChildOptions; ???final Entry<AttributeKey<?>, Object>[] currentChildAttrs; ???//选项转化为Entry对象(6) ???synchronized (childOptions) { ????????currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); ???} ???//属性转化为Entry对象(7) ???synchronized (childAttrs) { ????????currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); ???} ???//添加服务端handler(8) ???p.addLast(new ChannelInitializer<Channel>() { ???????//初始化channel ???????@Override ???????public void initChannel(Channel ch) throws Exception { ???????????final ChannelPipeline pipeline = ch.pipeline(); ???????????ChannelHandler handler = config.handler(); ???????????if (handler != null) { ????????????????pipeline.addLast(handler); ???????????} ????????????ch.eventLoop().execute(new Runnable() { ???????????????@Override ???????????????public void run() { ????????????????????pipeline.addLast(new ServerBootstrapAcceptor( ???????????????????????????currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ???????????????} ???????????}); ???????} ???});}

这个方法比较长, 我们重点关注第8步, 添加服务端channel, 这里的pipeline, 是服务服务端channel的pipeline, 也就是NioServerSocketChannel绑定的pipeline, 这里添加了一个ChannelInitializer类型的handler

我们看一下ChannelInitializer这个类的继承关系:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { ???//省略类体}

我们看到其继承了ChannelInboundHandlerAdapter, 说明是一个inbound类型的handler

这里我们可能会想到, 添加完handler会执行handlerAdded, 然后再handlerAdded方法中做了添加ServerBootstrapAcceptor这个handler

但是, 实际上并不是这样的, 当程序执行到这里, 并没有马上执行handlerAdded, 我们紧跟addLast方法

最后会跟到DefualtChannelPipeline的一个addLast方法中去:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { ???final AbstractChannelHandlerContext newCtx; ???synchronized (this) { ???????//判断handler是否被重复添加(1) ???????checkMultiplicity(handler); ???????//创建一个HandlerContext并添加到列表(2) ???????newCtx = newContext(group, filterName(name, handler), handler); ???????//添加HandlerContext(3) ???????addLast0(newCtx); ???????//是否已注册 ???????if (!registered) { ???????????newCtx.setAddPending(); ???????????callHandlerCallbackLater(newCtx, true); ???????????return this; ???????} ???????EventExecutor executor = newCtx.executor(); ???????if (!executor.inEventLoop()) { ???????????newCtx.setAddPending(); ???????????//回调用户事件 ???????????executor.execute(new Runnable() { ???????????????@Override ???????????????public void run() { ???????????????????callHandlerAdded0(newCtx); ???????????????} ???????????}); ???????????return this; ???????} ???} ???//回调添加事件(4) ???callHandlerAdded0(newCtx); ???return this;}

首先完成了handler的添加, 但是并没有马上执行回调

这里我们重点关注if (!registered)这个条件判断, 其实在注册完成, registered会变成true, 但是走到这一步的时候NioServerSockeChannel并没有完成注册(可以回顾第一章看注册在哪一步), 所以会进到if里并返回自身

我们重点关注callHandlerCallbackLater这个方法, 我们跟进去:

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { ???assert !registered; ???//判断是否已添加, 未添加, 进行添加, 已添加进行删除 ???PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); ???//获取第一个Callback任务 ???PendingHandlerCallback pending = pendingHandlerCallbackHead; ???//如果第一个Callback任务为空 ???if (pending == null) { ???????//将第一个任务设置为刚创建的任务 ???????pendingHandlerCallbackHead = task; ???} else { ???????while (pending.next != null) { ???????????pending = pending.next; ???????} ???????pending.next = task; ???}}

因我们调用这个方法的时候added传的true, 所以PendingHandlerCallback task赋值为new PendingHandlerAddedTask(ctx)

PendingHandlerAddedTask这个类, 我们从名字可以看出, 这是一个handler添加的延迟任务, 用于执行handler延迟添加的操作, 同样也对应一个名字为PendingHandlerRemovedTask的类, 用于执行延迟删除handler的操作, 这两个类都继承抽象类PendingHandlerCallback

 

我们看PendingHandlerAddedTask类构造方法:

PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { ???super(ctx);}

这里调用了父类的构造方法, 再跟进去:

PendingHandlerCallback(AbstractChannelHandlerContext ctx) { ???this.ctx = ctx;}

在父类中, 保存了要添加的context, 也就是ChannelInitializer类型的包装类

回到callHandlerCallbackLater方法中:

PendingHandlerCallback pending = pendingHandlerCallbackHead;

这表示获取第一个PendingHandlerCallback的任务, 其实PendingHandlerCallback是一个单向链表, 自身维护一个PendingHandlerCallback类型的next, 指向下一个任务, 在DefaultChannelPipeline这个类中, 定义了个PendingHandlerCallback类型的引用pendingHandlerCallbackHead, 用来指向延迟回调任务的中的第一个任务

 

之后判断这个任务是为空, 如果是第一次添加handler, 那么这里就是空, 所以将第一个任务赋值为我们刚创建的添加任务

如果不是第一次添加handler, 则将我们新创建的任务添加到链表的尾部, 因为这里我们是第一次添加, 所以第一个回调任务就指向了我们创建的添加handler的任务

完成这一系列操作之后, addLast方法返归, 此时并没有完成添加操作

而什么时候完成添加操作的呢?

在服务端channel注册时候的会走到AbstractChannel的register0方法:

private void register0(ChannelPromise promise) { ???try { ???????//做实际的注册(1) ???????doRegister(); ???????neverRegistered = false; ???????registered = true; ???????//触发事件(2) ???????pipeline.invokeHandlerAddedIfNeeded(); ???????safeSetSuccess(promise); ???????//触发注册成功事件(3) ???????pipeline.fireChannelRegistered(); ???????if (isActive()) { ???????????if (firstRegistration) { ???????????????//传播active事件(4) ???????????????pipeline.fireChannelActive(); ???????????} else if (config().isAutoRead()) { ???????????????beginRead(); ???????????} ???????} ???} catch (Throwable t) { ???????//省略代码 ???}}

重点关注第二步pipeline.invokeHandlerAddedIfNeeded(), 这里已经通过doRegister()方法完成了实际的注册, 我们跟到该方法中:

final void invokeHandlerAddedIfNeeded() { ???assert channel.eventLoop().inEventLoop(); ???if (firstRegistration) { ???????firstRegistration = false; ???????callHandlerAddedForAllHandlers(); ???}}

这里会判断是否第一次注册, 这里反回true, 然后会执行callHandlerAddedForAllHandlers()方法, 我们跟进去:

private void callHandlerAddedForAllHandlers() { ???final PendingHandlerCallback pendingHandlerCallbackHead; ???synchronized (this) { ???????assert !registered; ???????registered = true; ???????pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; ???????this.pendingHandlerCallbackHead = null; ???} ???//获取task ???PendingHandlerCallback task = pendingHandlerCallbackHead; ???while (task != null) { ???????//执行添加handler方法 ???????task.execute(); ???????task = task.next; ???}}

这里拿到第一个延迟执行handler添加的task其实就是我们之前剖析过的, 延迟执行handler添加的task, 就是PendingHandlerAddedTask对象

在while循环中, 通过执行execute()方法将handler添加

我们跟到PendingHandlerAddedTask的execute()方法中:

void execute() { ???//获取当前eventLoop线程 ???EventExecutor executor = ctx.executor(); ???//是当前执行的线程 ???if (executor.inEventLoop()) { ???????callHandlerAdded0(ctx); ???} else { ???????try { ???????????//添加到队列 ???????????executor.execute(this); ???????} catch (RejectedExecutionException e) { ???????????//代码省略 ???????} ???}}

终于在这里, 我们看到了执行回调的方法

再回到init方法中:

void init(Channel channel) throws Exception { ???//获取用户定义的选项(1) ???final Map<ChannelOption<?>, Object> options = options0(); ???synchronized (options) { ???????channel.config().setOptions(options); ???} ???//获取用户定义的属性(2) ???final Map<AttributeKey<?>, Object> attrs = attrs0(); ???synchronized (attrs) { ???????for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { ???????????@SuppressWarnings("unchecked") ???????????AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); ???????????channel.attr(key).set(e.getValue()); ???????} ???} ???//获取channel的pipline(3) ???ChannelPipeline p = channel.pipeline(); ???//work线程组(4) ???final EventLoopGroup currentChildGroup = childGroup; ???//用户设置的Handler(5) ???final ChannelHandler currentChildHandler = childHandler; ???final Entry<ChannelOption<?>, Object>[] currentChildOptions; ???final Entry<AttributeKey<?>, Object>[] currentChildAttrs; ???//选项转化为Entry对象(6) ???synchronized (childOptions) { ????????currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); ???} ???//属性转化为Entry对象(7) ???synchronized (childAttrs) { ????????currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); ???} ???//添加服务端handler(8) ???p.addLast(new ChannelInitializer<Channel>() { ???????//初始化channel ???????@Override ???????public void initChannel(Channel ch) throws Exception { ???????????final ChannelPipeline pipeline = ch.pipeline(); ???????????ChannelHandler handler = config.handler(); ???????????if (handler != null) { ????????????????pipeline.addLast(handler); ???????????} ????????????ch.eventLoop().execute(new Runnable() { ???????????????@Override ???????????????public void run() { ????????????????????pipeline.addLast(new ServerBootstrapAcceptor( ???????????????????????????currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ???????????????} ???????????}); ???????} ???});}

我们继续看第8步添加服务端handler

因为这里的handler是ChannelInitializer, 所以完成添加之后会调用ChannelInitializer的handlerAdded方法

跟到handlerAdded方法:

public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ???//默认情况下, 会返回true ???if (ctx.channel().isRegistered()) { ???????initChannel(ctx); ???}}

因为执行到这步服务端channel已经完成注册, 所以会执行到initChannel方法

跟到initChannel方法:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception { ???//这段代码是否被执行过 ???if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { ???????try { ???????????initChannel((C) ctx.channel()); ???????} catch (Throwable cause) { ???????????exceptionCaught(ctx, cause); ???????} finally { ???????????//调用之后会删除当前节点 ???????????remove(ctx); ???????} ???????return true; ???} ???return false;}

我们关注initChannel这个方法, 这个方法是在ChannelInitializer的匿名内部来实现的, 这里我们注意, 在initChannel方法执行完毕之后会调用remove(ctx)删除当前节点

我们继续跟进initChannel方法:

@Overridepublic void initChannel(Channel ch) throws Exception { ???final ChannelPipeline pipeline = ch.pipeline(); ???ChannelHandler handler = config.handler(); ???if (handler != null) { ????????pipeline.addLast(handler); ???} ????ch.eventLoop().execute(new Runnable() { ???????@Override ???????public void run() { ????????????pipeline.addLast(new ServerBootstrapAcceptor( ???????????????????currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ???????} ???});}

这里首先添加用户自定义的handler, 这里如果用户没有定义, 则添加不成功, 然后, 会调用addLast将ServerBootstrapAcceptor这个handler添加了进去, 同样这个handler也继承了ChannelInboundHandlerAdapter, 在这个handler中, 重写了channelRead方法, 所以, 这就是第一个问题的答案

紧接着我们看第二个问题:

2.客户端handler是什么时候被添加的?

我们这里看ServerBootstrapAcceptor的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) { ???final Channel child = (Channel) msg; ???//添加channelHadler, 这个channelHandler, 就是用户代码添加的ChannelInitializer ???child.pipeline().addLast(childHandler); ???//代码省略 ???try { ???????//work线程注册channel ???????childGroup.register(child).addListener(new ChannelFutureListener() { ???????????//代码省略 ???????}); ???} catch (Throwable t) { ???????forceClose(child, t); ???}}

这里真相可以大白了, 服务端再创建完客户端channel之后, 将新创建的NioSocketChannel作为参数触发channelRead事件(可以回顾NioMessageUnsafe的read方法, 代码这里就不贴了), 所以这里的参数msg就是NioSocketChannel

拿到channel时候再将客户端的handler添加进去, 我们回顾客户端handler的添加过程:

.childHandler(new ChannelInitializer<SocketChannel>() { ???@Override ???public void initChannel(SocketChannel ch) { ???????ch.pipeline().addLast(new StringDecoder()); ???????ch.pipeline().addLast(new StringEncoder()); ???????ch.pipeline().addLast(new ServerHandler()); ???}});

和服务端channel的逻辑一样, 首先会添加ChannelInitializer这个handler但是没有注册所以没有执行添加handler的回调, 将任务保存到一个延迟回调的task中

等客户端channel注册完毕, 会将执行添加handler的回调, 也就是handlerAdded方法, 在回调中执行initChannel方法将客户端handler添加进去, 然后删除ChannelInitializer这个handler

因为在服务端channel中这块逻辑已经进行了详细的剖析, 所以这边就不在赘述, 同学们可以自己跟进去走一遍流程

这里注意, 因为每创建一个NioSoeketChannel都会调用服务端ServerBootstrapAcceptor的channelRead方法, 所以这里会将每一个NioSocketChannel的handler进行添加

 

第四章总结

        本章剖析了事件传输的相关逻辑, 包括handler的添加, 删除, inbound和outbound以及异常事件的传输, 最后结合第一章和第三章, 剖析了服务端channel和客户端channel的添加过程, 同学们可以课后跟进源码, 将这些功能自己再走一遍以加深印象.其他的有关事件传输的逻辑, 可以结合这一章的知识点进行自行剖析

 

 

 

Netty源码分析第4章(pipeline)---->第7节: 前章节内容回顾

原文地址:https://www.cnblogs.com/xiangnan6122/p/10204523.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved