PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

2021-03-06 15:28

阅读:443

标签:use   消息   持续更新   ESS   getconf   ase   result   ISE   publish   

延时队列

  • Delayproducer.Php
  • Amqpbuilder.Php

AmqpBuilder.php

arguments = array_merge($this->arguments, $arguments);

        return $this;

    }

    /**

     * 设置延时队列相关参数

     *

     * @param string $queueName

     * @param int    $xMessageTtl

     * @param string $xDeadLetterExchange

     * @param string $xDeadLetterRoutingKey

     *

     * @return $this

     */

    public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self

    {

        $this->setArguments([

            ‘x-message-ttl‘             => [‘I‘, $xMessageTtl * 1000], // 毫秒

            ‘x-dead-letter-exchange‘    => [‘S‘, $xDeadLetterExchange],

            ‘x-dead-letter-routing-key‘ => [‘S‘, $xDeadLetterRoutingKey],

        ]);

        $this->setQueue($queueName);

        return $this;

    }

}

  

DelayProducer.php

produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
 });
 }
 /**
 * @param ProducerMessageInterface $producerMessage
 * @param AmqpBuilder              $queueBuilder
 * @param bool                     $confirm
 * @param int                      $timeout
 *
 * @return bool
 * @throws \Throwable
 */
 private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
 {
 $result = false;
 $this->injectMessageProperty($producerMessage);
 $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
 $pool = $this->getConnectionPool($producerMessage->getPoolName());
 /** @var \Hyperf\Amqp\Connection $connection */
 $connection = $pool->get();
 if ($confirm) {
 $channel = $connection->getConfirmChannel();
 } else {
 $channel = $connection->getChannel();
 }
 $channel->set_ack_handler(function () use (&$result)
 {
 $result = true;
 });
 try {
 // 处理延时队列
 $exchangeBuilder = $producerMessage->getExchangeBuilder();
 // 队列定义
 $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
 // 路由定义
 $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
 // 队列绑定
 $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
 // 消息发送
 $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
 $channel->wait_for_pending_acks_returns($timeout);
 } catch (Throwable $exception) {
 // Reconnect the connection before release.
 $connection->reconnect();
 throw $exception;
 }
 finally {
 $connection->release();
 }
 return $confirm ? $result : true;
 }
 /**
 * @param ProducerMessageInterface $producerMessage
 */
 private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
 {
 if (class_exists(AnnotationCollector::class)) {
 /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
 $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
 if ($annotation) {
 $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
 $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
 }
 }
 }
}

  

处理超时订单

  • Orderqueueconsumer.Php
  • Orderqueueproducer.Php

Orderqueueproducer.php

payload = $data;
 }
 public function getExchangeBuilder() : ExchangeBuilder
 {
 return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
 }
}

  

Orderqueueconsumer.php

  

Demo

$builder = new AmqpBuilder();
 $builder->setDelayedQueue(‘order_exchange‘, 1, ‘delay_exchange‘, ‘delay_route‘);
 $que = ApplicationContext::getContainer()->get(DelayProducer::class);
 var_dump($que->produce(new OrderQueueProducer([‘order_sn‘ => (string)mt_rand(10000, 90000)]), $builder)) 

  

更多学习内容请访问:

腾讯T3-T4标准精品PHP架构师教程目录大全,只要你看完保证薪资上升一个台阶(持续更新)

 

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

标签:use   消息   持续更新   ESS   getconf   ase   result   ISE   publish   

原文地址:https://www.cnblogs.com/a609251438/p/12894970.html


评论


亲,登录后才可以留言!