PHP+RabbitMQ实现消息队列的完整代码

  [复制链接]
查看2184 | 回复2 | 2019-8-2 10:15:22 | 显示全部楼层 |阅读模式
前言
为什么使用RabbitMq而不是ActiveMq或者RocketMq?
首先,从业务上来讲,我并不要求消息的100%接受率,并且,我需要结合php开发,RabbitMq相较RocketMq,延迟较低(微妙级)。至于ActiveMq,貌似问题较多。RabbitMq对各种语言的支持较好,所以选择RabbitMq。

先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.

php扩展地址: http://pecl.php.net/package/amqp

具体以官网为准  http://www.rabbitmq.com/getstarted.html

介绍
  • config.php 配置信息
  • BaseMQ.php MQ基类
  • ProductMQ.php 生产者类
  • ConsumerMQ.php 消费者类
  • Consumer2MQ.php 消费者2(可有多个)
config.php
  1. <?php
  2. return [
  3. //配置
  4. 'host' => [
  5.   'host' => '127.0.0.1',
  6.   'port' => '5672',
  7.   'login' => 'guest',
  8.   'password' => 'guest',
  9.   'vhost'=>'/',
  10. ],
  11. //交换机
  12. 'exchange'=>'word',
  13. //路由
  14. 'routes' => [],
  15. ];
复制代码
BaseMQ.php
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: pc
  5. * Date: 2018/12/13
  6. * Time: 14:11
  7. */

  8. namespace MyObjSummary\rabbitMQ;

  9. /** Member
  10. *  AMQPChannel
  11. *  AMQPConnection
  12. *  AMQPEnvelope
  13. *  AMQPExchange
  14. *  AMQPQueue
  15. * Class BaseMQ
  16. * @package MyObjSummary\rabbitMQ
  17. */
  18. class BaseMQ
  19. {
  20. /** MQ Channel
  21.   * @var \AMQPChannel
  22.   */
  23. public $AMQPChannel ;

  24. /** MQ Link
  25.   * @var \AMQPConnection
  26.   */
  27. public $AMQPConnection ;

  28. /** MQ Envelope
  29.   * @var \AMQPEnvelope
  30.   */
  31. public $AMQPEnvelope ;

  32. /** MQ Exchange
  33.   * @var \AMQPExchange
  34.   */
  35. public $AMQPExchange ;

  36. /** MQ Queue
  37.   * @var \AMQPQueue
  38.   */
  39. public $AMQPQueue ;

  40. /** conf
  41.   * @var
  42.   */
  43. public $conf ;

  44. /** exchange
  45.   * @var
  46.   */
  47. public $exchange ;

  48. /** link
  49.   * BaseMQ constructor.
  50.   * @throws \AMQPConnectionException
  51.   */
  52. public function __construct()
  53. {
  54.   $conf = require 'config.php' ;
  55.   if(!$conf)
  56.    throw new \AMQPConnectionException('config error!');
  57.   $this->conf  = $conf['host'] ;
  58.   $this->exchange = $conf['exchange'] ;
  59.   $this->AMQPConnection = new \AMQPConnection($this->conf);
  60.   if (!$this->AMQPConnection->connect())
  61.    throw new \AMQPConnectionException("Cannot connect to the broker!\n");
  62. }

  63. /**
  64.   * close link
  65.   */
  66. public function close()
  67. {
  68.   $this->AMQPConnection->disconnect();
  69. }

  70. /** Channel
  71.   * @return \AMQPChannel
  72.   * @throws \AMQPConnectionException
  73.   */
  74. public function channel()
  75. {
  76.   if(!$this->AMQPChannel) {
  77.    $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
  78.   }
  79.   return $this->AMQPChannel;
  80. }

  81. /** Exchange
  82.   * @return \AMQPExchange
  83.   * @throws \AMQPConnectionException
  84.   * @throws \AMQPExchangeException
  85.   */
  86. public function exchange()
  87. {
  88.   if(!$this->AMQPExchange) {
  89.    $this->AMQPExchange = new \AMQPExchange($this->channel());
  90.    $this->AMQPExchange->setName($this->exchange);
  91.   }
  92.   return $this->AMQPExchange ;
  93. }

  94. /** queue
  95.   * @return \AMQPQueue
  96.   * @throws \AMQPConnectionException
  97.   * @throws \AMQPQueueException
  98.   */
  99. public function queue()
  100. {
  101.   if(!$this->AMQPQueue) {
  102.    $this->AMQPQueue = new \AMQPQueue($this->channel());
  103.   }
  104.   return $this->AMQPQueue ;
  105. }

  106. /** Envelope
  107.   * @return \AMQPEnvelope
  108.   */
  109. public function envelope()
  110. {
  111.   if(!$this->AMQPEnvelope) {
  112.    $this->AMQPEnvelope = new \AMQPEnvelope();
  113.   }
  114.   return $this->AMQPEnvelope;
  115. }
  116. }
复制代码
ProductMQ.php
  1. <?php
  2. //生产者 P
  3. namespace MyObjSummary\rabbitMQ;
  4. require 'BaseMQ.php';
  5. class ProductMQ extends BaseMQ
  6. {
  7. private $routes = ['hello','word']; //路由key

  8. /**
  9.   * ProductMQ constructor.
  10.   * @throws \AMQPConnectionException
  11.   */
  12. public function __construct()
  13. {
  14.   parent::__construct();
  15. }

  16. /** 只控制发送成功 不接受消费者是否收到
  17.   * @throws \AMQPChannelException
  18.   * @throws \AMQPConnectionException
  19.   * @throws \AMQPExchangeException
  20.   */
  21. public function run()
  22. {
  23.   //频道
  24.   $channel = $this->channel();
  25.   //创建交换机对象
  26.   $ex = $this->exchange();
  27.   //消息内容
  28.   $message = 'product message '.rand(1,99999);
  29.   //开始事务
  30.   $channel->startTransaction();
  31.   $sendEd = true ;
  32.   foreach ($this->routes as $route) {
  33.    $sendEd = $ex->publish($message, $route) ;
  34.    echo "Send Message:".$sendEd."\n";
  35.   }
  36.   if(!$sendEd) {
  37.    $channel->rollbackTransaction();
  38.   }
  39.   $channel->commitTransaction(); //提交事务
  40.   $this->close();
  41.   die ;
  42. }
  43. }
  44. try{
  45. (new ProductMQ())->run();
  46. }catch (\Exception $exception){
  47. var_dump($exception->getMessage()) ;
  48. }
复制代码
ConsumerMQ.php
  1. <?php
  2. //消费者 C
  3. namespace MyObjSummary\rabbitMQ;
  4. require 'BaseMQ.php';
  5. class ConsumerMQ extends BaseMQ
  6. {
  7. private $q_name = 'hello'; //队列名
  8. private $route = 'hello'; //路由key

  9. /**
  10.   * ConsumerMQ constructor.
  11.   * @throws \AMQPConnectionException
  12.   */
  13. public function __construct()
  14. {
  15.   parent::__construct();
  16. }

  17. /** 接受消息 如果终止 重连时会有消息
  18.   * @throws \AMQPChannelException
  19.   * @throws \AMQPConnectionException
  20.   * @throws \AMQPExchangeException
  21.   * @throws \AMQPQueueException
  22.   */
  23. public function run()
  24. {

  25.   //创建交换机
  26.   $ex = $this->exchange();
  27.   $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
  28.   $ex->setFlags(AMQP_DURABLE); //持久化
  29.   //echo "Exchange Status:".$ex->declare()."\n";

  30.   //创建队列
  31.   $q = $this->queue();
  32.   //var_dump($q->declare());exit();
  33.   $q->setName($this->q_name);
  34.   $q->setFlags(AMQP_DURABLE); //持久化
  35.   //echo "Message Total:".$q->declareQueue()."\n";

  36.   //绑定交换机与队列,并指定路由键
  37.   echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";

  38.   //阻塞模式接收消息
  39.   echo "Message:\n";
  40.   while(True){
  41.    $q->consume(function ($envelope,$queue){
  42.     $msg = $envelope->getBody();
  43.     echo $msg."\n"; //处理消息
  44.     $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
  45.    });
  46.    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
  47.   }
  48.   $this->close();
  49. }
  50. }
  51. try{
  52. (new ConsumerMQ)->run();
  53. }catch (\Exception $exception){
  54. var_dump($exception->getMessage()) ;
  55. }
复制代码


回复

使用道具 举报

kangly | 2019-10-23 09:22:57 | 显示全部楼层
强烈支持楼主ing……
回复

使用道具 举报

beauty | 2021-4-21 10:03:47 | 显示全部楼层

强烈支持楼主ing……
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则