Queue 介紹
原理
EasySwoole
封裝實現了一個輕量級的隊列,默認使用 Redis
作為隊列驅動器。
用戶可以自己實現一個隊列驅動器來實現隊列,用 kafka
作為隊列驅動器或者 其他驅動器方式
作為隊列驅動器,來進行存儲。
從上可知,Queue
并不是一個單獨使用的組件,它更像一個對不同驅動的隊列進行統一封裝的門面組件。
Queue 組件當前最新穩定版本為 3.x。
舊版本 (2.1.x) 的 Queue
組件的使用,請看 Queue 2.1.x
組件要求
- ext-swoole: >=4.4.0
- easyswoole/component: ^2.0
- easyswoole/redis-pool: ~2.2.0
安裝方法
composer require easyswoole/queue 3.x
倉庫地址
基本使用
默認自帶的隊列驅動為 Redis
隊列。這里簡單列舉 2 種用戶可使用的方式:
- 在框架的任意位置進行生產和消費隊列任務。
- 在框架的任意位置進行生產隊列任務, 然后在自定義進程中進行消費任務。
在框架中進行生產和消費任務
創建隊列
use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;
// 配置 Redis 隊列驅動器
$redisConfig = new RedisConfig([
'host' => '127.0.0.1', // 服務端地址 默認為 '127.0.0.1'
'port' => 6379, // 端口 默認為 6379
'auth' => '', // 密碼 默認為 不設置
'db' => 0, // 默認為 0 號庫
]);
// 創建隊列
$queue = new Queue(new RedisQueue($redisConfig));
普通生產任務
$queue
為上述創建隊列中得到的隊列對象。
// 創建任務
$job = new Job();
// 設置任務數據
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
// 生產普通任務
$queue->producer()->push($job);
普通消費任務
$queue
為上述創建隊列中得到的隊列對象。
// 消費任務
$job = $queue->consumer()->pop();
// 或者是自定義進程中消費任務(具體使用請看下文自定義進程消費任務完整使用示例)
$queue->consumer()->listen(function (Job $job){
var_dump($job);
});
生產延遲任務
$queue
為上述創建隊列中得到的隊列對象。
// 創建任務
$job = new Job();
// 設置任務數據
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
// 設置任務延后執行時間
$job->setDelayTime(5);
// 生產延遲任務
$queue->producer()->push($job);
生產可信任務
// 創建任務
$job = new Job();
// 設置任務數據
$job->setJobData("this is my job data time time ".date('Ymd h:i:s'));
// 設置任務重試次數為 3 次。任務如果沒有確認,則會執行三次
$job->setRetryTimes(3);
// 如果5秒內沒確認任務,會重新回到隊列。默認為3秒
$job->setWaitConfirmTime(5);
// 投遞任務
$queue->producer()->push($job);
// 確認一個任務
$queue->consumer()->confirm($job);
在框架中生產任務和自定義進程中消費任務
- 注冊隊列驅動器
- 設置消費進程
- 生產者投遞任務
定義一個隊列
<?php
namespace App\Utility;
use EasySwoole\Component\Singleton;
use EasySwoole\Queue\Queue;
class MyQueue extends Queue
{
use Singleton;
}
定義消費進程
<?php
namespace App\Utility;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Queue\Job;
class QueueProcess extends AbstractProcess
{
protected function run($arg)
{
go(function (){
MyQueue::getInstance()->consumer()->listen(function (Job $job){
var_dump($job->getJobData());
});
});
}
}
支持多進程、多協程消費
注冊隊列驅動器、消費進程及設置生產者投遞任務
<?php
namespace EasySwoole\EasySwoole;
use App\Utility\MyQueue;
use App\Utility\QueueProcess;
use EasySwoole\Component\Timer;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\Queue\Job;
class EasySwooleEvent implements Event
{
public static function initialize()
{
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
// redis pool 使用請看 redis 章節文檔
$redisConfig = new \EasySwoole\Redis\Config\RedisConfig(
[
'host' => '127.0.0.1', // 服務端地址 默認為 '127.0.0.1'
'port' => 6379, // 端口 默認為 6379
'auth' => '', // 密碼 默認為 不設置
'db' => 0, // 默認為 0 號庫
]
);
// 配置 隊列驅動器
$driver = new \EasySwoole\Queue\Driver\RedisQueue($redisConfig, 'easyswoole_queue');
MyQueue::getInstance($driver);
// 注冊一個消費進程
$processConfig = new \EasySwoole\Component\Process\Config([
'processName' => 'QueueProcess', // 設置 自定義進程名稱
'processGroup' => 'Queue', // 設置 自定義進程組名稱
'enableCoroutine' => true, // 設置 自定義進程自動開啟協程
]);
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new QueueProcess($processConfig));
// 模擬生產者,可以在任意位置投遞
$register->add($register::onWorkerStart, function ($server, $id) {
if ($id == 0) {
Timer::getInstance()->loop(3000, function () {
$job = new Job();
$job->setJobData(['time' => \time()]);
MyQueue::getInstance()->producer()->push($job);
});
}
});
}
}
進程安全退出問題請看 自定義進程 章節。
控制器使用
以在 http
服務中為例,使用示例代碼如下:
<?php
namespace App\HttpController;
use App\Utility\MyQueue;
use EasySwoole\Http\AbstractInterface\Controller;
use EasySwoole\Http\Message\Status;
use EasySwoole\Queue\Driver\RedisQueue;
use EasySwoole\Queue\Job;
use EasySwoole\Queue\Queue;
use EasySwoole\Redis\Config\RedisConfig;
class Index extends Controller
{
// 生產普通任務
public function producer1()
{
// 獲取隊列
$queue = MyQueue::getInstance();
// 創建任務
$job = new Job();
// 設置任務數據
$job->setJobData("this is my job data time time " . date('Ymd h:i:s'));
var_dump('producer1 => ');
var_dump($job->getJobData());
// 生產普通任務
$produceRes = $queue->producer()->push($job);
if (!$produceRes) {
$this->writeJson(Status::CODE_OK, [], '隊列生產普通任務失敗!');
} else {
$this->writeJson(Status::CODE_OK, [], '隊列生產普通任務成功!');
}
}
// 生產延遲任務
public function producer2()
{
// 獲取隊列
$queue = MyQueue::getInstance();
// 創建任務
$job = new Job();
// 設置任務數據
$job->setJobData("this is my job data time time " . date('Ymd h:i:s'));
// 設置任務延后執行時間
$job->setDelayTime(5);
var_dump('producer2 => ');
var_dump($job->getJobData());
// 生產延遲任務
$produceRes = $queue->producer()->push($job);
if (!$produceRes) {
$this->writeJson(Status::CODE_OK, [], '隊列生產延遲任務失敗!');
} else {
$this->writeJson(Status::CODE_OK, [], '隊列生產延遲任務成功!');
}
}
// 生產可信任務
public function producer3()
{
// 獲取隊列
$queue = MyQueue::getInstance();
// 創建任務
$job = new Job();
// 設置任務數據
$job->setJobData("this is my job data time time " . date('Ymd h:i:s'));
var_dump('producer3 => ');
var_dump($job->getJobData());
// 設置任務重試次數為 3 次。任務如果沒有確認,則會執行三次
$job->setRetryTimes(3);
// 如果5秒內沒確認任務,會重新回到隊列。默認為3秒
$job->setWaitConfirmTime(5);
// 投遞任務
$queue->producer()->push($job);
// 確認一個任務
$queue->consumer()->confirm($job);
}
// 消費任務
public function consumer()
{
// 獲取隊列
$queue = MyQueue::getInstance();
### 消費任務
// 獲取到需要消費的任務
$job = $queue->consumer()->pop();
if (!$job) {
$this->writeJson(Status::CODE_OK, [], '沒有隊列任務需要消費了!');
return false;
}
// 獲取需要消費的任務的數據
$jobData = $job->getJobData();
var_dump($jobData);
}
}
進階使用
我們可以自定義驅動,實現 RabbitMQ
、Kafka
等消費隊列軟件的封裝。
用戶需要定義類,并實現 \EasySwoole\Queue\QueueDriverInterface
接口的幾個方法即可。該接口的詳細實現請看下文。
QueueDriverInterface 接口類實現
<?php
namespace EasySwoole\Queue;
interface QueueDriverInterface
{
public function push(Job $job,float $timeout = 3.0): bool;
public function pop(float $timeout = 3.0, array $params = []): ?Job;
public function info(): ?array;
public function confirm(Job $job,float $timeout = 3.0): bool;
}