Netty服务端处理新连接的流程:
1.检测新连接
2.基于NioServerSocketChannel创建客户端的NioSocketChannel
3.分配客户端channel的线程,注册线程所对应的selector
4.向selector注册读事件
新连接检测
服务端在创建完服务端的NioServerSocketChannel之后,绑定完端口号之后,会注册accept事件。当有新连接进入的时候,会触发accpet事件。之前博客有分析过EventLoop的thread的run方法会循环select检测是否有新的IO事件,如果检测到有IO事件,就通过processSelectedKey来处理对应的IO事件,这里的IO事件是accept,就会调用channel内部聚合的UnSafe类的read()方法。
这里循环调用doReadMessage()方法的条件是是否自动读,读取的连接数是否小于最大连接数,服务端channel默认一次最多读取16个新连接。
当没有超过最大连接数,并且是自动读的状态时候,就会循环调用doReadMessage,直到没有读到新连接,跳出while循环,
public void read() { ???assert AbstractNioMessageChannel.this.eventLoop().inEventLoop(); ???ChannelConfig config = AbstractNioMessageChannel.this.config(); ???if(!config.isAutoRead() && !AbstractNioMessageChannel.this.isReadPending()) { ???????this.removeReadOp(); ???} else { ???????int maxMessagesPerRead = config.getMaxMessagesPerRead(); ???????ChannelPipeline pipeline = AbstractNioMessageChannel.this.pipeline(); ???????boolean closed = false; ???????Throwable exception = null; ???????try { ???????????int size; ???????????try { ???????????????do { ???????????????????size = AbstractNioMessageChannel.this.doReadMessages(this.readBuf); ???????????????????if(size == 0) { ???????????????????????break; ???????????????????} ???????????????????if(size < 0) { ???????????????????????closed = true; ???????????????????????break; ???????????????????} ???????????????} while(config.isAutoRead() && this.readBuf.size() < maxMessagesPerRead); ???????????} catch (Throwable var11) { ???????????????exception = var11; ???????????} ???????????AbstractNioMessageChannel.this.setReadPending(false); ???????????size = this.readBuf.size(); ???????????int i = 0; ???????????while(true) { ???????????????if(i >= size) { ???????????????????this.readBuf.clear(); ???????????????????pipeline.fireChannelReadComplete(); ???????????????????if(exception != null) { ???????????????????????if(exception instanceof IOException && !(exception instanceof PortUnreachableException)) { ???????????????????????????closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); ???????????????????????} ???????????????????????pipeline.fireExceptionCaught(exception); ???????????????????} ???????????????????if(closed && AbstractNioMessageChannel.this.isOpen()) { ???????????????????????this.close(this.voidPromise()); ???????????????????} ???????????????????break; ???????????????} ???????????????pipeline.fireChannelRead(this.readBuf.get(i)); ???????????????++i; ???????????} ???????} finally { ???????????if(!config.isAutoRead() && !AbstractNioMessageChannel.this.isReadPending()) { ???????????????this.removeReadOp(); ???????????} ???????} ???}}
创建NioSocketChannel
这里read()方法是通过循环调用NioServerSocket的doReadMessage(byteBuf)方法进行实现channel的读取新连接。而doReadMessage是通过java nio的channel的accept获取当前新连接的channel,这里获取的channel也是java nio中的channel,然后将这个channel封装成NioSocketChannel,将NioServerSocketChannel和javaChannel都作为参数构造NioSocketChannel,放到buf中去,返回1,表示已经读取一条连接。
protected int doReadMessages(List<Object> buf) throws Exception { ???SocketChannel ch = this.javaChannel().accept(); ???try { ???????if(ch != null) { ???????????buf.add(new NioSocketChannel(this, ch)); ???????????return 1; ???????} ???} catch (Throwable var6) { ???????logger.warn("Failed to create a new channel from an accepted socket.", var6); ???????try { ???????????ch.close(); ???????} catch (Throwable var5) { ???????????logger.warn("Failed to close a socket.", var5); ???????} ???} ???return 0;}
NioSocketChannel的构造函数。
//配置Config类
public NioSocketChannel(Channel parent, java.nio.channels.SocketChannel socket) { ???super(parent, socket); ???this.config = new NioSocketChannel.NioSocketChannelConfig(this, socket.socket(), null);}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
???super(parent, ch, 1);
}
//保存channel感兴趣的读事件,并将channel设置为非阻塞的
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
???super(parent);
???this.ch = ch;
???this.readInterestOp = readInterestOp;
???try {
???????ch.configureBlocking(false);
???} catch (IOException var7) {
???????try {
???????????ch.close();
???????} catch (IOException var6) {
???????????if(logger.isWarnEnabled()) {
???????????????logger.warn("Failed to close a partially initialized socket.", var6);
???????????}
???????}
???????throw new ChannelException("Failed to enter non-blocking mode.", var7);
???}
}
这里配置channel的Config类使用了setTcpNoDelay(true),这里禁止了Nagle算法,Nagle算法的目的是让小的数据包尽量集合成大的数据包发送出去,Netty为了使数据能够及时发出去,禁止了Nagle算法。
public DefaultSocketChannelConfig(io.netty.channel.socket.SocketChannel channel, Socket javaSocket) { ???super(channel); ???if(javaSocket == null) { ???????throw new NullPointerException("javaSocket"); ???} else { ???????this.javaSocket = javaSocket; ???????if(PlatformDependent.canEnableTcpNoDelayByDefault()) { ???????????try { ???????????????this.setTcpNoDelay(true); ???????????} catch (Exception var4) { ???????????????; ???????????} ???????} ???}}
public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
???try {
???????this.javaSocket.setTcpNoDelay(tcpNoDelay);
???????return this;
???} catch (SocketException var3) {
???????throw new ChannelException(var3);
???}
}
新连接NioEventLoop的分配和selector的注册
在读取完新连接之后,会调用fireChannelRead方法,而服务端的NioServerSocketChannel在初始化阶段,在上面的pipeline添加了连接处理器ServerBootstrap.ServerBootstrapAcceptor,read事件会从head传送到serverBootstrapAcceptor,serverBootstrapAcceptor也是一个ChannelHandler,它会对新连接进行处理。
处理流程:
1.设置客户端channel的childHandler
添加channelHandler,这里的channelHandler一般是一个ChannelInitializer,他可以获取channel的pipeline,并且在上面添加一系列的Handler,最后再将ChannelInitializer这个Handler删除。
2.设置options和attrs
options是底层tcp读写的相关参数,attrs可以在客户端channel上面绑定一些属性。这里的options和attrs都是用户通过代码设置的。比如
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true)
设置的这些都会保存到ServerBootstrap这个类,然后在initChannel的时候会将这些参数都传入,构造一个ServerBootstrapAcceptor,这样当连接器接受到新的连接之后,新建子channel,就会带有这些属性。
3.选择NioEventLoop,并且注册selector
public void channelRead(ChannelHandlerContext ctx, Object msg) { ???final Channel child = (Channel)msg; ???//添加ChannelHandler
child.pipeline().addLast(new ChannelHandler[]{this.childHandler}); ???Map.Entry[] t = this.childOptions; ???int len$ = t.length; ???int i$; ???Map.Entry e; ???for(i$ = 0; i$ < len$; ++i$) { ???????e = t[i$]; ???????try { ???????????if(!child.config().setOption((ChannelOption)e.getKey(), e.getValue())) { ???????????????ServerBootstrap.logger.warn("Unknown channel option: " + e); ???????????} ???????} catch (Throwable var10) { ???????????ServerBootstrap.logger.warn("Failed to set a channel option: " + child, var10); ???????} ???} ???t = this.childAttrs; ???len$ = t.length; ???for(i$ = 0; i$ < len$; ++i$) { ???????e = t[i$]; ???????child.attr((AttributeKey)e.getKey()).set(e.getValue()); ???} ???try { ???????this.childGroup.register(child).addListener(new ChannelFutureListener() { ???????????public void operationComplete(ChannelFuture future) throws Exception { ???????????????if(!future.isSuccess()) { ???????????????????ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause()); ???????????????} ???????????} ???????}); ???} catch (Throwable var9) { ???????forceClose(child, var9); ???}}
这里注册是使用用户传进来的workerGroup线程池,使用register方法完成注册。
public ChannelFuture register(Channel channel) { ???return this.next().register(channel);}
这里的next()函数返回一个NioEventLoop,相当于从线程池里面挑选一个线程与这个channel进行绑定。最后通过层层调用,还是调用了java nio中channel的register方法,这时注册的时候,不关心任何事件。
public ChannelFuture register(Channel channel, ChannelPromise promise) { ???if(channel == null) { ???????throw new NullPointerException("channel"); ???} else if(promise == null) { ???????throw new NullPointerException("promise"); ???} else { ???????channel.unsafe().register(this, promise); ???????return promise; ???}}
//AbstractChannel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
???if(eventLoop == null) {
???????throw new NullPointerException("eventLoop");
???} else if(AbstractChannel.this.isRegistered()) {
???????promise.setFailure(new IllegalStateException("registered to an event loop already"));
???} else if(!AbstractChannel.this.isCompatible(eventLoop)) {
???????promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
???} else {
???????AbstractChannel.this.eventLoop = eventLoop;
???????if(eventLoop.inEventLoop()) {
???????????this.register0(promise);
???????} else {
???????????try {
???????????????eventLoop.execute(new OneTimeTask() {
???????????????????public void run() {
???????????????????????AbstractUnsafe.this.register0(promise);
???????????????????}
???????????????});
???????????} catch (Throwable var4) {
???????????????AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
???????????????this.closeForcibly();
???????????????AbstractChannel.this.closeFuture.setClosed();
???????????????this.safeSetFailure(promise, var4);
???????????}
???????}
???}
}
private void register0(ChannelPromise promise) {
???try {
???????if(!promise.setUncancellable() || !this.ensureOpen(promise)) {
???????????return;
???????}
???????AbstractChannel.this.doRegister();
???????AbstractChannel.this.registered = true;
???????this.safeSetSuccess(promise);
???????AbstractChannel.this.pipeline.fireChannelRegistered();
???????if(AbstractChannel.this.isActive()) {
???????????AbstractChannel.this.pipeline.fireChannelActive();
???????}
???} catch (Throwable var3) {
???????this.closeForcibly();
???????AbstractChannel.this.closeFuture.setClosed();
???????this.safeSetFailure(promise, var3);
???}
}
protected void doRegister() throws Exception {
???boolean selected = false;
???while(true) {
???????try {
???????????this.selectionKey = this.javaChannel().register(this.eventLoop().selector, 0, this);
???????????return;
???????} catch (CancelledKeyException var3) {
???????????if(selected) {
???????????????throw var3;
???????????}
???????????this.eventLoop().selectNow();
???????????selected = true;
???????}
???}
}
NioSocketChannel读事件的注册
通过传播channelActive方法,最终会调用channel的read()方法,channel在创建的时候都是默认自动读的。
public ChannelPipeline fireChannelActive() { ???this.head.fireChannelActive(); ???if(this.channel.config().isAutoRead()) { ???????this.channel.read(); ???} ???return this;}
会将channel的Active状态在pipeline上面传播,调用read方法,最后会调用doBeginRead,去注册感兴趣的事件,NioSocketChannel感兴趣的事件是读事件,而NioServerSocketChannel感兴趣的事件则是Accept事件。
public ChannelHandlerContext read() { ???final AbstractChannelHandlerContext next = this.findContextOutbound(); ???EventExecutor executor = next.executor(); ???if(executor.inEventLoop()) { ???????next.invokeRead(); ???} else { ???????Runnable task = next.invokeReadTask; ???????if(task == null) { ???????????next.invokeReadTask = task = new Runnable() { ???????????????public void run() { ???????????????????next.invokeRead(); ???????????????} ???????????}; ???????} ???????executor.execute(task); ???} ???return this;}
protected void doBeginRead() throws Exception {
???if(!this.inputShutdown) {
???????SelectionKey selectionKey = this.selectionKey;
???????if(selectionKey.isValid()) {
???????????this.readPending = true;
???????????int interestOps = selectionKey.interestOps();
???????????if((interestOps & this.readInterestOp) == 0) {
???????????????selectionKey.interestOps(interestOps | this.readInterestOp);
???????????}
???????}
???}
}
Netty源码分析之处理新连接
原文地址:https://www.cnblogs.com/xiaobaituyun/p/10801495.html