关于 Laravel Redis 多个进程同时取队列问题详解

2018-09-07 13:00

阅读:630

  前言

  最近在工作中遇到了一个问题,开启多个进程处理队列会重复读取 Redis 中队列吗?是否因此导致重复执行任务?下面就来通过示例代码详细介绍下。

  使用 Supervisor 监听 Laravel 队列任务,其中 Supervisor 的配置如下:

   [program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /var/

  注意: numprocs = 8,代表开启 8 个进程来执行 command 中的命令。

  如下:

  Laravel 多进程读取队列内容是否会重复

  在 Laravel 的某个控制器方法,一次放入多个任务队列:

   public function index(Request $request) { $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); $this->dispatch((new SendFile3())->onQueue(sendfile)); }

  在队列处理的方法打印日志,打印处理的队列的 ID:

  app/Jobs/SendFile3.php

   public function handle() { info(invoke SendFile3); dump(invoke handle); $rawbody = $this->job->getRawBody(); $info = json_decode($rawbody, true); info(queue id: . $info[id]); }

  Laravel 使用 Redis 的 list 作为队列的数据结构,并会为每个队列分配一个 ID,数据结构如下:

   { job: Illuminate\\Queue\\CallQueuedHandler@call, data: { commandName: App\\Jobs\\SendFile3, command: O:18:\App\\Jobs\\SendFile3\:4:{s:6:\\u0000*\u0000job\;N;s:10:\connection\;N;s:5:\queue\;s:8:\sendfile\;s:5:\delay\;N;} }, id: hadBcy3IpNsnOofQQdHohsa451OkQs88, attempts: 1 }

  请求这个控制器路由(或者命令行方式),就可以看到 Redis 中多了很多队列任务了,如图:

  

  这个时候开启 Supervisor 处理队列任务,并查看日志:

  这 8 个进程并发处理队列,但从打印的日志看,没有出现同样的 ID. 我们再看一下 Laravel 如何使用 Redis 处理队列的。

  分析一下 Laravel 队列的处理

  Laravel 中入队列方法

   public function pushRaw($payload, $queue = null, array $options = []) { $this->getConnection()->rpush($this->getQueue($queue), $payload); return Arr::get(json_decode($payload, true), id); }

  用的是 Redis 的 rpush 命令。

  Laravel 中取队列方法

   public function pop($queue = null) { $original = $queue ?: $this->default; $queue = $this->getQueue($queue); $this->migrateExpiredJobs($queue.:delayed, $queue); if (! is_null($this->expire)) { $this->migrateExpiredJobs($queue.:reserved, $queue); } list($job, $reserved) = $this->getConnection()->eval( LuaScripts::pop(), 2, $queue, $queue.:reserved, $this->getTime() + $this->expire ); if ($reserved) { return new RedisJob($this->container, $this, $job, $reserved, $original); } }

  这里用的是 lua 脚本取队列,如下:

   public static function pop() { return <<<LUA local job = redis.call(lpop, KEYS[1]) local reserved = false if(job ~= false) then reserved = cjson.decode(job) reserved[attempts] = reserved[attempts] + 1 reserved = cjson.encode(reserved) redis.call(zadd, KEYS[2], ARGV[1], reserved) end return {job, reserved} LUA; }

  那么结论是:从 Laravel 的处理方式和打印的日志结果看,即使多个进程读取同一个队列,也不会读取到一样的数据。

  总结

  以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。


评论


亲,登录后才可以留言!