package com.cn;import java.io.IOException;import java.nio.channels.Selector;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;import com.cn.pool.NioSelectorRunnablePool;/** * 抽象selector线程基类 */public abstract class AbstractNioSelector implements Runnable { ???/** ????* 线程池 ????*/ ???private final Executor executor; ???/** ????* 选择器 ????*/ ???protected Selector selector; ???/** ????* 选择器wakenUp状态标记 ????*/ ???protected final AtomicBoolean wakenUp = new AtomicBoolean(); ???/** ????* 线程安全任务队列 ????*/ ???private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); ???/** ????* 线程名称 ????*/ ???private String threadName; ???????/** ????* 线程池管理对象 ????*/ ???protected NioSelectorRunnablePool selectorRunnablePool; ???AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { ???????this.executor = executor; ???????this.threadName = threadName; ???????this.selectorRunnablePool = selectorRunnablePool; ???????openSelector(); ???} ???/** ????* 获取selector并启动线程,一个线程拥有了select才能为多个客服服务。 ????*/ ???private void openSelector() { ???????try { ???????????this.selector = Selector.open(); ???????} catch (IOException e) { ???????????throw new RuntimeException("Failed to create a selector."); ???????} ???????executor.execute(this);//像线程池中加入一个任务,并执行任务的run方法。运行当前任务,执行run方法。从线程池拿出一个线程执行这个任务。 ???} ???@Override ???public void run() { ???????????????Thread.currentThread().setName(this.threadName);//给当前线程付一个名字 ???????while (true) { ???????????try { ???????????????wakenUp.set(false); ???????????????select(selector);//接口,执行NioServerBoss或者NioServerWorker的select方法 ???????????????processTaskQueue();//执行完任务队列里面的任务 ???????????????process(selector);//接口,执行NioServerBoss或者NioServerWorker的process方法 ???????????} catch (Exception e) { ???????????????// ignore ???????????} ???????} ???} ???/** ????* 注册一个任务并激活selector ????* ?????* @param task ????*/ ???protected final void registerTask(Runnable task) { ???????taskQueue.add(task); ???????Selector selector = this.selector; ???????if (selector != null) { ???????????if (wakenUp.compareAndSet(false, true)) {//wakenUp是不是false,是false就置为true, ???????????????selector.wakeup(); ???????????} ???????} else { ???????????taskQueue.remove(task); ???????} ???} ???/** ????* 执行队列里的任务 ????*/ ???private void processTaskQueue() { ???????for (;;) { ???????????final Runnable task = taskQueue.poll(); ???????????if (task == null) { ???????????????break; ???????????} ???????????task.run();//task是runnable元素 ???????} ???} ???????/** ????* 获取线程管理对象 ????* @return ????*/ ???public NioSelectorRunnablePool getSelectorRunnablePool() { ???????return selectorRunnablePool; ???} ???/** ????* select抽象方法 ????????子类有重写 ????*/ ???protected abstract int select(Selector selector) throws IOException; ???/** ????* selector的业务处理 ??????子类有重写 ????*/ ???protected abstract void process(Selector selector) throws IOException;}
package com.cn;import java.io.IOException;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * boss实现类,每一个NioServerBoss再一个线程里面 */public class NioServerBoss extends AbstractNioSelector implements Boss{ ???public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { ???????super(executor, threadName, selectorRunnablePool); ???} ???@Override ???protected void process(Selector selector) throws IOException { ???????Set<SelectionKey> selectedKeys = selector.selectedKeys(); ???????if (selectedKeys.isEmpty()) { ???????????return; ???????} ???????????????for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { ???????????SelectionKey key = i.next(); ???????????i.remove(); ???????????ServerSocketChannel server = (ServerSocketChannel) key.channel(); ???????????// 新客户端 ???????????SocketChannel channel = server.accept(); ???????????// 设置为非阻塞 ???????????channel.configureBlocking(false); ???????????// 获取一个worker ???????????Worker nextworker = getSelectorRunnablePool().nextWorker();//通过线程管理对象获取一个worker(runnable任务对象), ???????????// 注册新客户端接入任务,将新的连接请求交给worker。 ???????????nextworker.registerNewChannelTask(channel);//往别的任务队列里面加任务 ???????????//安卓里面,子线程不能改变UI,要改变就要向主线程的任务队列里面加任务。 ???????????????????????System.out.println("新客户端链接"); ???????} ???} ???????????public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){ ????????final Selector selector = this.selector; ????????registerTask(new Runnable() { ???????????@Override ???????????public void run() { ???????????????try { ???????????????????//注册serverChannel到selector ???????????????????serverChannel.register(selector, SelectionKey.OP_ACCEPT); ???????????????} catch (ClosedChannelException e) { ???????????????????e.printStackTrace(); ???????????????} ???????????} ???????}); ???} ???????@Override ???protected int select(Selector selector) throws IOException { ???????return selector.select(); ???}}
package com.cn;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * worker实现类,每一个NioServerWorker再一个线程里面 */public class NioServerWorker extends AbstractNioSelector implements Worker{ ???public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { ???????super(executor, threadName, selectorRunnablePool); ???} ???@Override ???protected void process(Selector selector) throws IOException { ???????Set<SelectionKey> selectedKeys = selector.selectedKeys(); ???????if (selectedKeys.isEmpty()) { ???????????return; ???????} ???????Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); ???????while (ite.hasNext()) { ???????????SelectionKey key = (SelectionKey) ite.next(); ???????????// 移除,防止重复处理 ???????????ite.remove(); ???????????????????????// 得到事件发生的Socket通道 ???????????SocketChannel channel = (SocketChannel) key.channel(); ???????????????????????// 数据总长度 ???????????int ret = 0; ???????????boolean failure = true; ???????????ByteBuffer buffer = ByteBuffer.allocate(1024); ???????????//读取数据 ???????????try { ???????????????ret = channel.read(buffer); ???????????????failure = false; ???????????} catch (Exception e) { ???????????????// ignore ???????????} ???????????//判断是否连接已断开 ???????????if (ret <= 0 || failure) { ???????????????key.cancel(); ???????????????System.out.println("客户端断开连接"); ???????????}else{ ????????????????System.out.println("收到数据:" + new String(buffer.array())); ?????????????????????????????????//回写数据 ????????????????ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes()); ????????????????channel.write(outBuffer);// 将消息回送给客户端 ???????????} ???????} ???} ???/** ????* 加入一个新的socket客户端 ????*/ ???public void registerNewChannelTask(final SocketChannel channel){ ????????final Selector selector = this.selector; ????????registerTask(new Runnable() { ???????????@Override ???????????public void run() { ???????????????try { ???????????????????//将客户端注册到selector中 ???????????????????channel.register(selector, SelectionKey.OP_READ); ???????????????} catch (ClosedChannelException e) { ???????????????????e.printStackTrace(); ???????????????} ???????????} ???????}); ???} ???@Override ???protected int select(Selector selector) throws IOException { ???????return selector.select(60000); ???} ???}
package com.cn;import java.net.SocketAddress;import java.nio.channels.ServerSocketChannel;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;/** * 服务类 */public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool; ???????public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) { ???????this.selectorRunnablePool = selectorRunnablePool; ???} ???????/** ????* 监听端口 ????* @param localAddress ????*/ ???public void bind(final SocketAddress localAddress){ ???????try { ???????????// 获得一个ServerSocket通道 ???????????ServerSocketChannel serverChannel = ServerSocketChannel.open(); ???????????// 设置通道为非阻塞 ???????????serverChannel.configureBlocking(false); ???????????// 将该通道对应的ServerSocket绑定到port端口 ???????????serverChannel.socket().bind(localAddress); ???????????????????????//获取一个boss线程 ???????????Boss nextBoss = selectorRunnablePool.nextBoss(); ???????????//向boss注册一个ServerSocket通道 ???????????nextBoss.registerAcceptChannelTask(serverChannel); ???????} catch (Exception e) { ???????????e.printStackTrace(); ???????} ???}}
package com.cn;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import com.cn.pool.NioSelectorRunnablePool;/** * 启动函数 */public class Start { ???public static void main(String[] args) { ???????????????//管理线程池的,初始化2个线程池,一个boss一个work, ???????NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ???????????????//获取服务类 ???????ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool); ???????????????//绑定端口 ???????bootstrap.bind(new InetSocketAddress(10101)); ???????????????System.out.println("start"); ???}}
package com.cn.pool;import java.nio.channels.ServerSocketChannel;/** * boss接口 */public interface Boss { ???????/** ????* 加入一个新的ServerSocket,监听连接 ????*/ ???public void registerAcceptChannelTask(ServerSocketChannel serverChannel);}
package com.cn.pool;import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicInteger;import com.cn.NioServerBoss;import com.cn.NioServerWorker;/** * selector线程管理者 * ?* ??线程池是有多个线程,每个线程里面有一个任务队列,线程run的时候会从任务队列取一个任务出来,执行任务的run方法, ?????????队列里面没有任务就阻塞等待新的任务进来。 */public class NioSelectorRunnablePool { ???/** ????* boss任务数组,boss用来监听端口的, ????*/ ???private final AtomicInteger bossIndex = new AtomicInteger(); ???private Boss[] bosses; ???/** ????* worker任务数组,用来处理事件的, ????*/ ???private final AtomicInteger workerIndex = new AtomicInteger(); ???private Worker[] workeres; ???????//boss和worker是一个线程池 ???public NioSelectorRunnablePool(Executor boss, Executor worker) { ???????initBoss(boss, 1);//boss是一个线程池。 ???????initWorker(worker, Runtime.getRuntime().availableProcessors() * 2); ???} ???/** ????* 初始化boss线程池的runable任务数组 ????* @param boss ????* @param count ????*/ ???private void initBoss(Executor boss, int count) { ???????this.bosses = new NioServerBoss[count]; ???????//this.bosses是一个数组,里面是一个个的NioServerBoss, ???????//NioServerBoss是runnable任务对象。runnable对象里面有线程池、选择器、线程名、线程管理者。 ???????//executor.execute(this);通过NioServerBoss里面的线程池把任务对象NioServerBoss自己运行起来。 ???????//所有的NioServerBoss任务对象都是通过boss线程池来调度的。 ???????for (int i = 0; i < bosses.length; i++) { ???????????bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);//this是NioSelectorRunnablePool线程池管理者。 ???????????//boss thread 是任务runable的名字 ???????} ???} ???/** ????* 初始化worker线程池的runable任务数组 ????* @param worker ????* @param count ????*/ ???private void initWorker(Executor worker, int count) { ???????this.workeres = new NioServerWorker[2/*count*/]; ???????for (int i = 0; i < workeres.length; i++) { ???????????//所有的NioServerWorker任务对象都是通过worker线程池来调度的。 ???????????workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this); ???????} ???????//boss线程池里面有8个NioServerBoss.runable对象(8个大任务,开了8个线程), ???????//每一个NioServerWorker再一个线程里面。8个NioServerBoss.runable对象一开始就去run, ???????//每个NioServerBoss.runable对象里面有一个任务队列taskQueue,队列里面是一个个的Runnable对象。 ???????????????/* ???????public static void main(String[] args) { ???????//创建一个线程池,可回收的,没任务就回收了。newCachedThreadPool可以很大。60秒没任务就回收。 ???????ExecutorService pool = Executors.newCachedThreadPool();//线程池 ???????for(int i = 1; i < 5; i++){//4个任务,一个任务就是一个Runnable ???????????pool.execute(new Runnable() {//没有返回值 ???????????????@Override ???????????????public void run() { ???????????????????try { ???????????????????????Thread.sleep(5); ???????????????????} catch (InterruptedException e) { ???????????????????????e.printStackTrace(); ???????????????????} ???????????????????System.out.println("thread name: " + Thread.currentThread().getName()); ???????????????????????????????????} ???????????}); ???????????try { ???????????????Thread.sleep(5); ???????????} catch (InterruptedException e) { ???????????????e.printStackTrace(); ???????????} ???????} ???????pool.shutdown();//任务执行完就关了。 ???????/*thread name: pool-1-thread-1 ???????thread name: pool-1-thread-2 ???????thread name: pool-1-thread-1 ???????thread name: pool-1-thread-2 ??线程执行完了会回收,不一定开4个线程*/ ???} ???/** ????* 获取一个worker的runable任务,给每个work平均分配 ????*/ ???public Worker nextWorker() { ????????return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)]; ???} ???/** ????* 获取一个boss的runable任务 ????*/ ???public Boss nextBoss() { ????????return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; ???}}
package com.cn.pool;import java.nio.channels.SocketChannel;/** * worker接口 */public interface Worker { ???????/** ????* 加入一个新的客户端会话,监听客户端的处理 ????*/ ???public void registerNewChannelTask(SocketChannel channel);}
netty12---简单源码
原文地址:https://www.cnblogs.com/yaowen/p/9083763.html