PHP消息队列实现及应用

2021-05-07 23:27

阅读:691

标签:port   art   pop   man   key   更新   sid   elf   静态   

概要
  1. 消息队列的概念,原理和场景
  2. 解耦案例: 队列处理订单系统和配送系统
  3. 流量削峰案例: Redis的List类型实现秒杀
  4. RabbitMQ: 更专业的消息系统实现方案
 
一.消息队列
  1. 消息队列概念
    1. 队列结构中间件
    2. 消息放入后,不需要立即处理
    3. 由订阅者/消费者按顺序处理
  2. 核心结构
    1. 业务系统--入队-->消息队列--出队-->队列处理系统
  3. 应用场景
    1. 冗余: 数据需要冗余时,如订单系统后续需要严格的转换和记录,消息队列将这些数据持久化的存储在队列中,消息处理程序获取消息,并将该条记录删除
    2. 解耦: 解决了两套系统深度耦合问题,入队系统和出队系统无关
    3. 流量削峰: 秒杀和抢购,出现非常明显的流量剧增,大量的需求集中在几秒钟之内,对服务器的瞬间压力非常大
    4. 异步通信: 消息本身就可以使入队的请求直接返回,所以事先了程序的异步操作
    5. 扩展性: 如订单,订单入队后会有财务系统进行处理,后期如果添加配货系统,配货系统直接订阅该消息队列即可
    6. 排序保证: 有些场景下数据的处理顺序非常重要,队列可做成单线程单进单出系统,保证数据按照顺序处理
  4. 常见队列实现优缺点
    1. Mysql: 可靠性高,易实现,速度慢
    2. Redis: 速度快,单条大消息包时效率低
    3. 消息系统: 专业性强,可靠,学习成本高
  5. 消息处理的触发机制
    1. 死循环方式读取: 易实现,故障时无法及时恢复(时效性强,适合秒杀)
    2. 定时任务: 压力均分,有处理量上限(定时任务的间隔和数量需要把控,适合订单系统,物流配货系统)
    3. 守护进程: 类似于PHP-FPM和PHP-CG,需要shell基础(监听进程检测消息队列中是否有内容,有内容启用出队系统进行处理)
 
二.解耦案例:队列处理订单系统和配送系统
  1. 架构设计
    1. 订单系统(接收用户订单)--->
    2. 订单队列表--->
    3. 配送系统中定时执行的程序读取队列表,将已处理的记录进行标记
  2. 程序流程
    1. 接收用户订单(Order.php)--->
    2. 订单系统处理--->
    3. 队列表(包含字符:order_id,status,mobile,address,created_at,updated_at)--->
    4. (定时脚本goods.sh每分钟启动)配送处理系统(Goods.php)--->
    5. 配送系统处理
  3. 代码实现
 1 -- 表结构
 2 create table `order_queue`(
 3 `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT id号,
 4 `order_id` int(11) NOT NULL,
 5 `mobile` varchar(20) NOT NULL COMMENT 用户手机号,
 6 `address` varchar(100) NOT NULL COMMENT 用户地址,
 7 `created_at` datetime NOT NULL DEFAULT 0000-00-00 00:00:00 COMMENT 订单创建的时间,
 8 `updated_at` datetime NOT NULL DEFAULT 0000-00-00 00:00:00 COMMENT 处理完成的时间,
 9 `status` tinyint(2) NOT NULL COMMENT 当前状态,0未处理,1已处理,2处理中,
10 PRIMARY key (`id`)
11 )ENGINE=InnoDB DEFAULT CHARSET=utf8;
  1 //db.php
  2 php
  3 // 数据库连接类
  4 class DB{
  5   //私有的属性
  6   private static $dbcon=false;
  7   private $host;
  8   private $port;
  9   private $user;
 10   private $pass;
 11   private $db;
 12   private $charset;
 13   private $link;
 14   //私有的构造方法
 15   private function __construct(){
 16     $this->host =  ‘localhost‘;
 17     $this->port =  ‘3306‘;
 18     $this->user =  ‘root‘;
 19     $this->pass =  ‘root‘;
 20     $this->db =  ‘imooc‘;
 21     $this->charset= ‘utf8‘;
 22     //连接数据库
 23     $this->db_connect();
 24     //选择数据库
 25     $this->db_usedb();
 26     //设置字符集
 27     $this->db_charset();
 28    }
 29    //连接数据库
 30    private function db_connect(){
 31     $this->link=mysqli_connect($this->host.‘:‘.$this->port,$this->user,$this->pass);
 32     if(!$this->link){
 33       echo "数据库连接失败
"; 34 echo "错误编码".mysqli_errno($this->link)."
"; 35 echo "错误信息".mysqli_error($this->link)."
"; 36 exit; 37 } 38 } 39 //设置字符集 40 private function db_charset(){ 41 mysqli_query($this->link,"set names {$this->charset}"); 42 } 43 //选择数据库 44 private function db_usedb(){ 45 mysqli_query($this->link,"use {$this->db}"); 46 } 47 //私有的克隆(实现单例) 48 private function __clone(){ 49 die(‘clone is not allowed‘); 50 } 51 //公用的静态方法(将该方法作为入口) 52 public static function getIntance(){ 53 if(self::$dbcon==false){ 54 self::$dbcon=new self; 55 } 56 return self::$dbcon; 57 } 58 //执行sql语句的方法 59 public function query($sql){ 60 $res=mysqli_query($this->link,$sql); 61 if(!$res){ 62 echo "sql语句执行失败
"; 63 echo "错误编码是".mysqli_errno($this->link)."
"; 64 echo "错误信息是".mysqli_error($this->link)."
"; 65 } 66 return $res; 67 } 68 //获得最后一条记录id 69 public function getInsertid(){ 70 return mysqli_insert_id($this->link); 71 } 72 /** 73 * 查询某个字段 74 * @param 75 * @return string or int 76 */ 77 public function getOne($sql){ 78 $query=$this->query($sql); 79 return mysqli_free_result($query); 80 } 81 //获取一行记录,return array 一维数组 82 public function getRow($sql,$type="assoc"){ 83 $query=$this->query($sql); 84 if(!in_array($type,array("assoc",‘array‘,"row"))){ 85 die("mysqli_query error"); 86 } 87 $funcname="mysqli_fetch_".$type; 88 return $funcname($query); 89 } 90 //获取一条记录,前置条件通过资源获取一条记录 91 public function getFormSource($query,$type="assoc"){ 92 if(!in_array($type,array("assoc","array","row"))) 93 { 94 die("mysqli_query error"); 95 } 96 $funcname="mysqli_fetch_".$type; 97 return $funcname($query); 98 } 99 //获取多条数据,二维数组 100 public function getAll($sql){ 101 $query=$this->query($sql); 102 $list=array(); 103 while ($r=$this->getFormSource($query)) { 104 $list[]=$r; 105 } 106 return $list; 107 } 108 109 public function selectAll($table,$where,$fields=‘*‘,$order=‘‘,$skip=0,$limit=1000) 110 { 111 if(is_array($where)){ 112 foreach ($where as $key => $val) { 113 if (is_numeric($val)) { 114 $condition = $key.‘=‘.$val; 115 }else{ 116 $condition = $key.‘=\"‘.$val.‘\"‘; 117 } 118 } 119 } else { 120 $condition = $where; 121 } 122 if (!empty($order)) { 123 $order = " order by ".$order; 124 } 125 $sql = "select $fields from $table where $condition $order limit $skip,$limit"; 126 $query = $this->query($sql); 127 $list = array(); 128 while ($r= $this->getFormSource($query)) { 129 $list[] = $r; 130 } 131 return $list; 132 } 133 /** 134 * 定义添加数据的方法 135 * @param string $table 表名 136 * @param string orarray $data [数据] 137 * @return int 最新添加的id 138 */ 139 public function insert($table,$data){ 140 //遍历数组,得到每一个字段和字段的值 141 $key_str=‘‘; 142 $v_str=‘‘; 143 foreach($data as $key=>$v){ 144 // if(empty($v)){ 145 // die("error"); 146 // } 147 //$key的值是每一个字段s一个字段所对应的值 148 $key_str.=$key.‘,‘; 149 $v_str.="‘$v‘,"; 150 } 151 $key_str=trim($key_str,‘,‘); 152 $v_str=trim($v_str,‘,‘); 153 //判断数据是否为空 154 $sql="insert into $table ($key_str) values ($v_str)"; 155 $this->query($sql); 156 //返回上一次增加操做产生ID值 157 return $this->getInsertid(); 158 } 159 /* 160 * 删除一条数据方法 161 * @param1 $table, $where=array(‘id‘=>‘1‘) 表名 条件 162 * @return 受影响的行数 163 */ 164 public function deleteOne($table, $where){ 165 if(is_array($where)){ 166 foreach ($where as $key => $val) { 167 $condition = $key.‘=‘.$val; 168 } 169 } else { 170 $condition = $where; 171 } 172 $sql = "delete from $table where $condition"; 173 $this->query($sql); 174 //返回受影响的行数 175 return mysqli_affected_rows($this->link); 176 } 177 /* 178 * 删除多条数据方法 179 * @param1 $table, $where 表名 条件 180 * @return 受影响的行数 181 */ 182 public function deleteAll($table, $where){ 183 if(is_array($where)){ 184 foreach ($where as $key => $val) { 185 if(is_array($val)){ 186 $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘; 187 } else { 188 $condition = $key. ‘=‘ .$val; 189 } 190 } 191 } else { 192 $condition = $where; 193 } 194 $sql = "delete from $table where $condition"; 195 $this->query($sql); 196 //返回受影响的行数 197 return mysqli_affected_rows($this->link); 198 } 199 /** 200 * [修改操作description] 201 * @param [type] $table [表名] 202 * @param [type] $data [数据] 203 * @param [type] $where [条件] 204 * @return [type] 205 */ 206 public function update($table,$data,$where,$limit=0){ 207 //遍历数组,得到每一个字段和字段的值 208 $str=‘‘; 209 foreach($data as $key=>$v){ 210 $str.="$key=‘$v‘,"; 211 } 212 $str=rtrim($str,‘,‘); 213 if(is_array($where)){ 214 foreach ($where as $key => $val) { 215 if(is_array($val)){ 216 $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘; 217 } else { 218 $condition = $key. ‘=‘ .$val; 219 } 220 } 221 } else { 222 $condition = $where; 223 } 224 225 if (!empty($limit)) { 226 $limit = " limit ".$limit; 227 }else{ 228 $limit=‘‘; 229 } 230 //修改SQL语句 231 $sql="update $table set $str where $condition $limit"; 232 $this->query($sql); 233 //返回受影响的行数 234 return mysqli_affected_rows($this->link); 235 } 236 }
 1 //Order.php
 2 php
 3 //这个文件是用来接受用户的订单信息并写入队列的一个文件
 4 include ‘../db.php‘;
 5 
 6 if(!empty($_GET[‘mobile‘])){
 7     //首先是订单中心的处理流程
 8     //...且生成订单号
 9     $order_id = rand(10000,99999);
10     //把用户传过来的数据进行过滤(可以防止sql注入)
11     //订单信息
12     $insert_data = array(
13         ‘order_id‘=>$order_id,
14         ‘mobile‘=>$_GET[‘mobile‘],
15         ‘created_at‘=>date(‘Y-m-d H:i:s‘, time()),
16         ‘status‘=>0
17     )
18     //把数据存入队列表中
19     $db = DB::getIntance();
20     $res = $db->insert(‘order_queue‘,$insert_data);
21     if($res){
22         echo $insert_data[‘order_id‘].‘保存成功‘;
23     }else{
24         echo $insert_data[‘order_id‘].‘保存失败‘;
25     }
26 }
 1 //Goods.php
 2 php
 3 //配送系统处理队列中的订单并进行标记的文件
 4 include ‘../include/db.php‘;
 5 $db = DB::getIntance();
 6 //1.先把要处理的记录更新为等待处理
 7 //配送系统不是及时完成的,存在处理时间,若在处理中时其他程序操作该记录会存在冲突,所以需要先将订单进行锁定
 8 $waiting = array(‘status‘=>0);
 9 $lock = array(‘status‘=>2);
10 $res_lock = update(‘order_queue‘,$lock,$waiting,2);
11 //2.选择出刚刚更新的数据,然后进行配送系统的处理
12 if($res_lock){
13     //2.1选择出要处理的订单内容
14     $res = $db->selectAll(‘order_queue‘,$lock);
15     //2.2由配货系统进行退货处理
16     //...
17     
18     //3.将处理过的程序更新为已完成
19     $success = array(
20         ‘status‘=>1,
21         ‘updated_at‘=>date(‘Y-m-d H:i:s‘,time())
22     );
23     $res_last = $db->update(‘order_queue‘,$success,$lock);
24     if($res_last){
25         echo ‘Success‘.$res_last;
26     }else{
27         echo ‘Fail‘.$res_last;
28     }
29 }else{
30     echo ‘All Finished‘;
31 }
1 #good.sh
2 date "+%G-%m-%d %H:%M:S"
3 cd /home/项目位置
4 php goods.php
服务器部署定时任务使用 crontab -e
#m分 h时 dom日 mon月 dow周 command命令
*/1 * * * * shell脚本目录 >> 日志文件目录 2>&1(错误输出转化为标准输出)
tail -f log.log //监控日志文件
 
三.流量削峰案例:Redis的List类型实现秒杀
  1. 常用命令
    1. LPUSH/LPUSHX :将值插入到(/存在的)列表头部
    2. RPUSH/RPUSHX :将值插入到(/存在的)列表尾部
    3. LPOP:移出并获取列表的第一个元素
    4. RPOP :移出并获取列表的最后一个元素``
    5. LTRIM :保留指定区间内的元素
    6. LLEN :获取列表长度
    7. LSET:通过索引设置列表元素的值
    8. LINDEX :通过索引获取列表中的元素
    9. LRANGE:获取列表指定范围内的元素
  2. 架构设计: 秒杀业务程序--->redis--->通过入库程序将redis中数据入库永久保存
  3. 代码级设计
    1. 秒杀程序把请求写入Redis, (Uid, time_stamp微秒级时间戳)
    2. 检查Redis已存放数据的长度,超出上限直接丢弃。
    3. 死循环处理存入Redis的数据并入库
  4. 代码实现
1 -- redis_queue数据库表的SQL语句:
2 create table `redis_queue`(
3 `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
4 `uid` int(11) NOT NULL DEFAULT 0,
5 `time_stamp` varchar(24) NOT NULL,
6 PRIMARY KEY(`id`)
7 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 1 //user.php
 2 php
 3 //接收用户请求存入redis中
 4 //首先加载redis组件
 5 $redis = new Redis();
 6 $redis->connect(‘127.0.0.1‘,6379);
 7 $redis_name = ‘miaosha‘;
 8 /////////////////////////////////////////////////////////
 9 //接收用户id
10 $uid = $_GET[‘uid‘];
11 //获取一下redis里已有的数量
12 $num = 10;
13 //如果当天人数少于十的时候,则加入这个队列
14 if ($redis->lLen($redis_name) ){
15     $redis->rPush($redis_name,$uid.‘%‘.microtime());
16     echo $uid.‘秒杀成功‘;
17 }else{
18     //如果当天人数已达到十人
19     echo ‘秒杀已结束‘;    
20 }
21 $redis->close();
22 ///仅模拟高压下///////////////////////////////////////////////
23 for($i=0;$i$i++){
24     $uid = rand(100000,999999);
25     //获取一下redis里已有的数量
26     $num = 10;
27     //如果当天人数少于十的时候,则加入这个队列
28     if ($redis->lLen($redis_name) ){
29         $redis->rPush($redis_name,$uid.‘%‘.microtime());
30         echo $uid.‘秒杀成功‘;
31     }else{
32         //如果当天人数已达到十人
33         echo ‘秒杀已结束‘;    
34     }
35 }
36 $redis->close();
37 /////////////////////////////////////////////////////////
 1 //savetodb.php
 2 php
 3 include ‘../include/db.php‘;
 4 //首先加载redis组件
 5 $redis = new Redis();
 6 $redis->connect(‘127.0.0.1‘,6379);
 7 $redis_name = ‘miaosha‘;
 8 
 9 $db = DB::getIntance();
10 //死循环
11 while(1){
12     //从队列最左侧取出一个值
13     $user = $redis->lPop($redis_name);
14     //然后判断这个值是否存在
15     if(!$user||$user == ‘nil‘){
16         sleep(2);
17         continue;
18     }
19     //切割出时间,uid
20     $user_arr = explode(‘%‘,$user);
21     $insert_data = array(
22         ‘uid‘=>$user_arr[0],
23         ‘time_stamp‘=>$user_arr[1]
24     )
25     //保存到数据库中
26     $res = $db->insert("redis_queue",$insert_data);
27     //数据库插入失败时回滚机制
28     if(!$res){
29         $redis->rPush($redis_name,$user);
30     }
31     sleep(2);
32 }
33 //释放redis
34 $redis->close();
 
第4章 RabbitMQ:更专业的消息系统实现方案
  1. RabbitMo的架构和原理
技术图片
    1. 特点:完整的实现了AMOP、集群简化、持久化、跨平台
  • RabbitMQ使用
    1. RabbitMQ安装(rabbitmq-server、php-amaplib)
    2. 生产者向消息通道发送消息
    3. 消费者处理消息
     
    第5章 总结
    1. 为什么使用队列
    2. 如何使用队列
    3. 队列实现机制
    4. 存取的使用情况
    5. 如何监控
     
     
     
     
     
     
     
     
     
     
     
     


    PHP消息队列实现及应用

    标签:port   art   pop   man   key   更新   sid   elf   静态   

    原文地址:https://www.cnblogs.com/qiusanqi/p/12083063.html

    上一篇:HTML - 属性

    下一篇:压力测试 Apache ab


    评论


    亲,登录后才可以留言!