Skip to content

Commit

Permalink
Support managers external to K8s clusters (#108)
Browse files Browse the repository at this point in the history
* Fix documentation typo

* Import Distributed in README example

* Trigger TOTP requests outside of async block

* Add TODO about slow image pulling

* Support managers outside of K8s clusters

* Drop patch from kubectl_jll compat

* Set project version to 0.1.5

* Refactor external manager support

Attempted to remove `--bind-to` entirely and discovered that the port
number does not need to be fixed. It turns out that the issue I
originally ran into while experimenting with this is that the Julia was
only listening to the external interface while `kubectl port-forward`
tries is forwarding from the pod's localhost interface.

* Documentation update

* Drop unused worker-to-worker logic

* Avoid use of nworker for testing for a single worker
  • Loading branch information
omus authored Apr 13, 2023
1 parent 3fc33a8 commit 1139430
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 43 deletions.
6 changes: 4 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
name = "K8sClusterManagers"
uuid = "5aeab163-63d2-4171-9fbf-e22244d80acb"
version = "0.1.4"
version = "0.1.5"

[deps]
Compat = "34da2185-b29b-5c13-b0c7-acf172513d20"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Mocking = "78c3b35d-d492-501b-9361-3d52fe80e533"
kubectl_jll = "ed23c2a5-89c4-5d52-b0ca-9d53aadf8c45"

[compat]
Compat = "3.29, 4"
DataStructures = "0.18"
JSON = "0.21"
Mocking = "0.7"
Mustache = "1"
YAML = "0.4.6"
julia = "1.6"
kubectl_jll = "1.20.0"
kubectl_jll = "1.20"

[extras]
LibGit2 = "76f85450-5226-5b5a-8eaa-529ad045b433"
Expand Down
37 changes: 28 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,41 @@

A Julia cluster manager for provisioning workers in a Kubernetes (K8s) cluster.

Pairs well with [`julia_pod`](https://github.com/beacon-biosignals/julia_pod) for interactive Julia development within a K8s pod.

## K8sClusterManager

The `K8sClusterManager` is intended to be used from a Pod running inside a Kubernetes
cluster.
The `K8sClusterManager` can be used both inside and outside of a Kubernetes cluster.
To get started you'll need access to a K8s cluster and have configured your machine with
access to the cluster. If you're new to K8s we recommend you use use [minikube](https://minikube.sigs.k8s.io)
to quickly setup a local Kubernetes cluster.

### Running outside K8s

A distributed Julia cluster where the manager runs outside of K8s while the workers run in
the cluster can quickly be created via:

```julia
julia> using K8sClusterManagers, Distributed

julia> addprocs(K8sClusterManager(2))
```

When using the manager outside of Kubernetes cluster the manager will connect to workers
within the cluster using port-forwarding. Performance between the manager and workers will
be impacted by the network connection between the manager and the cluster.

### Running inside K8s

A Julia process running within a K8s cluster can also be used as a Julia distributed
manager.

Assuming you have `kubectl` installed locally and configured to connect to a cluster, you
can easily create an interactive Julia REPL session running from within the cluster by
executing:
To see this in action we'll create an interactive Julia REPL session running within the
cluster by executing:

```sh
kubectl run -it example-manager-pod --image julia:1
```

Or equivalently, using a K8s manifest named `example-manager-pod.yaml` containing:
or equivalently, using a K8s manifest named `example-manager-pod.yaml` containing:

```yaml
apiVersion: v1
Expand Down Expand Up @@ -52,7 +71,7 @@ Now in this Julia REPL session, you can do add two workers via:
```julia
julia> using Pkg; Pkg.add("K8sClusterManagers")

julia> using K8sClusterManagers
julia> using K8sClusterManagers, Distributed

julia> addprocs(K8sClusterManager(2))
```
Expand Down
26 changes: 13 additions & 13 deletions docs/src/examples.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
Examples
========

The [`K8sClusterManager`](@ref) is intended to be used inside a [Pod](https://kubernetes.io/docs/concepts/workloads/pods/)
running on a Kubernetes cluster.
The `K8sClusterManager` can be used both inside and outside of a Kubernetes cluster.

## Launching an interactive session

The following manifest will create a Kubernetes [Job](https://kubernetes.io/docs/concepts/workloads/controllers/job/)
named "interactive-session". This Job will spawn a Pod (see `spec.template.spec`) which will
run an interactive Julia session with the latest release of K8sClusterManagers.jl installed.
Be sure to create the required [ServiceAccount and associated permissions](../patterns/#required-permissions)
before proceeding.
named "interactive-session". This Job will spawn a [Pod](https://kubernetes.io/docs/concepts/workloads/pods/)
(see `spec.template.spec`) which will run an interactive Julia session with the latest
release of K8sClusterManagers.jl installed. Be sure to create the required [ServiceAccount
and associated permissions](../patterns/#required-permissions) before proceeding.

````@eval
using Markdown
Expand All @@ -36,11 +35,12 @@ echo $manager_pod
kubectl attach -it pod/${manager_pod?}
```

### Launching workers
## Launching workers

Once you've attached to the interactive session you can use [`K8sClusterManager`](@ref) to
spawn K8s workers. For our example we'll be using a small amount of CPU/Memory to ensure
workers can be spawned even on clusters with limited resources:
You can use [`K8sClusterManager`](@ref) to spawn workers within the K8s cluster. The cluster
manager can be used both inside/outside of the K8s cluster. In the following example we'll
be using a small amount of CPU/Memory to ensure workers can be spawned even on clusters with
limited resources:

```julia
julia> using Distributed, K8sClusterManagers
Expand All @@ -61,7 +61,7 @@ julia> pmap(x -> myid(), 1:nworkers()) # Each worker reports its worker ID
2
```
### Pending workers
## Pending workers
A worker created via `addprocs` may not necessarily be available right away, as K8s must
schedule the worker's Pod to a Node before the corresponding Julia process can start.
Expand All @@ -74,7 +74,7 @@ the manager will continue with the subset of workers which have reported in and
workers that are stuck in the "Pending" phase.
```julia
julia> addprocs(K8sClusterManager(1, memory="1Ei", pending_timeout=10)) # Request 1 exbibyte of memory
julia> addprocs(K8sClusterManager(1, memory="1Pi", pending_timeout=10)) # Request 1 pebibyte of memory
┌ Warning: TimeoutException: timed out after waiting for worker interactive-session-d7jfb-worker-ffvnm to start for 10 seconds, with status:
│ {
"conditions": [
Expand All @@ -94,7 +94,7 @@ julia> addprocs(K8sClusterManager(1, memory="1Ei", pending_timeout=10)) # Reque
Int64[]
```
### Termination Reason
## Termination Reason
When Julia workers [exceed the specified memory limit](https://kubernetes.io/docs/tasks/configure-pod-container/assign-memory-resource/#exceed-a-container-s-memory-limit)
the worker Pod will be automatically killed by Kubernetes (OOMKilled). In such a
Expand Down
1 change: 1 addition & 0 deletions src/K8sClusterManagers.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module K8sClusterManagers

using Compat: @something
using DataStructures: DefaultOrderedDict, OrderedDict
using Distributed: Distributed, ClusterManager, WorkerConfig, cluster_cookie
using JSON: JSON
Expand Down
128 changes: 110 additions & 18 deletions src/native_driver.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ available.
- `manager_pod_name`: the name of the manager pod. Defaults to `gethostname()` which is
the name of the pod when executed inside of a Kubernetes pod.
- `worker_prefix`: the prefix given to spawned workers. Defaults to
`"\$(manager_pod_name)-worker"`.
- `image`: Docker image to use for the workers. Defaults to using the image of the Julia
caller if running within a pod using a single container otherwise is a required argument.
`"\$(manager_pod_name)-worker"` when the manager is running inside of K8s otherwise
defaults to `"$(gethostname())-worker`.
- `image`: Docker image to use for the workers. Defaults to the image used by the manager
when running inside of a K8s pod otherwise defaults to "julia:\$VERSION".
- `cpu`: [CPU resources requested](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu)
for each worker. Defaults to `$(repr(DEFAULT_WORKER_CPU))`,
- `memory`: [Memory resource requested](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory)
for each worker in bytes. Requests may provide a unit suffix (e.g. "G" for Gigabytes and
"GiB" for Gibibytes). Defaults to `$(repr(DEFAULT_WORKER_MEMORY))`.
"Gi" for Gibibytes). Defaults to `$(repr(DEFAULT_WORKER_MEMORY))`.
- `pending_timeout`: The maximum number of seconds to wait for a "Pending" worker pod to
enter the "Running" phase. Once the timeout has been reached the manager will continue
with the number of workers available (`<= np`). Defaults to `180` (3 minutes).
Expand All @@ -49,25 +50,27 @@ available.
"""
function K8sClusterManager(np::Integer;
namespace::AbstractString=current_namespace(),
manager_pod_name=gethostname(),
worker_prefix::AbstractString="$(manager_pod_name)-worker",
manager_pod_name=isk8s() ? gethostname() : nothing,
worker_prefix::AbstractString="$(@something(manager_pod_name, gethostname()))-worker",
image=nothing,
cpu=DEFAULT_WORKER_CPU,
memory=DEFAULT_WORKER_MEMORY,
pending_timeout::Real=180,
configure=identity)

# Default to using the image of the pod if possible
if image === nothing
pod = get_pod(manager_pod_name)
images = map(c -> c["image"], pod["spec"]["containers"])

if length(images) == 1
image = first(images)
elseif length(images) > 0
error("Unable to determine image from pod \"$manager_pod_name\" which uses multiple containers")
if manager_pod_name !== nothing
pod = get_pod(manager_pod_name)
images = map(c -> c["image"], pod["spec"]["containers"])

if length(images) == 1
image = first(images)
elseif length(images) > 0
error("Unable to determine image from pod \"$manager_pod_name\" which uses multiple containers")
else
error("Unable to find any images for pod \"$manager_pod_name\"")
end
else
error("Unable to find any images for pod \"$manager_pod_name\"")
image = "julia:$VERSION"
end
end

Expand All @@ -94,17 +97,33 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched::
exename = params[:exename]
exeflags = params[:exeflags]

cmd = `$exename $exeflags --worker=$(cluster_cookie())`
# When using a standard Julia Docker image we can safely set the Julia executable name
# Alternatively, we could extend `Distributed.default_addprocs_params`.
if startswith(manager.image, "julia:")
exename = "julia"
end

# Using `--bind-to=0.0.0.0` to force the worker to listen to all interfaces instead
# of only a single external interface. This is required for `kubectl port-forward`.
# TODO: Should file against the Julia repo about this issue.
cmd = `$exename $exeflags --worker=$(cluster_cookie()) --bind-to=0.0.0.0`

worker_manifest = worker_pod_spec(manager; cmd, cluster_cookie=cluster_cookie())

# Note: User-defined `configure` function may or may-not be mutating
worker_manifest = manager.configure(worker_manifest)

# Trigger any TOTP requests before the async loop
# TODO: Verify this is working correctly
success(`$(kubectl()) get pods -o 'jsonpath={.items[*].metadata.null}'`)

@sync for i in 1:manager.np
@async begin
pod_name = create_pod(worker_manifest)

# TODO: Add notice about having to pull an image. On a slow internet connection
# this can make it appear that the cluster start is hung

pod = try
wait_for_running_pod(pod_name; timeout=manager.pending_timeout)
catch e
Expand All @@ -125,7 +144,7 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched::

config = WorkerConfig()
config.io = p.out
config.userdata = (; pod_name=pod_name)
config.userdata = (; pod_name, port_forward=Ref{Base.Process}())

push!(launched, config)
notify(c)
Expand Down Expand Up @@ -156,6 +175,13 @@ function Distributed.manage(manager::K8sClusterManager, id::Integer, config::Wor
end

elseif op === :deregister
# Terminate the port-forward process. Without this these processes may
# persist until terminated by the cluster (e.g. `minikube stop`).
pf = config.userdata.port_forward
if isassigned(pf)
kill(pf[])
end

# As the deregister `manage` call occurs before remote workers are told to
# deregister we should avoid unnecessarily blocking.
@async begin
Expand All @@ -179,4 +205,70 @@ function Distributed.manage(manager::K8sClusterManager, id::Integer, config::Wor
notify(DEREGISTER_ALERT; all=true)
end
end

return nothing
end

# Stripped down and modified version of:
# https://github.com/JuliaLang/julia/blob/844c20dd63870aa5b369b85038f0523d7d79308a/stdlib/Distributed/src/managers.jl#L567-L632
function Distributed.connect(manager::K8sClusterManager, pid::Int, config::WorkerConfig)
# Note: This method currently doesn't implement support for worker-to-worker
# connections and instead relies on the `Distributed.connect(::DefaultClusterManager, ...)`
# for this. If we did need to perform special logic for worker-to-worker connections
# we would need to modify how `init_worker` is called via `start_worker`:
# https://github.com/JuliaLang/julia/blob/f7554b5c9f0f580a9fcf5c7b8b9a83b678e2f48a/stdlib/Distributed/src/cluster.jl#L375-L378

# master connecting to workers
if config.io !== nothing
# Not truly needed as we already know this information but since we are using `--worker`
# we may as well follow the standard protocol
bind_addr, port = Distributed.read_worker_host_port(config.io)
else
error("I/O not setup")
end

pod_name = config.userdata.pod_name

# As we've forced the worker to listen to all interfaces the reported `bind_addr` will
# be a non-routable address. We'll need to determine the in cluster IP address another
# way.
intra_addr = get_pod(pod_name)["status"]["podIP"]
intra_port = port

bind_addr, port = if !isk8s()
# When the manager running outside of the K8s cluster we need to establish
# port-forward connections from the manager to the workers.
pf = open(`$(kubectl()) port-forward --address localhost pod/$pod_name :$intra_port`, "r")
fwd_addr, fwd_port = parse_forward_info(readline(pf.out))

# Retain a reference to the port forward
config.userdata.port_forward[] = pf

fwd_addr, fwd_port
else
intra_addr, intra_port
end

s, bind_addr = Distributed.connect_to_worker(bind_addr, port)
config.bind_addr = bind_addr

# write out a subset of the connect_at required for further worker-worker connection setups
config.connect_at = (intra_addr, Int(intra_port))

if config.io !== nothing
let pid = pid
Distributed.redirect_worker_output(pid, Base.notnothing(config.io))
end
end

return (s, s)
end

function parse_forward_info(str)
m = match(r"^Forwarding from (.*):(\d+) ->", str)
if m !== nothing
return (m.captures[1], parse(UInt16, m.captures[2]))
else
error("Unable to parse port-forward response")
end
end
Loading

2 comments on commit 1139430

@omus
Copy link
Member Author

@omus omus commented on 1139430 Apr 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/81570

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.1.5 -m "<description of version>" 11394302e85e8eea2510a4ed8bd79768feb8b776
git push origin v0.1.5

Please sign in to comment.