分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > IT知识

netty-read

发布时间:2023-09-06 01:06责任编辑:顾先生关键词:暂无标签

NioEventLoop是ServerSocketChannel和SocketChannel通用的EventLoop,从NioEventLoop的执行逻辑开始

 1 protected void run() { 2 ??for (;;) { 3 ????switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { 4 ??????case SelectStrategy.CONTINUE: 5 ????????continue; 6 ??????case SelectStrategy.SELECT: 7 ????????select(wakenUp.getAndSet(false)); 8 ????????if (wakenUp.get()) { 9 ??????????selector.wakeup();10 ????????}11 ??????default:12 ????????// fallthrough13 ????}14 15 ????cancelledKeys = 0;16 ????needsToSelectAgain = false;17 ????final int ioRatio = this.ioRatio;18 ????if (ioRatio == 100) {19 ??????try {20 ????????//处理IO事件21 ????????processSelectedKeys();22 ??????} finally {23 ????????// Ensure we always run tasks.24 ????????//处理队列中任务25 ????????runAllTasks();26 ??????}27 ????} else {28 ??????final long ioStartTime = System.nanoTime();29 ??????try {30 ????????processSelectedKeys();31 ??????} finally {32 ????????// Ensure we always run tasks.33 ????????final long ioTime = System.nanoTime() - ioStartTime;34 ????????runAllTasks(ioTime * (100 - ioRatio) / ioRatio);35 ??????}36 ????}37 ??}38 }
 1 //eventLoop持有selector,可以得到selectedKeys 2 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { 3 ?4 ??if (selectedKeys.isEmpty()) { 5 ????return; 6 ??} 7 ?8 ??Iterator<SelectionKey> i = selectedKeys.iterator(); 9 ??for (;;) {10 ????final SelectionKey k = i.next();11 ????//java.nio.channels.ServerSocketChannel的attachment是io.netty.channel.socket.ServerSocketChannel,12 ????//java.nio.channels.SocketChannel的attachment是io.netty.channel.socket.SocketChannel,13 ????final Object a = k.attachment();14 ????i.remove();15 16 ????if (a instanceof AbstractNioChannel) {17 ??????processSelectedKey(k, (AbstractNioChannel) a);18 ????} else {19 ??????@SuppressWarnings("unchecked")20 ??????NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;21 ??????processSelectedKey(k, task);22 ????}23 24 ????if (!i.hasNext()) {25 ??????break;26 ????}27 28 ????if (needsToSelectAgain) {29 ??????selectAgain();30 ??????selectedKeys = selector.selectedKeys();31 32 ??????// Create the iterator again to avoid ConcurrentModificationException33 ??????if (selectedKeys.isEmpty()) {34 ????????break;35 ??????} else {36 ????????i = selectedKeys.iterator();37 ??????}38 ????}39 ??}40 }
 1 //涵盖了SelectionKey.OP_CONNECT、SelectionKey.OP_WRITE、SelectionKey.OP_READ、SelectionKey.OP_ACCEPT四种事件 2 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { 3 ??final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); 4 ?5 ??int readyOps = k.readyOps(); 6 ?7 ??if ((readyOps & SelectionKey.OP_CONNECT) != 0) { 8 ????int ops = k.interestOps(); 9 ????ops &= ~SelectionKey.OP_CONNECT;10 ????k.interestOps(ops);11 12 ????unsafe.finishConnect();13 ??}14 15 ??if ((readyOps & SelectionKey.OP_WRITE) != 0) {16 ????ch.unsafe().forceFlush();17 ??}18 ??if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {19 ??????//重点关注 ???20 ????unsafe.read();21 ??}22 }

下面分别对NioMessageUnsafe以及NioByteUnsafe的read操作进行分析

NioMessageUnsafe用于ServerSocketChannel,读取的是SocketChannel对象

 1 private final class NioMessageUnsafe extends AbstractNioUnsafe { 2 ????//...... 3 ?????4 ????public void read() { 5 ??????assert eventLoop().inEventLoop(); 6 ??????final ChannelConfig config = config(); 7 ??????final ChannelPipeline pipeline = pipeline(); 8 ??????final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); 9 ??????allocHandle.reset(config);10 11 ??????boolean closed = false;12 ??????Throwable exception = null;13 14 ??????try {15 ????????do {16 ????????????//真正的读取操作17 ??????????int localRead = doReadMessages(readBuf);18 ??????????//读完了19 ??????????if (localRead == 0) {20 ????????????break;21 ??????????}22 ??????????//localRead<0说明对端关闭了23 ??????????if (localRead < 0) {24 ????????????closed = true;25 ????????????break;26 ??????????}27 28 ??????????allocHandle.incMessagesRead(localRead);29 ????????} while (allocHandle.continueReading());30 ??????} catch (Throwable t) {31 ????????exception = t;32 ??????}33 34 ??????int size = readBuf.size();35 ??????for (int i = 0; i < size; i ++) {36 ????????readPending = false;37 ????????//发送给Handler处理,即回调Handler的channelRead。38 ????????//对于serverSocketChannel来说,readBuf中装的是SocketChannel,39 ????????//fireChannelRead就是发送给ServerBootstrapAcceptor,由ServerBootstrapAcceptor注册SocketChannel40 ????????pipeline.fireChannelRead(readBuf.get(i));41 ??????}42 ??????readBuf.clear();43 ??????allocHandle.readComplete();44 ??????//回调Handler的channelReadComplete45 ??????pipeline.fireChannelReadComplete();46 47 ??????if (exception != null) {48 ????????closed = closeOnReadError(exception);49 ????????????//回调Handler的ExceptionCaught50 ????????pipeline.fireExceptionCaught(exception);51 ??????}52 53 ??????if (closed) {54 ????????inputShutdown = true;55 ????????if (isOpen()) {56 ????????????close(voidPromise());57 ????????}58 ??????}59 ????}60 ????61 ????//......62 }
 1 protected int doReadMessages(List<Object> buf) throws Exception { 2 ????//调用java.nio.channels.ServerSocketChannel.accept() 3 ??SocketChannel ch = SocketUtils.accept(javaChannel()); 4 ?5 ??try { 6 ????if (ch != null) { 7 ??????buf.add(new NioSocketChannel(this, ch)); 8 ??????return 1; 9 ????}10 ??} catch (Throwable t) {11 ????try {12 ??????ch.close();13 ????} catch (Throwable t2) {14 ??????logger.warn("Failed to close a socket.", t2);15 ????}16 ??}17 ??return 0;18 }

NioByteUnsafe用于SocketChannel,读取的是字节序列

 1 protected class NioByteUnsafe extends AbstractNioUnsafe { 2 ????//...... 3 ????public final void read() { 4 ??????final ChannelConfig config = config(); 5 ??????final ChannelPipeline pipeline = pipeline(); 6 ??????final ByteBufAllocator allocator = config.getAllocator(); 7 ??????final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); 8 ??????allocHandle.reset(config); 9 10 ??????ByteBuf byteBuf = null;11 ??????boolean close = false;12 ??????try {13 ????????do {14 ????????????//每次读取都重新分配Buffer15 ??????????byteBuf = allocHandle.allocate(allocator);16 ??????????//真正的读取操作17 ??????????allocHandle.lastBytesRead(doReadBytes(byteBuf));18 ??????????//读到0表示数据读完,-1表示对端已关闭19 ??????????if (allocHandle.lastBytesRead() <= 0) {20 ????????????// nothing was read. release the buffer.21 ????????????byteBuf.release();22 ????????????byteBuf = null;23 ????????????//读到-1,表示对端已关闭24 ????????????close = allocHandle.lastBytesRead() < 0;25 ????????????break;26 ??????????}27 28 ??????????allocHandle.incMessagesRead(1);29 ??????????readPending = false;30 ??????????//将读取到的数据发送给Handler,即回调Handler的ChannelRead31 ??????????pipeline.fireChannelRead(byteBuf);32 ??????????byteBuf = null;33 ????????} while (allocHandle.continueReading());34 ????????????35 ????????allocHandle.readComplete();36 ????????//回调Handler的ChannelReadComplete37 ????????pipeline.fireChannelReadComplete();38 39 ????????if (close) {40 ??????????closeOnRead(pipeline);41 ????????}42 ??????} catch (Throwable t) {43 ????????handleReadException(pipeline, byteBuf, t, close, allocHandle);//会调用pipeline.fireChannelReadComplete和pipeline.fireExceptionCaught44 ??????}45 ????}46 ????//......47 }
1 protected int doReadBytes(ByteBuf byteBuf) throws Exception {2 ??final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();3 ??allocHandle.attemptedBytesRead(byteBuf.writableBytes());4 ??//从java.nio.channels.SocketChannel中读取allocHandle.attemptedBytesRead()个字节置byteBuf中5 ??return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());6 }

netty-read

原文地址:http://www.cnblogs.com/holoyong/p/7436612.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved