分享web开发知识

注册/登录|最近发布|今日推荐

主页 IT知识网页技术软件开发前端开发代码编程运营维护技术分享教程案例
当前位置:首页 > 网页技术

skynet源码分析之socketchannel

发布时间:2023-09-06 01:50责任编辑:白小东关键词:暂无标签

请求回应模式是与外部交互最常用的模式之一。通常协议设计方式有两种:1.每个请求包对应一个回应包,有tcp保证时序,先请求的先回应,但不必收到回应才发送下一个请求,redis的协议就是这种类型;2.每个请求带一个唯一的session标识,回应包也带这个标识。这样每个请求不一定都需要回应,且不用遵循先请求先回应的时序。mongodb的协议就是这种类型。skynet提供socketchannel库封装内部细节,支持上面两种模式。详情参考官方wiki https://github.com/cloudwu/skynet/wiki/SocketChannel

调用socketchannel.channel创建一个channel对象,必须提供ip地址(可以是域名)和端口。采用第一种还是第二种模式依据是否提供response参数,redis没有提供说明用的第一种模式,mongo提供了(第13行)说明用第二种模式。

 1 -- lualib/skynet/db/redis.lua 2 ?local channel = socketchannel.channel { 3 ?????host = db_conf.host, 4 ?????port = db_conf.port or 6379, 5 ?????auth = redis_login(db_conf.auth, db_conf.db), 6 ?????nodelay = true, 7 ?} 8 ??9 ?-- lualib/skynet/db/mongo.lua10 ?obj.__sock = socketchannel.channel {11 ?????host = obj.host,12 ?????port = obj.port,13 ?????response = dispatch_reply,14 ?????auth = mongo_auth(obj),15 ?????backup = backup,16 ?????nodelay = true,17 ?}18 ?19 ?-- lualib/skynet/socketchannel.lua20 ?function socket_channel.channel(desc)21 ?????local c = {22 ?????????__host = assert(desc.host),23 ?????????__port = assert(desc.port),24 ?????????__backup = desc.backup,25 ?????????__auth = desc.auth,26 ?????????__response = desc.response, ????-- It‘s for session mode27 ?????????__request = {}, -- request seq { response func or session } ????-- It‘s for order mode28 ?????????__thread = {}, -- coroutine seq or session->coroutine map29 ?????????__result = {}, -- response result { coroutine -> result }30 ?????????__result_data = {},31 ?????????__connecting = {},32 ?????????__sock = false,33 ?????????__closed = false,34 ?????????__authcoroutine = false,35 ?????????__nodelay = desc.nodelay,36 ?????}37 ?38 ?????return setmetatable(c, channel_meta)39 ?end

创建完对象后,可以手动调用connect连接对端,如果不connect,在第一次发送请求的时候会尝试去连接。最终调用到connect_once,

第7行,调用socket库api连接对端

第11行,fork一个协程专门处理收到回应包

15-21行,如果是模式1,收到回应包后的处理函数是dispatch_by_order,模式2则是dispatch_by_session

 1 -- lualib/skynet/socketchannel.lua 2 local function connect_once(self) 3 ????if self.__closed then 4 ????????return false 5 ????end 6 ????assert(not self.__sock and not self.__authcoroutine) 7 ????local fd,err = socket.open(self.__host, self.__port) 8 ????... 9 10 ????self.__sock = setmetatable( {fd} , channel_socket_meta )11 ????self.__dispatch_thread = skynet.fork(dispatch_function(self), self)12 ????...13 end14 15 local function dispatch_function(self)16 ????if self.__response then17 ????????return dispatch_by_session18 ????else19 ????????return dispatch_by_order20 ????end21 end

接下来先介绍发送请求包的流程,之后再介绍如何处理回应包。调用者通过channel:request发送请求包,该接口有三个参数:参数request请求包数据;参数response在模式1下是一个function用来接收回应包,模式2下是一个唯一的session值;参数padding可选,表示将巨大消息拆分成多个小包发送出去。

第2行,检测是否已连接,如果未连接,会尝试去连接

第8行,调用socket库把发送请求包。

第13-16行,不需要回应直接返回。

第18,23,35-48行,保存当前co。如果是模式2,保留session-co映射关系在self.__thread里(38行);如果是模式1,保留response函数在self.__request里,co在self.__threaad里(41,42行)。

43-46行,如果有暂停的co在等待回应包,重启它。

第24行,暂停当前co,等待对方回应包。当收到回应包时,回应处理函数会重启它。

25-32行,返回结果给调用者。

 1 function channel:request(request, response, padding) 2 ????assert(block_connect(self, true)) ??????-- connect once 3 ????local fd = self.__sock[1] 4 ?5 ????if padding then 6 ????????... 7 ????else 8 ????????if not socket_write(fd , request) then 9 ????????????sock_err(self)10 ????????end11 ????end12 13 ????if response == nil then14 ????????-- no response15 ????????return16 ????end17 18 ????return wait_for_response(self, response)19 end20 21 local function wait_for_response(self, response)22 ????local co = coroutine.running()23 ????push_response(self, response, co)24 ????skynet.wait(co)25 26 ????local result = self.__result[co]27 ????self.__result[co] = nil28 ????local result_data = self.__result_data[co]29 ????self.__result_data[co] = nil30 ????...31 32 ????return result_data33 end34 35 local function push_response(self, response, co)36 ????if self.__response then37 ????????-- response is session38 ????????self.__thread[response] = co39 ????else40 ????????-- response is a function, push it to __request41 ????????table.insert(self.__request, response)42 ????????table.insert(self.__thread, co)43 ????????if self.__wait_response then44 ????????????skynet.wakeup(self.__wait_response)45 ????????????self.__wait_response = nil46 ????????end47 ????end48 end

对于模式1的回应处理函数dispatch_by_order,

第4行,调用pop_response获取第一个未回应的请求包的response和co

第6行,调用response函数,response函数调用socket库的readline/read(24行)来等待socket上的返回,是一个阻塞操作。等socket返回后,response函数返回

第11-16行,返回结果保存在self.__result_data

第17行,重启调用者发送请求包的co,把结果返回给调用者(上面代码的26-32行),至此完成一次与对端请求回应交互

 1 -- lualib/skynet/socketchannel.lua 2 local function dispatch_by_order(self) 3 ????while self.__sock do 4 ????????local func, co = pop_response(self) 5 ????????... 6 ????????local ok, result_ok, result_data, padding = pcall(func, self.__sock) 7 ????????if ok then 8 ????????????if padding and result_ok then 9 ????????????????...10 ????????????else11 ????????????????self.__result[co] = result_ok12 ????????????????if result_ok and self.__result_data[co] then13 ????????????????????table.insert(self.__result_data[co], result_data)14 ????????????????else15 ????????????????????self.__result_data[co] = result_data16 ????????????????end17 ????????????????skynet.wakeup(co)18 ????????????end19 ????????end20 end21 22 -- lualib/skynet/db/redis.lua23 local function read_response(fd)24 ????local result = fd:readline "\r\n"25 ????local firstchar = string.byte(result)26 ????local data = string.sub(result,2)27 ????return redcmd[firstchar](fd,data)28 end

对于模式2的回应处理函数dispatch_by_session,

第6行,调用response函数,response函数会调用socket库的readline/read(30行)来等待socket上的返回,是一个阻塞操作。等socket返回后,response函数返回回应包(回应包包含唯一的session)

第8行,通过session获取对应的co

第13-21行,接下来处理跟上面一样,保存回应包内容,重启co。

 1 ?-- lualib/skynet/socketchannel.lua 2 ?local function dispatch_by_session(self) 3 ?????local response = self.__response 4 ?????-- response() return session 5 ?????while self.__sock do 6 ?????????local ok , session, result_ok, result_data, padding = pcall(response, self.__sock) 7 ?????????if ok and session then 8 ?????????????local co = self.__thread[session] 9 ?????????????if co then10 ?????????????????if padding and result_ok then11 ?????????????????????...12 ?????????????????else13 ?????????????????????self.__thread[session] = nil14 ?????????????????????self.__result[co] = result_ok15 ?????????????????????if result_ok and self.__result_data[co] then16 ?????????????????????????table.insert(self.__result_data[co], result_data)17 ?????????????????????else18 ?????????????????????????self.__result_data[co] = result_data19 ?????????????????????end20 ?????????????????????skynet.wakeup(co)21 ?????????????????end22 ?????????????else23 ?????????????????self.__thread[session] = nil24 ?????????????????skynet.error("socket: unknown session :", session)25 ?????????????end26 ?end27 ?28 ?-- lualib/skynet/db/mongo.lua29 ?local function dispatch_reply(so)30 ?????local len_reply = so:read(4)31 ?????local reply ????= so:read(driver.length(len_reply))32 ?????...33 ?????return reply_id, succ, result34 ?end

skynet源码分析之socketchannel

原文地址:https://www.cnblogs.com/RainRill/p/8892648.html

知识推荐

我的编程学习网——分享web前端后端开发技术知识。 垃圾信息处理邮箱 tousu563@163.com 网站地图
icp备案号 闽ICP备2023006418号-8 不良信息举报平台 互联网安全管理备案 Copyright 2023 www.wodecom.cn All Rights Reserved