NSQ客戶端
NSQ 是實時的分布式消息處理平臺,其設計的目的是用來大規模地處理每天數以十億計級別的消息。 它具有分布式和去中心化拓撲結構,該結構具有無單點故障、故障容錯、高可用性以及能夠保證消息的可靠傳遞的特征。
組件要求
- php: >=5.3.0
- ext-json: *
- easyswoole/easyswoole: 3.x
- easyswoole/http-client: ^1.2.5
- easyswoole/pool: ^1.0
- easyswoole/spl: ^1.1
- monolog/monolog: ~1.0
- react/react: >=0.2.1
安裝方法
composer require easyswoole/nsq
倉庫地址
基本使用
注冊Nsq服務
namespace EasySwoole\EasySwoole;
use App\Producer\Process as ProducerProcess;
use App\Consumer\Process as ConsumerProcess;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
class EasySwooleEvent implements Event
{
public static function initialize()
{
// TODO: Implement initialize() method.
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
// TODO: Implement mainServerCreate() method.
// 生產者
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ProducerProcess());
// 消費者
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ConsumerProcess());
}
......
}
生產者
namespace App\Producer;
use EasySwoole\Component\Process\AbstractProcess;
class Process extends AbstractProcess
{
protected function run($arg)
{
go(function () {
$config = new \EasySwoole\Nsq\Config();
$topic = "topic.test";
$nsqlookup = new \EasySwoole\Nsq\Lookup\Nsqlookupd($config->getNsqdUrl());
$hosts = $nsqlookup->lookupHosts($topic);
foreach ($hosts as $host) {
$nsq = new \EasySwoole\Nsq\Nsq();
for ($i = 0; $i < 10; $i++) {
$msg = new \EasySwoole\Nsq\Message\Message();
$msg->setPayload("test$i");
$nsq->push(
new \EasySwoole\Nsq\Connection\Producer($host, $config),
$topic,
$msg
);
}
}
});
}
}
消費者
namespace App\Consumer;
use EasySwoole\Component\Process\AbstractProcess;
class Process extends AbstractProcess
{
protected function run($arg)
{
go(function () {
$topic = "topic.test";
$config = new \EasySwoole\Nsq\Config();
$nsqlookup = new \EasySwoole\Nsq\Lookup\Nsqlookupd($config->getNsqdUrl());
$hosts = $nsqlookup->lookupHosts($topic);
foreach ($hosts as $host) {
$nsq = new \EasySwoole\Nsq\Nsq();
$nsq->subscribe(
new \EasySwoole\Nsq\Connection\Consumer($host, $config, $topic, 'test.consuming'),
function ($item) {
var_dump($item['message']);
}
);
}
});
}
}
附贈
- Nsq 集群部署 docker-compose.yml 一份,使用方式如下
- 保證4150,4151,4160,4161,4171端口未被占用(占用后可以修改compose文件中的端口號)
- 根目錄下,docker-compose up -d
- 訪問localhost:4171,可以查看Web版 nsqadmin 狀態。
https://github.com/easy-swoole/nsq/blob/master/docker-compose.yml