Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add timeout for service #2530

Merged
merged 8 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions xmake/core/base/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ function _instance:send(data, opt)
local events, waiterrs = _instance.wait(self, socket.EV_SEND, opt.timeout or -1)
if events == socket.EV_SEND then
wait = true
elseif events == 0 then
os.raise("%s: send timeout!", self)
else
errors = waiterrs
break
Expand Down Expand Up @@ -385,6 +387,8 @@ function _instance:sendfile(file, opt)
local events, waiterrs = _instance.wait(self, socket.EV_SEND, opt.timeout or -1)
if events == socket.EV_SEND then
wait = true
elseif events == 0 then
os.raise("%s: sendfile timeout!", self)
else
errors = waiterrs
break
Expand Down Expand Up @@ -451,6 +455,8 @@ function _instance:recv(buff, size, opt)
local events, waiterrs = _instance.wait(self, socket.EV_RECV, opt.timeout or -1)
if events == socket.EV_RECV then
wait = true
elseif events == 0 then
os.raise("%s: recv timeout!", self)
else
data_or_errors = waiterrs
break
Expand Down
21 changes: 21 additions & 0 deletions xmake/modules/private/service/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,33 @@
import("core.base.object")
import("core.base.socket")
import("core.base.scheduler")
import("private.service.client_config", {alias = "config"})

-- define module
local client = client or object()

-- init client
function client:init()

-- init timeout
self._SEND_TIMEOUT = config.get("send_timeout") or -1
self._RECV_TIMEOUT = config.get("recv_timeout") or -1
self._CONNECT_TIMEOUT = config.get("connect_timeout") or -1
end

-- get send timeout
function client:send_timeout()
return self._SEND_TIMEOUT
end

-- get recv timeout
function client:recv_timeout()
return self._RECV_TIMEOUT
end

-- get connect timeout
function client:connect_timeout()
return self._CONNECT_TIMEOUT
end

-- parse host address
Expand Down
4 changes: 3 additions & 1 deletion xmake/modules/private/service/client_config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ function _generate_configfile()
local token = _get_local_server_token()
print("generating the config file to %s ..", filepath)
local configs = {
send_timeout = -1,
recv_timeout = -1,
connect_timeout = -1,
remote_build = {
-- without authorization: "127.0.0.1:9691"
-- with user authorization: "[email protected]:9691"
Expand All @@ -58,7 +61,6 @@ function _generate_configfile()
{connect = "127.0.0.1:9693", token = token}
}
}

}
save(configs)
end
Expand Down
20 changes: 13 additions & 7 deletions xmake/modules/private/service/distcc_build/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ function distcc_build_client:init()
else
raise("we need enter a project directory with xmake.lua first!")
end

-- init timeout
self._SEND_TIMEOUT = config.get("distcc_build.send_timeout") or config.get("send_timeout") or -1
self._RECV_TIMEOUT = config.get("distcc_build.recv_timeout") or config.get("recv_timeout") or -1
self._CONNECT_TIMEOUT = config.get("distcc_build.connect_timeout") or config.get("connect_timeout") or -1
end

-- get class
Expand Down Expand Up @@ -464,7 +469,8 @@ function distcc_build_client:_host_status_session_open(host_status)
for i = 1, njob do
local session = host_status.sessions[i]
if not session then
session = client_session(self, host_status.session_id, host_status.token, host_status.addr, host_status.port)
session = client_session(self, host_status.session_id, host_status.token, host_status.addr, host_status.port,
{send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout(), connect_timeout = self:connect_timeout()})
host_status.sessions[i] = session
session:open()
return session
Expand Down Expand Up @@ -531,14 +537,14 @@ function distcc_build_client:_connect_host(host)
end

-- do connect
local sock = assert(socket.connect(addr, port), "%s: server unreachable!", self)
local sock = assert(socket.connect(addr, port, {timeout = self:connect_timeout()}), "%s: server unreachable!", self)
local session_id = self:_session_id(addr, port)
local ok = false
local errors
local ncpu, njob
print("%s: connect %s:%d ..", self, addr, port)
if sock then
local stream = socket_stream(sock)
local stream = socket_stream(sock, {send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout()})
if stream:send_msg(message.new_connect(session_id, {token = token})) and stream:flush() then
local msg = stream:recv_msg()
if msg then
Expand Down Expand Up @@ -581,13 +587,13 @@ function distcc_build_client:_disconnect_host(host)

-- do disconnect
local token = host.token
local sock = socket.connect(addr, port)
local sock = socket.connect(addr, port, {timeout = self:connect_timeout()})
local session_id = self:_session_id(addr, port)
local errors
local ok = false
print("%s: disconnect %s:%d ..", self, addr, port)
if sock then
local stream = socket_stream(sock)
local stream = socket_stream(sock, {send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout()})
if stream:send_msg(message.new_disconnect(session_id, {token = token})) and stream:flush() then
local msg = stream:recv_msg()
if msg then
Expand Down Expand Up @@ -632,13 +638,13 @@ function distcc_build_client:_clean_host(host)

-- do clean
local token = host.token
local sock = socket.connect(addr, port)
local sock = socket.connect(addr, port, {timeout = self:connect_timeout()})
local session_id = self:_session_id(addr, port)
local errors
local ok = false
print("%s: clean files in %s:%d ..", self, addr, port)
if sock then
local stream = socket_stream(sock)
local stream = socket_stream(sock, {send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout()})
if stream:send_msg(message.new_clean(session_id, {token = token})) and stream:flush() then
local msg = stream:recv_msg()
if msg then
Expand Down
31 changes: 25 additions & 6 deletions xmake/modules/private/service/distcc_build/client_session.lua
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ import("private.service.stream", {alias = "socket_stream"})
local client_session = client_session or object()

-- init client session
function client_session:init(client, session_id, token, addr, port)
function client_session:init(client, session_id, token, addr, port, opt)
opt = opt or {}
self._ID = session_id
self._ADDR = addr
self._PORT = port
self._TOKEN = token
self._CLIENT = client
self._SEND_TIMEOUT = opt.send_timeout and opt.send_timeout or -1
self._RECV_TIMEOUT = opt.recv_timeout and opt.recv_timeout or -1
self._CONNECT_TIMEOUT = opt.connect_timeout and opt.connect_timeout or -1
end

-- get client session id
Expand All @@ -58,6 +62,21 @@ function client_session:client()
return self._CLIENT
end

-- get send timeout
function client_session:send_timeout()
return self._SEND_TIMEOUT
end

-- get recv timeout
function client_session:recv_timeout()
return self._RECV_TIMEOUT
end

-- get connect timeout
function client_session:connect_timeout()
return self._CONNECT_TIMEOUT
end

-- server unreachable?
function client_session:is_unreachable()
return self._UNREACHABLE
Expand All @@ -69,12 +88,12 @@ function client_session:stream()
if stream == nil then
local addr = self._ADDR
local port = self._PORT
local sock = socket.connect(addr, port)
local sock = socket.connect(addr, port, {timeout = self:connect_timeout()})
if not sock then
self._UNREACHABLE = true
raise("%s: server unreachable!", self)
end
stream = socket_stream(sock)
stream = socket_stream(sock, {send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout()})
self._STREAM = stream
end
return stream
Expand Down Expand Up @@ -113,7 +132,7 @@ function client_session:compile(sourcefile, objectfile, cppfile, cppflags, opt)
if stream:send_msg(message.new_compile(self:id(), toolname, toolkind, plat, arch, toolchain,
cppflags, path.filename(sourcefile), {token = self:token(), cachekey = cachekey})) and
stream:send_file(cppfile, {compress = os.filesize(cppfile) > 4096}) and stream:flush() then
local recv = stream:recv_file(objectfile)
local recv = stream:recv_file(objectfile, {timeout = -1})
if recv ~= nil then
local msg = stream:recv_msg()
if msg then
Expand Down Expand Up @@ -144,8 +163,8 @@ function client_session:__tostring()
return string.format("<session %s>", self:id())
end

function main(client, session_id, token, addr, port)
function main(client, session_id, token, addr, port, opt)
local instance = client_session()
instance:init(client, session_id, token, addr, port)
instance:init(client, session_id, token, addr, port, opt)
return instance
end
5 changes: 4 additions & 1 deletion xmake/modules/private/service/distcc_build/server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import("core.base.global")
import("private.service.server_config", {alias = "config"})
import("private.service.message")
import("private.service.server")
import("private.service.stream", {alias = "socket_stream"})
import("private.service.distcc_build.server_session")
import("lib.detect.find_tool")

Expand All @@ -44,6 +43,10 @@ function distcc_build_server:init(daemon)

-- init sessions
self._SESSIONS = {}

-- init timeout
self._SEND_TIMEOUT = config.get("distcc_build.send_timeout") or config.get("send_timeout") or -1
self._RECV_TIMEOUT = config.get("distcc_build.recv_timeout") or config.get("recv_timeout") or -1
end

-- get class
Expand Down
33 changes: 19 additions & 14 deletions xmake/modules/private/service/remote_build/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ function remote_build_client:init()
filesync:ignorefiles_add(".git/**")
filesync:ignorefiles_add(".xmake/**")
self._FILESYNC = filesync

-- init timeout
self._SEND_TIMEOUT = config.get("remote_build.send_timeout") or config.get("send_timeout") or -1
self._RECV_TIMEOUT = config.get("remote_build.recv_timeout") or config.get("recv_timeout") or -1
self._CONNECT_TIMEOUT = config.get("remote_build.connect_timeout") or config.get("connect_timeout") or -1
end

-- get class
Expand Down Expand Up @@ -92,13 +97,13 @@ function remote_build_client:connect()
-- do connect
local addr = self:addr()
local port = self:port()
local sock = assert(socket.connect(addr, port), "%s: server unreachable!", self)
local sock = assert(socket.connect(addr, port, {timeout = self:connect_timeout()}), "%s: server unreachable!", self)
local session_id = self:session_id()
local ok = false
local errors
print("%s: connect %s:%d ..", self, addr, port)
if sock then
local stream = socket_stream(sock)
local stream = socket_stream(sock, {send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout()})
if stream:send_msg(message.new_connect(session_id, {token = token})) and stream:flush() then
local msg = stream:recv_msg()
if msg then
Expand Down Expand Up @@ -140,13 +145,13 @@ function remote_build_client:disconnect()
end
local addr = self:addr()
local port = self:port()
local sock = socket.connect(addr, port)
local sock = socket.connect(addr, port, {timeout = self:connect_timeout()})
local session_id = self:session_id()
local errors
local ok = false
print("%s: disconnect %s:%d ..", self, addr, port)
if sock then
local stream = socket_stream(sock)
local stream = socket_stream(sock, {send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout()})
if stream:send_msg(message.new_disconnect(session_id, {token = self:token()})) and stream:flush() then
local msg = stream:recv_msg()
if msg then
Expand Down Expand Up @@ -181,7 +186,7 @@ function remote_build_client:sync()
assert(self:is_connected(), "%s: has been not connected!", self)
local addr = self:addr()
local port = self:port()
local sock = assert(socket.connect(addr, port), "%s: server unreachable!", self)
local sock = assert(socket.connect(addr, port, {timeout = self:connect_timeout()}), "%s: server unreachable!", self)
local session_id = self:session_id()
local errors
local ok = false
Expand All @@ -190,7 +195,7 @@ function remote_build_client:sync()
while sock do

-- diff files
local stream = socket_stream(sock)
local stream = socket_stream(sock, {send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout()})
diff_files, errors = self:_diff_files(stream)
if not diff_files then
break
Expand All @@ -214,7 +219,7 @@ function remote_build_client:sync()
end

-- sync ok
local msg = stream:recv_msg()
local msg = stream:recv_msg({timeout = -1})
if msg and msg:success() then
vprint(msg:body())
ok = true
Expand All @@ -235,14 +240,14 @@ function remote_build_client:clean()
assert(self:is_connected(), "%s: has been not connected!", self)
local addr = self:addr()
local port = self:port()
local sock = assert(socket.connect(addr, port), "%s: server unreachable!", self)
local sock = assert(socket.connect(addr, port, {timeout = self:connect_timeout()}), "%s: server unreachable!", self)
local session_id = self:session_id()
local errors
local ok = false
print("%s: clean files in %s:%d ..", self, addr, port)
local stream = socket_stream(sock)
local stream = socket_stream(sock, {send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout()})
if stream:send_msg(message.new_clean(session_id, {token = self:token()})) and stream:flush() then
local msg = stream:recv_msg()
local msg = stream:recv_msg({timeout = -1})
if msg then
vprint(msg:body())
if msg:success() then
Expand All @@ -264,23 +269,23 @@ function remote_build_client:runcmd(program, argv)
assert(self:is_connected(), "%s: has been not connected!", self)
local addr = self:addr()
local port = self:port()
local sock = assert(socket.connect(addr, port), "%s: server unreachable!", self)
local sock = assert(socket.connect(addr, port, {timeout = self:connect_timeout()}), "%s: server unreachable!", self)
local session_id = self:session_id()
local errors
local ok = false
local buff = bytes(8192)
local command = os.args(table.join(program, argv))
local leftstr = ""
cprint("%s: run ${bright}%s${clear} in %s:%d ..", self, command, addr, port)
local stream = socket_stream(sock)
local stream = socket_stream(sock, {send_timeout = self:send_timeout(), recv_timeout = self:recv_timeout()})
if stream:send_msg(message.new_runcmd(session_id, program, argv, {token = self:token()})) and stream:flush() then
local stdin_opt = {stop = false}
local group_name = "remote_build/runcmd"
scheduler.co_group_begin(group_name, function (co_group)
scheduler.co_start(self._read_stdin, self, stream, stdin_opt)
end)
while true do
local msg = stream:recv_msg()
local msg = stream:recv_msg({timeout = -1})
if msg then
if msg:is_data() then
local data = stream:recv(buff, msg:body().size)
Expand Down Expand Up @@ -412,7 +417,7 @@ function remote_build_client:_diff_files(stream)
local result, errors
cprint("Comparing ${bright}%d${clear} files ..", filecount)
if stream:send_msg(message.new_diff(session_id, manifest, {token = self:token()}), {compress = true}) and stream:flush() then
local msg = stream:recv_msg()
local msg = stream:recv_msg({timeout = -1})
if msg and msg:success() then
result = msg:body().manifest
if result then
Expand Down
5 changes: 4 additions & 1 deletion xmake/modules/private/service/remote_build/server.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import("core.base.global")
import("private.service.server_config", {alias = "config"})
import("private.service.message")
import("private.service.server")
import("private.service.stream", {alias = "socket_stream"})
import("private.service.remote_build.server_session")
import("lib.detect.find_tool")

Expand All @@ -44,6 +43,10 @@ function remote_build_server:init(daemon)

-- init sessions
self._SESSIONS = {}

-- init timeout
self._SEND_TIMEOUT = config.get("remote_build.send_timeout") or config.get("send_timeout") or -1
self._RECV_TIMEOUT = config.get("remote_build.recv_timeout") or config.get("recv_timeout") or -1
end

-- get class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ end
-- recv data from stream
function server_session:_recv_data(buff)
local stream = self:stream()
local msg = stream:recv_msg()
local msg = stream:recv_msg({timeout = -1})
if msg and msg:is_data() then
return stream:recv(buff, msg:body().size)
end
Expand Down
Loading