Skynet Cluster 简介
Cluster 模块负责 Skynet 节点之间的通信。
Cluster 概述
| --- --- ---> |
node1 | | node2
| <-- --- --- |
两个节点之间通信最多会创建 2 条 TCP 链路。如上图所示。
- 节点 node1 主动向 node2 发起通信和 node2 的回应是一条 TCP 链接。
- 节点 node2 主动向 node1 发起通信和 node1 的回应是另一条 TCP 链接。
Cluster 模块包含三个服务:clusterd clusteragent 和 clustersender ,通信使用 gate 服务。
- clusterd 服务提供管理功能,初始化 gate 服务监听网络连接。每个节点只存在一个 clusterd 服务。
- gate 服务接收到新的 socket 连接时,通知 clusterd 服务,此时创建 clusteragent 服务接收数据。每个 clusteragent 服务对应一个 socket 连接。
- 主动与其它节点通信时,clusterd 服务创建 clustersender 服务建立连接并发送数据。每个 clustersender 服务对应一个 socket 连接。
举个例子。假设存在如下节点配置:
center = "192.168.1.1:10000"
login1 = "192.168.1.2:10011"
login2 = "192.168.1.3:10012"
- 若 login1 主动与 center 节点通信,则建立一条 TCP 连接,login1 创建一个 clustersender 服务与 center 创建的一个 clusteragent 服务对应。
- 若 login2 主动与 center 节点通信,则建立一条 TCP 连接,login2 创建一个 clustersender 服务与 center 创建的另一个 clusteragent 服务对应。
- 若 center 主动与 login1 节点通信,则建立一条 TCP 连接,center 创建一个 clustersender 服务与 login1 创建的一个 clusteragent 服务对应。
- 若 center 主动与 login2 节点通信,则建立一条 TCP 连接,center 创建另一个 clustersender 服务与 login2 创建的一个 clusteragent 服务对应。
上述一共会创建 4 条 TCP 连接,4 个 clusteragent 服务和 4 个 clustersender 服务。
Cluster 接口
在使用 Skynet Cluster 之前,需要先设置配置信息,指定节点名称和对应的 IP 地址及端口号。配置格式如下:db
是节点名称而 127.0.0.1:2528
是该节点对应的 IP 地址及端口号。
db = "127.0.0.1:2528"
db2 = "127.0.0.1:2529"
配置信息可通过配置文件给出,在 Skynet Config 中把文件名赋值给 cluster
即可。
此外也可以直接通过字符串给出配置信息。
调用 cluster.reload(config)
重载配置信息,config
可以是文件名或者字符串。
使用过程中通常涉及如下接口。接口位于库文件 lualib/skynet/cluster.lua
中。
调用 require "skynet.cluster"
创建 clusterd 服务。require
时调用 skynet.uniqueservice("clusterd")
创建唯一的 clusterd 服务。
调用 cluster.open(port)
监听网络连接。
调用 cluster.call(node, address, ...)
向 node
节点发起请求。
调用 cluster.send(node, address, ...)
向 node
节点推送数据。
调用 cluster.register(name, addr)
注册字符串 name
对应的地址。此时,请求方只需使用字符串 name
就可访问到对应的服务。 调用 cluster.query(node, name)
查询字符串 name
对应的服务地址。
通常,服务器调用 cluster.open
监听网络连接。客户端调用 cluster.call
或者 cluster.send
指定节点名称和服务地址发起访问。
Cluster 加载配置流程
clusterd 服务在启动时会调用 loadconfig
函数加载配置,此外也可以通过 cluster.reload(config)
重载配置。
clusterd.lua:72 loadconfig
函数用于加载配置。配置格式就是前面提到的节点名称和对应的 IP 地址及端口号。加载完配置后,便可获取到节点对应的地址信息。具体流程如下。
加载选项到
config
中。目前支持在配置中设置__nowaiting = true
,若向节点发起连接请求时,未找到连接对应的地址,则直接报错而不是挂起协程等待该节点的地址信息被设置。加载配置数据到
node_address
中。若
ct.namequery
和not config.nowaiting
为true
,则唤醒等待节点地址信息的协程。local ct = connecting[name] if ct and ct.namequery and not config.nowaiting then skynet.error(string.format("Cluster node [%s] resloved : %s", name, address)) skynet.wakeup(ct.namequery) end
若
config.nowaiting
为true
则表示不等待节点的配置信息,此时唤醒所有的协程。if config.nowaiting then -- wakeup all connecting request for name, ct in pairs(connecting) do if ct.namequery then skynet.wakeup(ct.namequery) end end end
若节点的地址发生变更,则向新地址重新发起连接请求。
for _, name in ipairs(reload) do -- open_channel would block skynet.fork(open_channel, node_channel, name) end
Cluster 监听网络连接
cluster.open
函数用于监听网络连接。clusterd.lua124 command.listen
处理具体的监听逻辑,网络模块采用 gate
服务。处理来自 gate
服务的消息来处理网络请求。
function command.listen(source, addr, port)
local gate = skynet.newservice("gate")
if port == nil then
local address = assert(node_address[addr], addr .. " is down")
addr, port = string.match(address, "([^:]+):(.*)$")
end
skynet.call(gate, "lua", "open", { address = addr, port = port })
skynet.ret(skynet.pack(nil))
end
Cluster 发起网络连接请求
调用 cluster.send
或 cluster.call
向节点发送数据,若还未建立连接,则会先尝试建立连接。概要流程如下:
cluster.lua:8 request_sender
函数中调用pcall(skynet.call, clusterd, "lua", "sender", node)
通过clusterd
服务创建clustersender
服务,创建完服务后,则连接建立完成。- 向
clustersender
服务发送数据,即可向目标节点发送数据。
获取 clustersender
服务
对于 cluster.call
函数,通过 cluster.lua41 get_sender
函数获取 clustersender
服务,若处于创建过程中,则调用 skynet.wait(task)
等待。
对于 cluster.send
函数,由于不能阻塞调用,所以通过 get_queue
函数获取 clustersender
服务。若处于创建 clustersender
服务的过程中,则把请求插入到队列中然后返回:table.insert(task_queue[node], table.pack(address, ...))
。通过对 task_queue
设置元表 setmetatable(task_queue, { __index = get_queue } )
,来触发首次调用 request_sender
函数。
函数 request_sender
的注意点在于需要按顺序逐个唤醒等待的协程。由于在 ipairs(q)
的过程中,可能存在对 q
的修改是 table.insert
操作,因此是允许的。最后设置 sender[node] = c
表示服务创建完毕,后续通过 sender
获取即可。
for _, task in ipairs(q) do
if type(task) == "table" then
if c then
skynet.send(c, "lua", "push", task[1], skynet.pack(table.unpack(task,2,task.n)))
end
else
skynet.wakeup(task)
skynet.wait(confirm)
end
end
task_queue[node] = nil
sender[node] = c
创建 clustersender
服务,建立网络连接
函数 clusterd.lua:14 open_channel
完成连接建立过程。流程如下:
若当前正处于连接建立的过程中,则挂起当前协程,等待。
local ct = connecting[key] if ct then local co = coroutine.running() table.insert(ct, co) skynet.wait(co) return assert(ct.channel) end
获取节点的 IP 地址数据:
local address = node_address[key]
。若not config.nowaiting
为true
则需要等待节点的地址数据,此时也会被挂起,等待在loadconfig
中被唤醒。创建
clustersender
服务,调用pcall(skynet.call, c, "lua", "changenode", host, port)
向clustersender
发送changenode
消息,由clustersender
服务 connect 目标节点。完成连接建立,唤起
connecting
中之前被挂起的连接请求。connecting[key] = nil for _, co in ipairs(ct) do skynet.wakeup(co) end
检查
node_address[key]
节点地址是否发生变更,若变更则再次调用open_channel
函数。if node_address[key] ~= address then return open_channel(t,key) end
于是,连接建立完成,可与目标节点通信。
Cluster 接收网络连接数据
clusterd
服务创建 gate
服务,接收来自 gate
服务的消息处理网络请求。当有新的网络连接时 clusterd.lua195 command.socket
函数处理新的连接,此时 subcmd == "open"
创建 clusteragent
服务接收网络数据。
若 subcmd == "closer" or subcmd == "error"
表示网络连接已经断开,此时处理连接断开逻辑。
function command.socket(source, subcmd, fd, msg)
if subcmd == "open" then
skynet.error(string.format("socket accept from %s", msg))
-- new cluster agent
cluster_agent[fd] = false
local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
local closed = cluster_agent[fd]
cluster_agent[fd] = agent
if closed then
skynet.send(agent, "lua", "exit")
cluster_agent[fd] = nil
end
else
if subcmd == "close" or subcmd == "error" then
-- close cluster agent
local agent = cluster_agent[fd]
if type(agent) == "boolean" then
cluster_agent[fd] = true
elseif agent then
skynet.send(agent, "lua", "exit")
cluster_agent[fd] = nil
end
else
skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
end
end
end
Cluster 具体的发送和接收数据逻辑
发送数据的细节见 clustersender 服务,接收数据的细节见 clusteragent 服务。