分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 教程案例

Netty5源码解析

发布时间:2023-09-06 01:32责任编辑:郭大石关键词:暂无标签

Netty5源码解析

今天让我来总结下netty5的服务端代码。

  1. 服务端(ServerBootstrap)
    示例代码如下:

    import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * Created by yaojiafeng on 16/1/17. */public class SimpleServer { ???public void bind(int port) throws Exception { ???????// 配置服务端的NIO线程组 ???????EventLoopGroup bossGroup = new NioEventLoopGroup(1); ???????EventLoopGroup workerGroup = new NioEventLoopGroup(1); ???????try { ???????????ServerBootstrap b = new ServerBootstrap(); ???????????b.group(bossGroup, workerGroup) ???????????????????.channel(NioServerSocketChannel.class) ???????????????????.option(ChannelOption.SO_BACKLOG, 1024) ???????????????????.childHandler(new ChildChannelHandler()); ???????????// 绑定端口,同步等待成功 ???????????ChannelFuture f = b.bind(port).sync(); ???????????// 等待服务端监听端口关闭 ???????????f.channel().closeFuture().sync(); ???????} finally { ???????????// 优雅退出,释放线程池资源 ???????????bossGroup.shutdownGracefully(); ???????????workerGroup.shutdownGracefully(); ???????} ???} ???private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { ???????@Override ???????protected void initChannel(SocketChannel arg0) throws Exception { ???????????arg0.pipeline().addLast(new SimpleServerHandler()); ???????} ???} ???/** ????* @param args ????* @throws Exception ????*/ ???public static void main(String[] args) throws Exception { ???????int port = 8081; ???????if (args != null && args.length > 0) { ???????????try { ???????????????port = Integer.valueOf(args[0]); ???????????} catch (NumberFormatException e) { ???????????????// 采用默认值 ???????????} ???????} ???????new SimpleServer().bind(port); ???}}

    1.1. 设置EventLoopGroup
    首先创建2个EventLoopGroup,一个parentGroup(用于接受新连接),childGroup(用于执行读写事件),NioEventLoopGroup内部根据设置的nEventLoops参数创建对应大小的NioEventLoop数组,并且每个NioEventLoop默认使用ForkJoinPool的一个线程,所以NioEventLoop称为单线程事件循环。

    1.2. 构造ServerBootstrap
    构造ServerBootstrap对象,并设置EventLoopGroup,channel(NioServerSocketChannel服务端套接字),一些option例如ChannelOption.SO_BACKLOG,childHandler(客户端连接后在管道链设置的ChannelHandler)

    1.3. 同步绑定端口

    b.bind(port).sync()

    1.3.1 validate方法
    validate方法验证parentGroup和channelFactory不能为null

    1.3.2 initAndRegister方法
    刚方法内部使用channelFactory通过反射构造NioServerSocketChannel的实例对象,NioServerSocketChannel实例对象构造内部主要包含ServerSocketChannel,DefaultChannelId(标识唯一性),Unsafe(所有IO操作都在这个类里),DefaultChannelPipeline(通道处理器管道链,自定义的ChannelHandler都在这里),NioServerSocketChannelConfig(一些配置信息)。
    构造完调用init初始化NioServerSocketChannel,包括设置自定义的ChannelHandler,ServerBootstrapAcceptor(专门用于接受客户端新连接时,初始化NioSocketChannel并注册进childGroup进行读写监听)。

    ChannelFuture regFuture = group().register(channel)

    异步注册NioServerSocketChannel到parentGroup里的NioEventLoop。
    因为注册过程是在NioEventLoop异步执行的,这里直接先分析register方法

    1.3.3 异步register

    channel.unsafe().register(this, promise)

    注册的时候会调用以上方法,委派给Unsafe的register方法,内部会给NioServerSocketChannel的字段eventLoop初始化(NioServerSocketChannel关联唯一的一个NioEventLoop),然后会调用

    eventLoop.execute(new OneTimeTask() { ???????????????????????@Override ???????????????????????public void run() { ???????????????????????????register0(promise); ???????????????????????} ???????????????????});

    这个会开启NioEventLoop的事件循环线程,并放task到taskQueue里,作为异步执行register0方法。

    register0方法会调用外部类(NioServerSocketChannel)的doRegister方法,

    protected void doRegister() throws Exception { ???????boolean selected = false; ???????for (;;) { ???????????try { ???????????????selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this); ???????????????return; ???????????} catch (CancelledKeyException e) { ???????????????if (!selected) { ???????????????????// Force the Selector to select now as the "canceled" SelectionKey may still be ???????????????????// cached and not removed because no Select.select(..) operation was called yet. ???????????????????((NioEventLoop) eventLoop().unwrap()).selectNow(); ???????????????????selected = true; ???????????????} else { ???????????????????// We forced a select operation on the selector before but the SelectionKey is still cached ???????????????????// for whatever reason. JDK bug ? ???????????????????throw e; ???????????????} ???????????} ???????} ???}

    这里使用了NIO的API,把NioServerSocketChannel里的ServerSocketChannel注册到NioServerSocketChannel关联的NioEventLoop里的selector。
    接下来的safeSetSuccess会把Main线程设置的监听器,设置bind任务。

    1.3.4 执行bind操作

    private static void doBind0( ???????????final ChannelFuture regFuture, final Channel channel, ???????????final SocketAddress localAddress, final ChannelPromise promise) { ???????// This method is invoked before channelRegistered() is triggered. ?Give user handlers a chance to set up ???????// the pipeline in its channelRegistered() implementation. ???????channel.eventLoop().execute(new Runnable() { ???????????@Override ???????????public void run() { ???????????????if (regFuture.isSuccess()) { ???????????????????channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); ???????????????} else { ???????????????????promise.setFailure(regFuture.cause()); ???????????????} ???????????} ???????}); ???}

    注册成功的情况下,执行bind操作(NioServerSocketChannel的bind方法),一路追踪到

    unsafe.bind(localAddress, promise);

    unsafe的bind方法,内部调用NioServerSocketChannel的doBind方法

    protected void doBind(SocketAddress localAddress) throws Exception { ???????javaChannel().socket().bind(localAddress, config.getBacklog()); ???}

    并且设置pipeline.fireChannelActive()任务,fireChannelActive任务会调用channel.read()方法,内部会调用到unsafe.beginRead()方法,最终调用的是NioServerSocketChannel的doBeginRead方法,重新设置SelectionKey的感兴趣的事件readInterestOp(NioServerSocketChannel构造的时候确定的为SelectionKey.OP_ACCEPT),开始接收新连接。

    protected void doBeginRead() throws Exception { ???????// Channel.read() or ChannelHandlerContext.read() was called ???????if (inputShutdown) { ???????????return; ???????} ???????final SelectionKey selectionKey = this.selectionKey; ???????if (!selectionKey.isValid()) { ???????????return; ???????} ???????readPending = true; ???????final int interestOps = selectionKey.interestOps(); ???????if ((interestOps & readInterestOp) == 0) { ???????????selectionKey.interestOps(interestOps | readInterestOp); ???????} ???}

    1.4. NioEventLoop事件循环接受新连接
    NioEventLoop不停的通过ForkJoinPool执行它的asRunnable任务(通过每次执行任务将要完成时,重新把asRunnable设置到ForkJoinPool里)。
    从asRunnable的run方法开始,内部先执行selector的select操作,然后先调用processSelectedKeys()方法,获取到激活的selectedKeys数组,这里如果有新连接进来,那么就有一个SelectionKey,获取它的attachment(NioServerSocketChannel),然后调用processSelectedKey方法。

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { ???????final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ???????if (!k.isValid()) { ???????????// close the channel if the key is not valid anymore ???????????unsafe.close(unsafe.voidPromise()); ???????????return; ???????} ???????try { ???????????int readyOps = k.readyOps(); ???????????// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead ???????????// to a spin loop ???????????if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { ???????????????unsafe.read(); ???????????????if (!ch.isOpen()) { ???????????????????// Connection already closed - no need to handle write. ???????????????????return; ???????????????} ???????????} ???????????if ((readyOps & SelectionKey.OP_WRITE) != 0) { ???????????????// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ???????????????ch.unsafe().forceFlush(); ???????????} ???????????if ((readyOps & SelectionKey.OP_CONNECT) != 0) { ???????????????// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking ???????????????// See https://github.com/netty/netty/issues/924 ???????????????int ops = k.interestOps(); ???????????????ops &= ~SelectionKey.OP_CONNECT; ???????????????k.interestOps(ops); ???????????????unsafe.finishConnect(); ???????????} ???????} catch (CancelledKeyException ignored) { ???????????unsafe.close(unsafe.voidPromise()); ???????} ???}

    1.4.1 执行获取新连接方法

    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { ???????????????unsafe.read(); ???????????????if (!ch.isOpen()) { ???????????????????// Connection already closed - no need to handle write. ???????????????????return; ???????????????} ???????????}

    当readyOps等于SelectionKey.OP_ACCEPT调用unsafe.read(),这里调用到了AbstractNioMessageChannel的内部类NioMessageUnsafe的read方法。
    read方法会循环接受新连接,一次默认能接受16个连接,具体调用doReadMessages方法。

    protected int doReadMessages(List<Object> buf) throws Exception { ???????SocketChannel ch = javaChannel().accept(); ???????try { ???????????if (ch != null) { ???????????????buf.add(new NioSocketChannel(this, ch)); ???????????????return 1; ???????????} ???????} catch (Throwable t) { ???????????logger.warn("Failed to create a new channel from an accepted socket.", t); ???????????try { ???????????????ch.close(); ???????????} catch (Throwable t2) { ???????????????logger.warn("Failed to close a socket.", t2); ???????????} ???????} ???????return 0; ???}

    这里调用NIO的API,accept方法获取SocketChannel,并封装成NioSocketChannel(NioSocketChannel的构造字段和NioServerSocketChannel类似,只是NioSocketChannel默认的感兴趣事件为SelectionKey.OP_READ)。
    接受连接完成,循环调用pipeline.fireChannelRead()方法。

    1.4.2 ServerBootstrapAcceptor的channelRead方法
    上面的管道调用fireChannelRead方法,通过责任链方式依次调用ChannelHandler的channelRead方法,最重要的就是ServerBootstrapAcceptor的channelRead方法。
    它这个方法设置了childHandler到NioSocketChannel(新连接)的管道链里,然后又是异步注册NioSocketChannel到childGroup里的NioEventLoop里,注册过程和前面1.3章节的大体一致,也是启动了childGroup里的NioEventLoop的事件循环异步注册。只是因为是NioSocketChannel一些实现的方法不一样,执行的代码有点差别,最终注册完成也会调用pipeline的fireChannelActive()方法。

    1.4.3 fireChannelActive方法

    public ChannelPipeline fireChannelActive() { ???????head.fireChannelActive(); ???????if (channel.config().isAutoRead()) { ???????????channel.read(); ???????} ???????return this; ???}

    channel的read方法最终委派调用到unsafe.beginRead()方法,然后又是NioSocketChannel的doBeginRead方法,重新设置SelectionKey的感兴趣事件为SelectionKey.OP_READ(NioSocketChannel的默认值)。到这里连接已经建立,并且开启了客户端连接读事件的监听。

PS:上面的SimpleServerHandler代码如下:

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;/** * Created by yaojiafeng on 16/1/17. */public class SimpleServerHandler extends ChannelHandlerAdapter { ???@Override ???public void channelRead(ChannelHandlerContext ctx, Object msg) ???????????throws Exception { ???????ByteBuf body = (ByteBuf) msg; ???????byte[] bytes = new byte[body.readableBytes()]; ???????body.readBytes(bytes); ???????System.out.println(new String(bytes)); ???????ByteBuf resp = Unpooled.copiedBuffer(bytes); ???????ctx.writeAndFlush(resp); ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ???????ctx.close(); ???}}

Netty5源码解析

原文地址:https://www.cnblogs.com/yaojf/p/8127198.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved