分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 前端开发

skynet源码分析之cluster集群模式

发布时间:2023-09-06 01:50责任编辑:顾先生关键词:暂无标签

比起slave/harbor集群模式,skynet提供了用的更为广泛的cluster集群模式,参考官方wiki https://github.com/cloudwu/skynet/wiki/Cluster。cluster模式利用socketchannel库(http://www.cnblogs.com/RainRill/p/8892648.html) 与其他skynet进程进行交互,每个请求包带一个唯一的session值,对端回应包附带session值。cluster集群模式tcp通道是单向的,即skynet进程1(集群中的节点)通过tcp通道向进程2发送请求包,进程2回应包也走这一通道。但是,进程2向进程1发送请求包及进程1的回应包则是另一条tcp通道。

每个集群节点都有一份完整的cluster配置,会启动一个clusterd的服务,调用loadconfig加载配置。

 第11-19行,加载配置文件(也可以手动传入配置table tmp)

第20-24行,保存节点名-地址映射关系

 1 -- service/clusterd.lua 2 ?skynet.start(function() 3 ?????loadconfig() 4 ?????skynet.dispatch("lua", function(session , source, cmd, ...) 5 ?????????local f = assert(command[cmd]) 6 ?????????f(source, ...) 7 ?????end) 8 ?end) 9 ?10 ?local function loadconfig(tmp)11 ?????if tmp == nil then12 ?????????tmp = {}13 ?????????if config_name then14 ?????????????local f = assert(io.open(config_name))15 ?????????????local source = f:read "*a"16 ?????????????f:close()17 ?????????????assert(load(source, "@"..config_name, "t", tmp))()18 ?????????end19 ?????end20 ?????for name,address in pairs(tmp) do21 ?????????...22 ?????????node_address[name] = address23 ?????????...24 ?????end25 ?end

以skynet进程1的A服务向skynet进程2的B服务发送请求包及回应为例,说明cluster的工作流程:

 对于进程2,配置了 db = "127.0.0.1:2528",启动后调用cluster.open "db"。

第4行,给clusterd服务发送消息。

第12-15行,启动一个gate服务,然后通知gate服务监听配置的地址。gate调用socket.listen监听外部socket连接。

第20行,watchdog就是clusterd服务的地址。

 1 -- lualib/skynet/cluster.lua 2 function cluster.open(port) 3 ????if type(port) == "string" then 4 ????????skynet.call(clusterd, "lua", "listen", port) 5 ????else 6 ????????skynet.call(clusterd, "lua", "listen", "0.0.0.0", port) 7 ????end 8 end 9 10 -- service/clusterd.lua11 function command.listen(source, addr, port)12 ????local gate = skynet.newservice("gate")13 ????...14 ????skynet.call(gate, "lua", "open", { address = addr, port = port })15 ????skynet.ret(skynet.pack(nil))16 end17 18 -- servcice/gate.lua19 function handler.open(source, conf)20 ????watchdog = conf.watchdog or source21 end

对于进程1,调用cluster.call(db, "A", ...),给节点名为db(进程2)的A服务发送请求,最终调用到send_request

第9行,请求包带上唯一的sesssion值

第11行,按cluster定义的模式打包数据

第15行,获取socketchannel对象,如果第一次请求,会先创建socketchannel对象,并建立tcp连接

第16行,调用socketchannel的request接口发送请求包

 1 -- lualib/skynet/cluster.lua 2 function cluster.call(node, address, ...) 3 ????-- skynet.pack(...) will free by cluster.core.packrequest 4 ????return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...)) 5 end 6 ?7 -- service/clusterd.lua 8 local function send_request(source, node, addr, msg, sz) 9 ????local session = node_session[node] or 110 ????-- msg is a local pointer, cluster.packrequest will free it11 ????local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)12 ????node_session[node] = new_session13 14 ????-- node_channel[node] may yield or throw error15 ????local c = node_channel[node]16 17 ????return c:request(request, session, padding)18 end19 20 function command.req(...)21 ????local ok, msg = pcall(send_request, ...)22 ????if ok then23 ????????...24 ????????skynet.ret(msg)25 ????end26 end

创建socket对象时提供了response参数(第6行),所以是采用带session值的请求-回应模式。

第11行,协程阻塞在socket.read上,此时暂停co,等待回应包

 1 -- service/clusterd 2 ????local host, port = string.match(address, "([^:]+):(.*)$") 3 ????c = sc.channel { 4 ????????host = host, 5 ????????port = tonumber(port), 6 ????????response = read_response, 7 ????????nodelay = true, 8 ????} 9 10 local function read_response(sock)11 ????local sz = socket.header(sock:read(2))12 ????local msg = sock:read(sz)13 ????return cluster.unpackresponse(msg) ?????-- session, ok, data, padding14 end

对于进程2,gate服务收到进程1的tcp连接请求后,

第8行,给clusterd服务发送消息

第17-18行,clusterd收到后,新建一个clusteragent服务。注:clusteragent是skynet最近新加的。参考https://blog.codingnow.com/2018/04/skynet_cluster.html#more

第24-28行,clusteragent服务专门处理进程1的cluster模式的请求。每个cluster节点连接都新建一个cluseteragent服务去处理请求包。

 1 -- service/gate.lua 2 function handler.connect(fd, addr) 3 ????local c = { 4 ????????fd = fd, 5 ????????ip = addr, 6 ????} 7 ????connection[fd] = c 8 ????skynet.send(watchdog, "lua", "socket", "open", fd, addr) 9 end10 11 -- service/clusterd.lua12 function command.socket(source, subcmd, fd, msg)13 ????if subcmd == "open" then14 ????????skynet.error(string.format("socket accept from %s", msg))15 ????????-- new cluster agent16 ????????cluster_agent[fd] = false17 ????????local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)18 ????????cluster_agent[fd] = agent19 ????????... ????20 end21 22 -- service/clusterdagent.lua23 skynet.start(function()24 ????skynet.register_protocol {25 ????????name = "client",26 ????????id = skynet.PTYPE_CLIENT,27 ????????unpack = cluster.unpackrequest,28 ????????dispatch = dispatch_request,29 ????}30 ????...31 end

当gate服务收到请求包后,转发给对应的clusteragent服务(第7行),

 1 -- service/gate.lua 2 function handler.message(fd, msg, sz) 3 ????-- recv a package, forward it 4 ????local c = connection[fd] 5 ????local agent = c.agent 6 ????if agent then 7 ????????skynet.redirect(agent, c.client, "client", fd, msg, sz) 8 ????else 9 ????????skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))10 ????end11 end

clusteragent服务消息分发函数dispatch_request,

第7-9行,如果是push请求,不需要回应,send给目的服务(B服务)后直接返回即可

第11行,如果是call请求,需要回应,给目的服务(B服务)发送消息,然后等待B服务处理完返回。

第14-21行,将消息打包成回应包,通过tcp返回给请求端(skynet进程1)。

进程1收到回应后,重启协程,返回结果给请求服务(A服务)。这就是cluster模式的调用流程。

 1 -- service/clusteragent.lua 2 local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push) 3 ????if cluster.isname(addr) then 4 ????????addr = register_name[addr] 5 ????end 6 ????if addr then 7 ????????if is_push then 8 ????????????skynet.rawsend(addr, "lua", msg, sz) 9 ????????????return ?-- no response10 ????????else11 ????????????ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)12 ????????end13 ????if ok then14 ????????response = cluster.packresponse(session, true, msg, sz)15 ????????if type(response) == "table" then16 ????????????for _, v in ipairs(response) do17 ????????????????socket.lwrite(fd, v)18 ????????????end19 ????????else20 ????????????socket.write(fd, response)21 ????????end22 ?????...23 end

skynet源码分析之cluster集群模式

原文地址:https://www.cnblogs.com/RainRill/p/8900334.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved