Skip to content

Commit

Permalink
Merge pull request #21818 from JuliaLang/amitm/reuseportfix
Browse files Browse the repository at this point in the history
Fix reuse of client port on Linux. Implement for OSX.
  • Loading branch information
amitmurthy authored May 22, 2017
2 parents f760dad + fa8c4d2 commit e5fb87d
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 53 deletions.
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ This section lists changes that do not have deprecation warnings.
* `homedir` now determines the user's home directory via `libuv`'s `uv_os_homedir`,
rather than from environment variables ([#19636]).
* Workers now listen on an ephemeral port assigned by the OS. Previously workers would
listen on the first free port available from 9009 ([#21818]).
Library improvements
--------------------
Expand Down
8 changes: 4 additions & 4 deletions base/distributed/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ function start_worker(out::IO, cookie::AbstractString)
init_worker(cookie)
interface = IPv4(LPROC.bind_addr)
if LPROC.bind_port == 0
(actual_port,sock) = listenany(interface, UInt16(9009))
LPROC.bind_port = actual_port
(port, sock) = listenany(interface, UInt16(0))
LPROC.bind_port = port
else
sock = listen(interface, LPROC.bind_port)
end
Expand Down Expand Up @@ -256,9 +256,9 @@ end
function parse_connection_info(str)
m = match(r"^julia_worker:(\d+)#(.*)", str)
if m !== nothing
(m.captures[2], parse(Int16, m.captures[1]))
(m.captures[2], parse(UInt16, m.captures[1]))
else
("", Int16(-1))
("", UInt16(0))
end
end

Expand Down
43 changes: 23 additions & 20 deletions base/distributed/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -455,31 +455,34 @@ end
const client_port = Ref{Cushort}(0)

function socket_reuse_port()
s = TCPSocket()
client_host = Ref{Cuint}(0)
ccall(:jl_tcp_bind, Int32,
(Ptr{Void}, UInt16, UInt32, Cuint),
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))

# TODO: Support OSX and change the above code to call setsockopt before bind once libuv provides
# early access to a socket fd, i.e., before a bind call.

@static if is_linux()
try
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
return s
elseif rc < 0
throw(SystemError("setsockopt() SO_REUSEPORT : "))
end
getsockname(s)
catch e
@static if is_linux() || is_apple()
s = TCPSocket(delay = false)

# Linux requires the port to be bound before setting REUSEPORT, OSX after.
is_linux() && bind_client_port(s)
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
return s
elseif rc < 0
# This is an issue only on systems with lots of client connections, hence delay the warning
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to plain socket : ", e)
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to regular socket.")

# provide a clean new socket
return TCPSocket()
end
is_apple() && bind_client_port(s)
else
return TCPSocket()
end
end

function bind_client_port(s)
err = ccall(:jl_tcp_bind, Int32, (Ptr{Void}, UInt16, UInt32, Cuint),
s.handle, hton(client_port[]), hton(UInt32(0)), 0)
Base.uv_error("bind() failed", err)

_addr, port = Base._sockname(s, true)
client_port[] = port
return s
end

Expand Down
24 changes: 17 additions & 7 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,14 @@ mutable struct TCPSocket <: LibuvStream
return tcp
end
end
function TCPSocket()

# kw arg "delay": if true, libuv delays creation of the socket fd till the first bind call
function TCPSocket(; delay=true)
tcp = TCPSocket(Libc.malloc(_sizeof_uv_tcp), StatusUninit)
err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}),
eventloop(), tcp.handle)
af_spec = delay ? 0 : 2 # AF_UNSPEC is 0, AF_INET is 2

err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Void}, Ptr{Void}, Cuint),
eventloop(), tcp.handle, af_spec)
uv_error("failed to create tcp socket", err)
tcp.status = StatusInit
return tcp
Expand Down Expand Up @@ -822,6 +826,10 @@ function listenany(host::IPAddr, default_port)
while true
sock = TCPServer()
if bind(sock, addr) && trylisten(sock) == 0
if default_port == 0
_addr, port = _sockname(sock, true)
return (port, sock)
end
return (addr.port, sock)
end
close(sock)
Expand All @@ -840,16 +848,18 @@ listenany(default_port) = listenany(IPv4(UInt32(0)), default_port)
Get the IP address and the port that the given `TCPSocket` is connected to
(or bound to, in the case of `TCPServer`).
"""
function getsockname(sock::Union{TCPServer,TCPSocket})
getsockname(sock::Union{TCPServer, TCPSocket}) = _sockname(sock, isa(sock, TCPServer))

function _sockname(sock, self)
rport = Ref{Cushort}(0)
raddress = zeros(UInt8, 16)
rfamily = Ref{Cuint}(0)
r = if isa(sock, TCPServer)
ccall(:jl_tcp_getsockname, Int32,
if self
r = ccall(:jl_tcp_getsockname, Int32,
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
sock.handle, rport, raddress, rfamily)
else
ccall(:jl_tcp_getpeername, Int32,
r = ccall(:jl_tcp_getpeername, Int32,
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
sock.handle, rport, raddress, rfamily)
end
Expand Down
9 changes: 5 additions & 4 deletions doc/src/manual/parallel-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -1231,8 +1231,8 @@ as local laptops, departmental clusters, or even the cloud. This section covers
requirements for the inbuilt `LocalManager` and `SSHManager`:
* The master process does not listen on any port. It only connects out to the workers.
* Each worker binds to only one of the local interfaces and listens on the first free port starting
from `9009`.
* Each worker binds to only one of the local interfaces and listens on an ephemeral port number
assigned by the OS.
* `LocalManager`, used by `addprocs(N)`, by default binds only to the loopback interface. This means
that workers started later on remote hosts (or by anyone with malicious intentions) are unable
to connect to the cluster. An `addprocs(4)` followed by an `addprocs(["remote_host"])` will fail.
Expand All @@ -1250,8 +1250,9 @@ requirements for the inbuilt `LocalManager` and `SSHManager`:
authenticated via public key infrastructure (PKI). Authentication credentials can be supplied
via `sshflags`, for example ```sshflags=`-e <keyfile>` ```.
Note that worker-worker connections are still plain TCP and the local security policy on the remote
cluster must allow for free connections between worker nodes, at least for ports 9009 and above.
In an all-to-all topology (the default), all workers connect to each other via plain TCP sockets.
The security policy on the cluster nodes must thus ensure free connectivity between workers for
the ephemeral port range (varies by OS).
Securing and encrypting all worker-worker traffic (via SSH) or encrypting individual messages
can be done via a custom ClusterManager.
Expand Down
37 changes: 36 additions & 1 deletion test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,41 @@ include("testenv.jl")

addprocs_with_testenv(4)

# Test that the client port is reused. SO_REUSEPORT may not be supported on
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
if is_unix()
# Run the test on all processes.
results = asyncmap(procs()) do p
remotecall_fetch(p) do
ports_lower = [] # ports of pids lower than myid()
ports_higher = [] # ports of pids higher than myid()
for w in Base.Distributed.PGRP.workers
w.id == myid() && continue
port = Base._sockname(w.r_stream, true)[2]
if (w.id == 1)
# master connects to workers
push!(ports_higher, port)
elseif w.id < myid()
push!(ports_lower, port)
elseif w.id > myid()
push!(ports_higher, port)
end
end
@assert (length(ports_lower) + length(ports_higher)) == nworkers()
for portset in [ports_lower, ports_higher]
if (length(portset) > 0) && (length(unique(portset)) != 1)
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
return 0
end
end
return myid()
end
end

# Ensure that the code has indeed been successfully executed everywhere
@test all(p -> p in results, procs())
end

id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]

Expand Down Expand Up @@ -923,7 +958,7 @@ if is_unix() # aka have ssh
end
end

remotecall_fetch(plst->rmprocs(plst; waitfor=5.0), 1, new_pids)
remotecall_fetch(rmprocs, 1, new_pids)
end

print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n")
Expand Down
36 changes: 19 additions & 17 deletions test/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,29 @@ end
# test show() function for UDPSocket()
@test repr(UDPSocket()) == "UDPSocket(init)"

port = Channel(1)
defaultport = rand(2000:4000)
tsk = @async begin
p, s = listenany(defaultport)
put!(port, p)
sock = accept(s)
# test write call
write(sock,"Hello World\n")

# test "locked" println to a socket
@sync begin
for i in 1:100
@async println(sock, "a", 1)
for testport in [0, defaultport]
port = Channel(1)
tsk = @async begin
p, s = listenany(testport)
put!(port, p)
sock = accept(s)
# test write call
write(sock,"Hello World\n")

# test "locked" println to a socket
@sync begin
for i in 1:100
@async println(sock, "a", 1)
end
end
close(s)
close(sock)
end
close(s)
close(sock)
wait(port)
@test readstring(connect(fetch(port))) == "Hello World\n" * ("a1\n"^100)
wait(tsk)
end
wait(port)
@test readstring(connect(fetch(port))) == "Hello World\n" * ("a1\n"^100)
wait(tsk)

mktempdir() do tmpdir
socketname = is_windows() ? ("\\\\.\\pipe\\uv-test-" * randstring(6)) : joinpath(tmpdir, "socket")
Expand Down

0 comments on commit e5fb87d

Please sign in to comment.