Actor
提供Actor
模式支持,助力游戲行業開發。EasySwoole
的Actor
采用自定義Process
作為存儲載體,以協程作為最小調度單位,利用協程Channel
做mail box
,而客戶端與Process
之間的通訊,采用UnixSocket
實現,并且借助TCP
實現分布式的ActorClient
,超高并發下也能輕松應對。
工作流程
一般來說有兩種策略用來在并發線程中進行通信:共享數據和消息傳遞。使用共享數據方式的并發編程面臨的最大的一個問題就是數據條件競爭,當兩個實例需要訪問同一個數據時,為了保證數據的一致性,通常需要為數據加鎖,而Actor模型采用消息傳遞機制來避免數據競爭,無需復雜的加鎖操作,各個實例只需要關注自身的狀態以及處理收到的消息。
Actor
是完全面向對象、無鎖、異步、實例隔離、分布式的并發開發模式。Actor
實例之間互相隔離,Actor
實例擁有自己獨立的狀態,各個Actor
之間不能直接訪問對方的狀態,需要通過消息投遞機制來通知對方改變狀態。由于每個實例的狀態是獨立的,沒有數據被共享,所以不會發生數據競爭,從而避免了并發下的加鎖問題。
舉一個游戲場景的例子,在一個游戲房間中,有5個玩家,每個玩家都是一個PlayerActor
,擁有自己的屬性,比如角色ID,昵稱,當前血量,攻擊力等。游戲房間本身也是一個RoomActor
,房間也擁有屬性,比如當前在線的玩家,當前場景的怪物數量,怪物血量等。此時玩家A攻擊某個怪物,則PlayerActor-A
向RoomActor
發送一個攻擊怪物的指令,RoomActor
經過計算,得出玩家A對怪物的傷害值,并給房間內的所有PlayerActor
發送一個消息(玩家A攻擊怪物A,造成175點傷害,怪物A剩余血量1200點),類似此過程,每個PlayerActor
都可以得知房間內發生了什么事情,但又不會造成同時訪問怪物A的屬性,導致的共享加鎖問題。
安裝
Actor
并沒有作為內置組件,需要先引入包并進行基礎配置才能夠使用。
composer require easyswoole/actor
使用
建立一個Actor
每一種對象(玩家、房間、甚至是日志服務也可以作為一種Actor
對象)都建立一個Actor
來進行管理,一個對象可以擁有多個實例(Client)
并且可以互相通過信箱發送消息來處理業務。
<?php
namespace App\Player;
use EasySwoole\Actor\AbstractActor;
use EasySwoole\Actor\ActorConfig;
/**
* 玩家Actor
* Class PlayerActor
* @package App\Player
*/
class PlayerActor extends AbstractActor
{
/**
* 配置當前的Actor
* @param ActorConfig $actorConfig
*/
public static function configure(ActorConfig $actorConfig)
{
$actorConfig->setActorName('PlayerActor');
$actorConfig->setWorkerNum(3);
}
/**
* Actor首次啟動時
*/
protected function onStart()
{
$actorId = $this->actorId();
echo "Player Actor {$actorId} onStart\n";
}
/**
* Actor收到消息時
* @param $msg
*/
protected function onMessage($msg)
{
$actorId = $this->actorId();
echo "Player Actor {$actorId} onMessage\n";
}
/**
* Actor即將退出前
* @param $arg
*/
protected function onExit($arg)
{
$actorId = $this->actorId();
echo "Player Actor {$actorId} onExit\n";
}
/**
* Actor發生異常時
* @param \Throwable $throwable
*/
protected function onException(\Throwable $throwable)
{
$actorId = $this->actorId();
echo "Player Actor {$actorId} onException\n";
}
}
注冊Actor服務
可以使用setListenAddress
和setListenPort
指定本機對外監聽的端口,其他機器可以通過該端口向本機的Actor
發送消息。
public static function mainServerCreate(EventRegister $register) {
// 注冊Actor管理器
$server = \EasySwoole\EasySwoole\ServerManager::getInstance()->getSwooleServer();
\EasySwoole\Actor\Actor::getInstance()->register(PlayerActor::class);
\EasySwoole\Actor\Actor::getInstance()->setTempDir(EASYSWOOLE_TEMP_DIR)
->setListenAddress('0.0.0.0')->setListenPort('9900')->attachServer($server);
}
Actor實例管理
服務啟動后就可以進行Actor
的操作,管理本機的Client
實例,則不需要給client
傳入$node
參數,默認的node
為本機,管理其他機器時需要傳入。
// 管理本機的Actor則不需要聲明節點
$node = new \EasySwoole\Actor\ActorNode();
$node->setIp('127.0.0.1');
$node->setListenPort(9900);
// 啟動一個Actor并得到ActorId 后續操作需要依賴ActorId
$actorId = PlayerActor::client($node)->create(['time' => time()]); // 00101000000000000000001
// 給某個Actor發消息
PlayerActor::client($node)->send($actorId, ['data' => 'data']);
// 給該類型的全部Actor發消息
PlayerActor::client($node)->sendAll(['data' => 'data']);
// 退出某個Actor
PlayerActor::client($node)->exit($actorId, ['arg' => 'arg']);
// 退出全部Actor
PlayerActor::client($node)->exitAll(['arg' => 'arg']);
架構解讀
Actor
應該叫ActorManager
更確切點,它用來注冊Actor
啟動Proxy
和ActorWorker
進程。
當你在業務邏輯里定義了幾種Actor
,比如RoomActor
、PlayerActor
,需要在SwooleServer
啟動時注冊它們。
具體就是在EasySwooleEvent.mainServerCreate
方法中添加如下代碼。
$actor = Actor::getInstance();
$actor->register(RoomActor::class);
$actor->register(PlayerActor::class);
$actorConf = Config::getInstance()->getConf('ACTOR_SERVER');
$actor->setMachineId($actorConf['MACHINE_ID'])
->setListenAddress($actorConf['LISTEN_ADDRESS'])
->setListenPort($actorConf['PORT'])
->attachServer($server);
其中ListenAddress
、ListenPort
為Proxy
進程的監聽地址端口,MachineId
為ActorWorker
進程的機器碼。
MachineId
和IP:PORT
對應。
attachServer
將開啟相應數量的Proxy
進程,以及前邊register
的ActorWorker
進程。
工作原理
Proxy
進程做消息中轉,Worker
進程做消息分發推送。來看個具體的例子:
游戲中玩家P請求進入房間R,抽象成Actor
模型就是PlayerActor
需要往RoomActor
發送請求加入的命令。
那么這時候需要這樣寫:
\EasySwoole\Actor\Test\RoomActor::client($node)->send($roomActorId, [
'user_actor_id' => $userActorId,
'data' => '其他進入房間的參數'
])
其中$roomActorId
和$userActorId
是事先xxActor::client()->create()
出來的。
上面那段代碼的意思就是往$roomActorId
的RoomActor
實例推送了一條$userActorId
玩家的UserActor
實例要加入房間的消息。
參數$node
用來尋址Proxy
,它由目標Actor
實例的Worker.MachineId
決定,在本例中就是$roomActorId
被創建在了哪個MachineId
的WorkerProcess
。
通過$roomActorId
中的機器碼找到IP:PORT
,生成$node
。
send
時會創建一個協程TcpClient
,將消息發送給Proxy
,然后Proxy
將消息轉發(UnixClient)
至本機WorkerProcess
,WorkerProcess
收到消息,推送到具體的Actor
實例。
這樣就完成了從PlayerActor
到RoomActor
的請求通訊,RoomActor
收到請求消息并處理完成后,向PlayerActor
回發處理結果,用的是同樣的通訊流程。
如果是單機部署,可以忽略$node
參數,因為所有通訊都是在本機進行。
多機的話,需要自己根據業務來實現Actor
如何分布和定位。
主要屬性
machineId 機器碼
proxyNum 啟動幾個ProxyProcess
listenPort 監聽port
listenAddress 監聽ip
AbstractActor
Actor
實例的基類,所有業務中用到的Actor
都將繼承于`AbstractActor。例如游戲場景中的房間,你可以:
class RoomActor extends AbstractActor
工作原理
每個Actor
實例都維護一份獨立的數據和狀態,當一個Actor
實例通過client()->create()
后,會開啟協程循環,接收mailbox pop
的消息,進而處理業務邏輯,更新自己的數據及狀態。具體實現就是__run()
這個方法。
靜態方法 configure
用來配置ActorConfig
,只需要在具體的Actor
(如RoomActor
)去重寫這個方法就行。
關于ActorConfig
具體屬性可以看下邊ActorConfig
部分。
幾個虛擬方法
以下幾個虛擬方法需要在Actor
子類中實現,這幾個方法被用在__run()
中來完成Actor
的運行周期。
onStart() 在協程開啟前執行,你可以在此進行Actor
初始化的一些操作,比如獲取房間的基礎屬性等。
onMessage() 當接收到消息時執行,一個Actor
實例的生命周期基本上就是在收消息-處理-發消息,你需要在這里對消息進行解析處理。
onExit() 當接收到退出命令時執行。比如你希望在一個Actor
實例退出的時候,同時通知某些關聯的其他Actor
,可以在此處理。
其它
exit() 用于實例自己退出操作,會向自己發一條退出的命令。
tick()、after() 兩個定時器,用于Actor
實例的定時任務,比如游戲房間的定時刷怪(tick)
;掉線后多長時間自動踢出(after)
。
static client() 用于創建一個ActorClient
來進行對應Actor
(實例)的通訊。
ActorClient
Actor
通訊客戶端,調用xxActor::client()
來創建一個ActorClient
進行Actor
通訊。
上邊已經大概講過了Actor
的通訊流程,本質就是TcpClient->ProxyProcess->UnixClient->ActorWorkerProcess->xxActor
。
看下它實現了哪些方法:
create() 創建一個xxActor
實例,返回actorId
,在之后你可以使用這個actorId
與此實例進行通訊。
send() 指定actorId
,向其發送消息。
exit() 通知xxActor
退出指定actorId
的實例。
sendAll() 向所有的xxActor
實例發送消息。
exitAll() 退出所有xxActor
實例。
exist() 當前是否存在指定actorId
的xxActor
實例。
status() 當前ActorWorker
下xxActor
的分布狀態。
ActorConfig
具體Actor
的配置項,比如RoomActor
、PlayerActor
都有自己的配置。
actorName 一般用類名就可以,注意在同一個服務中這個是不能重復的。
actorClass 在Actor->register()
會將對應的類名寫入。
workerNum 為Actor
開啟幾個進程,Actor->attachServer()
時會根據這個參數為相應Actor
啟動WorkerNum
個Worker
進程。
ActorNode
上邊提到過,xxActor::client($node)
,這個$node
就是ActorNode
對象,屬性為Ip
和Port
,用于尋址Proxy
。
WorkerConfig
WorkerProcess
的配置項,WorkerProcess
啟動時用到。
workerId worker
進程Id
,create Actor
的時候用于生成actorId
machineId worker
進程機器碼,create Actor
的時候用于生成actorId
trigger 異常觸發處理接口
WorkerProcess
Actor
的重點在這里,每個注冊的Actor
(類)會啟動相應數量的WorkerProcess
。
比如你注冊了RoomActor
、PlayerActor
,workerNum
都配置的是3,那么系統將啟動3個RoomActor
的Worker
進程和3個PlayerActor
的Worker
進程。
每個WorkerProcess
維護一個ActorList
,你通過client()->create()
的Actor
將分布在不同Worker
進程里,由它的ActorList
進行管理。
WorkerProcess
通過協程接收client
(這個client
就是Proxy
做轉發時的UnixClient
)消息,區分消息類型,然后分發給對應的Actor
實例。
請仔細閱讀下WorkerProcess
的源碼,它繼承于AbstractUnixProcess
。
UnixClient
UnixStream Socket
,自行了解。Proxy
轉發消息給本機Actor
所使用的Client
。
Protocol
數據封包協議。
ProxyCommand
消息命令對象,Actor2
將不同類型的消息封裝成格式化的命令,最終傳給WorkerProcess
。
你可以在ActorClient
中了解一下方法和命令的對應關系,但這個不需要在業務層去更改。
ProxyConfig
消息代理的配置項。
actorList 注冊的actor
列表。
machineId 機器碼
tempDir 臨時目錄
trigger 錯誤觸發處理接口
ProxyProcess
Actor->attachServer()
會啟動proxyNum
個ProxyProcess
。
用于在Actor
實例和WorkerProcess
做消息中轉。