PHP消息队列实现及应用
2021-05-07 23:27
标签:port art pop man key 更新 sid elf 静态 PHP消息队列实现及应用 标签:port art pop man key 更新 sid elf 静态 原文地址:https://www.cnblogs.com/qiusanqi/p/12083063.html
1 -- 表结构
2 create table `order_queue`(
3 `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT ‘id号‘,
4 `order_id` int(11) NOT NULL,
5 `mobile` varchar(20) NOT NULL COMMENT ‘用户手机号‘,
6 `address` varchar(100) NOT NULL COMMENT ‘用户地址‘,
7 `created_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘订单创建的时间‘,
8 `updated_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘处理完成的时间‘,
9 `status` tinyint(2) NOT NULL COMMENT ‘当前状态,0未处理,1已处理,2处理中‘,
10 PRIMARY key (`id`)
11 )ENGINE=InnoDB DEFAULT CHARSET=utf8;
1 //db.php
2 php
3 // 数据库连接类
4 class DB{
5 //私有的属性
6 private static $dbcon=false;
7 private $host;
8 private $port;
9 private $user;
10 private $pass;
11 private $db;
12 private $charset;
13 private $link;
14 //私有的构造方法
15 private function __construct(){
16 $this->host = ‘localhost‘;
17 $this->port = ‘3306‘;
18 $this->user = ‘root‘;
19 $this->pass = ‘root‘;
20 $this->db = ‘imooc‘;
21 $this->charset= ‘utf8‘;
22 //连接数据库
23 $this->db_connect();
24 //选择数据库
25 $this->db_usedb();
26 //设置字符集
27 $this->db_charset();
28 }
29 //连接数据库
30 private function db_connect(){
31 $this->link=mysqli_connect($this->host.‘:‘.$this->port,$this->user,$this->pass);
32 if(!$this->link){
33 echo "数据库连接失败
";
34 echo "错误编码".mysqli_errno($this->link)."
";
35 echo "错误信息".mysqli_error($this->link)."
";
36 exit;
37 }
38 }
39 //设置字符集
40 private function db_charset(){
41 mysqli_query($this->link,"set names {$this->charset}");
42 }
43 //选择数据库
44 private function db_usedb(){
45 mysqli_query($this->link,"use {$this->db}");
46 }
47 //私有的克隆(实现单例)
48 private function __clone(){
49 die(‘clone is not allowed‘);
50 }
51 //公用的静态方法(将该方法作为入口)
52 public static function getIntance(){
53 if(self::$dbcon==false){
54 self::$dbcon=new self;
55 }
56 return self::$dbcon;
57 }
58 //执行sql语句的方法
59 public function query($sql){
60 $res=mysqli_query($this->link,$sql);
61 if(!$res){
62 echo "sql语句执行失败
";
63 echo "错误编码是".mysqli_errno($this->link)."
";
64 echo "错误信息是".mysqli_error($this->link)."
";
65 }
66 return $res;
67 }
68 //获得最后一条记录id
69 public function getInsertid(){
70 return mysqli_insert_id($this->link);
71 }
72 /**
73 * 查询某个字段
74 * @param
75 * @return string or int
76 */
77 public function getOne($sql){
78 $query=$this->query($sql);
79 return mysqli_free_result($query);
80 }
81 //获取一行记录,return array 一维数组
82 public function getRow($sql,$type="assoc"){
83 $query=$this->query($sql);
84 if(!in_array($type,array("assoc",‘array‘,"row"))){
85 die("mysqli_query error");
86 }
87 $funcname="mysqli_fetch_".$type;
88 return $funcname($query);
89 }
90 //获取一条记录,前置条件通过资源获取一条记录
91 public function getFormSource($query,$type="assoc"){
92 if(!in_array($type,array("assoc","array","row")))
93 {
94 die("mysqli_query error");
95 }
96 $funcname="mysqli_fetch_".$type;
97 return $funcname($query);
98 }
99 //获取多条数据,二维数组
100 public function getAll($sql){
101 $query=$this->query($sql);
102 $list=array();
103 while ($r=$this->getFormSource($query)) {
104 $list[]=$r;
105 }
106 return $list;
107 }
108
109 public function selectAll($table,$where,$fields=‘*‘,$order=‘‘,$skip=0,$limit=1000)
110 {
111 if(is_array($where)){
112 foreach ($where as $key => $val) {
113 if (is_numeric($val)) {
114 $condition = $key.‘=‘.$val;
115 }else{
116 $condition = $key.‘=\"‘.$val.‘\"‘;
117 }
118 }
119 } else {
120 $condition = $where;
121 }
122 if (!empty($order)) {
123 $order = " order by ".$order;
124 }
125 $sql = "select $fields from $table where $condition $order limit $skip,$limit";
126 $query = $this->query($sql);
127 $list = array();
128 while ($r= $this->getFormSource($query)) {
129 $list[] = $r;
130 }
131 return $list;
132 }
133 /**
134 * 定义添加数据的方法
135 * @param string $table 表名
136 * @param string orarray $data [数据]
137 * @return int 最新添加的id
138 */
139 public function insert($table,$data){
140 //遍历数组,得到每一个字段和字段的值
141 $key_str=‘‘;
142 $v_str=‘‘;
143 foreach($data as $key=>$v){
144 // if(empty($v)){
145 // die("error");
146 // }
147 //$key的值是每一个字段s一个字段所对应的值
148 $key_str.=$key.‘,‘;
149 $v_str.="‘$v‘,";
150 }
151 $key_str=trim($key_str,‘,‘);
152 $v_str=trim($v_str,‘,‘);
153 //判断数据是否为空
154 $sql="insert into $table ($key_str) values ($v_str)";
155 $this->query($sql);
156 //返回上一次增加操做产生ID值
157 return $this->getInsertid();
158 }
159 /*
160 * 删除一条数据方法
161 * @param1 $table, $where=array(‘id‘=>‘1‘) 表名 条件
162 * @return 受影响的行数
163 */
164 public function deleteOne($table, $where){
165 if(is_array($where)){
166 foreach ($where as $key => $val) {
167 $condition = $key.‘=‘.$val;
168 }
169 } else {
170 $condition = $where;
171 }
172 $sql = "delete from $table where $condition";
173 $this->query($sql);
174 //返回受影响的行数
175 return mysqli_affected_rows($this->link);
176 }
177 /*
178 * 删除多条数据方法
179 * @param1 $table, $where 表名 条件
180 * @return 受影响的行数
181 */
182 public function deleteAll($table, $where){
183 if(is_array($where)){
184 foreach ($where as $key => $val) {
185 if(is_array($val)){
186 $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘;
187 } else {
188 $condition = $key. ‘=‘ .$val;
189 }
190 }
191 } else {
192 $condition = $where;
193 }
194 $sql = "delete from $table where $condition";
195 $this->query($sql);
196 //返回受影响的行数
197 return mysqli_affected_rows($this->link);
198 }
199 /**
200 * [修改操作description]
201 * @param [type] $table [表名]
202 * @param [type] $data [数据]
203 * @param [type] $where [条件]
204 * @return [type]
205 */
206 public function update($table,$data,$where,$limit=0){
207 //遍历数组,得到每一个字段和字段的值
208 $str=‘‘;
209 foreach($data as $key=>$v){
210 $str.="$key=‘$v‘,";
211 }
212 $str=rtrim($str,‘,‘);
213 if(is_array($where)){
214 foreach ($where as $key => $val) {
215 if(is_array($val)){
216 $condition = $key.‘ in (‘.implode(‘,‘, $val) .‘)‘;
217 } else {
218 $condition = $key. ‘=‘ .$val;
219 }
220 }
221 } else {
222 $condition = $where;
223 }
224
225 if (!empty($limit)) {
226 $limit = " limit ".$limit;
227 }else{
228 $limit=‘‘;
229 }
230 //修改SQL语句
231 $sql="update $table set $str where $condition $limit";
232 $this->query($sql);
233 //返回受影响的行数
234 return mysqli_affected_rows($this->link);
235 }
236 } 1 //Order.php
2 php
3 //这个文件是用来接受用户的订单信息并写入队列的一个文件
4 include ‘../db.php‘;
5
6 if(!empty($_GET[‘mobile‘])){
7 //首先是订单中心的处理流程
8 //...且生成订单号
9 $order_id = rand(10000,99999);
10 //把用户传过来的数据进行过滤(可以防止sql注入)
11 //订单信息
12 $insert_data = array(
13 ‘order_id‘=>$order_id,
14 ‘mobile‘=>$_GET[‘mobile‘],
15 ‘created_at‘=>date(‘Y-m-d H:i:s‘, time()),
16 ‘status‘=>0
17 )
18 //把数据存入队列表中
19 $db = DB::getIntance();
20 $res = $db->insert(‘order_queue‘,$insert_data);
21 if($res){
22 echo $insert_data[‘order_id‘].‘保存成功‘;
23 }else{
24 echo $insert_data[‘order_id‘].‘保存失败‘;
25 }
26 }
1 //Goods.php
2 php
3 //配送系统处理队列中的订单并进行标记的文件
4 include ‘../include/db.php‘;
5 $db = DB::getIntance();
6 //1.先把要处理的记录更新为等待处理
7 //配送系统不是及时完成的,存在处理时间,若在处理中时其他程序操作该记录会存在冲突,所以需要先将订单进行锁定
8 $waiting = array(‘status‘=>0);
9 $lock = array(‘status‘=>2);
10 $res_lock = update(‘order_queue‘,$lock,$waiting,2);
11 //2.选择出刚刚更新的数据,然后进行配送系统的处理
12 if($res_lock){
13 //2.1选择出要处理的订单内容
14 $res = $db->selectAll(‘order_queue‘,$lock);
15 //2.2由配货系统进行退货处理
16 //...
17
18 //3.将处理过的程序更新为已完成
19 $success = array(
20 ‘status‘=>1,
21 ‘updated_at‘=>date(‘Y-m-d H:i:s‘,time())
22 );
23 $res_last = $db->update(‘order_queue‘,$success,$lock);
24 if($res_last){
25 echo ‘Success‘.$res_last;
26 }else{
27 echo ‘Fail‘.$res_last;
28 }
29 }else{
30 echo ‘All Finished‘;
31 }
1 #good.sh
2 date "+%G-%m-%d %H:%M:S"
3 cd /home/项目位置
4 php goods.php
1 -- redis_queue数据库表的SQL语句:
2 create table `redis_queue`(
3 `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
4 `uid` int(11) NOT NULL DEFAULT ‘0‘,
5 `time_stamp` varchar(24) NOT NULL,
6 PRIMARY KEY(`id`)
7 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1 //user.php
2 php
3 //接收用户请求存入redis中
4 //首先加载redis组件
5 $redis = new Redis();
6 $redis->connect(‘127.0.0.1‘,6379);
7 $redis_name = ‘miaosha‘;
8 /////////////////////////////////////////////////////////
9 //接收用户id
10 $uid = $_GET[‘uid‘];
11 //获取一下redis里已有的数量
12 $num = 10;
13 //如果当天人数少于十的时候,则加入这个队列
14 if ($redis->lLen($redis_name) ){
15 $redis->rPush($redis_name,$uid.‘%‘.microtime());
16 echo $uid.‘秒杀成功‘;
17 }else{
18 //如果当天人数已达到十人
19 echo ‘秒杀已结束‘;
20 }
21 $redis->close();
22 ///仅模拟高压下///////////////////////////////////////////////
23 for($i=0;$i$i++){
24 $uid = rand(100000,999999);
25 //获取一下redis里已有的数量
26 $num = 10;
27 //如果当天人数少于十的时候,则加入这个队列
28 if ($redis->lLen($redis_name) ){
29 $redis->rPush($redis_name,$uid.‘%‘.microtime());
30 echo $uid.‘秒杀成功‘;
31 }else{
32 //如果当天人数已达到十人
33 echo ‘秒杀已结束‘;
34 }
35 }
36 $redis->close();
37 /////////////////////////////////////////////////////////
1 //savetodb.php
2 php
3 include ‘../include/db.php‘;
4 //首先加载redis组件
5 $redis = new Redis();
6 $redis->connect(‘127.0.0.1‘,6379);
7 $redis_name = ‘miaosha‘;
8
9 $db = DB::getIntance();
10 //死循环
11 while(1){
12 //从队列最左侧取出一个值
13 $user = $redis->lPop($redis_name);
14 //然后判断这个值是否存在
15 if(!$user||$user == ‘nil‘){
16 sleep(2);
17 continue;
18 }
19 //切割出时间,uid
20 $user_arr = explode(‘%‘,$user);
21 $insert_data = array(
22 ‘uid‘=>$user_arr[0],
23 ‘time_stamp‘=>$user_arr[1]
24 )
25 //保存到数据库中
26 $res = $db->insert("redis_queue",$insert_data);
27 //数据库插入失败时回滚机制
28 if(!$res){
29 $redis->rPush($redis_name,$user);
30 }
31 sleep(2);
32 }
33 //释放redis
34 $redis->close();
上一篇:HTML - 属性
下一篇:压力测试 Apache ab