分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 技术分享

netty源码分析

发布时间:2023-09-06 01:29责任编辑:熊小新关键词:暂无标签

  1、Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

  2、目前netty有3个版本netty3、netty4、netty5。3个版本的内容有所不同。neety3是核心的代码介绍。相对于netty4、和netty5的复杂性来说。netty3的源码是值得学习的。我这里解析了netty3的一些源码,仅供大家理解,也是为了方便大家理解做了很多简化。不代表作者的开发思路。

  3、我们先来看一张图(这张图是我在学习源码的时候扣的,哈哈)

  一、传统NIO流

  

  1)一个线程里面,存在一个selector,当然这个selector也承担起看大门和服务客人的工作。

  2)这里不管多少客户端进来,都是这个selector来处理。这样就就加大了这个服务员的工作量

  3)为了加入线程池,让多个selector同时工作,当时目的性都是一样的。

  4)虽然看大门的和服务客人的都是服务员,但是还是存在差别的。为了更好的处理多个线程的问题。所以这里netty就诞生了。

 二、netty框架

  

  理解:

  1)netty3的框架也是基于nio流做出来的。所以这里会详细介绍netty3框架的思路

  2)将看门的服务员和服务客人的服务员分开。形成两块(也就是2个线程池,也就是后面的boss和worker)

  3)当一个客人来的时候,首先boss,进行接待。然后boss分配工作给worker,这个,在两个线程池的工作下,有条不乱。

  4)原理:就是将看大门的selector和服务客人的selector分开。然后通过boss线程池,下发任务给对应的worker

  4、netty3源码分析

  1)加入对应的jar包。我这里为了了解源码用的是netty3的包。

    <dependency> ???????????<groupId>io.netty</groupId> ???????????<artifactId>netty</artifactId> ???????????<version>3.10.6.Final</version> ???????</dependency>

  2)目录结构

  

  说明:

  a、NettyBoss、NettyWork是针对于selector做区分。虽然他们很多共性,我这里为了好理解,并没有做抽象类(忽略开发思路)。

  b、ThreadHandle是用来初始化线程池和对应的接口。

  c、Start为启动类

  3)NettyBoss(看大门的服务员,第一种线程selector)

package com.troy.application.netty;import java.io.IOException;import java.nio.channels.*;import java.util.Iterator;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;public class NettyBoss { ???//线程池 ???public final Executor executor; ???//boss选择器 ???protected Selector selector; ???//原子变量,主要是用来保护线程安全。当本线程执行的时候,排除其他线程的执行 ???protected final AtomicBoolean wakenUp = new AtomicBoolean(); ???//队列,线程安全队列。 ???public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>(); ???//线程处理,这里主要是拿到work的线程池 ???protected ThreadHandle threadHandle; ???//初始化 ???public NettyBoss(Executor executor,ThreadHandle threadHandle) { ???????//赋值 ???????this.executor = executor; ???????this.threadHandle = threadHandle; ???????try { ???????????//每一个线程选择器 ???????????this.selector = Selector.open(); ???????} catch (IOException e) { ???????????e.printStackTrace(); ???????} ???????//从线程中获取一个线程执行以下内容 ???????executor.execute(() -> { ???????????while (true) { ???????????????try { ???????????????????//这里的目前就是排除其他线程同事执行,false因为这里处于阻塞状态,不用开启 ???????????????????wakenUp.set(false); ???????????????????//选择器阻塞 ???????????????????selector.select(); ???????????????????//运行队列中的任务 ???????????????????while (true) { ???????????????????????final Runnable task = taskQueue.poll(); ???????????????????????if (task == null) { ???????????????????????????break; ???????????????????????} ???????????????????????//如果任务存在开始运行 ???????????????????????task.run(); ???????????????????} ???????????????????//对进来的进行处理 ???????????????????this.process(selector); ???????????????} catch (Exception e) { ???????????????????e.printStackTrace(); ???????????????} ???????????} ???????}); ???} ???public void process(Selector selector) throws IOException { ???????Set<SelectionKey> selectedKeys = selector.selectedKeys(); ???????if (selectedKeys.isEmpty()) { ???????????return; ???????} ???????for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { ???????????SelectionKey key = i.next(); ???????????i.remove(); ???????????ServerSocketChannel server = (ServerSocketChannel) key.channel(); ???????????// 新客户端 ???????????SocketChannel channel = server.accept(); ???????????// 设置为非阻塞 ???????????channel.configureBlocking(false); ???????????// 获取一个worker ???????????NettyWork nextworker = threadHandle.workeres[Math.abs(threadHandle.workerIndex.getAndIncrement() % threadHandle.workeres.length)]; ???????????// 注册新客户端接入任务 ???????????Runnable runnable = () -> { ???????????????try { ???????????????????//将客户端注册到selector中 ???????????????????channel.register(nextworker.selector, SelectionKey.OP_READ); ???????????????} catch (ClosedChannelException e) { ???????????????????e.printStackTrace(); ???????????????} ???????????}; ???????????//添加到work的队列中 ???????????nextworker.taskQueue.add(runnable); ???????????if (nextworker.selector != null) { ???????????????//这里的目前就是开启执行过程 ???????????????if (nextworker.wakenUp.compareAndSet(false, true)) { ???????????????????//放开本次阻塞,进行下一步执行 ???????????????????nextworker.selector.wakeup(); ???????????????} ???????????} else { ???????????????//任务完成移除线程 ???????????????taskQueue.remove(runnable); ???????????} ???????????System.out.println("新客户端链接"); ???????} ???}}

  解释:

  a、初始化的时候,赋值线程池,和线程处理类(线程处理类目的是获取worker的工作线程)

  b、executor为线程池的执行过程。

  c、selector.select()为形成阻塞,wakenUp为了线程安全考核。在接入客户端的时候用selector.wakeup()来放开本次阻塞(很重要)。

  d、然后在worker安全队列中执行对应工作。(taskQueue的目前在boss和worker中的作用都是为了考虑线程安全,这里采用线程安全队列的目的是为了不直接操作其他线程)

  e、wakenUp.compareAndSet(false, true),这里是考虑并发问题。在本线程运行的时候,其他线程处于等待状态。这里也是为了线程安全考虑。

  4)NettyWork(服务客人的服务员,第二种selector)

package com.troy.application.netty;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Queue;import java.util.Set;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;public class NettyWork { ???//线程池 ???public final Executor executor; ???//boss选择器 ???protected Selector selector; ???//原子变量,主要是用来保护线程安全。当本线程执行的时候,排除其他线程的执行 ???protected final AtomicBoolean wakenUp = new AtomicBoolean(); ???//队列,线程安全队列。 ???public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>(); ???//初始化 ???public NettyWork(Executor executor) { ???????this.executor = executor; ???????try { ???????????//每一个work也需要一个选择器用来管理通道 ???????????this.selector = Selector.open(); ???????} catch (IOException e) { ???????????e.printStackTrace(); ???????} ???????//从线程池中获取一个线程开始执行 ???????executor.execute(() -> { ???????????while (true) { ???????????????try { ???????????????????//阻塞状态排除问题 ???????????????????wakenUp.set(false); ???????????????????//阻塞 ???????????????????selector.select(); ???????????????????//处理work任务 ???????????????????while (true) { ???????????????????????final Runnable task = taskQueue.poll(); ???????????????????????if (task == null) { ???????????????????????????break; ???????????????????????} ???????????????????????//存在work任务开始执行 ???????????????????????task.run(); ???????????????????} ???????????????????//处理任务 ???????????????????this.process(selector); ???????????????} catch (Exception e) { ???????????????????e.printStackTrace(); ???????????????} ???????????} ???????}); ???} ???public void process(Selector selector) throws IOException { ???????Set<SelectionKey> selectedKeys = selector.selectedKeys(); ???????if (selectedKeys.isEmpty()) { ???????????return; ???????} ???????Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); ???????while (ite.hasNext()) { ???????????SelectionKey key = (SelectionKey) ite.next(); ???????????// 移除,防止重复处理 ???????????ite.remove(); ???????????// 得到事件发生的Socket通道 ???????????SocketChannel channel = (SocketChannel) key.channel(); ???????????// 数据总长度 ???????????int ret = 0; ???????????boolean failure = true; ???????????ByteBuffer buffer = ByteBuffer.allocate(1024); ???????????//读取数据 ???????????try { ???????????????ret = channel.read(buffer); ???????????????failure = false; ???????????} catch (Exception e) { ???????????????// ignore ???????????} ???????????//判断是否连接已断开 ???????????if (ret <= 0 || failure) { ???????????????key.cancel(); ???????????????System.out.println("客户端断开连接"); ???????????}else{ ???????????????System.out.println("收到数据:" + new String(buffer.array())); ???????????????//回写数据 ???????????????ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes()); ???????????????channel.write(outBuffer);// 将消息回送给客户端 ???????????} ???????} ???}}

  解释:

  a、worker的执行方式基本上面和boss的方式是一样的,只不够是处理方式不一样

  b、这里需要注意的是,都是考虑线程队列执行。

  3)ThreadHandle(线程处理,这里主要是启动需要的东西)

package com.troy.application.netty;import java.net.InetSocketAddress;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.ServerSocketChannel;import java.util.concurrent.ExecutorService;import java.util.concurrent.atomic.AtomicInteger;public class ThreadHandle { ???public final AtomicInteger bossIndex = new AtomicInteger(); ???public static NettyBoss[] bosses; ???public final AtomicInteger workerIndex = new AtomicInteger(); ???public static NettyWork[] workeres; ???public ThreadHandle(ExecutorService boss,ExecutorService work) { ???????this.bosses = new NettyBoss[1]; ???????//初始化boss线程池 ???????for (int i = 0; i < bosses.length; i++) { ???????????bosses[i] = new NettyBoss(boss,this); ???????} ???????this.workeres = new NettyWork[Runtime.getRuntime().availableProcessors() * 2]; ???????//初始化work线程池 ???????for (int i = 0; i < workeres.length; i++) { ???????????workeres[i] = new NettyWork(work); ???????} ???} ???public void bind(InetSocketAddress inetSocketAddress) { ???????try { ???????????// 获得一个ServerSocket通道 ???????????ServerSocketChannel serverChannel = ServerSocketChannel.open(); ???????????// 设置通道为非阻塞 ???????????serverChannel.configureBlocking(false); ???????????// 将该通道对应的ServerSocket绑定到port端口 ???????????serverChannel.socket().bind(inetSocketAddress); ???????????//获取一个boss线程 ???????????NettyBoss nextBoss = bosses[Math.abs(bossIndex.getAndIncrement() % workeres.length)]; ???????????//向boss注册一个ServerSocket通道 ???????????Runnable runnable = () -> { ???????????????try { ???????????????????//注册serverChannel到selector ???????????????????serverChannel.register(nextBoss.selector, SelectionKey.OP_ACCEPT); ???????????????} catch (ClosedChannelException e) { ???????????????????e.printStackTrace(); ???????????????} ???????????}; ???????????//加入任务队列 ???????????nextBoss.taskQueue.add(runnable); ???????????if (nextBoss.selector != null) { ???????????????//排除其他任务处理 ???????????????if (nextBoss.wakenUp.compareAndSet(false, true)) { ???????????????????//放开阻塞 ???????????????????nextBoss.selector.wakeup(); ???????????????} ???????????} else { ???????????????//移除任务 ???????????????nextBoss.taskQueue.remove(runnable); ???????????} ???????} catch (Exception e) { ???????????e.printStackTrace(); ???????} ???}}

  解释:

  a、这里采用数组的形式,主要目的是考虑多个看门的,和多个服务客人的线程。为了好控制,好选择,哪一个来执行。

  b、端口的注册,在NettyBoss里面进行初始化的的原理都是一样的。

  4)start

package com.troy.application.netty;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Start { ???public static void main(String[] args) { ???????//声明线程池 ???????ExecutorService boss = Executors.newCachedThreadPool(); ???????ExecutorService work = Executors.newCachedThreadPool(); ???????//初始化线程池 ???????ThreadHandle threadHandle = new ThreadHandle(boss,work); ???????//声明端口 ???????threadHandle.bind(new InetSocketAddress(9000)); ???????System.out.println("start"); ???}}

  说明一下流程

  a、初始化boss和work。让boss线程池加入设定第一种boss的selector,并且处于阻塞状态。work的初始化也基本上是一样的,只不过换成了第二种selector线程池,处于阻塞状态。

  b、当线程处理类初始化监听端口的时候。就是选择boss中其中一个selector。声明一个线程先监听,加入boss的线程安全队列中。然后放开boss阻塞,向下执行。线程执行会监听对应端口并阻塞。

  c、当一个客户端接入的时候,boss中的selector会监听到对应端口。然后选择work线程中的一个selector给work分派任务。

  d、最后work中的selector来处理事务。

  4、源码下载:https://pan.baidu.com/s/1pKIxuMf

  5、本代码只是用于理解netty的实现过程,不代表开发思路。其中我为了简化代码,做了很多调整。目的就是压缩代码,方便理解。

netty源码分析

原文地址:http://www.cnblogs.com/ll409546297/p/8004409.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved