kafka
Kafka是一種高吞吐量的分布式發布訂閱消息系統,有如下特性: 通過O(1)的磁盤數據結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠保持長時間的穩定性能。 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息。 支持通過Kafka服務器和消費機集群來分區消息。 支持Hadoop并行數據加載。
本項目代碼參考自 https://github.com/weiboad/kafka-php
組件要求
- php: >=7.1.0
- ext-swoole: ^4.4.5
- easyswoole/component: ^2.0
- easyswoole/spl: ^1.1
安裝方法
composer require easyswoole/kafka
倉庫地址
基本使用
注冊kafka服務
namespace EasySwoole\EasySwoole;
use App\Producer\Process as ProducerProcess;
use App\Consumer\Process as ConsumerProcess;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
class EasySwooleEvent implements Event
{
public static function initialize()
{
// TODO: Implement initialize() method.
date_default_timezone_set('Asia/Shanghai');
}
public static function mainServerCreate(EventRegister $register)
{
// TODO: Implement mainServerCreate() method.
// 生產者
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ProducerProcess());
// 消費者
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new ConsumerProcess());
}
}
生產者
namespace App\Producer;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Kafka\Config\ProducerConfig;
use EasySwoole\Kafka\Kafka;
class Process extends AbstractProcess
{
protected function run($arg)
{
go(function () {
$config = new ProducerConfig();
$config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
$config->setBrokerVersion('0.9.0');
$config->setRequiredAck(1);
$kafka = new Kafka($config);
$result = $kafka->producer()->send([
[
'topic' => 'test',
'value' => 'message--',
'key' => 'key--',
],
]);
var_dump($result);
var_dump('ok');
});
}
}
消費者
namespace App\Consumer;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Kafka\Config\ConsumerConfig;
use EasySwoole\Kafka\Kafka;
class Process extends AbstractProcess
{
protected function run($arg)
{
go(function () {
$config = new ConsumerConfig();
$config->setRefreshIntervalMs(1000);
$config->setMetadataBrokerList('127.0.0.1:9092,127.0.0.1:9093');
$config->setBrokerVersion('0.9.0');
$config->setGroupId('test');
$config->setTopics(['test']);
$config->setOffsetReset('earliest');
$kafka = new Kafka($config);
// 設置消費回調
$func = function ($topic, $partition, $message) {
var_dump($topic);
var_dump($partition);
var_dump($message);
};
$kafka->consumer()->subscribe($func);
});
}
}
附贈
- Kafka 集群部署 docker-compose.yml 一份,使用方式如下
- 保證2181,9092,9093,9000端口未被占用(占用后可以修改compose文件中的端口號)
- 根目錄下,docker-compose up -d
- 訪問localhost:9000,可以查看kafka集群狀態。
https://github.com/easy-swoole/kafka/blob/master/docker-compose.yml