Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support managers external to K8s clusters #108

Merged
merged 12 commits into from
Apr 13, 2023
6 changes: 4 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
name = "K8sClusterManagers"
uuid = "5aeab163-63d2-4171-9fbf-e22244d80acb"
version = "0.1.4"
version = "0.1.5"

[deps]
Compat = "34da2185-b29b-5c13-b0c7-acf172513d20"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Mocking = "78c3b35d-d492-501b-9361-3d52fe80e533"
kubectl_jll = "ed23c2a5-89c4-5d52-b0ca-9d53aadf8c45"

[compat]
Compat = "3.29, 4"
DataStructures = "0.18"
JSON = "0.21"
Mocking = "0.7"
Mustache = "1"
YAML = "0.4.6"
julia = "1.6"
kubectl_jll = "1.20.0"
kubectl_jll = "1.20"

[extras]
LibGit2 = "76f85450-5226-5b5a-8eaa-529ad045b433"
Expand Down
37 changes: 28 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,41 @@

A Julia cluster manager for provisioning workers in a Kubernetes (K8s) cluster.

Pairs well with [`julia_pod`](https://github.com/beacon-biosignals/julia_pod) for interactive Julia development within a K8s pod.

## K8sClusterManager

The `K8sClusterManager` is intended to be used from a Pod running inside a Kubernetes
cluster.
The `K8sClusterManager` can be used both inside and outside of a Kubernetes cluster.
To get started you'll need access to a K8s cluster and have configured your machine with
access to the cluster. If you're new to K8s we recommend you use use [minikube](https://minikube.sigs.k8s.io)
to quickly setup a local Kubernetes cluster.

### Running outside K8s

A distributed Julia cluster where the manager runs outside of K8s while the workers run in
the cluster can quickly be created via:

```julia
julia> using K8sClusterManagers, Distributed

julia> addprocs(K8sClusterManager(2))
```

When using the manager outside of Kubernetes cluster the manager will connect to workers
within the cluster using port-forwarding. Performance between the manager and workers will
be impacted by the network connection between the manager and the cluster.

### Running inside K8s

A Julia process running within a K8s cluster can also be used as a Julia distributed
manager.

Assuming you have `kubectl` installed locally and configured to connect to a cluster, you
can easily create an interactive Julia REPL session running from within the cluster by
executing:
To see this in action we'll create an interactive Julia REPL session running within the
cluster by executing:

```sh
kubectl run -it example-manager-pod --image julia:1
```

Or equivalently, using a K8s manifest named `example-manager-pod.yaml` containing:
or equivalently, using a K8s manifest named `example-manager-pod.yaml` containing:

```yaml
apiVersion: v1
Expand Down Expand Up @@ -52,7 +71,7 @@ Now in this Julia REPL session, you can do add two workers via:
```julia
julia> using Pkg; Pkg.add("K8sClusterManagers")

julia> using K8sClusterManagers
julia> using K8sClusterManagers, Distributed

julia> addprocs(K8sClusterManager(2))
```
Expand Down
26 changes: 13 additions & 13 deletions docs/src/examples.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
Examples
========

The [`K8sClusterManager`](@ref) is intended to be used inside a [Pod](https://kubernetes.io/docs/concepts/workloads/pods/)
running on a Kubernetes cluster.
The `K8sClusterManager` can be used both inside and outside of a Kubernetes cluster.

## Launching an interactive session

The following manifest will create a Kubernetes [Job](https://kubernetes.io/docs/concepts/workloads/controllers/job/)
named "interactive-session". This Job will spawn a Pod (see `spec.template.spec`) which will
run an interactive Julia session with the latest release of K8sClusterManagers.jl installed.
Be sure to create the required [ServiceAccount and associated permissions](../patterns/#required-permissions)
before proceeding.
named "interactive-session". This Job will spawn a [Pod](https://kubernetes.io/docs/concepts/workloads/pods/)
(see `spec.template.spec`) which will run an interactive Julia session with the latest
release of K8sClusterManagers.jl installed. Be sure to create the required [ServiceAccount
and associated permissions](../patterns/#required-permissions) before proceeding.

````@eval
using Markdown
Expand All @@ -36,11 +35,12 @@ echo $manager_pod
kubectl attach -it pod/${manager_pod?}
```

### Launching workers
## Launching workers

Once you've attached to the interactive session you can use [`K8sClusterManager`](@ref) to
spawn K8s workers. For our example we'll be using a small amount of CPU/Memory to ensure
workers can be spawned even on clusters with limited resources:
You can use [`K8sClusterManager`](@ref) to spawn workers within the K8s cluster. The cluster
manager can be used both inside/outside of the K8s cluster. In the following example we'll
be using a small amount of CPU/Memory to ensure workers can be spawned even on clusters with
limited resources:

```julia
julia> using Distributed, K8sClusterManagers
Expand All @@ -61,7 +61,7 @@ julia> pmap(x -> myid(), 1:nworkers()) # Each worker reports its worker ID
2
```

### Pending workers
## Pending workers

A worker created via `addprocs` may not necessarily be available right away, as K8s must
schedule the worker's Pod to a Node before the corresponding Julia process can start.
Expand All @@ -74,7 +74,7 @@ the manager will continue with the subset of workers which have reported in and
workers that are stuck in the "Pending" phase.

```julia
julia> addprocs(K8sClusterManager(1, memory="1Ei", pending_timeout=10)) # Request 1 exbibyte of memory
julia> addprocs(K8sClusterManager(1, memory="1Pi", pending_timeout=10)) # Request 1 pebibyte of memory
┌ Warning: TimeoutException: timed out after waiting for worker interactive-session-d7jfb-worker-ffvnm to start for 10 seconds, with status:
│ {
│ "conditions": [
Expand All @@ -94,7 +94,7 @@ julia> addprocs(K8sClusterManager(1, memory="1Ei", pending_timeout=10)) # Reque
Int64[]
```

### Termination Reason
## Termination Reason

When Julia workers [exceed the specified memory limit](https://kubernetes.io/docs/tasks/configure-pod-container/assign-memory-resource/#exceed-a-container-s-memory-limit)
the worker Pod will be automatically killed by Kubernetes (OOMKilled). In such a
Expand Down
1 change: 1 addition & 0 deletions src/K8sClusterManagers.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module K8sClusterManagers

using Compat: @something
using DataStructures: DefaultOrderedDict, OrderedDict
using Distributed: Distributed, ClusterManager, WorkerConfig, cluster_cookie
using JSON: JSON
Expand Down
128 changes: 110 additions & 18 deletions src/native_driver.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ available.
- `manager_pod_name`: the name of the manager pod. Defaults to `gethostname()` which is
the name of the pod when executed inside of a Kubernetes pod.
- `worker_prefix`: the prefix given to spawned workers. Defaults to
`"\$(manager_pod_name)-worker"`.
- `image`: Docker image to use for the workers. Defaults to using the image of the Julia
caller if running within a pod using a single container otherwise is a required argument.
`"\$(manager_pod_name)-worker"` when the manager is running inside of K8s otherwise
defaults to `"$(gethostname())-worker`.
- `image`: Docker image to use for the workers. Defaults to the image used by the manager
when running inside of a K8s pod otherwise defaults to "julia:\$VERSION".
- `cpu`: [CPU resources requested](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu)
for each worker. Defaults to `$(repr(DEFAULT_WORKER_CPU))`,
- `memory`: [Memory resource requested](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory)
for each worker in bytes. Requests may provide a unit suffix (e.g. "G" for Gigabytes and
"GiB" for Gibibytes). Defaults to `$(repr(DEFAULT_WORKER_MEMORY))`.
"Gi" for Gibibytes). Defaults to `$(repr(DEFAULT_WORKER_MEMORY))`.
- `pending_timeout`: The maximum number of seconds to wait for a "Pending" worker pod to
enter the "Running" phase. Once the timeout has been reached the manager will continue
with the number of workers available (`<= np`). Defaults to `180` (3 minutes).
Expand All @@ -49,25 +50,27 @@ available.
"""
function K8sClusterManager(np::Integer;
namespace::AbstractString=current_namespace(),
manager_pod_name=gethostname(),
worker_prefix::AbstractString="$(manager_pod_name)-worker",
manager_pod_name=isk8s() ? gethostname() : nothing,
worker_prefix::AbstractString="$(@something(manager_pod_name, gethostname()))-worker",
image=nothing,
cpu=DEFAULT_WORKER_CPU,
memory=DEFAULT_WORKER_MEMORY,
pending_timeout::Real=180,
configure=identity)

# Default to using the image of the pod if possible
if image === nothing
pod = get_pod(manager_pod_name)
images = map(c -> c["image"], pod["spec"]["containers"])

if length(images) == 1
image = first(images)
elseif length(images) > 0
error("Unable to determine image from pod \"$manager_pod_name\" which uses multiple containers")
if manager_pod_name !== nothing
pod = get_pod(manager_pod_name)
images = map(c -> c["image"], pod["spec"]["containers"])

if length(images) == 1
image = first(images)
elseif length(images) > 0
error("Unable to determine image from pod \"$manager_pod_name\" which uses multiple containers")
else
error("Unable to find any images for pod \"$manager_pod_name\"")
end
else
error("Unable to find any images for pod \"$manager_pod_name\"")
image = "julia:$VERSION"
end
end

Expand All @@ -94,17 +97,33 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched::
exename = params[:exename]
exeflags = params[:exeflags]

cmd = `$exename $exeflags --worker=$(cluster_cookie())`
# When using a standard Julia Docker image we can safely set the Julia executable name
# Alternatively, we could extend `Distributed.default_addprocs_params`.
if startswith(manager.image, "julia:")
exename = "julia"
end

# Using `--bind-to=0.0.0.0` to force the worker to listen to all interfaces instead
# of only a single external interface. This is required for `kubectl port-forward`.
# TODO: Should file against the Julia repo about this issue.
cmd = `$exename $exeflags --worker=$(cluster_cookie()) --bind-to=0.0.0.0`

worker_manifest = worker_pod_spec(manager; cmd, cluster_cookie=cluster_cookie())

# Note: User-defined `configure` function may or may-not be mutating
worker_manifest = manager.configure(worker_manifest)

# Trigger any TOTP requests before the async loop
# TODO: Verify this is working correctly
success(`$(kubectl()) get pods -o 'jsonpath={.items[*].metadata.null}'`)

@sync for i in 1:manager.np
@async begin
pod_name = create_pod(worker_manifest)

# TODO: Add notice about having to pull an image. On a slow internet connection
# this can make it appear that the cluster start is hung

pod = try
wait_for_running_pod(pod_name; timeout=manager.pending_timeout)
catch e
Expand All @@ -125,7 +144,7 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched::

config = WorkerConfig()
config.io = p.out
config.userdata = (; pod_name=pod_name)
config.userdata = (; pod_name, port_forward=Ref{Base.Process}())

push!(launched, config)
notify(c)
Expand Down Expand Up @@ -156,6 +175,13 @@ function Distributed.manage(manager::K8sClusterManager, id::Integer, config::Wor
end

elseif op === :deregister
# Terminate the port-forward process. Without this these processes may
# persist until terminated by the cluster (e.g. `minikube stop`).
pf = config.userdata.port_forward
if isassigned(pf)
kill(pf[])
end

# As the deregister `manage` call occurs before remote workers are told to
# deregister we should avoid unnecessarily blocking.
@async begin
Expand All @@ -179,4 +205,70 @@ function Distributed.manage(manager::K8sClusterManager, id::Integer, config::Wor
notify(DEREGISTER_ALERT; all=true)
end
end

return nothing
end

# Stripped down and modified version of:
# https://github.com/JuliaLang/julia/blob/844c20dd63870aa5b369b85038f0523d7d79308a/stdlib/Distributed/src/managers.jl#L567-L632
function Distributed.connect(manager::K8sClusterManager, pid::Int, config::WorkerConfig)
Comment on lines +212 to +214
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to check my own understanding, we need to provide a specialized method for this because of the need to setup port forwarding in the case of a local-to-cluster connection?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. Specifically, we could use config.connect_at to specify that the manager connect to the workers using the local port forwarding. However, doing that results in the workers also trying to use those ephemeral addresses which fails any worker-to-worker connections.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. This is actually supported but it's not well documented.

# Note: This method currently doesn't implement support for worker-to-worker
# connections and instead relies on the `Distributed.connect(::DefaultClusterManager, ...)`
# for this. If we did need to perform special logic for worker-to-worker connections
# we would need to modify how `init_worker` is called via `start_worker`:
# https://github.com/JuliaLang/julia/blob/f7554b5c9f0f580a9fcf5c7b8b9a83b678e2f48a/stdlib/Distributed/src/cluster.jl#L375-L378

# master connecting to workers
if config.io !== nothing
# Not truly needed as we already know this information but since we are using `--worker`
# we may as well follow the standard protocol
bind_addr, port = Distributed.read_worker_host_port(config.io)
else
error("I/O not setup")
end

pod_name = config.userdata.pod_name

# As we've forced the worker to listen to all interfaces the reported `bind_addr` will
# be a non-routable address. We'll need to determine the in cluster IP address another
# way.
intra_addr = get_pod(pod_name)["status"]["podIP"]
intra_port = port

bind_addr, port = if !isk8s()
# When the manager running outside of the K8s cluster we need to establish
# port-forward connections from the manager to the workers.
pf = open(`$(kubectl()) port-forward --address localhost pod/$pod_name :$intra_port`, "r")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the kubectl docs say that this doesn't return. Do we need to somehow keep track of the status of this process and i.e. restart it if it fails?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this process fails then the connection between the manager is broken and that worker will be dropped. As the port we assign on the localhost is random we couldn't just restart the same process and have things work.

I will say though as this is the connect function if Distributed.jl did try to reconnect to the worker then this would automatically result in the port forward process becoming recreated. Unfortunately, that's not the world we live in as once this function runs once then the config.connect_at is set and all further connections are deemed worker-to-worker connections (another thing I don't like about the Distributed.jl interface)

fwd_addr, fwd_port = parse_forward_info(readline(pf.out))

# Retain a reference to the port forward
config.userdata.port_forward[] = pf
Comment on lines +244 to +245
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AH I see now, that's what this is doing...


fwd_addr, fwd_port
else
intra_addr, intra_port
end

s, bind_addr = Distributed.connect_to_worker(bind_addr, port)
config.bind_addr = bind_addr

# write out a subset of the connect_at required for further worker-worker connection setups
config.connect_at = (intra_addr, Int(intra_port))

if config.io !== nothing
let pid = pid
Distributed.redirect_worker_output(pid, Base.notnothing(config.io))
end
end

return (s, s)
end

function parse_forward_info(str)
m = match(r"^Forwarding from (.*):(\d+) ->", str)
if m !== nothing
return (m.captures[1], parse(UInt16, m.captures[2]))
else
error("Unable to parse port-forward response")
end
end
Loading