diff --git a/Project.toml b/Project.toml index 88aca27b..995839b0 100644 --- a/Project.toml +++ b/Project.toml @@ -26,5 +26,5 @@ Conda = "1" JSON = "0.18,0.19,0.20,0.21,1" MbedTLS = "0.5,0.6,0.7,1" SoftGlobalScope = "1" -ZMQ = "1" +ZMQ = "1.3" julia = "1.6" diff --git a/src/handlers.jl b/src/handlers.jl index 4e84f709..52630953 100644 --- a/src/handlers.jl +++ b/src/handlers.jl @@ -189,6 +189,9 @@ function connect_request(socket, msg) end function shutdown_request(socket, msg) + # stop heartbeat thread by closing the context + close(heartbeat_context[]) + send_ipython(requests[], msg_reply(msg, "shutdown_reply", msg.content)) sleep(0.1) # short delay (like in ipykernel), to hopefully ensure shutdown_reply is sent diff --git a/src/heartbeat.jl b/src/heartbeat.jl index c0e58e4b..7ae55415 100644 --- a/src/heartbeat.jl +++ b/src/heartbeat.jl @@ -7,10 +7,9 @@ import Libdl const threadid = zeros(Int, 128) # sizeof(uv_thread_t) <= 8 on Linux, OSX, Win -const zmq_proxy = Ref(C_NULL) # entry point for new thread -function heartbeat_thread(sock::Ptr{Cvoid}) +function heartbeat_thread(heartbeat::Ptr{Cvoid}) @static if VERSION ≥ v"1.9.0-DEV.1588" # julia#46609 # julia automatically "adopts" this thread because # we entered a Julia cfunction. We then have to enable @@ -19,14 +18,16 @@ function heartbeat_thread(sock::Ptr{Cvoid}) # (see julia#47196) ccall(:jl_gc_safe_enter, Int8, ()) end - ccall(zmq_proxy[], Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), - sock, sock, C_NULL) - nothing + ret = ZMQ.lib.zmq_proxy(heartbeat, heartbeat, C_NULL) + @static if VERSION ≥ v"1.9.0-DEV.1588" # julia#46609 + # leave safe region if zmq_proxy returns (when context is closed) + ccall(:jl_gc_safe_leave, Int8, ()) + end + return ret end -function start_heartbeat(sock) - zmq_proxy[] = Libdl.dlsym(Libdl.dlopen(ZMQ.libzmq), :zmq_proxy) - heartbeat_c = @cfunction(heartbeat_thread, Cvoid, (Ptr{Cvoid},)) +function start_heartbeat(heartbeat) + heartbeat_c = @cfunction(heartbeat_thread, Cint, (Ptr{Cvoid},)) ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Ptr{Cvoid}), - threadid, heartbeat_c, sock) + threadid, heartbeat_c, heartbeat) end diff --git a/src/init.jl b/src/init.jl index 231a33d7..66521b9f 100644 --- a/src/init.jl +++ b/src/init.jl @@ -25,6 +25,7 @@ const raw_input = Ref{Socket}() const requests = Ref{Socket}() const control = Ref{Socket}() const heartbeat = Ref{Socket}() +const heartbeat_context = Ref{Context}() const profile = Dict{String,Any}() const read_stdout = Ref{Base.PipeEndpoint}() const read_stderr = Ref{Base.PipeEndpoint}() @@ -87,7 +88,8 @@ function init(args) raw_input[] = Socket(ROUTER) requests[] = Socket(ROUTER) control[] = Socket(ROUTER) - heartbeat[] = Socket(ROUTER) + heartbeat_context[] = Context() + heartbeat = Socket(heartbeat_context[], ROUTER) sep = profile["transport"]=="ipc" ? "-" : ":" bind(publish[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["iopub_port"])") bind(requests[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["shell_port"])") @@ -97,7 +99,7 @@ function init(args) # associate a lock with each socket so that multi-part messages # on a given socket don't get inter-mingled between tasks. - for s in (publish[], raw_input[], requests[], control[], heartbeat[]) + for s in (publish[], raw_input[], requests[], control[]) socket_locks[s] = ReentrantLock() end