分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 教程案例

thinkphp5 消息队列thinkphp-queue扩展

发布时间:2023-09-06 02:06责任编辑:熊小新关键词:thinkphp消息队列

1.简介

thinkphp-queue是thinkphp的一个第三方扩展, 内置了 Redis,Database,Topthink ,Sync这四种驱动,推荐使用redis

2. 下载 和安装

composer require topthink/think-queue

配置目录在: application/extra/queue.php

return [ ???‘connector‘ ?=> ‘Redis‘, ???????????// Redis 驱动 ???‘expire‘ ????=> 60, ???????????????// 任务的过期时间,默认为60秒; 若要禁用,则设置为 null ???‘default‘ ???=> ‘default‘, ???????// 默认的队列名称 ???‘host‘ ??????=> ‘127.0.0.1‘, ???????// redis 主机ip ???‘port‘ ??????=> 6379, ???????????// redis 端口 ???‘password‘ ??=> ‘‘, ???????????????// redis 密码 ???‘select‘ ????=> 0, ???????????????// 使用哪一个 db,默认为 db0 ???‘timeout‘ ???=> 0, ???????????????// redis连接的超时时间 ???‘persistent‘ => false, ???????????// 是否是长连接 ???// ???‘connector‘ => ‘Database‘, ??// 数据库驱动 ???// ???‘expire‘ ???=> 60, ??????????// 任务的过期时间,默认为60秒; 若要禁用,则设置为 null ???// ???‘default‘ ??=> ‘default‘, ???// 默认的队列名称 ???// ???‘table‘ ????=> ‘jobs‘, ??????// 存储消息的表名,不带前缀 ???// ???‘dsn‘ ??????=> [], ???// ???‘connector‘ ??=> ‘Topthink‘, ???// ThinkPHP内部的队列通知服务平台 ,本文不作介绍 ???// ???‘token‘ ??????=> ‘‘, ???// ???‘project_id‘ ?=> ‘‘, ???// ???‘protocol‘ ???=> ‘https‘, ???// ???‘host‘ ???????=> ‘qns.topthink.com‘, ???// ???‘port‘ ???????=> 443, ???// ???‘api_version‘ => 1, ???// ???‘max_retries‘ => 3, ???// ???‘default‘ ????=> ‘default‘,// ???????‘connector‘ ??=> ‘Sync‘, ???????// Sync 驱动,该驱动的实际作用是取消消息队列,还原为同步执行];

3.入队,创建队列的代码

/*** 文件路径: \application\index\controller\JobTest.php* 该控制器的业务代码中借助了thinkphp-queue 库,将一个消息推送到消息队列*/namespace application\index\controller; ?use think\Exception; ?use think\Queue; ?class JobTest { ?/** ??* 一个使用了队列的 action ??*/ ?public function actionWithHelloJob(){ ???????????// 1.当前任务将由哪个类来负责处理。 ??????// ??当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法 ?????$jobHandlerClassName ?= ‘application\index\job\Hello‘; ??????// 2.当前任务归属的队列名称,如果为新队列,会自动创建 ?????$jobQueueName ???????= "helloJobQueue"; ??????// 3.当前任务所需的业务数据 . 不能为 resource 类型,其他类型最终将转化为json形式的字符串 ?????// ??( jobData 为对象时,需要在先在此处手动序列化,否则只存储其public属性的键值对) ?????$jobData ????????????= [ ‘ts‘ => time(), ‘bizId‘ => uniqid() , ‘a‘ => 1 ] ; ?????// 4.将该任务推送到消息队列,等待对应的消费者去执行 ?????$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); ?????????// database 驱动时,返回值为 1|false ?; ??redis 驱动时,返回值为 随机字符串|false ?????if( $isPushed !== false ){ ???????????echo date(‘Y-m-d H:i:s‘) . " a new Hello Job is Pushed to the MQ"."<br>"; ?????}else{ ?????????echo ‘Oops, something went wrong.‘; ?????} ?} }

如果是多个任务:如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1app\lib\job\Job2@task2

4.消费队列的代码:

<?phpnamespace app\test\job;use think\queue\Job;class Hello { ???/** ????* fire方法是消息队列默认调用的方法 ????* @param Job ???????????$job ?????当前的任务对象 ????* @param array|mixed ???$data ????发布任务时自定义的数据 ????*/ ???public function fire(Job $job,$data){ ???????// 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行. ???????$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); ???????if(!isJobStillNeedToBeDone){ ???????????$job->delete(); ???????????return; ???????} ???????$isJobDone = $this->doHelloJob($data); ???????if ($isJobDone) { ???????????//如果任务执行成功, 记得删除任务 ???????????$job->delete(); ???????????print("<info>Hello Job has been done and deleted"."</info>\n"); ???????}else{ ???????????if ($job->attempts() > 3) { ???????????????//通过这个方法可以检查这个任务已经重试了几次了 ???????????????print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n"); ???????????????$job->delete(); ???????????????// 也可以重新发布这个任务 ???????????????//print("<info>Hello Job will be availabe again after 2s."."</info>\n"); ???????????????//$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行 ???????????} ???????} ???} ???/** ????* 有些消息在到达消费者时,可能已经不再需要执行了 ????* @param array|mixed ???$data ????发布任务时自定义的数据 ????* @return boolean ????????????????任务执行的结果 ????*/ ???private function checkDatabaseToSeeIfJobNeedToBeDone($data){ ???????return true; ???} ???/** ????* 根据消息中的数据进行实际的业务处理 ????* @param array|mixed ???$data ????发布任务时自定义的数据 ????* @return boolean ????????????????任务执行的结果 ????*/ ???private function doHelloJob($data) { ???????// 根据消息中的数据进行实际的业务处理... ???????var_dump($data);// ???????print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");// ???????print("<info>Hello Job is Fired at " . date(‘Y-m-d H:i:s‘) ."</info> \n");// ???????print("<info>Hello Job is Done!"."</info> \n"); ???????return true; ???}}

执行代码:

php think queue:work --queue helloJobQueue

5.总结:

5.1 命名:

  • queue:subscribe 命令 [截至2017-02-15,作者暂未实现该模式,略过]

  • queue:work 命令

    work 命令: 该命令将启动一个 work 进程来处理消息队列。

    php think queue:work --queue helloJobQueue
  • queue:listen 命令

    listen 命令: 该命令将会创建一个 listen 父进程 ,然后由父进程通过 proc_open(‘php think queue:work’) 的方式来创建一个work 子 进程来处理消息队列,且限制该work进程的执行时间。

    php think queue:listen --queue helloJobQueue

    queue:restart 重新开启

    参考资料:https://blog.csdn.net/will5451/article/details/80434174

    https://www.kancloud.cn/yangweijie/learn_thinkphp5_with_yang/367645

    https://github.com/top-think/think-queue

thinkphp5 消息队列thinkphp-queue扩展

原文地址:https://www.cnblogs.com/myvic/p/9373710.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved