Netty Nio启动全流程
1. 各组件之间的关系
说明:EventLoopGroup类似线程池,EventLoop为单线程,每个EventLoop关联一个Nio Selector,用于注册Channel,形成一个EventLoop被多个channel公用。在EventLoop会执行通道Io选择操作,以及非Io任务。在Channel初始化后会创建pipeline,是handler的链表结构。
2. 服务端vs客户端启动
// 服务端启动private ChannelFuture doBind(final SocketAddress localAddress) { ???final ChannelFuture regFuture = initAndRegister(); ???final Channel channel = regFuture.channel(); ???????if (regFuture.cause() != null) { ???????return regFuture; ???} ???if (regFuture.isDone()) { ???????// At this point we know that the registration was complete and successful. ???????ChannelPromise promise = channel.newPromise(); ???????doBind0(regFuture, channel, localAddress, promise); ???????return promise; ???} else { ???????// Registration future is almost always fulfilled already, but just in case it‘s not. ???????final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); ???????regFuture.addListener(new ChannelFutureListener() { ???????????@Override ???????????public void operationComplete(ChannelFuture future) throws Exception { ???????????????Throwable cause = future.cause(); ???????????????if (cause != null) { ???????????????????// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an ???????????????????// IllegalStateException once we try to access the EventLoop of the Channel. ???????????????????promise.setFailure(cause); ???????????????} else { ???????????????????// Registration was successful, so set the correct executor to use. ???????????????????// See https://github.com/netty/netty/issues/2586 ???????????????????promise.registered(); ???????????????????doBind0(regFuture, channel, localAddress, promise); ???????????????} ???????????} ???????}); ???????return promise; ???}}//客户端启动private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { ???final ChannelFuture regFuture = initAndRegister(); ???final Channel channel = regFuture.channel(); ???if (regFuture.isDone()) { ???????if (!regFuture.isSuccess()) { ???????????return regFuture; ???????} ???????return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); ???} else { ???????// Registration future is almost always fulfilled already, but just in case it‘s not. ???????final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); ???????regFuture.addListener(new ChannelFutureListener() { ???????????@Override ???????????public void operationComplete(ChannelFuture future) throws Exception { ???????????????// Directly obtain the cause and do a null check so we only need one volatile read in case of a ???????????????// failure. ???????????????Throwable cause = future.cause(); ???????????????if (cause != null) { ???????????????????// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an ???????????????????// IllegalStateException once we try to access the EventLoop of the Channel. ???????????????????promise.setFailure(cause); ???????????????} else { ???????????????????// Registration was successful, so set the correct executor to use. ???????????????????// See https://github.com/netty/netty/issues/2586 ???????????????????promise.registered(); ???????????????????doResolveAndConnect0(channel, remoteAddress, localAddress, promise); ???????????????} ???????????} ???????}); ???????return promise; ???}}一言以蔽之,首先做初始化channel和channel注册操作,然后服务器启动做绑定操作,客户端启动做连接操作。而初始化channel和channel注册都是通过initAndRegister()实现。最大化重用代码。
3. 初始化创建通道以及通道注册
3.1 模板方法的创建通道->初始化通道->通道注册
final ChannelFuture initAndRegister() { ???Channel channel = null; ???try { ?????// 创建通道 ???????channel = channelFactory.newChannel(); ????// 初始化通道 ???????init(channel); ???} catch (Throwable t) { ???????if (channel != null) { ???????????// channel can be null if newChannel crashed (eg SocketException("too many open files")) ???????????channel.unsafe().closeForcibly(); ???????????// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor ???????????return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); ???????} ???????// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor ???????return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); ???} ???// 通道注册 ???ChannelFuture regFuture = config().group().register(channel); ???if (regFuture.cause() != null) { ???????if (channel.isRegistered()) { ???????????channel.close(); ???????} else { ???????????channel.unsafe().closeForcibly(); ???????} ???} ???return regFuture;}3.2 创建通道
- 构造channel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { ???super(parent); ???this.ch = ch; ???this.readInterestOp = readInterestOp; ???try { ???????ch.configureBlocking(false); ???} catch (IOException e) { ???????try { ???????????ch.close(); ???????} catch (IOException e2) { ???????????if (logger.isWarnEnabled()) { ???????????????logger.warn( ???????????????????????"Failed to close a partially initialized socket.", e2); ???????????} ???????} ???????throw new ChannelException("Failed to enter non-blocking mode.", e); ???}}protected AbstractChannel(Channel parent) { ???this.parent = parent; ???id = newId(); ???unsafe = newUnsafe(); ???pipeline = newChannelPipeline();}NioChannel将java SelectableChannel包装了一把,并添加了pipeline和unsafe操作,默认的pipeline是一个双向链表结构,只包含head和tail两个节点。
- 初始化channel
对于客户端而言,直接向pipeline中添加builder方法的handler,以及一些nio操作的通用属性,对于服务端创建而言,除了一些基本nio属性外,只添加了一个初始化的handler
// 客户端创建ChannelPipeline p = channel.pipeline();p.addLast(config.handler());//服务端创建p.addLast(new ChannelInitializer<Channel>() { ???@Override ???public void initChannel(final Channel ch) throws Exception { ???????final ChannelPipeline pipeline = ch.pipeline(); ???????ChannelHandler handler = config.handler(); ???????if (handler != null) { ???????????pipeline.addLast(handler); ???????} ???????ch.eventLoop().execute(new Runnable() { ???????????@Override ???????????public void run() { ???????????????pipeline.addLast(new ServerBootstrapAcceptor( ???????????????????????ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ???????????} ???????}); ???}});注:ChannelInitializer的initChannnel会在注册成功之后调用,以此实现动态扩展。
客户端创建时候pipeline中没有ChannelInitializer,需要自己添加。
- 通道注册
主要将channel绑定到EventLoop上面,然后在eventLoop单线程中执行注册操作
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) { ???if (eventLoop == null) { ???????throw new NullPointerException("eventLoop"); ???} ???if (isRegistered()) { ???????promise.setFailure(new IllegalStateException("registered to an event loop already")); ???????return; ???} ???if (!isCompatible(eventLoop)) { ???????promise.setFailure( ???????????????new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); ???????return; ???} ???AbstractChannel.this.eventLoop = eventLoop; ???// 此时在主线程中,不知eventLoop线程池中 ???if (eventLoop.inEventLoop()) { ???????register0(promise); ???} else { ???????try { ???????????eventLoop.execute(new Runnable() { ???????????????@Override ???????????????public void run() { ???????????????????register0(promise); ???????????????} ???????????}); ???????} catch (Throwable t) { ???????????logger.warn( ???????????????????"Force-closing a channel whose registration task was not accepted by an event loop: {}", ???????????????????AbstractChannel.this, t); ???????????closeForcibly(); ???????????closeFuture.setClosed(); ???????????safeSetFailure(promise, t); ???????} ???}}register0主要干三件事,注册->调用ChannelInitializer的initChannnel完成添加handler->注册channel关心的操作
3.1 java channel注册,0表示只注册,不执行任何操作
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);3.2 pipeline.fireChannelRegistered()
此时,pipeline中包含三个handler,其中一个是ChannelInitializer。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { ???if (initChannel(ctx)) { ???????ctx.pipeline().fireChannelRegistered(); ???} else { ???????ctx.fireChannelRegistered(); ???}}3.2 beginRead();
@Overrideprotected void doBeginRead() throws Exception { ???// Channel.read() or ChannelHandlerContext.read() was called ???final SelectionKey selectionKey = this.selectionKey; ???if (!selectionKey.isValid()) { ???????return; ???} ???readPending = true; ???final int interestOps = selectionKey.interestOps(); ???if ((interestOps & readInterestOp) == 0) { ???????selectionKey.interestOps(interestOps | readInterestOp); ???}}注意,此时才会真实注册关系的事件,对服务端而言为Accept,对客户端创建,就是connect
public NioServerSocketChannel(ServerSocketChannel channel) { ????super(null, channel, SelectionKey.OP_ACCEPT); ????config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { ????super(parent, ch, SelectionKey.OP_READ); }至此,客户端与服务端完成了初始化channel以及注册channel操作。
4. 服务端绑定到指定端口
private static void doBind0( ???????final ChannelFuture regFuture, final Channel channel, ???????final SocketAddress localAddress, final ChannelPromise promise) { ???channel.eventLoop().execute(new Runnable() { ???????@Override ???????public void run() { ???????????if (regFuture.isSuccess()) { ???????????????channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ???????????} else { ???????????????promise.setFailure(regFuture.cause()); ???????????} ???????} ???});}在eventLoop中执行绑定端口操作
@Overridepublic void bind( ???????ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) ???????throws Exception { ???unsafe.bind(localAddress, promise);}最后都是会调用unsafe的bind方法完成端口绑定操作。
5. 客户端连接远程服务端
private static void doConnect( ???????final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { ???final Channel channel = connectPromise.channel(); ???channel.eventLoop().execute(new Runnable() { ???????@Override ???????public void run() { ???????????if (localAddress == null) { ???????????????channel.connect(remoteAddress, connectPromise); ???????????} else { ???????????????channel.connect(remoteAddress, localAddress, connectPromise); ???????????} ???????????connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ???????} ???});}连接服务端最终也是在eventLoop中执行,最终调用unsafe的connect方法。
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { ???if (localAddress != null) { ???????doBind0(localAddress); ???} ???boolean success = false; ???try { ???????boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); ???????if (!connected) { ???????????selectionKey().interestOps(SelectionKey.OP_CONNECT); ???????} ???????success = true; ???????return connected; ???} finally { ???????if (!success) { ???????????doClose(); ???????} ???}}connect有三种结果,成功,直接返回true,失败则暂时不知道结果,检测OP_CONNECT,异常直接关闭链路。
值得说明的是jdk默认不支持连接超时,netty添加了超时机制:在EventLoop中添加超时任务,触发超时时间后会关闭连接,连接成功会删除该超时任务。
// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) { ???connectTimeoutFuture = eventLoop().schedule(new Runnable() { ???????@Override ???????public void run() { ???????????ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ???????????ConnectTimeoutException cause = ???????????????????new ConnectTimeoutException("connection timed out: " + remoteAddress); ???????????if (connectPromise != null && connectPromise.tryFailure(cause)) { ???????????????close(voidPromise()); ???????????} ???????} ???}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}promise.addListener(new ChannelFutureListener() { ???@Override ???public void operationComplete(ChannelFuture future) throws Exception { ???????if (future.isCancelled()) { ???????????if (connectTimeoutFuture != null) { ???????????????connectTimeoutFuture.cancel(false); ???????????} ???????????connectPromise = null; ???????????close(voidPromise()); ???????} ???}});6.EventLooop 处理IO事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { ???// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking ???// See https://github.com/netty/netty/issues/924 ???int ops = k.interestOps(); ???ops &= ~SelectionKey.OP_CONNECT; ???k.interestOps(ops); ???unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) { ???// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ???ch.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { ???unsafe.read();}Netty Nio启动全流程
原文地址:https://www.cnblogs.com/dragonfei/p/9343654.html