Windows 安装 amqp 扩展
RabbitMQ 是基于 amqp(高级消息队列协议) 协议的。使用 RabbitMQ 前必须为 PHP 安装相应的 amqp 扩展。
- 下载相应版本的 amqp 扩展:http://pecl.php.net/package/amqp,解压缩文件。
- 将 php_amqp.dll 复制到 php 的扩展目录 ext 下,修改配置文件 php.ini:
[amqp]extension=php_amqp.dll
- 将 rabbitmq.*.dll 文件复制到 php 的安装目录下,然后修改 Apache 配置文件 httpd.conf:
#[rabbitmq]LoadFile "F:\wamp64\bin\php\php7.0.10\rabbitmq.*.dll"
- 重启服务器,查看 phpinfo,确认扩展信息。
Direct Exchange 模式
Direct Exchange 模式的交换机适合用于消息的单播发送. 交换机根据推送消息时的 routing key 和 队列的 routing key 判断消息应该推送到哪个队列. 可以实现同一交换机上的消息, 根据 routing key 推送到不同的队列中.
默认 Direct Exchange
此种模式下,使用 RabbitMQ 的默认 Exchange 即可,默认的 Exchange 是 Direct 模式。使用默认 Exchange 时,不需要对 Exchange 进行属性设置和声明,也不需要对 Queue 进行显示绑定和设置 routing key。Queue 默认会绑定到默认 Exchange,以及默认 routing key 与 Queue 的名称相同。
producer.php:
- 创建连接并发起连接
- 在连接上创建通道
- 在通道上获取默认交换机
- 向交换机发送消息
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 // 连接设置 3 $conConfig = [ 4 ‘host‘ => ‘127.0.0.1‘, 5 ‘port‘ => 5672, 6 ‘login‘ => ‘root‘, 7 ‘password‘ => ‘root‘, 8 ‘vhost‘ => ‘/‘ 9 ];10 try11 {12 // RabbitMQ 连接实例13 $con = new AMQPConnection($conConfig);14 // 发起连接15 $con->connect();16 // 判断连接是否仍然有效17 if(!$con->isConnected())18 {19 echo ‘连接失败‘;die;20 }21 // 新建通道22 $channel = new AMQPChannel($con);23 // 使用RabbitMQ的默认Exchange24 $exchange = new AMQPExchange($channel);25 for($i = 1; $i < 6; $i++)26 {27 $message = [28 ‘name‘ => ‘默认交换机,消息-‘ . $i,29 ‘info‘ => ‘Hello World!‘30 ];31 // 发送消息,为消息指定routing key,成功返回true,失败false32 $state = $exchange->publish(json_encode($message, JSON_UNESCAPED_UNICODE), ‘test.queue1‘);33 if($state)34 {35 echo ‘Success‘ . PHP_EOL;36 }else37 {38 echo ‘Fail‘ . PHP_EOL;39 }40 }41 // 关闭连接42 $con->disconnect();43 }catch(Exception $e)44 {45 echo $e->getMessage();46 }
consumer.php:
- 创建连接并发起连接
- 在连接上创建通道
- 在通道上创建队列并声明队列
- 从队列获取消息
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 $conConfig = [ 3 ‘host‘ => ‘127.0.0.1‘, 4 ‘port‘ => 5672, 5 ‘login‘ => ‘root‘, 6 ‘password‘ => ‘root‘, 7 ‘vhost‘ => ‘/‘ 8 ]; 9 ?10 11 try12 {13 $con = new AMQPConnection($conConfig);14 $con->connect();15 if(!$con->isConnected())16 {17 echo ‘连接失败‘;die;18 }19 ?20 21 $channel = new AMQPChannel($con);22 ?23 24 $queue = new AMQPQueue($channel);25 $queue->setName(‘test.queue1‘);26 // 声明队列,不需要对Queue进行显示绑定到交换机和指定Queue的routing key27 $queue->declareQueue();28 $queue->consume(function($envelope, $queue)29 {30 echo $envelope->getBody() . PHP_EOL;31 }, AMQP_AUTOACK);32 ?33 34 $con->disconnect();35 }catch(Exception $e)36 {37 echo $e->getMessage();38 }
自定义 Direct Exchange
producer:
header(‘Content-Type: text/html; charset=utf-8‘);// 连接设置$conConfig = [ ‘host‘ => ‘127.0.0.1‘, ‘port‘ => 5672, ‘login‘ => ‘root‘, ‘password‘ => ‘root‘, ‘vhost‘ => ‘/‘];try{ // RabbitMQ 连接实例 $con = new AMQPConnection($conConfig); // 发起连接 $con->connect(); // 判断连接结果,true成功,false失败 if(!$con->isConnected()) { echo ‘连接失败‘;die; } // 新建通道 $channel = new AMQPChannel($con); // 新建交换机 $exchange = new AMQPExchange($channel); // 交换机名称 $exchange->setName(‘test.direct‘); // 交换机类型 $exchange->setType(‘direct‘); // 声明交换机 $exchange->declareExchange(); for($i = 1; $i < 6; $i++) { $message = [ ‘name‘ => ‘direct交换机,消息-‘ . $i, ‘info‘ => ‘Hello World!‘ ]; // 发送消息,同时为消息指定routing key,成功返回true,失败false $state = $exchange->publish(json_encode($message, JSON_UNESCAPED_UNICODE), ‘test.queue1‘); if($state) { echo ‘Success‘ . PHP_EOL; }else { echo ‘Fail‘ . PHP_EOL; } } // 关闭连接 $con->disconnect();}catch(Exception $e){ echo $e->getMessage();}
consumer:
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 $conConfig = [ 3 ‘host‘ => ‘127.0.0.1‘, 4 ‘port‘ => 5672, 5 ‘login‘ => ‘root‘, 6 ‘password‘ => ‘root‘, 7 ‘vhost‘ => ‘/‘ 8 ]; 9 ?10 11 try12 {13 $con = new AMQPConnection($conConfig);14 $con->connect();15 if(!$con->isConnected())16 {17 echo ‘连接失败‘;die;18 }19 ?20 $channel = new AMQPChannel($con);21 ?22 $exchange =new AMQPExchange($channel);23 $exchange->setName("test.direct");24 $exchange->setType(‘direct‘);25 $exchange->setFlags(AMQP_DURABLE);26 $exchange->declareExchange();27 ?28 29 $queue = new AMQPQueue($channel);30 $queue->setName(‘test.queue1‘);31 // 声明队列,不需要对Queue进行显示绑定到交换机和指定Queue的routing key32 $queue->declareQueue();33 // 绑定队列到指定交换机,并指定routing key,即分发规则,消息的routing key与队列的绑定routing key匹配时才。routing key可以使用正则表达式34 $queue->bind(‘test.direct‘, ‘/^q.*/‘);35 $queue->consume(function($envelope, $queue)36 {37 echo $envelope->getBody() . PHP_EOL;38 }, AMQP_AUTOACK);39 ?40 41 $con->disconnect();42 }catch(Exception $e)43 {44 echo $e->getMessage();45 }
RabbitMQ 在 PHP 下的简单使用 (一) -- 安装 AMQP 扩展和 Direct Exchange 模式
原文地址:https://www.cnblogs.com/fxyy/p/10540414.html