不论是NioServerSocketChannel,还是NioSocketChannel,最终都会调用父类AbstractChannel的构造函数,pipeline也在channel被创建的时候被创建。
protected AbstractChannel(Channel parent) { ???this.parent = parent; ???this.unsafe = this.newUnsafe(); ???this.pipeline = new DefaultChannelPipeline(this);}
而这里是创建了一个DefaultChannelPipeline。构造函数会保存传入的channel,并且默认创建两个节点head和tail并将它们构成双向链表的结构。
public DefaultChannelPipeline(AbstractChannel channel) { ???if(channel == null) { ???????throw new NullPointerException("channel"); ???} else { ???????this.channel = channel; ???????this.tail = new DefaultChannelPipeline.TailContext(this); ???????this.head = new DefaultChannelPipeline.HeadContext(this); ???????this.head.next = this.tail; ???????this.tail.prev = this.head; ???}}
而这里的head和tail都是实现了ChannelHeadContext接口,在pipeline中每一个节点都是ChannelHeadContext,ChannelHeadContext实现了AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker,其中AttributeMap代表ChannelHeadContext可以存储自定义的属性,而ChannelInboundInvoker, ChannelOutboundInvoker则是定义一些事件的触发方法。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker { ???// 返回对应的Channel对象 ???Channel channel(); ???// 返回对应的EventLoop对象 ???EventExecutor executor(); ???// 返回对应的ChannelHandler对象 ???ChannelHandler handler(); ???// 该Context对应的Handler是否从pipeline中移除 ???boolean isRemoved(); ???/*******************以下的fire方法都是触发对应事件在pipeline中传播*********************/ ???ChannelHandlerContext fireChannelRegistered(); ???ChannelHandlerContext fireChannelUnregistered(); ???ChannelHandlerContext fireChannelActive(); ???ChannelHandlerContext fireChannelInactive(); ???ChannelHandlerContext fireExceptionCaught(Throwable cause); ???ChannelHandlerContext fireUserEventTriggered(Object evt); ???ChannelHandlerContext fireChannelRead(Object msg); ???ChannelHandlerContext fireChannelReadComplete(); ???ChannelHandlerContext fireChannelWritabilityChanged(); ???ChannelHandlerContext read(); ???ChannelHandlerContext flush(); ???// 返回对应的pipeline ???ChannelPipeline pipeline();}
pipeline中的head和tail分别新建是HeadContext和TailContext,仔细看这两个类的源码,发现它们不仅仅是实现ChannelHandlerContext,而且本身还是一个channelHandler,而让人比较意外的是,HeadContext实现的是ChannelOutboundHandler(处理输出字节),TailContext则是ChannelInboundHandler(处理输入字节)。
从HeadContext的源码可以看出,它的很多操作还是依赖于unsafe的。
static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler { ???private static final String HEAD_NAME = DefaultChannelPipeline.generateName0(DefaultChannelPipeline.HeadContext.class); ???protected final Channel.Unsafe unsafe; ???HeadContext(DefaultChannelPipeline pipeline) { ???????super(pipeline, (EventExecutorGroup)null, HEAD_NAME, false, true); ???????this.unsafe = pipeline.channel().unsafe(); ???} ???public ChannelHandler handler() { ???????return this; ???} ???public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ???} ???public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { ???} ???public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ???????this.unsafe.bind(localAddress, promise); ???} ???public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ???????this.unsafe.connect(remoteAddress, localAddress, promise); ???} ???public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ???????this.unsafe.disconnect(promise); ???} ???public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ???????this.unsafe.close(promise); ???} ???public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ???????this.unsafe.deregister(promise); ???} ???public void read(ChannelHandlerContext ctx) { ???????this.unsafe.beginRead(); ???} ???public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ???????this.unsafe.write(msg, promise); ???} ???public void flush(ChannelHandlerContext ctx) throws Exception { ???????this.unsafe.flush(); ???} ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????ctx.fireExceptionCaught(cause); ???}}
TailContext主要就是对输入数据的一个收尾工作,它的很多方法都为空。
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { ???private static final String TAIL_NAME = DefaultChannelPipeline.generateName0(DefaultChannelPipeline.TailContext.class); ???TailContext(DefaultChannelPipeline pipeline) { ???????super(pipeline, (EventExecutorGroup)null, TAIL_NAME, true, false); ???} ???public ChannelHandler handler() { ???????return this; ???} ???public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ???} ???public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ???} ???public void channelActive(ChannelHandlerContext ctx) throws Exception { ???} ???public void channelInactive(ChannelHandlerContext ctx) throws Exception { ???} ???public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { ???} ???public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ???} ???public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { ???} ???public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { ???} ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????DefaultChannelPipeline.logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.", cause); ???} ???public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ???????try { ???????????DefaultChannelPipeline.logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg); ???????} finally { ???????????ReferenceCountUtil.release(msg); ???????} ???} ???public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ???}}
Netty源码分析之Pipeline创建
原文地址:https://www.cnblogs.com/xiaobaituyun/p/10803521.html