设计思路
阅读resque源码,我们知道,resque把失败的队列的数据都放在了resque:failed列表,数据如下。
我的项目是在yii2下统一处理resque的,
我直接resque源码在新增了retry方法,在每一个failJob的payload增加了attempts尝试次数,
一开始创建的时候attempts为0,重试之后attempts每次加1然后进行retry,直到到达尝试数次才停止重试。
核心代码
1.调用重试的代码
???/** ????* Retry Job ????*/ ???public function actionRetry() ???{ ???????$redis = new Redis(‘redis‘); ???????while (true) { ???????????$json = $redis->rPop(‘resque:failed‘); ???????????$array = json_decode($json, true); ???????????if ($array) { ???????????????$jobArray = $array[‘payload‘]; ???????????????if ($jobArray[‘attempts‘] < $this->retryTimes) { ???????????????????//retry ???????????????????\Resque::retry($jobArray, $array[‘queue‘]); ???????????????????echo "Queued job " . $jobArray[‘id‘] . ‘ has retry!‘ . "\n"; ???????????????} else { ???????????????????//stop retry ???????????????????$redis->lPush(‘resque:failed‘, [$json]); ???????????????} ???????????} ???????????//take a sleep ???????????echo "*** Sleeping for ".$this->sleep. "\n"; ???????????sleep($this->sleep); ???????} ???????return true; ???}
这里可以弄成守护进程一直处理失败的队列任务。2./php-resque/lib/Resque.php
/** ????* retry job and save it to the specified queue. ????* ????* @param array $jobArray The attempts of the the job . ????* array( ????* ?‘id‘=> The id of the job ????* ?‘class‘ => The name of the class that contains the code to execute the job. ????* ?‘args‘ => Any optional arguments that should be passed when the job is executed.‘‘ ????* ?‘attempts‘=> The retry attempts of the job ????* ) ????* @param string $queue The name of the queue to place the job in. ????* ????* @return boolean ????*/ ???public static function retry($jobArray,$queue) ???{ ???????require_once dirname(__FILE__) . ‘/Resque/Job.php‘; ???????$result = Resque_Job::retry($jobArray,$queue); ???????if ($result) { ???????????Resque_Event::trigger(‘afterEnqueue‘, array( ???????????????‘class‘ => $jobArray[‘class‘], ???????????????‘args‘ ?=> $jobArray[‘args‘], ???????????????‘queue‘ => $queue, ???????????)); ???????} ???????return true; ???}
3./php-resque/lib/Resque/Job.php
/** ????* retry ?job and save it to the specified queue. ????* * ????* @param array $jobArray The data of the job. ????* array( ????* ?‘id‘=> The id of the job ????* ?‘class‘ => The name of the class that contains the code to execute the job. ????* ?‘args‘ => Any optional arguments that should be passed when the job is executed.‘‘ ????* ?‘attempts‘=> The retry attempts of the job ????* ) ????* @param string $queue The name of the queue to place the job in. ????* ????* @return string ????*/ ???public static function retry($jobArray,$queue) ???{ ???????$args = $jobArray[‘args‘]; ???????if($args !== null && !is_array($args)) { ???????????throw new InvalidArgumentException( ???????????????‘Supplied $args must be an array.‘ ???????????); ???????} ???????$jobArray[‘attempts‘]++; ???????Resque::push($queue, array( ???????????‘class‘ => $jobArray[‘class‘], ???????????‘args‘ ?=> array($args), ???????????‘id‘ ???=> $jobArray[‘id‘], ???????????‘attempts‘=>$jobArray[‘attempts‘] ???????)); ???????return true; ???}
结果:
我这里我设置了重试次数为3次,每隔5秒处理一个队列。
PHP-RESQUE重试机制
原文地址:http://blog.51cto.com/onebig/2162057