-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support managers external to K8s clusters #108
Changes from all commits
fa7ccaa
3587d2a
7a773e6
b013a17
53055fc
aea180d
df0a45f
37f0570
2aebee3
7db3ce4
ea079ce
aeec2e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,14 +33,15 @@ available. | |
- `manager_pod_name`: the name of the manager pod. Defaults to `gethostname()` which is | ||
the name of the pod when executed inside of a Kubernetes pod. | ||
- `worker_prefix`: the prefix given to spawned workers. Defaults to | ||
`"\$(manager_pod_name)-worker"`. | ||
- `image`: Docker image to use for the workers. Defaults to using the image of the Julia | ||
caller if running within a pod using a single container otherwise is a required argument. | ||
`"\$(manager_pod_name)-worker"` when the manager is running inside of K8s otherwise | ||
defaults to `"$(gethostname())-worker`. | ||
- `image`: Docker image to use for the workers. Defaults to the image used by the manager | ||
when running inside of a K8s pod otherwise defaults to "julia:\$VERSION". | ||
- `cpu`: [CPU resources requested](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu) | ||
for each worker. Defaults to `$(repr(DEFAULT_WORKER_CPU))`, | ||
- `memory`: [Memory resource requested](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory) | ||
for each worker in bytes. Requests may provide a unit suffix (e.g. "G" for Gigabytes and | ||
"GiB" for Gibibytes). Defaults to `$(repr(DEFAULT_WORKER_MEMORY))`. | ||
"Gi" for Gibibytes). Defaults to `$(repr(DEFAULT_WORKER_MEMORY))`. | ||
- `pending_timeout`: The maximum number of seconds to wait for a "Pending" worker pod to | ||
enter the "Running" phase. Once the timeout has been reached the manager will continue | ||
with the number of workers available (`<= np`). Defaults to `180` (3 minutes). | ||
|
@@ -49,25 +50,27 @@ available. | |
""" | ||
function K8sClusterManager(np::Integer; | ||
namespace::AbstractString=current_namespace(), | ||
manager_pod_name=gethostname(), | ||
worker_prefix::AbstractString="$(manager_pod_name)-worker", | ||
manager_pod_name=isk8s() ? gethostname() : nothing, | ||
worker_prefix::AbstractString="$(@something(manager_pod_name, gethostname()))-worker", | ||
image=nothing, | ||
cpu=DEFAULT_WORKER_CPU, | ||
memory=DEFAULT_WORKER_MEMORY, | ||
pending_timeout::Real=180, | ||
configure=identity) | ||
|
||
# Default to using the image of the pod if possible | ||
if image === nothing | ||
pod = get_pod(manager_pod_name) | ||
images = map(c -> c["image"], pod["spec"]["containers"]) | ||
|
||
if length(images) == 1 | ||
image = first(images) | ||
elseif length(images) > 0 | ||
error("Unable to determine image from pod \"$manager_pod_name\" which uses multiple containers") | ||
if manager_pod_name !== nothing | ||
pod = get_pod(manager_pod_name) | ||
images = map(c -> c["image"], pod["spec"]["containers"]) | ||
|
||
if length(images) == 1 | ||
image = first(images) | ||
elseif length(images) > 0 | ||
error("Unable to determine image from pod \"$manager_pod_name\" which uses multiple containers") | ||
else | ||
error("Unable to find any images for pod \"$manager_pod_name\"") | ||
end | ||
else | ||
error("Unable to find any images for pod \"$manager_pod_name\"") | ||
image = "julia:$VERSION" | ||
end | ||
end | ||
|
||
|
@@ -94,17 +97,33 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched:: | |
exename = params[:exename] | ||
exeflags = params[:exeflags] | ||
|
||
cmd = `$exename $exeflags --worker=$(cluster_cookie())` | ||
# When using a standard Julia Docker image we can safely set the Julia executable name | ||
# Alternatively, we could extend `Distributed.default_addprocs_params`. | ||
if startswith(manager.image, "julia:") | ||
exename = "julia" | ||
end | ||
|
||
# Using `--bind-to=0.0.0.0` to force the worker to listen to all interfaces instead | ||
# of only a single external interface. This is required for `kubectl port-forward`. | ||
# TODO: Should file against the Julia repo about this issue. | ||
cmd = `$exename $exeflags --worker=$(cluster_cookie()) --bind-to=0.0.0.0` | ||
|
||
worker_manifest = worker_pod_spec(manager; cmd, cluster_cookie=cluster_cookie()) | ||
|
||
# Note: User-defined `configure` function may or may-not be mutating | ||
worker_manifest = manager.configure(worker_manifest) | ||
|
||
# Trigger any TOTP requests before the async loop | ||
# TODO: Verify this is working correctly | ||
success(`$(kubectl()) get pods -o 'jsonpath={.items[*].metadata.null}'`) | ||
|
||
@sync for i in 1:manager.np | ||
@async begin | ||
pod_name = create_pod(worker_manifest) | ||
|
||
# TODO: Add notice about having to pull an image. On a slow internet connection | ||
# this can make it appear that the cluster start is hung | ||
|
||
pod = try | ||
wait_for_running_pod(pod_name; timeout=manager.pending_timeout) | ||
catch e | ||
|
@@ -125,7 +144,7 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched:: | |
|
||
config = WorkerConfig() | ||
config.io = p.out | ||
config.userdata = (; pod_name=pod_name) | ||
config.userdata = (; pod_name, port_forward=Ref{Base.Process}()) | ||
|
||
push!(launched, config) | ||
notify(c) | ||
|
@@ -156,6 +175,13 @@ function Distributed.manage(manager::K8sClusterManager, id::Integer, config::Wor | |
end | ||
|
||
elseif op === :deregister | ||
# Terminate the port-forward process. Without this these processes may | ||
# persist until terminated by the cluster (e.g. `minikube stop`). | ||
pf = config.userdata.port_forward | ||
if isassigned(pf) | ||
kill(pf[]) | ||
end | ||
|
||
# As the deregister `manage` call occurs before remote workers are told to | ||
# deregister we should avoid unnecessarily blocking. | ||
@async begin | ||
|
@@ -179,4 +205,70 @@ function Distributed.manage(manager::K8sClusterManager, id::Integer, config::Wor | |
notify(DEREGISTER_ALERT; all=true) | ||
end | ||
end | ||
|
||
return nothing | ||
end | ||
|
||
# Stripped down and modified version of: | ||
# https://github.com/JuliaLang/julia/blob/844c20dd63870aa5b369b85038f0523d7d79308a/stdlib/Distributed/src/managers.jl#L567-L632 | ||
function Distributed.connect(manager::K8sClusterManager, pid::Int, config::WorkerConfig) | ||
# Note: This method currently doesn't implement support for worker-to-worker | ||
# connections and instead relies on the `Distributed.connect(::DefaultClusterManager, ...)` | ||
# for this. If we did need to perform special logic for worker-to-worker connections | ||
# we would need to modify how `init_worker` is called via `start_worker`: | ||
# https://github.com/JuliaLang/julia/blob/f7554b5c9f0f580a9fcf5c7b8b9a83b678e2f48a/stdlib/Distributed/src/cluster.jl#L375-L378 | ||
|
||
# master connecting to workers | ||
if config.io !== nothing | ||
# Not truly needed as we already know this information but since we are using `--worker` | ||
# we may as well follow the standard protocol | ||
bind_addr, port = Distributed.read_worker_host_port(config.io) | ||
else | ||
error("I/O not setup") | ||
end | ||
|
||
pod_name = config.userdata.pod_name | ||
|
||
# As we've forced the worker to listen to all interfaces the reported `bind_addr` will | ||
# be a non-routable address. We'll need to determine the in cluster IP address another | ||
# way. | ||
intra_addr = get_pod(pod_name)["status"]["podIP"] | ||
intra_port = port | ||
|
||
bind_addr, port = if !isk8s() | ||
# When the manager running outside of the K8s cluster we need to establish | ||
# port-forward connections from the manager to the workers. | ||
pf = open(`$(kubectl()) port-forward --address localhost pod/$pod_name :$intra_port`, "r") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the kubectl docs say that this doesn't return. Do we need to somehow keep track of the status of this process and i.e. restart it if it fails? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this process fails then the connection between the manager is broken and that worker will be dropped. As the port we assign on the localhost is random we couldn't just restart the same process and have things work. I will say though as this is the |
||
fwd_addr, fwd_port = parse_forward_info(readline(pf.out)) | ||
|
||
# Retain a reference to the port forward | ||
config.userdata.port_forward[] = pf | ||
Comment on lines
+244
to
+245
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AH I see now, that's what this is doing... |
||
|
||
fwd_addr, fwd_port | ||
else | ||
intra_addr, intra_port | ||
end | ||
|
||
s, bind_addr = Distributed.connect_to_worker(bind_addr, port) | ||
config.bind_addr = bind_addr | ||
|
||
# write out a subset of the connect_at required for further worker-worker connection setups | ||
config.connect_at = (intra_addr, Int(intra_port)) | ||
|
||
if config.io !== nothing | ||
let pid = pid | ||
Distributed.redirect_worker_output(pid, Base.notnothing(config.io)) | ||
end | ||
end | ||
|
||
return (s, s) | ||
end | ||
|
||
function parse_forward_info(str) | ||
m = match(r"^Forwarding from (.*):(\d+) ->", str) | ||
if m !== nothing | ||
return (m.captures[1], parse(UInt16, m.captures[2])) | ||
else | ||
error("Unable to parse port-forward response") | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to check my own understanding, we need to provide a specialized method for this because of the need to setup port forwarding in the case of a local-to-cluster connection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct. Specifically, we could use
config.connect_at
to specify that the manager connect to the workers using the local port forwarding. However, doing that results in the workers also trying to use those ephemeral addresses which fails any worker-to-worker connections.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. This is actually supported but it's not well documented.