thinkphp使用AMQP库(支持RabbitMq)

2021-02-16 13:17

阅读:656

标签:send   cep   composer   catch   change   tag   消费者   consumer   try   

1,安装依赖库 composer require php-amqplib/php-amqplib 地址:https://github.com/php-amqplib/php-amqplib

2,mq生产者.php

include(__DIR__ . ‘../../public/config.php‘);
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
* Created by PhpStorm.
* User: pandeng
* Date: 2017-07-26
* Time: 21:51
*/
class MessageQueue
{
  const exchange = ‘router‘;
  const queue = ‘msgs‘;
  public static  function pushMessage($data)
  {
    $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
    $channel = $connection->channel();

    $channel->queue_declare(self::queue, false, true, false, false);
    $channel->exchange_declare(self::exchange, ‘direct‘, false, true, false);
    $channel->queue_bind(self::queue, self::exchange);
    $messageBody = $data;
    $message = new AMQPMessage($messageBody, array(‘content_type‘ => ‘text/plain‘, ‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSISTENT));
    $channel->basic_publish($message, self::exchange);
    $channel->close();
    $connection->close();
    return "ok";
}
}

3.消费者.php

namespace app\index\controller;
include(__DIR__ . ‘../../../../public/config.php‘);
use PhpAmqpLib\Connection\AMQPStreamConnection;

use think\Controller;
use think\Log;
use think\Request;
use think\Db;

class MessageConsume extends Controller
{
const exchange = ‘router‘;
const queue = ‘msgs‘;
const consumerTag = ‘consumer‘;

function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
    write_log("closed",3);
}

function process_message($message)
{
    if ($message->body !== ‘quit‘) {
        $obj = json_decode($message->body);
        if (!isset($obj->id)) {
            echo ‘error data\n‘;
            write_log("error data:" . $message->body, 2);
        } else {
            try {
                write_log("data:" . json_encode($message));
            } catch (\Think\Exception  $e) {
                write_log($e->getMessage(), 2);
                write_log(json_encode($message), 2);
            } catch (\PDOException $pe) {
                write_log($pe->getMessage(), 2);
                write_log(json_encode($message), 2);
            }
        }
    }
    $message->delivery_info[‘channel‘]->basic_ack($message->delivery_info[‘delivery_tag‘]);
    // Send a message with the string "quit" to cancel the consumer.
    if ($message->body === ‘quit‘) {
        $message->delivery_info[‘channel‘]->basic_cancel($message->delivery_info[‘consumer_tag‘]);
    }
 }

/**
 * 启动
 *
 * @return \think\Response
 */
public function start()
{
    $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
    $channel = $connection->channel();
    $channel->queue_declare(self::queue, false, true, false, false);
    $channel->exchange_declare(self::exchange, ‘direct‘, false, true, false);
    $channel->queue_bind(self::queue, self::exchange);
    $channel->basic_consume(self::queue, self::consumerTag, false, false, false, false, array($this, ‘process_message‘));

    register_shutdown_function(array($this, ‘shutdown‘), $channel, $connection);
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    write_log("starting",3);
}

}
  1. 启动消费者(守护进程) nohup php index.php index/Message_Consume/start &

原文链接:https://www.jianshu.com/p/89dc541c6362

thinkphp使用AMQP库(支持RabbitMq)

标签:send   cep   composer   catch   change   tag   消费者   consumer   try   

原文地址:https://www.cnblogs.com/stronger-xsw/p/12972950.html


评论


亲,登录后才可以留言!