分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 网页技术

Netty源码分析第2章(NioEventLoop)---->第8节: 执行任务队列

发布时间:2023-09-06 02:28责任编辑:苏小强关键词:暂无标签

 

Netty源码分析第二章: NioEventLoop

 

第八节: 执行任务队列

继续回到NioEventLoop的run()方法:

protected void run() { ???for (;;) { ???????try { ???????????switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { ???????????????case SelectStrategy.CONTINUE: ???????????????????continue; ???????????????case SelectStrategy.SELECT: ???????????????????//轮询io事件(1) ???????????????????select(wakenUp.getAndSet(false)); ???????????????????if (wakenUp.get()) { ???????????????????????selector.wakeup(); ???????????????????} ???????????????default: ???????????} ???????????cancelledKeys = 0; ???????????needsToSelectAgain = false; ???????????//默认是50 ???????????final int ioRatio = this.ioRatio; ????????????if (ioRatio == 100) { ???????????????try { ???????????????????processSelectedKeys(); ???????????????} finally { ???????????????????runAllTasks(); ???????????????} ???????????} else { ???????????????//记录下开始时间 ???????????????final long ioStartTime = System.nanoTime(); ???????????????try { ???????????????????//处理轮询到的key(2) ???????????????????processSelectedKeys(); ???????????????} finally { ???????????????????//计算耗时 ???????????????????final long ioTime = System.nanoTime() - ioStartTime; ???????????????????//执行task(3) ???????????????????runAllTasks(ioTime * (100 - ioRatio) / ioRatio); ???????????????} ???????????} ???????} catch (Throwable t) { ???????????handleLoopException(t); ???????} ???????//代码省略 ???}}

我们看到处理完轮询到的key之后, 首先记录下耗时, 然后通过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务

我们知道ioRatio默认是50, 所以执行完ioTime * (100 - ioRatio) / ioRatio后, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:

跟进方法:

protected boolean runAllTasks(long timeoutNanos) { ???//定时任务队列中聚合任务 ???fetchFromScheduledTaskQueue(); ???//从普通taskQ里面拿一个任务 ???Runnable task = pollTask(); ???//task为空, 则直接返回 ???if (task == null) { ???????//跑完所有的任务执行收尾的操作 ???????afterRunningAllTasks(); ???????return false; ???} ???//如果队列不为空 ???//首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间) ???final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; ???long runTasks = 0; ???long lastExecutionTime; ???//执行每一个任务 ???for (;;) { ???????safeExecute(task); ???????//标记当前跑完的任务 ???????runTasks ++; ???????//当跑完64个任务的时候, 会计算一下当前时间 ???????if ((runTasks & 0x3F) == 0) { ???????????//定时任务初始化到当前的时间 ???????????lastExecutionTime = ScheduledFutureTask.nanoTime(); ???????????//如果超过截止时间则不执行(nanoTime()是耗时的) ???????????if (lastExecutionTime >= deadline) { ???????????????break; ???????????} ???????} ???????//如果没有超过这个时间, 则继续从普通任务队列拿任务 ???????task = pollTask(); ???????//直到没有任务执行 ???????if (task == null) { ???????????//记录下最后执行时间 ???????????lastExecutionTime = ScheduledFutureTask.nanoTime(); ???????????break; ???????} ???} ???//收尾工作 ???afterRunningAllTasks(); ???this.lastExecutionTime = lastExecutionTime; ???return true;}

首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到可以执行的任务添加到taskQueue中, 我们跟进去:

private boolean fetchFromScheduledTaskQueue() { ???long nanoTime = AbstractScheduledEventExecutor.nanoTime(); ???//从定时任务队列中抓取第一个定时任务 ???//寻找截止时间为nanoTime的任务 ???Runnable scheduledTask ?= pollScheduledTask(nanoTime); ???//如果该定时任务队列不为空, 则塞到普通任务队列里面 ???while (scheduledTask != null) { ???????//如果添加到普通任务队列过程中失败 ???????if (!taskQueue.offer(scheduledTask)) { ???????????//则重新添加到定时任务队列中 ???????????scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); ???????????return false; ???????} ???????//继续从定时任务队列中拉取任务 ???????//方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中 ???????scheduledTask = pollScheduledTask(nanoTime); ???} ???return true;}

long nanoTime = AbstractScheduledEventExecutor.nanoTime()代表从定时任务初始化到现在过去了多长时间

Runnable scheduledTask= pollScheduledTask(nanoTime)代表从定时任务队列中拿到小于nanoTime时间的任务, 因为小于初始化到现在的时间, 说明该任务需要执行了

跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) { ???assert inEventLoop(); ???//拿到定时任务队列 ???Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ???//peek()方法拿到第一个任务 ???ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); ???if (scheduledTask == null) { ???????return null; ???} ???if (scheduledTask.deadlineNanos() <= nanoTime) { ???????//从队列中删除 ???????scheduledTaskQueue.remove(); ???????//返回该 ???????return scheduledTask; ???} ???return null;}

我们看到首先获得当前类绑定的定时任务队列的成员变量

如果不为空, 则通过scheduledTaskQueue.peek()弹出第一个任务

如果当前任务小于传来的时间, 说明该任务需要执行, 则从定时任务队列中删除

我们继续回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() { ???long nanoTime = AbstractScheduledEventExecutor.nanoTime(); ???//从定时任务队列中抓取第一个定时任务 ???//寻找截止时间为nanoTime的任务 ???Runnable scheduledTask ?= pollScheduledTask(nanoTime); ???//如果该定时任务队列不为空, 则塞到普通任务队列里面 ???while (scheduledTask != null) { ???????//如果添加到普通任务队列过程中失败 ???????if (!taskQueue.offer(scheduledTask)) { ???????????//则重新添加到定时任务队列中 ???????????scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); ???????????return false; ???????} ???????//继续从定时任务队列中拉取任务 ???????//方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中 ???????scheduledTask = pollScheduledTask(nanoTime); ???} ???return true;}

弹出需要执行的定时任务之后, 我们通过taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失败, 则通过scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)重新添加到定时任务队列中

如果添加成功, 则通过pollScheduledTask(nanoTime)方法继续添加, 直到没有需要执行的任务

这样就将定时任务队列需要执行的任务添加到了taskQueue中

回到runAllTasks(long timeoutNanos)方法中:

首先通过Runnable task = pollTask()从taskQueue中拿一个任务

任务不为空, 则通过final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos计算一个截止时间, 任务的执行时间不能超过这个时间

然后在for循环中通过safeExecute(task)执行task, 我们跟到safeExecute(task)中:

protected static void safeExecute(Runnable task) { ???try { ???????//直接调用run()方法执行 ???????task.run(); ???} catch (Throwable t) { ???????//发生异常不终止 ???????logger.warn("A task raised an exception. Task: {}", task, t); ???}}

这里直接调用task的run()方法进行执行, 其中发生异常, 只打印一条日志, 代表发生异常不终止, 继续往下执行

回到runAllTasks(long timeoutNanos)方法:

每次执行完task, runTasks自增

这里if ((runTasks & 0x3F) == 0)代表是否执行了64个任务, 如果执行了64个任务, 则会通过lastExecutionTime = ScheduledFutureTask.nanoTime()记录定时任务初始化到现在的时间, 如果这个时间超过了截止时间, 则退出循环

如果没有超过截止时间, 则通过task = pollTask()继续弹出任务执行

这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要原因是因为获取系统时间是个比较耗时的操作, 这里是netty的一种优化方式

如果没有task需要执行, 则通过afterRunningAllTasks()做收尾工作, 最后记录下最后的执行时间

以上就是有关执行任务队列的相关逻辑

本章总结

    本章学习了有关NioEventLoopGroup的创建, NioEventLoop的创建和启动, 以及多路复用器的轮询处理和task执行的相关逻辑, 通过本章学习, 我们应该掌握如下内容:

    1.NioEventLoopGroup如何选择分配NioEventLoop

    2.NioEventLoop如何开启

    3.NioEventLoop如何进行select操作

    4.NioEventLoop如何执行task

 

Netty源码分析第2章(NioEventLoop)---->第8节: 执行任务队列

原文地址:https://www.cnblogs.com/xiangnan6122/p/10203169.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved