分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 运营维护

Netty源码学习(六)ChannelPipeline

发布时间:2023-09-06 01:15责任编辑:白小东关键词:暂无标签

0. ChannelPipeline简介

ChannelPipeline = Channel + Pipeline,也就是说首先它与Channel绑定,然后它是起到类似于管道的作用:字节流在ChannelPipeline上流动,流动的过程中被ChannelHandler修饰,最终输出。

1. ChannelPipeline类图

ChannelPipeline只有两个子类,直接一起放上来好了,其中EmbeddedChannelPipeline主要用于测试,本文只介绍DefaultChannelPipeline

2. ChannelPipeline的初始化

跟踪一下DefaultChannelPipeline的构造方法就能发现

ChannelPipeline是在AbstractChannel的构造方法中被初始化的,而AbstractChannel的构造方法有两个,我只选取其中一个做分析了:

AbstractChannel() ???protected AbstractChannel(Channel parent) { ???????this.parent = parent; ???????id = newId(); ???????unsafe = newUnsafe(); ???????pipeline = newChannelPipeline(); ???}AbstractChannel.newChannelPipeline() ???protected DefaultChannelPipeline newChannelPipeline() { ???????return new DefaultChannelPipeline(this); ???} ???protected DefaultChannelPipeline(Channel channel) { ???????this.channel = ObjectUtil.checkNotNull(channel, "channel"); ???????succeededFuture = new SucceededChannelFuture(channel, null); ???????voidPromise = ?new VoidChannelPromise(channel, true); ???????tail = new TailContext(this); ???????head = new HeadContext(this); ???????head.next = tail; ???????tail.prev = head; ???}

可以看到,DefaultChannelPipeline中维护了对关联的Channel的引用

而且,Pipeline内部维护了一个双向链表,head是链表的头,tail是链表的尾,链表指针是AbstractChannelHandlerContext类型。

在DefaultChannelPipeline初始化完成的时候,其内部结构是下面这个样子的:

HeadContext  <---->  TailContext

HeadContext与TailContext都继承于AbstractChannelHandlerContext,基本上是起到占位符的效果,没有什么功能性的作用。

3. 向pipeline添加节点

举一个很简单的例子:

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { ????@Override ????public void initChannel(SocketChannel ch) throws Exception { ????????ChannelPipeline p = ch.pipeline(); ????????p.addLast(new Decoder());//解码 ????????p.addLast(new BusinessHandler())//业务逻辑 ????????p.addLast(new Encoder());//编码 ????}});

在pipeline中,从网卡收到的数据流先被Decoder解码,然后被BusinessHandler处理,然后再被Encoder编码,最后写回到网卡中。

此时pipeline的内部结构为:

HeadContext  <---->  Decoder  <---->   BusinessHandler  <---->   Encoder  <---->    TailContext

pipeline的修改,是在Channel初始化时,由ChannelInitializer进行的。

ChannelInitializer调用用户自定义的initChannel方法,然后调用Pipeline.addLast()方法修改pipeline的结构,关键代码位于DefaultChannelPipeline.addLast()中:

 ???public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { ???????final AbstractChannelHandlerContext newCtx; ???????synchronized (this) {//很重要,用当前的DefaultChannelPipeline作为同步对象,使pipeline的addLast方法串行化 ???????????checkMultiplicity(handler);//禁止非Sharable的handler被重复add到不同的pipeline中 ???????????newCtx = newContext(group, filterName(name, handler), handler);//将Handler包装成DefaultChannelHandlerContext并插入pipeline中 ???????????addLast0(newCtx);//将ChannelHandlerContext插入pipeline ???????????// If the registered is false it means that the channel was not registered on an eventloop yet. ???????????// In this case we add the context to the pipeline and add a task that will call ???????????// ChannelHandler.handlerAdded(...) once the channel is registered. ???????????if (!registered) {//如果channel没有与eventloop绑定,则创建一个任务,这个任务会在channel被register的时候调用 ???????????????newCtx.setAddPending(); ???????????????callHandlerCallbackLater(newCtx, true); ???????????????return this; ???????????} ???????????EventExecutor executor = newCtx.executor(); ???????????if (!executor.inEventLoop()) { ???????????????newCtx.setAddPending(); ???????????????executor.execute(new Runnable() { ???????????????????@Override ???????????????????public void run() { ???????????????????????callHandlerAdded0(newCtx); ???????????????????} ???????????????}); ???????????????return this; ???????????} ???????} ???????callHandlerAdded0(newCtx); ???????return this; ???} ???private void addLast0(AbstractChannelHandlerContext newCtx) {//向双链表插入节点 ???????AbstractChannelHandlerContext prev = tail.prev; ???????newCtx.prev = prev; ???????newCtx.next = tail; ???????prev.next = newCtx; ???????tail.prev = newCtx; ???} ???private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { ???????try { ???????????ctx.handler().handlerAdded(ctx);//触发handler的handlerAdded回调函数 ???????????ctx.setAddComplete(); ???????} catch (Throwable t) {//异常处理 ???????????boolean removed = false; ???????????try { ???????????????remove0(ctx); ???????????????try { ???????????????????ctx.handler().handlerRemoved(ctx); ???????????????} finally { ???????????????????ctx.setRemoved(); ???????????????} ???????????????removed = true; ???????????} catch (Throwable t2) { ???????????????if (logger.isWarnEnabled()) { ???????????????????logger.warn("Failed to remove a handler: " + ctx.name(), t2); ???????????????} ???????????} ???????????if (removed) { ???????????????fireExceptionCaught(new ChannelPipelineException( ???????????????????????ctx.handler().getClass().getName() + ???????????????????????".handlerAdded() has thrown an exception; removed.", t)); ???????????} else { ???????????????fireExceptionCaught(new ChannelPipelineException( ???????????????????????ctx.handler().getClass().getName() + ???????????????????????".handlerAdded() has thrown an exception; also failed to remove.", t)); ???????????} ???????} ???}

小结:

a. addLast方法的作用就是将传入的handler添加到当前Pipeline的双向链表中

b. 在后续处理的时候,可以通过遍历链表,找到channel关联的pipeline上注册的所有handler

c. 贴出的代码中没有涉及,但又很重要的一点是:在添加handler的过程中,会根据handler继承于ChannelInboundHandler或者ChannelOutboundHandler来判定这个handler是用于处理in事件还是out事件的,然后会以此为依据来设置AbstractChannelHandlerContext的inbound和outbound位。

4. 一个读事件在pipeline中的流转过程

在第三节的示例中,如果一个已经register完毕的Channel收到一个数据包,会发生什么事情呢?

首先,这个Channel必然是与某个NioEventLoop绑定的,这个Channel上的可读事件会触发NioEventLoop.processSelectedKey方法:

 ???private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ???????final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ???????........... ???????try{ ???????????// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead ???????????// to a spin loop ???????????if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { ???????????????unsafe.read();//触发 ???????????} ???????} catch (CancelledKeyException ignored) { ???????????unsafe.close(unsafe.voidPromise()); ???????} ???}

可以看到这会触发AbstractNioChannel.NioUnsafe的read方法,其实现位于AbstractNioByteChannel.NioByteUnsafe中:

 ???????@Override ???????public final void read() { ???????????........... ???????????????do { ???????????????????byteBuf = allocHandle.allocate(allocator);//创建一个ByteBuf作为缓冲区 ???????????????????allocHandle.lastBytesRead(doReadBytes(byteBuf));//读取数据到ByteBuf ???????????????????if (allocHandle.lastBytesRead() <= 0) { ???????????????????????// nothing was read. release the buffer. ???????????????????????byteBuf.release(); ???????????????????????byteBuf = null; ???????????????????????close = allocHandle.lastBytesRead() < 0; ???????????????????????break; ???????????????????} ???????????????????allocHandle.incMessagesRead(1); ???????????????????readPending = false; ???????????????????pipeline.fireChannelRead(byteBuf);//触发pipeline的读事件 ???????????????????byteBuf = null; ???????????????} while (allocHandle.continueReading()); ???????????????allocHandle.readComplete(); ???????????????pipeline.fireChannelReadComplete(); ??????????............. ???????} ???}

fireChannelRead的实现位于DefaultChannelPipeline中:

DefaultChannelPipeline.fireChannelRead() ???@Overrides0. public final ChannelPipeline fireChannelRead(Object msg) { ???????AbstractChannelHandlerContext.invokeChannelRead(head, msg);//这里传入的是pipeline的head节点 ???????return this; ???}AbstractChannelHandlerContext.invokeChannelRead()s1. static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { ???????final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);//这行代码可能是为了检查内存泄漏 ???????EventExecutor executor = next.executor(); ???????if (executor.inEventLoop()) {//同步或者异步的调用传入的AbstractChannelHandlerContext的invokeChannelRead方法, ???????????next.invokeChannelRead(m); ???????} else { ???????????executor.execute(new Runnable() { ???????????????@Override ???????????????public void run() { ???????????????????next.invokeChannelRead(m); ???????????????} ???????????}); ???????} ???}s2. private void invokeChannelRead(Object msg) { ???????if (invokeHandler()) { ???????????try { ???????????????((ChannelInboundHandler) handler()).channelRead(this, msg);//第一次调用时,handler为HeadContext,后续调用时为pipeline中自定义的类型为inbound的handler ???????????} catch (Throwable t) { ???????????????notifyHandlerException(t); ???????????} ???????} else { ???????????fireChannelRead(msg); ???????} ???}HeadContext.channelRead() ???????????@Overrides3. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????????ctx.fireChannelRead(msg); ???????}AbstractChannelHandlerContext.fireChannelRead() ???@Overrides4. public ChannelHandlerContext fireChannelRead(final Object msg) { ???????invokeChannelRead(findContextInbound(), msg);//先找到pipeline上当前AbstractChannelHandlerContext节点之后的第一个inbound类型的AbstractChannelHandlerContext,然后调用其invokeChannelRead()方法,这样就又调转回到s1了 ???????return this; ???} ???//从pipeline的当前AbstractChannelHandlerContext向后遍历,找到第一个类型为inbound的AbstractChannelHandlerContext节点s5. private AbstractChannelHandlerContext findContextInbound() { ???????AbstractChannelHandlerContext ctx = this; ???????do { ???????????ctx = ctx.next; ???????} while (!ctx.inbound); ???????return ctx; ???}

可以看出,流入的msg会先被送到HeadContext中,然后HeadContext会将其转发到pipeline中的下一个类型为inbound的AbstractChannelHandlerContext,然后调用关联的Handler来处理数据包

如果Handler的channelRead方法中又调用了ctx.fireChannelRead(msg),那么这个msg会继续被转发到pipeline中下一个类型为inbound的AbstractChannelHandlerContext中进行处理

那么问题来了,如果pipeline中的最后一个自定义的类型为inbound的AbstractChannelHandlerContext中接着调用ctx.fireChannelRead(msg),会发生什么呢?

只需要查看pipeline链表的真正尾结点TailContext的源码就行了:

 ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????onUnhandledInboundMessage(msg); ???} ???/** ????* Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user ????* in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible ????* to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. ????*/ ???protected void onUnhandledInboundMessage(Object msg) { ???????try { ???????????logger.debug( ???????????????????"Discarded inbound message {} that reached at the tail of the pipeline. " + ???????????????????????????"Please check your pipeline configuration.", msg); ???????} finally { ???????????ReferenceCountUtil.release(msg);//释放内存 ???????} ???}

原来只是使用debug级别输出一行日志罢了。

小结:

a. 读事件会触发Channel所关联的EventLoop的processSelectedKey方法

b. 触发AbstractNioByteChannel.NioByteUnsafe的read方法,其中会调用JDK底层提供的nio方法,将从网卡上读取到的数据包装成ByteBuf类型的消息msg

c. 触发Channel关联的DefaultChannelPipeline的fireChannelRead方法

d. 触发DefaultChannelPipeline中维护的双向链表的头结点HeadContext的invokeChannelRead方法

e. 触发DefaultChannelPipeline中维护的双向链表的后续类型为inBound的AbstractChannelHandlerContext的invokeChannelRead方法

f. 如果用户自定义的Handler的channelRead方法中又调用了ctx.fireChannelRead(msg),那么这个msg会继续沿着pipeline向后传播

g. 如果TailContext的channelRead收到了msg,则以debug级别输出日志

5. 一个写事件在pipeline中的流转过程

Netty源码学习(六)ChannelPipeline

原文地址:http://www.cnblogs.com/stevenczp/p/7615903.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved