PHP基于rabbitmq操作类的生产者和消费者功能示例

2018-09-07 11:52

阅读:552

  本文实例讲述了PHP基于rabbitmq操作类的生产者和消费者功能。分享给大家供大家参考,具体如下:

  注意事项:

  1、accept.php消费者代码需要在命令行执行

  2、username=>asdf,password=>123456 改成自己的帐号和密码

  RabbitMQCommand.php操作类代码

   <?php /* * amqp协议操作类,可以访问rabbitMQ * 需先安装php_amqp扩展 */ class RabbitMQCommand{ public $configs = array(); //交换机名称 public $exchange_name = ; //队列名称 public $queue_name = ; //路由名称 public $route_key = ; /* * 持久化,默认True */ public $durable = True; /* * 自动删除 * exchange is deleted when all queues have finished using it * queue is deleted when last consumer unsubscribes * */ public $autodelete = False; /* * 镜像 * 镜像队列,打开后消息会在节点之间复制,有master和slave的概念 */ public $mirror = False; private $_conn = Null; private $_exchange = Null; private $_channel = Null; private $_queue = Null; /* * @configs array(host=>$host,port=>5672,username=>$username,password=>$password,vhost=>/) */ public function __construct($configs = array(), $exchange_name = , $queue_name = , $route_key = ) { $this->setConfigs($configs); $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; $this->route_key = $route_key; } private function setConfigs($configs) { if (!is_array($configs)) { throw new Exception(configs is not array); } if (!($configs[host] && $configs[port] && $configs[username] && $configs[password])) { throw new Exception(configs is empty); } if (empty($configs[vhost])) { $configs[vhost] = /; } $configs[login] = $configs[username]; unset($configs[username]); $this->configs = $configs; } /* * 设置是否持久化,默认为True */ public function setDurable($durable) { $this->durable = $durable; } /* * 设置是否自动删除 */ public function setAutoDelete($autodelete) { $this->autodelete = $autodelete; } /* * 设置是否镜像 */ public function setMirror($mirror) { $this->mirror = $mirror; } /* * 打开amqp连接 */ private function open() { if (!$this->_conn) { try { $this->_conn = new AMQPConnection($this->configs); $this->_conn->connect(); $this->initConnection(); } catch (AMQPConnectionException $ex) { throw new Exception(cannot connection rabbitmq,500); } } } /* * rabbitmq连接不变 * 重置交换机,队列,路由等配置 */ public function reset($exchange_name, $queue_name, $route_key) { $this->exchange_name = $exchange_name; $this->queue_name = $queue_name; $this->route_key = $route_key; $this->initConnection(); } /* * 初始化rabbit连接的相关配置 */ private function initConnection() { if (empty($this->exchange_name) empty($this->queue_name) empty($this->route_key)) { throw new Exception(rabbitmq exchange_name or queue_name or route_key is empty,500); } $this->_channel = new AMQPChannel($this->_conn); $this->_exchange = new AMQPExchange($this->_channel); $this->_exchange->setName($this->exchange_name); $this->_exchange->setType(AMQP_EX_TYPE_DIRECT); if ($this->durable) $this->_exchange->setFlags(AMQP_DURABLE); if ($this->autodelete) $this->_exchange->setFlags(AMQP_AUTODELETE); $this->_exchange->declare(); $this->_queue = new AMQPQueue($this->_channel); $this->_queue->setName($this->queue_name); if ($this->durable) $this->_queue->setFlags(AMQP_DURABLE); if ($this->autodelete) $this->_queue->setFlags(AMQP_AUTODELETE); if ($this->mirror) $this->_queue->setArgument(x-ha-policy, all); $this->_queue->declare(); $this->_queue->bind($this->exchange_name, $this->route_key); } public function close() { if ($this->_conn) { $this->_conn->disconnect(); } } public function __sleep() { $this->close(); return array_keys(get_object_vars($this)); } public function __destruct() { $this->close(); } /* * 生产者发送消息 */ public function send($msg) { $this->open(); if(is_array($msg)){ $msg = json_encode($msg); }else{ $msg = trim(strval($msg)); } return $this->_exchange->publish($msg, $this->route_key); } /* * 消费者 * $fun_name = array($classobj,$function) or function name string * $autoack 是否自动应答 * * function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg.\n; //处理消息 $queue->ack($envelope->getDeliveryTag());//手动应答 } */ public function run($fun_name, $autoack = True){ $this->open(); if (!$fun_name !$this->_queue) return False; while(True){ if ($autoack) $this->_queue->consume($fun_name, AMQP_AUTOACK); else $this->_queue->consume($fun_name); } } }

  send.php生产者代码

   <?php set_time_limit(0); include_once(RabbitMQCommand.php); $configs = array(host=>127.0.0.1,port=>5672,username=>asdf,password=>123456,vhost=>/); $exchange_name = class-e-1; $queue_name = class-q-1; $route_key = class-r-1; $ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key); for($i=0;$i<=100;$i++){ $ra->send(date(Y-m-d H:i:s,time())); } exit();

  accept.php消费者代码

   <?php error_reporting(0); include_once(RabbitMQCommand.php); $configs = array(host=>127.0.0.1,port=>5672,username=>asdf,password=>123456,vhost=>/); $exchange_name = class-e-1; $queue_name = class-q-1; $route_key = class-r-1; $ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key); class A{ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); $envelopeID = $envelope->getDeliveryTag(); $pid = posix_getpid(); file_put_contents(log{$pid}.log, $msg..$envelopeID..\r\n,FILE_APPEND); $queue->ack($envelopeID); } } $a = new A(); $s = $ra->run(array($a,processMessage),false);

  更多关于PHP相关内容感兴趣的读者可查看本站专题:《PHP数据结构与算法教程》、《php程序设计算法总结》、《php字符串(string)用法总结》、《PHP数组(Array)操作技巧大全》、《PHP常用遍历算法与技巧总结》及《PHP数学运算技巧总结》

  希望本文所述对大家PHP程序设计有所帮助。


评论


亲,登录后才可以留言!