分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 代码编程

PHP操作RabbitMQ的类 exchange、queue、route kye、bind

发布时间:2023-09-06 02:33责任编辑:胡小海关键词:PHP

RabbitMQ是常见的消息中间件。也许是还是不够了解的缘故,感觉功能还好吧。

讲到队列,大家脑子里第一印象是下边这样的。

P生产者推送消息-->队列-->C消费者取出消息

结构很简单,但是RabbitMQ应该是为了丰富的功能吧,把“队列”拆分了。

分成了:exchange(交换机)和queue(队列)两个部分

同时说明:

生产者推送消息只推到exchange,不知道会进入哪个queue。

exchange通过一个route key与queue绑定,这时才会知道消息具体落到了哪个queue里。

而消费则获取消息,是直接从队列里取的。

大概就是以上这个意思,问题是还有两个特殊的说明:

1.消费者是无法订阅或者获取不存在的MessageQueue中信息。

2.消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

简单的理解就是,如果exchange和queue不绑定,生产者推送的消息到exchange会直接丢弃(丢失),同时consume也无法完成订阅。

所以,这里就有一个问题,无论是推送消息进队列,还是订阅消息消费,必须先定义好exchange和queue并通过route key绑定一起。

那么到底是推送消息的时候定义并绑定呢?还是订阅的时候定义并绑定呢?

根据那两个特殊的说明理解,无论是谁定义绑定,都有可能会出现问题。

所以,最终就是推送和消费之前,都尝试定义exchange和queue,并完成绑定。

推送消息,相对简单,就一个publish指定exchange和route key就完成了。

消费消息,相对的复杂一点,有两种方式:

1、推送(push)订阅方式,使用consume方法订阅队列,只要队列有消息就消费

2、拉取(poll)主动拉取,使用get方法,主动去从队列一次拉取一条消息

这两种情况,都有各自的应用场景,可以根据需要自行选择。

额外提醒一点:尽量不要使用循环的方式调用get方法消费队列,尤其是处理的消息很多的情况。

如果大量的消费队列,建议直接使用consume方法。

还有一个情况,当消费者取出消息时,可以不对消息队列做任何操作,也可以将取出的消息删除。

毕竟,队列里的消息,消费后是需要删除的,取到消息,发给队列一个然亏,队列就删除该消息。

这个取出消息后的删除,也分两种情况:

1、一种是取出就删除,consume和get两个方法都有一个参数AMQP_AUTOACK自动反馈

2、另一种是取出后,并不会自动返回删除,而是将取出来的消息处理之后确认没有问题了,手动反馈给消息队列

至于选择哪种反馈方式,根据需求自行选择。

关于RabbitMQ还有很多要说的,比如一个exchange可以绑定多个queue,多个exchange可以绑定一个queue(多对多的关系)

还可以根据exchange不同的模式,搭配不同的route key做不同的匹配。各种组合吧。

灵活应用起来功能还是很强大的,只是具体使用时需要仔细,因为一个不小心,不是丢失消息就是多出很多消息。

所知有限,在此不做特复杂的说明,下边的例子也是一个简单的完成一个简单的队列的操作。演示学习用。

更多的情况,就自行研究扩展。

RabbitMQ操作类(rabbitmq.class.php)

 ?1 <?php ?2 // rabbitmq 操作类 ?3 class RabbitMQ ?4 { ?5 ????// 配置变量 ?6 ????public $configs = array( ?7 ????????‘host‘ => ‘localhost‘, ??8 ????????‘port‘ => ‘5672‘, ??9 ????????‘login‘ => ‘guest‘, ?10 ????????‘password‘ => ‘guest‘, 11 ????????‘vhost‘ => ‘/‘ 12 ????); 13 ????public $exchange_name = ‘ex_q_def‘;// 交换机名称 14 ????public $queue_name = ‘ex_q_def‘;// 队列名称 15 ????public $route_key = ‘‘;// 路由key的名称 16 ????public $durable = true;// 持久化,默认true 17 ????public $autodelete = false;// 自动删除 18 ?19 ????// 内部通用变量 20 ????private $_conn = null; 21 ????private $_exchange = null; 22 ????private $_channel = null; 23 ????private $_queue = null; 24 ?25 ????// 构造函数 26 ????public function __construct() 27 ????{ 28 ????????// 初始化队列 29 ????????$this->init(); 30 ????} 31 ?32 ????// 配置rabbitmq 33 ????public function set_configs($configs) 34 ????{ 35 ????????// 初始化配置 36 ????????if (!is_array($configs)) { 37 ????????????echo ‘configs is not array.‘; 38 ????????} 39 ????????if (!($configs[‘host‘] && $configs[‘port‘] && $configs[‘login‘] && $configs[‘password‘])) { 40 ????????????echo ‘configs is empty.‘; 41 ????????} 42 ????????if (!isset($configs[‘vhost‘])) {// 没有vhost元素,给出默认值 43 ????????????$configs[‘vhost‘] = ‘/‘; 44 ????????} else { 45 ????????????if (empty($configs[‘vhost‘])) {// 有vhost元素,但是值为空,给出默认值 46 ????????????????$configs[‘vhost‘] = ‘/‘; 47 ????????????} 48 ????????} 49 ????????$this->configs = $configs; 50 ????} 51 ?52 ????// 初始化rabbitmq 53 ????private function init() 54 ????{ 55 ????????if (!$this->_conn) { 56 ????????????$this->_conn = new AMQPConnection($this->configs);// 创建连接对象 57 ????????????if (!$this->_conn->connect()) { 58 ????????????????echo "Cannot connect to the broker \n "; 59 ????????????????exit(0); 60 ????????????} 61 ????????} 62 ?63 ????????// 创建channel 64 ????????$this->_channel = new AMQPChannel($this->_conn); 65 ????} 66 ?67 ????// 创建队列(为了保证正常订阅,避免消息丢失,生产者和消费则都要尝试创建队列:交换机和队列通过路由绑定一起) 68 ????public function create_queue($exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘) 69 ????{ 70 ????????if ($exchange_name != ‘‘) { 71 ????????????// 队列名参数可以省略,默认与交换机同名 72 ????????????$this->exchange_name = $exchange_name;// 更新交换机名称 73 ????????????$this->queue_name = $exchange_name;// 更新队列名称 74 ????????} 75 ????????if ($route_key != ‘‘) $this->route_key = $route_key;// 更新路由 76 ????????if ($queue_name != ‘‘) $this->queue_name = $queue_name;// 独立更新队列名称 77 ?78 ????????// 创建exchange交换机 79 ????????$this->_exchange = new AMQPExchange($this->_channel);// 创建交换机 80 ????????$this->_exchange->setType(AMQP_EX_TYPE_DIRECT);// 设置交换机模式为direct 81 ????????if ($this->durable) { 82 ????????????$this->_exchange->setFlags(AMQP_DURABLE);// 设置是否持久化 83 ????????} 84 ????????if ($this->autodelete) { 85 ????????????$this->_exchange->setFlags(AMQP_AUTODELETE);// 设置是否自动删除 86 ????????} 87 ????????$this->_exchange->setName($this->exchange_name);// 设置交换机名称 88 ????????$this->_exchange->declare(); 89 ?90 ????????// 创建queue队列 91 ????????$this->_queue = new AMQPQueue($this->_channel); 92 ????????if ($this->durable) { 93 ????????????$this->_queue->setFlags(AMQP_DURABLE);// 设置是否持久化 94 ????????} 95 ????????if ($this->autodelete) { 96 ????????????$this->_queue->setFlags(AMQP_AUTODELETE);// 设置是否自动删除 97 ????????} 98 ????????$this->_queue->setName($this->queue_name);// 设置队列名称 99 ????????$this->_queue->declare();// 完成队列的定义100 101 ????????// 将queue和exchange通过route_key绑定在一起102 ????????$this->_queue->bind($this->exchange_name, $this->route_key);103 ????}104 105 ????// 生产者,向队列交换机发送消息106 ????public function send($msg, $exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘)107 ????{108 ????????$this->create_queue($exchange_name, $route_key, $queue_name);109 ????????// 消息处理110 ????????if (is_array($msg)) {111 ????????????$msg = json_encode($msg);// 将数组类型转换成JSON格式112 ????????} else {113 ????????????$msg = trim(strval($msg));// 简单处理一下要发送的消息内容114 ????????}115 116 ????????// 生产者推送消息进队列时,只能将消息推送到交换机exchange中117 ????????$this->_exchange->publish($msg, $this->route_key);118 ????}119 120 ????// 消费者,从队列中获取数,消费队列(订阅)121 ????public function run($fun_name, $exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘, $autoack=false)122 ????{123 ????????if (!$fun_name) {// 没有返回函数,或者队列不存在124 ????????????return false;125 ????????}126 ????????$this->create_queue($exchange_name, $route_key, $queue_name);127 ????????// 订阅消息128 ????????while (true) {129 ????????????if ($autoack) {130 ????????????????$this->_queue->consume($fun_name, AMQP_AUTOACK);// 自动应答131 ????????????} else {132 ????????????????$this->_queue->consume($fun_name);// 需要手动应答133 ????????????}134 ????????}135 ????}136 137 ????// 消费者,从队列中获取数,消费队列(主动获取)138 ????public function get($exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘, $autoack=false)139 ????{140 ????????$this->create_queue($exchange_name, $route_key, $queue_name);141 ????????// 主动获取消息142 ????????if ($autoack) {143 ????????????$msg = $this->_queue->get(AMQP_AUTOACK);// 自动应答144 ????????} else {145 ????????????$msg = $this->_queue->get();// 需要手动应答146 ????????}147 ????????return [‘msg‘=>$msg, ‘queue‘=>$this->_queue];148 ????}149 }150 ?>

生产者推送(test_send.php)

 1 <?php 2 require_once(‘rabbitmq.class.php‘); 3 ?4 $rmq = new RabbitMQ; 5 for ($i = 0; $i < 10; $i++) { 6 ????echo ‘test_consume_‘ . $i .‘<br />‘; 7 ????$rmq->send(‘test_consume_‘ . $i, ‘test_consume‘); 8 } 9 10 for ($i = 0; $i < 10; $i++) {11 ????echo ‘get_msg_‘.$i.‘<br />‘;12 ????$rmq->send(‘get_msg_‘ . $i, ‘test_get‘);13 }14 15 echo ‘send ok ! ‘ . date(‘Y-m-d H:i:s‘);16 ?>

消费者consume(test_run.php)

 1 <?php 2 require_once(‘rabbitmq.class.php‘); 3 ?4 $rmq = new RabbitMQ; 5 ?6 $s = $rmq->run(‘processMessage‘, ‘test_consume‘); 7 ?8 function processMessage($envelope, $queue) { 9 ????$msg = $envelope->getBody();10 ????sleep(1); ?//sleep1秒模拟任务处理11 ????echo $msg."\n"; //处理消息12 ????$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答13 }14 ?>

消费者get(test_get.php)

 1 <?php 2 require_once(‘rabbitmq.class.php‘); 3 ?4 $rmq = new RabbitMQ; 5 ?6 $r = $rmq->get(‘test_get‘); 7 ?8 echo $r[‘msg‘]->getBody();// 取到的消息 9 $r[‘queue‘]->ack($r[‘msg‘]->getDeliveryTag());// 手动反馈,删除消费的消息10 ?>

订阅consume可以起多个程序,队列会轮询平均的分到每一个订阅里。当然,前提是处理速度是一样,并且都有反馈。

如果处理速度不同,哪个快 ,哪就会分配更多的消息。如果没有反馈,默认只会推送3条消息,如果一直不给反馈,就不会再有推送了。

此时如果中断这个没有反馈的订阅,因为队列中没有删除,会再次分配到其他订阅者哪里继续推送消费。

同样的,如果队列中有消息,随时开启新的订阅,随时就会分配到消费的消息。

PHP操作RabbitMQ的类 exchange、queue、route kye、bind

原文地址:https://www.cnblogs.com/leafinwind/p/10424974.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved