分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 教程案例

Netty Nio启动全流程

发布时间:2023-09-06 02:05责任编辑:沈小雨关键词:暂无标签

Netty Nio启动全流程

1. 各组件之间的关系


说明:EventLoopGroup类似线程池,EventLoop为单线程,每个EventLoop关联一个Nio Selector,用于注册Channel,形成一个EventLoop被多个channel公用。在EventLoop会执行通道Io选择操作,以及非Io任务。在Channel初始化后会创建pipeline,是handler的链表结构。

2. 服务端vs客户端启动

// 服务端启动private ChannelFuture doBind(final SocketAddress localAddress) { ???final ChannelFuture regFuture = initAndRegister(); ???final Channel channel = regFuture.channel(); ???????if (regFuture.cause() != null) { ???????return regFuture; ???} ???if (regFuture.isDone()) { ???????// At this point we know that the registration was complete and successful. ???????ChannelPromise promise = channel.newPromise(); ???????doBind0(regFuture, channel, localAddress, promise); ???????return promise; ???} else { ???????// Registration future is almost always fulfilled already, but just in case it‘s not. ???????final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); ???????regFuture.addListener(new ChannelFutureListener() { ???????????@Override ???????????public void operationComplete(ChannelFuture future) throws Exception { ???????????????Throwable cause = future.cause(); ???????????????if (cause != null) { ???????????????????// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an ???????????????????// IllegalStateException once we try to access the EventLoop of the Channel. ???????????????????promise.setFailure(cause); ???????????????} else { ???????????????????// Registration was successful, so set the correct executor to use. ???????????????????// See https://github.com/netty/netty/issues/2586 ???????????????????promise.registered(); ???????????????????doBind0(regFuture, channel, localAddress, promise); ???????????????} ???????????} ???????}); ???????return promise; ???}}
//客户端启动private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { ???final ChannelFuture regFuture = initAndRegister(); ???final Channel channel = regFuture.channel(); ???if (regFuture.isDone()) { ???????if (!regFuture.isSuccess()) { ???????????return regFuture; ???????} ???????return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); ???} else { ???????// Registration future is almost always fulfilled already, but just in case it‘s not. ???????final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); ???????regFuture.addListener(new ChannelFutureListener() { ???????????@Override ???????????public void operationComplete(ChannelFuture future) throws Exception { ???????????????// Directly obtain the cause and do a null check so we only need one volatile read in case of a ???????????????// failure. ???????????????Throwable cause = future.cause(); ???????????????if (cause != null) { ???????????????????// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an ???????????????????// IllegalStateException once we try to access the EventLoop of the Channel. ???????????????????promise.setFailure(cause); ???????????????} else { ???????????????????// Registration was successful, so set the correct executor to use. ???????????????????// See https://github.com/netty/netty/issues/2586 ???????????????????promise.registered(); ???????????????????doResolveAndConnect0(channel, remoteAddress, localAddress, promise); ???????????????} ???????????} ???????}); ???????return promise; ???}}

一言以蔽之,首先做初始化channel和channel注册操作,然后服务器启动做绑定操作,客户端启动做连接操作。而初始化channel和channel注册都是通过initAndRegister()实现。最大化重用代码。

3. 初始化创建通道以及通道注册

3.1 模板方法的创建通道->初始化通道->通道注册

final ChannelFuture initAndRegister() { ???Channel channel = null; ???try { ?????// 创建通道 ???????channel = channelFactory.newChannel(); ????// 初始化通道 ???????init(channel); ???} catch (Throwable t) { ???????if (channel != null) { ???????????// channel can be null if newChannel crashed (eg SocketException("too many open files")) ???????????channel.unsafe().closeForcibly(); ???????????// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor ???????????return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); ???????} ???????// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor ???????return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); ???} ???// 通道注册 ???ChannelFuture regFuture = config().group().register(channel); ???if (regFuture.cause() != null) { ???????if (channel.isRegistered()) { ???????????channel.close(); ???????} else { ???????????channel.unsafe().closeForcibly(); ???????} ???} ???return regFuture;}

3.2 创建通道

  1. 构造channel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { ???super(parent); ???this.ch = ch; ???this.readInterestOp = readInterestOp; ???try { ???????ch.configureBlocking(false); ???} catch (IOException e) { ???????try { ???????????ch.close(); ???????} catch (IOException e2) { ???????????if (logger.isWarnEnabled()) { ???????????????logger.warn( ???????????????????????"Failed to close a partially initialized socket.", e2); ???????????} ???????} ???????throw new ChannelException("Failed to enter non-blocking mode.", e); ???}}
protected AbstractChannel(Channel parent) { ???this.parent = parent; ???id = newId(); ???unsafe = newUnsafe(); ???pipeline = newChannelPipeline();}

NioChannel将java SelectableChannel包装了一把,并添加了pipeline和unsafe操作,默认的pipeline是一个双向链表结构,只包含head和tail两个节点。

  1. 初始化channel
    对于客户端而言,直接向pipeline中添加builder方法的handler,以及一些nio操作的通用属性,对于服务端创建而言,除了一些基本nio属性外,只添加了一个初始化的handler
// 客户端创建ChannelPipeline p = channel.pipeline();p.addLast(config.handler());
//服务端创建p.addLast(new ChannelInitializer<Channel>() { ???@Override ???public void initChannel(final Channel ch) throws Exception { ???????final ChannelPipeline pipeline = ch.pipeline(); ???????ChannelHandler handler = config.handler(); ???????if (handler != null) { ???????????pipeline.addLast(handler); ???????} ???????ch.eventLoop().execute(new Runnable() { ???????????@Override ???????????public void run() { ???????????????pipeline.addLast(new ServerBootstrapAcceptor( ???????????????????????ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ???????????} ???????}); ???}});

注:ChannelInitializer的initChannnel会在注册成功之后调用,以此实现动态扩展。
客户端创建时候pipeline中没有ChannelInitializer,需要自己添加。

  1. 通道注册
    主要将channel绑定到EventLoop上面,然后在eventLoop单线程中执行注册操作
@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) { ???if (eventLoop == null) { ???????throw new NullPointerException("eventLoop"); ???} ???if (isRegistered()) { ???????promise.setFailure(new IllegalStateException("registered to an event loop already")); ???????return; ???} ???if (!isCompatible(eventLoop)) { ???????promise.setFailure( ???????????????new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); ???????return; ???} ???AbstractChannel.this.eventLoop = eventLoop; ???// 此时在主线程中,不知eventLoop线程池中 ???if (eventLoop.inEventLoop()) { ???????register0(promise); ???} else { ???????try { ???????????eventLoop.execute(new Runnable() { ???????????????@Override ???????????????public void run() { ???????????????????register0(promise); ???????????????} ???????????}); ???????} catch (Throwable t) { ???????????logger.warn( ???????????????????"Force-closing a channel whose registration task was not accepted by an event loop: {}", ???????????????????AbstractChannel.this, t); ???????????closeForcibly(); ???????????closeFuture.setClosed(); ???????????safeSetFailure(promise, t); ???????} ???}}

register0主要干三件事,注册->调用ChannelInitializer的initChannnel完成添加handler->注册channel关心的操作
3.1 java channel注册,0表示只注册,不执行任何操作

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

3.2 pipeline.fireChannelRegistered()
此时,pipeline中包含三个handler,其中一个是ChannelInitializer。

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { ???if (initChannel(ctx)) { ???????ctx.pipeline().fireChannelRegistered(); ???} else { ???????ctx.fireChannelRegistered(); ???}}

3.2 beginRead();

@Overrideprotected void doBeginRead() throws Exception { ???// Channel.read() or ChannelHandlerContext.read() was called ???final SelectionKey selectionKey = this.selectionKey; ???if (!selectionKey.isValid()) { ???????return; ???} ???readPending = true; ???final int interestOps = selectionKey.interestOps(); ???if ((interestOps & readInterestOp) == 0) { ???????selectionKey.interestOps(interestOps | readInterestOp); ???}}

注意,此时才会真实注册关系的事件,对服务端而言为Accept,对客户端创建,就是connect

public NioServerSocketChannel(ServerSocketChannel channel) { ????super(null, channel, SelectionKey.OP_ACCEPT); ????config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { ????super(parent, ch, SelectionKey.OP_READ); }

至此,客户端与服务端完成了初始化channel以及注册channel操作。

4. 服务端绑定到指定端口

private static void doBind0( ???????final ChannelFuture regFuture, final Channel channel, ???????final SocketAddress localAddress, final ChannelPromise promise) { ???channel.eventLoop().execute(new Runnable() { ???????@Override ???????public void run() { ???????????if (regFuture.isSuccess()) { ???????????????channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ???????????} else { ???????????????promise.setFailure(regFuture.cause()); ???????????} ???????} ???});}

在eventLoop中执行绑定端口操作

@Overridepublic void bind( ???????ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) ???????throws Exception { ???unsafe.bind(localAddress, promise);}

最后都是会调用unsafe的bind方法完成端口绑定操作。

5. 客户端连接远程服务端

private static void doConnect( ???????final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { ???final Channel channel = connectPromise.channel(); ???channel.eventLoop().execute(new Runnable() { ???????@Override ???????public void run() { ???????????if (localAddress == null) { ???????????????channel.connect(remoteAddress, connectPromise); ???????????} else { ???????????????channel.connect(remoteAddress, localAddress, connectPromise); ???????????} ???????????connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ???????} ???});}

连接服务端最终也是在eventLoop中执行,最终调用unsafe的connect方法。

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { ???if (localAddress != null) { ???????doBind0(localAddress); ???} ???boolean success = false; ???try { ???????boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); ???????if (!connected) { ???????????selectionKey().interestOps(SelectionKey.OP_CONNECT); ???????} ???????success = true; ???????return connected; ???} finally { ???????if (!success) { ???????????doClose(); ???????} ???}}

connect有三种结果,成功,直接返回true,失败则暂时不知道结果,检测OP_CONNECT,异常直接关闭链路。
值得说明的是jdk默认不支持连接超时,netty添加了超时机制:在EventLoop中添加超时任务,触发超时时间后会关闭连接,连接成功会删除该超时任务。

// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) { ???connectTimeoutFuture = eventLoop().schedule(new Runnable() { ???????@Override ???????public void run() { ???????????ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ???????????ConnectTimeoutException cause = ???????????????????new ConnectTimeoutException("connection timed out: " + remoteAddress); ???????????if (connectPromise != null && connectPromise.tryFailure(cause)) { ???????????????close(voidPromise()); ???????????} ???????} ???}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}promise.addListener(new ChannelFutureListener() { ???@Override ???public void operationComplete(ChannelFuture future) throws Exception { ???????if (future.isCancelled()) { ???????????if (connectTimeoutFuture != null) { ???????????????connectTimeoutFuture.cancel(false); ???????????} ???????????connectPromise = null; ???????????close(voidPromise()); ???????} ???}});

6.EventLooop 处理IO事件

if ((readyOps & SelectionKey.OP_CONNECT) != 0) { ???// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking ???// See https://github.com/netty/netty/issues/924 ???int ops = k.interestOps(); ???ops &= ~SelectionKey.OP_CONNECT; ???k.interestOps(ops); ???unsafe.finishConnect();}// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.if ((readyOps & SelectionKey.OP_WRITE) != 0) { ???// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ???ch.unsafe().forceFlush();}// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead// to a spin loopif ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { ???unsafe.read();}

Netty Nio启动全流程

原文地址:https://www.cnblogs.com/dragonfei/p/9343654.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved