diff --git a/Project.toml b/Project.toml index f5710b9..fa51620 100644 --- a/Project.toml +++ b/Project.toml @@ -1,8 +1,9 @@ name = "K8sClusterManagers" uuid = "5aeab163-63d2-4171-9fbf-e22244d80acb" -version = "0.1.4" +version = "0.1.5" [deps] +Compat = "34da2185-b29b-5c13-b0c7-acf172513d20" DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" @@ -10,13 +11,14 @@ Mocking = "78c3b35d-d492-501b-9361-3d52fe80e533" kubectl_jll = "ed23c2a5-89c4-5d52-b0ca-9d53aadf8c45" [compat] +Compat = "3.29, 4" DataStructures = "0.18" JSON = "0.21" Mocking = "0.7" Mustache = "1" YAML = "0.4.6" julia = "1.6" -kubectl_jll = "1.20.0" +kubectl_jll = "1.20" [extras] LibGit2 = "76f85450-5226-5b5a-8eaa-529ad045b433" diff --git a/README.md b/README.md index 2ee722a..7745531 100644 --- a/README.md +++ b/README.md @@ -7,22 +7,41 @@ A Julia cluster manager for provisioning workers in a Kubernetes (K8s) cluster. -Pairs well with [`julia_pod`](https://github.com/beacon-biosignals/julia_pod) for interactive Julia development within a K8s pod. - ## K8sClusterManager -The `K8sClusterManager` is intended to be used from a Pod running inside a Kubernetes -cluster. +The `K8sClusterManager` can be used both inside and outside of a Kubernetes cluster. +To get started you'll need access to a K8s cluster and have configured your machine with +access to the cluster. If you're new to K8s we recommend you use use [minikube](https://minikube.sigs.k8s.io) +to quickly setup a local Kubernetes cluster. + +### Running outside K8s + +A distributed Julia cluster where the manager runs outside of K8s while the workers run in +the cluster can quickly be created via: + +```julia +julia> using K8sClusterManagers, Distributed + +julia> addprocs(K8sClusterManager(2)) +``` + +When using the manager outside of Kubernetes cluster the manager will connect to workers +within the cluster using port-forwarding. Performance between the manager and workers will +be impacted by the network connection between the manager and the cluster. + +### Running inside K8s + +A Julia process running within a K8s cluster can also be used as a Julia distributed +manager. -Assuming you have `kubectl` installed locally and configured to connect to a cluster, you -can easily create an interactive Julia REPL session running from within the cluster by -executing: +To see this in action we'll create an interactive Julia REPL session running within the +cluster by executing: ```sh kubectl run -it example-manager-pod --image julia:1 ``` -Or equivalently, using a K8s manifest named `example-manager-pod.yaml` containing: +or equivalently, using a K8s manifest named `example-manager-pod.yaml` containing: ```yaml apiVersion: v1 @@ -52,7 +71,7 @@ Now in this Julia REPL session, you can do add two workers via: ```julia julia> using Pkg; Pkg.add("K8sClusterManagers") -julia> using K8sClusterManagers +julia> using K8sClusterManagers, Distributed julia> addprocs(K8sClusterManager(2)) ``` diff --git a/docs/src/examples.md b/docs/src/examples.md index 4bdfa1f..a19d3f8 100644 --- a/docs/src/examples.md +++ b/docs/src/examples.md @@ -1,16 +1,15 @@ Examples ======== -The [`K8sClusterManager`](@ref) is intended to be used inside a [Pod](https://kubernetes.io/docs/concepts/workloads/pods/) -running on a Kubernetes cluster. +The `K8sClusterManager` can be used both inside and outside of a Kubernetes cluster. ## Launching an interactive session The following manifest will create a Kubernetes [Job](https://kubernetes.io/docs/concepts/workloads/controllers/job/) -named "interactive-session". This Job will spawn a Pod (see `spec.template.spec`) which will -run an interactive Julia session with the latest release of K8sClusterManagers.jl installed. -Be sure to create the required [ServiceAccount and associated permissions](../patterns/#required-permissions) -before proceeding. +named "interactive-session". This Job will spawn a [Pod](https://kubernetes.io/docs/concepts/workloads/pods/) +(see `spec.template.spec`) which will run an interactive Julia session with the latest +release of K8sClusterManagers.jl installed. Be sure to create the required [ServiceAccount +and associated permissions](../patterns/#required-permissions) before proceeding. ````@eval using Markdown @@ -36,11 +35,12 @@ echo $manager_pod kubectl attach -it pod/${manager_pod?} ``` -### Launching workers +## Launching workers -Once you've attached to the interactive session you can use [`K8sClusterManager`](@ref) to -spawn K8s workers. For our example we'll be using a small amount of CPU/Memory to ensure -workers can be spawned even on clusters with limited resources: +You can use [`K8sClusterManager`](@ref) to spawn workers within the K8s cluster. The cluster +manager can be used both inside/outside of the K8s cluster. In the following example we'll +be using a small amount of CPU/Memory to ensure workers can be spawned even on clusters with +limited resources: ```julia julia> using Distributed, K8sClusterManagers @@ -61,7 +61,7 @@ julia> pmap(x -> myid(), 1:nworkers()) # Each worker reports its worker ID 2 ``` -### Pending workers +## Pending workers A worker created via `addprocs` may not necessarily be available right away, as K8s must schedule the worker's Pod to a Node before the corresponding Julia process can start. @@ -74,7 +74,7 @@ the manager will continue with the subset of workers which have reported in and workers that are stuck in the "Pending" phase. ```julia -julia> addprocs(K8sClusterManager(1, memory="1Ei", pending_timeout=10)) # Request 1 exbibyte of memory +julia> addprocs(K8sClusterManager(1, memory="1Pi", pending_timeout=10)) # Request 1 pebibyte of memory ┌ Warning: TimeoutException: timed out after waiting for worker interactive-session-d7jfb-worker-ffvnm to start for 10 seconds, with status: │ { │ "conditions": [ @@ -94,7 +94,7 @@ julia> addprocs(K8sClusterManager(1, memory="1Ei", pending_timeout=10)) # Reque Int64[] ``` -### Termination Reason +## Termination Reason When Julia workers [exceed the specified memory limit](https://kubernetes.io/docs/tasks/configure-pod-container/assign-memory-resource/#exceed-a-container-s-memory-limit) the worker Pod will be automatically killed by Kubernetes (OOMKilled). In such a diff --git a/src/K8sClusterManagers.jl b/src/K8sClusterManagers.jl index 090c8d0..69534f1 100644 --- a/src/K8sClusterManagers.jl +++ b/src/K8sClusterManagers.jl @@ -1,5 +1,6 @@ module K8sClusterManagers +using Compat: @something using DataStructures: DefaultOrderedDict, OrderedDict using Distributed: Distributed, ClusterManager, WorkerConfig, cluster_cookie using JSON: JSON diff --git a/src/native_driver.jl b/src/native_driver.jl index 1ccd85c..4d96b6d 100644 --- a/src/native_driver.jl +++ b/src/native_driver.jl @@ -33,14 +33,15 @@ available. - `manager_pod_name`: the name of the manager pod. Defaults to `gethostname()` which is the name of the pod when executed inside of a Kubernetes pod. - `worker_prefix`: the prefix given to spawned workers. Defaults to - `"\$(manager_pod_name)-worker"`. -- `image`: Docker image to use for the workers. Defaults to using the image of the Julia - caller if running within a pod using a single container otherwise is a required argument. + `"\$(manager_pod_name)-worker"` when the manager is running inside of K8s otherwise + defaults to `"$(gethostname())-worker`. +- `image`: Docker image to use for the workers. Defaults to the image used by the manager + when running inside of a K8s pod otherwise defaults to "julia:\$VERSION". - `cpu`: [CPU resources requested](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu) for each worker. Defaults to `$(repr(DEFAULT_WORKER_CPU))`, - `memory`: [Memory resource requested](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory) for each worker in bytes. Requests may provide a unit suffix (e.g. "G" for Gigabytes and - "GiB" for Gibibytes). Defaults to `$(repr(DEFAULT_WORKER_MEMORY))`. + "Gi" for Gibibytes). Defaults to `$(repr(DEFAULT_WORKER_MEMORY))`. - `pending_timeout`: The maximum number of seconds to wait for a "Pending" worker pod to enter the "Running" phase. Once the timeout has been reached the manager will continue with the number of workers available (`<= np`). Defaults to `180` (3 minutes). @@ -49,25 +50,27 @@ available. """ function K8sClusterManager(np::Integer; namespace::AbstractString=current_namespace(), - manager_pod_name=gethostname(), - worker_prefix::AbstractString="$(manager_pod_name)-worker", + manager_pod_name=isk8s() ? gethostname() : nothing, + worker_prefix::AbstractString="$(@something(manager_pod_name, gethostname()))-worker", image=nothing, cpu=DEFAULT_WORKER_CPU, memory=DEFAULT_WORKER_MEMORY, pending_timeout::Real=180, configure=identity) - - # Default to using the image of the pod if possible if image === nothing - pod = get_pod(manager_pod_name) - images = map(c -> c["image"], pod["spec"]["containers"]) - - if length(images) == 1 - image = first(images) - elseif length(images) > 0 - error("Unable to determine image from pod \"$manager_pod_name\" which uses multiple containers") + if manager_pod_name !== nothing + pod = get_pod(manager_pod_name) + images = map(c -> c["image"], pod["spec"]["containers"]) + + if length(images) == 1 + image = first(images) + elseif length(images) > 0 + error("Unable to determine image from pod \"$manager_pod_name\" which uses multiple containers") + else + error("Unable to find any images for pod \"$manager_pod_name\"") + end else - error("Unable to find any images for pod \"$manager_pod_name\"") + image = "julia:$VERSION" end end @@ -94,17 +97,33 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched:: exename = params[:exename] exeflags = params[:exeflags] - cmd = `$exename $exeflags --worker=$(cluster_cookie())` + # When using a standard Julia Docker image we can safely set the Julia executable name + # Alternatively, we could extend `Distributed.default_addprocs_params`. + if startswith(manager.image, "julia:") + exename = "julia" + end + + # Using `--bind-to=0.0.0.0` to force the worker to listen to all interfaces instead + # of only a single external interface. This is required for `kubectl port-forward`. + # TODO: Should file against the Julia repo about this issue. + cmd = `$exename $exeflags --worker=$(cluster_cookie()) --bind-to=0.0.0.0` worker_manifest = worker_pod_spec(manager; cmd, cluster_cookie=cluster_cookie()) # Note: User-defined `configure` function may or may-not be mutating worker_manifest = manager.configure(worker_manifest) + # Trigger any TOTP requests before the async loop + # TODO: Verify this is working correctly + success(`$(kubectl()) get pods -o 'jsonpath={.items[*].metadata.null}'`) + @sync for i in 1:manager.np @async begin pod_name = create_pod(worker_manifest) + # TODO: Add notice about having to pull an image. On a slow internet connection + # this can make it appear that the cluster start is hung + pod = try wait_for_running_pod(pod_name; timeout=manager.pending_timeout) catch e @@ -125,7 +144,7 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched:: config = WorkerConfig() config.io = p.out - config.userdata = (; pod_name=pod_name) + config.userdata = (; pod_name, port_forward=Ref{Base.Process}()) push!(launched, config) notify(c) @@ -156,6 +175,13 @@ function Distributed.manage(manager::K8sClusterManager, id::Integer, config::Wor end elseif op === :deregister + # Terminate the port-forward process. Without this these processes may + # persist until terminated by the cluster (e.g. `minikube stop`). + pf = config.userdata.port_forward + if isassigned(pf) + kill(pf[]) + end + # As the deregister `manage` call occurs before remote workers are told to # deregister we should avoid unnecessarily blocking. @async begin @@ -179,4 +205,70 @@ function Distributed.manage(manager::K8sClusterManager, id::Integer, config::Wor notify(DEREGISTER_ALERT; all=true) end end + + return nothing +end + +# Stripped down and modified version of: +# https://github.com/JuliaLang/julia/blob/844c20dd63870aa5b369b85038f0523d7d79308a/stdlib/Distributed/src/managers.jl#L567-L632 +function Distributed.connect(manager::K8sClusterManager, pid::Int, config::WorkerConfig) + # Note: This method currently doesn't implement support for worker-to-worker + # connections and instead relies on the `Distributed.connect(::DefaultClusterManager, ...)` + # for this. If we did need to perform special logic for worker-to-worker connections + # we would need to modify how `init_worker` is called via `start_worker`: + # https://github.com/JuliaLang/julia/blob/f7554b5c9f0f580a9fcf5c7b8b9a83b678e2f48a/stdlib/Distributed/src/cluster.jl#L375-L378 + + # master connecting to workers + if config.io !== nothing + # Not truly needed as we already know this information but since we are using `--worker` + # we may as well follow the standard protocol + bind_addr, port = Distributed.read_worker_host_port(config.io) + else + error("I/O not setup") + end + + pod_name = config.userdata.pod_name + + # As we've forced the worker to listen to all interfaces the reported `bind_addr` will + # be a non-routable address. We'll need to determine the in cluster IP address another + # way. + intra_addr = get_pod(pod_name)["status"]["podIP"] + intra_port = port + + bind_addr, port = if !isk8s() + # When the manager running outside of the K8s cluster we need to establish + # port-forward connections from the manager to the workers. + pf = open(`$(kubectl()) port-forward --address localhost pod/$pod_name :$intra_port`, "r") + fwd_addr, fwd_port = parse_forward_info(readline(pf.out)) + + # Retain a reference to the port forward + config.userdata.port_forward[] = pf + + fwd_addr, fwd_port + else + intra_addr, intra_port + end + + s, bind_addr = Distributed.connect_to_worker(bind_addr, port) + config.bind_addr = bind_addr + + # write out a subset of the connect_at required for further worker-worker connection setups + config.connect_at = (intra_addr, Int(intra_port)) + + if config.io !== nothing + let pid = pid + Distributed.redirect_worker_output(pid, Base.notnothing(config.io)) + end + end + + return (s, s) +end + +function parse_forward_info(str) + m = match(r"^Forwarding from (.*):(\d+) ->", str) + if m !== nothing + return (m.captures[1], parse(UInt16, m.captures[2])) + else + error("Unable to parse port-forward response") + end end diff --git a/test/cluster.jl b/test/cluster.jl index 1257061..1fe396e 100644 --- a/test/cluster.jl +++ b/test/cluster.jl @@ -218,7 +218,47 @@ let test_name = "test-isk8s" end end -let test_name = "test-success" +let test_name = "test-k8s-external-manager" + @testset "$test_name" begin + worker_prefix = "$(test_name)-$(TEST_RUN)-worker" + function configure(pod) + push!(pod["metadata"]["labels"], "test" => test_name, COMMON_LABELS...) + return pod + end + + # Manager must be running outside of a K8s cluster for these tests + @test !isk8s() + + worker_ids = addprocs(K8sClusterManager(1; configure, worker_prefix, pending_timeout=60, cpu="0.5", memory="300Mi")) + @test nprocs() == 2 + + worker_id = only(worker_ids) + worker_pod = remotecall_fetch(gethostname, worker_id) + + @test pod_exists(worker_pod) + @test pod_phase(worker_pod) == "Running" + + # Worker image should default to the standard Julia docker image when the manager + # isn't running as a K8s pod + @test only(pod_images(worker_pod)) == "julia:$VERSION" + + # Execute code on remote worker + @test remotecall_fetch(myid, worker_id) == worker_id + + rmprocs(worker_id) + timedwait(20; pollint=1) do + pod_phase(worker_pod) != "Running" + end + + # Removed workers should have a return code of zero + @test pod_phase(worker_pod) == "Succeeded" + + @info "Deleting pod for $test_name" + delete_pod(worker_pod; wait=false) + end +end + +let test_name = "test-k8s-internal-manager" @testset "$test_name" begin job_name = "$(test_name)-$(TEST_RUN)" worker_prefix = "$(job_name)-worker" @@ -285,6 +325,9 @@ let test_name = "test-success" @test output_matches[1][:worker_id] == "2" @test output_matches[1][:output] == worker_pod + # Worker image should default to the manager's image + @test only(pod_images(worker_pod)) == only(pod_images(manager_pod)) + # Ensure there are no unexpected error messages in the log @test !occursin(r"\bError\b"i, manager_log) ] @@ -379,6 +422,7 @@ let test_name = "test-multi-addprocs" end end +# Keep this test using a in-cluster manager as the interrupt reports the error in the let test_name = "test-interrupt" @testset "$test_name" begin job_name = "$(test_name)-$(TEST_RUN)" diff --git a/test/utils.jl b/test/utils.jl index d6cfeca..b14c1cf 100644 --- a/test/utils.jl +++ b/test/utils.jl @@ -56,6 +56,12 @@ function pod_names(labels::Pair...; sort_by=nothing) return !isempty(output) ? split(output, '\n') : String[] end +function pod_images(pod_name) + jsonpath = """{range .spec.containers[*]}{.image}{"\\n"}{end}""" + output = readchomp(`$(kubectl()) get pod/$pod_name -o jsonpath=$jsonpath`) + return split(output, '\n'; keepempty=false) +end + # Use the double-quoted flow scalar style to allow us to have a YAML string which includes # newlines without being aware of YAML indentation (block styles) #