分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 教程案例

封装php的RabbitMq

发布时间:2023-09-06 02:29责任编辑:傅花花关键词:暂无标签

这段时间一直业务比较忙,因公司用的 databases 队列,用起来 感觉不是很爽,故简单封装了一个rabbitmq类(业务代码随便写的)

首先是账号密码配置

config.php

<?php ???return $arr = [ ???????‘RabbitMq‘ => [ ???????????// Rabbitmq 服务地址 ???????????‘host‘ => ‘127.0.0.1‘, ???????????// Rabbitmq 服务端口 ???????????‘port‘ => ‘5672‘, ???????????// Rabbitmq 帐号 ???????????‘login‘ => ‘guest‘, ???????????// Rabbitmq 密码 ???????????‘password‘ => ‘guest‘, ???????????‘vhost‘=>‘/‘ ???????] ???];

基类 base.php 

<?phpinclude dirname(__FILE__).‘/object.php‘;include dirname(__FILE__).‘/config.php‘;class RabbitMq implements object{ ???//保存类实例的静态成员变量 ???static private $_instance; ???static private $_conn; ???static private $amp ; ???static private $route = ‘key_1‘; ???static private $q ; ???static private $ex ; ???static private $queue; ???public static function getInstance(){ ???????global $arr; ???????if (!(self::$_instance instanceof self)) { ???????????self::$_instance = new self($arr[‘RabbitMq‘]); ???????????return self::$_instance; ???????} ???????return self::$_instance; ???} ???private function __construct($conn) ???{ ???????//创建连接和channel ???????$conn = new AMQPConnection($conn); ???????if(!$conn->connect()) { ???????????die("Cannot connect to the broker!\n"); ???????} ???????self::$_conn = new AMQPChannel($conn); ???????self::$amp = $conn; ???} ???/* * ????* ????* parm 交换机名 ????* parm 队列名 ????* ????* */ ???public function listen($exchangeName,$queuename){ ???????self::$queue = $queuename; ???????return $this->setExchange($exchangeName,$queuename); ???} ???//连接交换机 ???public function setExchange($exchangeName,$queueName){ ???????//创建交换机 ???????$ex = new AMQPExchange(self::$_conn); ???????self::$ex = $ex; ???????$ex->setName($exchangeName); ???????$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 ???????$ex->setFlags(AMQP_DURABLE); //持久化 ???????$ex->declare(); ???????return self::setQueue($queueName,$exchangeName); ???} ???//创建队列 ???private static function setQueue($queueName,$exchangeName){ ???// ?创建队列 ???????$q = new AMQPQueue(self::$_conn); ???????$q->setName($queueName); ???????$q->setFlags(AMQP_DURABLE); ???????$q->declareQueue(); ???// 用于绑定队列和交换机 ???????$routingKey = self::$route; ???????$q->bind($exchangeName, ?$routingKey); ???????self::$q = $q; ???????return(self::$_instance); ???} ???/* * 消费者 * $fun_name = array($classobj,$function) or function name string * $autoack 是否自动应答 * * function processMessage($envelope, $queue) { ???????$msg = $envelope->getBody(); ???????echo $msg."\n"; //处理消息 ???????$queue->ack($envelope->getDeliveryTag());//手动应答 ???} */ ???public function run($func, $autoack = True){ ???????if (!$func || !self::$q) return False; ???????while(True){ ???????????if ($autoack) { ???????????????if(!self::$q->consume($func, AMQP_AUTOACK)){// ???????????????????self::$q->ack($envelope->getDeliveryTag()); ???????????????????echo 123; ???????????????} ???????????} ????????????self::$q->consume($func); ???????} ???} ???private static function closeConn(){ ???????self::$amp->disconnect(); ???} ???public function pushlish($msg){ ???????while (1) { ???????????sleep(1); ???????????if (self::$ex->publish(date(‘H:i:s‘) . "用户" . "注册", self::$route)) { ???????????????//写入文件等操作 ???????????????echo $msg; ???????????} ???????} ???} ???//__clone方法防止对象被复制克隆 ???public function __clone() ???{ ???????trigger_error(‘Clone is not allow!‘, E_USER_ERROR); ???}}
consume 监听类(一个操作对应一个class)
<?phpinclude dirname(__FILE__).‘/base.php‘;class Add{ ???public static function run(){ ???????$dbms=‘mysql‘; ????//数据库类型 ???????$host=‘127.0.01‘; //数据库主机名 ???????$dbName=‘test‘; ???//使用的数据库 ???????$user=‘root‘; ?????//数据库连接用户名 ???????$pass=‘admin‘; ?????????//对应的密码 ???????$dsn="$dbms:host=$host;dbname=$dbName"; ???????sleep(1); ???????try { ???????????$dbh = new PDO($dsn, $user, $pass); //初始化一个PDO对象 ???????????/*你还可以进行一次搜索操作 ???????????foreach ($dbh->query(‘SELECT * from FOO‘) as $row) { ???????????????print_r($row); //你可以用 echo($GLOBAL); 来看到这些值 ???????????} ???????????*/ ???????????$dbh = null; ???????} catch (PDOException $e) { ???????????die ("Error!: " . $e->getMessage() . "<br/>"); ???????}//默认这个不是长连接,如果需要数据库长连接,需要最后加一个参数:array(PDO::ATTR_PERSISTENT => true) 变成这样: ???????$db = new PDO($dsn, $user, $pass, array(PDO::ATTR_PERSISTENT => true)); ???????$sql = ‘INSERT INTO `test`.`t_reg`(`names`) VALUES (9)‘; ???????$row = $db->query($sql); ???????if(!$row){ ??????????return false; ???????} ???????????echo ‘OK‘; ???}}$consume = new Add();//tudo//$s = RabbitMq::getInstance()->listen(‘jiaohuanji‘,‘queue1‘)->run(array($consume,‘run‘)); 将run函数带入到consume里面作为回调 在consume里面增加$funname ,增加代码粘性$s = RabbitMq::getInstance()->listen(‘jiaohuanji‘,‘queue1‘)->run(array($consume,‘run‘));

push 类(发送者)

<?phpinclude "base.php";RabbitMq::getInstance()->listen(‘jiaohuanji‘,‘queue1‘)->pushlish(‘请求已发送‘);

接口interface 

<?php ???interface object ???{ ???????public static function getInstance(); ???}

监听 add.php

执行 send.php 即可完成简单的rabit操作




封装php的RabbitMq

原文地址:https://www.cnblogs.com/huangguowen/p/10254444.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved