FastCacheQueue
EasySwoole FastCache組件在>= 1.2.1
的時(shí)候新增類似· beanstalkd消息隊(duì)列 ·特性。
- 可以創(chuàng)建多個(gè)queue
- 支持延遲投遞
- 任務(wù)超時(shí)恢復(fù)執(zhí)行
- 任務(wù)重發(fā)執(zhí)行
- 任務(wù)最大重發(fā)次數(shù)
- 支持putJob、delayJob、releaseJob、reserveJob、buryJob、kickJob等命令
基本使用
FastCacheQueue依托于FastCache,具體安裝請(qǐng)查看FastCache
服務(wù)注冊(cè)
更新后,EasySwoole\FastCache\CacheProcessConfig類多出以下方法
/** 設(shè)置進(jìn)程最大內(nèi)存 默認(rèn)512M */
public function setMaxMem(string $maxMem): void
/** 設(shè)置消息隊(duì)列保留時(shí)間 默認(rèn)60s (取出任務(wù)后沒(méi)有及時(shí)確認(rèn)會(huì)重新放回隊(duì)列) */
public function setQueueReserveTime(int $queueReserveTime): void
/** 設(shè)置消息隊(duì)列最大重發(fā)次數(shù) 默認(rèn)10 達(dá)到次數(shù)后重發(fā)將會(huì)被丟棄 */
public function setQueueMaxReleaseTimes(int $queueMaxReleaseTimes): void
開(kāi)始使用
下文示例代碼的Job和Cache都使用以下命名空間
use EasySwoole\FastCache\Cache;
use EasySwoole\FastCache\Job;
投遞任務(wù)
投遞成功之后 將會(huì)返回該任務(wù)的jobId。
沒(méi)有失敗情況,除非fastCache注冊(cè)注冊(cè)失敗。
$job = new Job();
$job->setData("siam"); // 任意類型數(shù)據(jù)
$job->setQueue("siam_queue");
$jobId = Cache::getInstance()->putJob($job);
var_dump($jobId);
取出任務(wù)
可以開(kāi)啟自定義進(jìn)程當(dāng)消費(fèi)者,循環(huán)監(jiān)聽(tīng)隊(duì)列,執(zhí)行任務(wù)處理。
注意:任務(wù)執(zhí)行完成一定要有一個(gè)結(jié)果。要么刪除該任務(wù),要么重發(fā)。否則當(dāng)任務(wù)取出一定時(shí)間后(默認(rèn)60s)會(huì)自動(dòng)放回隊(duì)列中。
$job = Cache::getInstance()->getJob('siam_queue');// Job對(duì)象或者null
if ($job === null){
echo "沒(méi)有任務(wù)\n";
}else{
// 執(zhí)行業(yè)務(wù)邏輯
var_dump($job);
// 執(zhí)行完了要?jiǎng)h除或者重發(fā),否則超時(shí)會(huì)自動(dòng)重發(fā)
Cache::getInstance()->deleteJob($job);
}
清空ready任務(wù)隊(duì)列
var_dump(Cache::getInstance()->flushReadyJobQueue('siam_queue'));
var_dump(Cache::getInstance()->jobQueueSize('siam_queue'));
延遲執(zhí)行任務(wù)
$job = new Job();
$job->setData("siam");
$job->setQueue("siam_queue_delay");
$job->setDelay(5);// 延時(shí)5s
$jobId = Cache::getInstance()->putJob($job);
var_dump($jobId);
// 馬上取會(huì)失敗 隔5s取才成功
$job = Cache::getInstance()->getJob('siam_queue_delay');
var_dump($job);
刪除任務(wù)
可以是由getJob取出的對(duì)象,也可以自己聲明Job對(duì)象,傳入JobId來(lái)刪除。
$job = new Job();
$job->setJobId(1);
$job->setQueue('siam_queue_delay');
Cache::getInstance()->deleteJob($job);
任務(wù)重發(fā)
任務(wù)執(zhí)行失敗,或者某些場(chǎng)景需要重新執(zhí)行,則可以重發(fā)。
重發(fā)時(shí),可以指定是否延遲執(zhí)行。
// get出來(lái)的任務(wù)執(zhí)行失敗可以重發(fā)
$job = new Job();
$job->setData("siam");
$job->setQueue("siam_queue");
$jobId = Cache::getInstance()->putJob($job);
$job = Cache::getInstance()->getJob('siam_queue');
if ($job === null){
echo "沒(méi)有任務(wù)\n";
}else{
// 執(zhí)行業(yè)務(wù)邏輯
$doRes = false;
if (!$doRes){
// 業(yè)務(wù)邏輯失敗,需要重發(fā)
// 如果延遲隊(duì)列需要馬上重發(fā),在這里需要清空delay屬性
// $job->setDelay(0);
// 如果普通隊(duì)列需要延遲重發(fā),則設(shè)置delay屬性
// $job->setDelay(5);
$res = Cache::getInstance()->releaseJob($job);
var_dump($res);
}else{
// 執(zhí)行完了要?jiǎng)h除或者重發(fā),否則超時(shí)會(huì)自動(dòng)重發(fā)
Cache::getInstance()->deleteJob($job);
}
}
返回現(xiàn)在有什么隊(duì)列
$queues = Cache::getInstance()->jobQueues();
var_dump($queues);
返回某個(gè)隊(duì)列的長(zhǎng)度
$queueSize = Cache::getInstance()->jobQueueSize("siam_queue");
$queueSize2 = Cache::getInstance()->jobQueueSize("siam_queue_delay");
var_dump($queueSize);
var_dump($queueSize2);
清空隊(duì)列 可指定名稱
// 清空全部
$res = Cache::getInstance()->flushJobQueue();
var_dump($res);
// 清空siam_queue隊(duì)列
$res = Cache::getInstance()->flushJobQueue('siam_queue');
var_dump($res);
將任務(wù)改為延遲狀態(tài)
//添加任務(wù)
$job = new Job();
$job->setData("LuffyQAQ");
$job->setQueue("LuffyQAQ_queue_delay");
$jobId = Cache::getInstance()->putJob($job);
//方法一 直接傳入jobId
$job->setJobId($jobId);
$job->setDelay(30);
var_dump(Cache::getInstance()->delayJob($job));
//方法二 取出任務(wù)
$job = Cache::getInstance()->getJob('LuffyQAQ_queue_delay');
$job->setDelay(30);
var_dump(Cache::getInstance()->delayJob($job));
//使用jobQueueSize查看隊(duì)列長(zhǎng)度
$queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_delay");
var_dump($queueSize);
從延遲執(zhí)行隊(duì)列中拿取
//傳入隊(duì)列名
var_dump(Cache::getInstance()->getDelayJob('LuffyQAQ_queue_delay'));
清空delay任務(wù)隊(duì)列
var_dump(Cache::getInstance()->flushDelayJobQueue('LuffyQAQ_queue_delay'));
var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_delay'));
將任務(wù)改為保留狀態(tài)
//添加任務(wù)
$job = new Job();
$job->setData("LuffyQAQ");
$job->setQueue("LuffyQAQ_queue_reserve");
$jobId = Cache::getInstance()->putJob($job);
//方法一 直接傳入jobId
$job->setJobId($jobId);
var_dump(Cache::getInstance()->reserveJob($job));
//方法二 取出任務(wù)
$job = Cache::getInstance()->getJob('LuffyQAQ_queue_reserve');
var_dump(Cache::getInstance()->reserveJob($job));
//使用jobQueueSize查看隊(duì)列長(zhǎng)度
$queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_reserve");
var_dump($queueSize);
從保留隊(duì)列中拿取
//傳入隊(duì)列名
var_dump(Cache::getInstance()->getReserveJob('LuffyQAQ_queue_reserve'));
清空reserve任務(wù)隊(duì)列
var_dump(Cache::getInstance()->flushReserveJobQueue('LuffyQAQ_queue_reserve'));
var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_reserve'));
將任務(wù)改為埋藏狀態(tài)
$job = new Job();
$job->setQueue('LuffyQAQ_queue_bury');
$job->setData('LuffyQAQ');
$jobId = Cache::getInstance()->putJob($job);
$job->setJobId($jobId);
var_dump(Cache::getInstance()->buryJob($job));
//使用jobQueueSize查看隊(duì)列長(zhǎng)度
$queueSize = Cache::getInstance()->jobQueueSize("LuffyQAQ_queue_bury");
var_dump($queueSize);
從埋藏隊(duì)列中拿取
//傳入隊(duì)列名
var_dump(Cache::getInstance()->getBuryJob('LuffyQAQ_queue_bury'));
將埋藏隊(duì)列任務(wù)恢復(fù)到ready中
var_dump(Cache::getInstance()->kickJob($job));
清空bury任務(wù)隊(duì)列
var_dump(Cache::getInstance()->flushBuryJobQueue('LuffyQAQ_queue_bury'));
var_dump(Cache::getInstance()->jobQueueSize('LuffyQAQ_queue_bury'));