Skip to content

Commit

Permalink
[Test] Add e2e test for sample RayJob yaml on kind (#935)
Browse files Browse the repository at this point in the history
Add e2e test for sample RayJob yaml on kind
  • Loading branch information
architkulkarni authored Apr 17, 2023
1 parent deb29bd commit ef290e0
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 3 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/actions/configuration/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,23 @@ runs:
run: |
python tests/test_sample_raycluster_yamls.py
shell: bash

- name: Run tests for sample RayJob YAML files with the nightly operator.
# Depends on the KubeRay operator image built in previous steps
env:
GITHUB_ACTIONS: true
RAY_IMAGE: rayproject/ray:${{ inputs.ray_version }}
OPERATOR_IMAGE: kuberay/operator:${{ steps.vars.outputs.sha_short }}
run: |
python tests/test_sample_rayjob_yamls.py
shell: bash

- name: Run tests for sample RayJob YAML files with the latest KubeRay release.
# Depends on latest KubeRay release.
env:
GITHUB_ACTIONS: true
RAY_IMAGE: rayproject/ray:${{ inputs.ray_version }}
OPERATOR_IMAGE: kuberay/operator:v0.5.0 # The operator image in the latest KubeRay release.
run: |
python tests/test_sample_rayjob_yamls.py
shell: bash
12 changes: 11 additions & 1 deletion ray-operator/config/samples/ray_v1alpha1_rayjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ spec:
# the following params are used to complete the ray start: ray start --head --block --redis-port=6379 ...
rayStartParams:
dashboard-host: '0.0.0.0'
num-cpus: '2' # can be auto-completed from the limits
num-cpus: '1' # can be auto-completed from the limits
#pod template
template:
spec:
Expand All @@ -37,6 +37,11 @@ spec:
name: client
- containerPort: 8000
name: serve
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
Expand Down Expand Up @@ -68,6 +73,11 @@ spec:
preStop:
exec:
command: [ "/bin/sh","-c","ray stop" ]
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
######################Ray code sample#################################
# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example
# it is mounted into the container and executed to show the Ray job at work
Expand Down
98 changes: 98 additions & 0 deletions tests/framework/prototype.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from framework.utils import (
create_custom_object,
delete_custom_object,
get_custom_object,
get_head_pod,
logger,
pod_exec_command,
Expand Down Expand Up @@ -368,6 +369,103 @@ def clean_up(self):
show_cluster_info(self.namespace)
raise Exception("RayServiceAddCREvent clean_up() timeout")

class RayJobAddCREvent(CREvent):
"""CREvent for RayJob addition"""
def wait(self):
"""Wait for RayJob to converge"""
start_time = time.time()
expected_head_pods = get_expected_head_pods(self.custom_resource_object)
expected_worker_pods = get_expected_worker_pods(self.custom_resource_object)
# Wait until:
# (1) The number of head pods and worker pods are as expected.
# (2) All head pods and worker pods are "Running".
# (3) RayJob named "rayjob-sample" has status "SUCCEEDED".
converge = False
k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]
for i in range(self.timeout):
headpods = k8s_v1_api.list_namespaced_pod(
namespace = self.namespace, label_selector='ray.io/node-type=head')
workerpods = k8s_v1_api.list_namespaced_pod(
namespace = self.namespace, label_selector='ray.io/node-type=worker')
rayjob = get_custom_object(CONST.RAY_JOB_CRD, self.namespace,
self.custom_resource_object['metadata']['name'])

if (len(headpods.items) == expected_head_pods
and len(workerpods.items) == expected_worker_pods
and check_pod_running(headpods.items) and check_pod_running(workerpods.items)
and rayjob.get('status') is not None
and rayjob.get('status').get('jobStatus') == "SUCCEEDED"):
converge = True
logger.info("--- RayJobAddCREvent converged in %s seconds ---",
time.time() - start_time)
break
else:
# Print debug logs every 10 seconds.
if i % 10 == 0 and i != 0:
logger.info("RayJobAddCREvent wait() hasn't converged yet.")
# Print out the delta between expected and actual for the parts that are not
# converged yet.
if len(headpods.items) != expected_head_pods:
logger.info("expected_head_pods: %d, actual_head_pods: %d",
expected_head_pods, len(headpods.items))
if len(workerpods.items) != expected_worker_pods:
logger.info("expected_worker_pods: %d, actual_worker_pods: %d",
expected_worker_pods, len(workerpods.items))
if not check_pod_running(headpods.items):
logger.info("head pods are not running yet.")
if not check_pod_running(workerpods.items):
logger.info("worker pods are not running yet.")
if rayjob.get('status') is None:
logger.info("rayjob status is None.")
elif rayjob.get('status').get('jobStatus') != "SUCCEEDED":
logger.info("rayjob status is not SUCCEEDED yet.")
logger.info("rayjob status: %s", rayjob.get('status').get('jobStatus'))

if (rayjob.get("status") is not None and
rayjob.get("status").get("jobStatus") in ["STOPPED", "FAILED"]):
logger.info("Job Status: %s", rayjob.get("status").get("jobStatus"))
logger.info("Job Message: %s", rayjob.get("status").get("message"))
break
time.sleep(1)

if not converge:
logger.info("RayJobAddCREvent wait() failed to converge in %d seconds.",
self.timeout)
logger.info("expected_head_pods: %d, expected_worker_pods: %d",
expected_head_pods, expected_worker_pods)
logger.info("rayjob: %s", rayjob)
show_cluster_info(self.namespace)
raise Exception("RayJobAddCREvent wait() timeout")

def clean_up(self):
"""Delete added RayJob"""
if not self.filepath:
delete_custom_object(CONST.RAY_JOB_CRD,
self.namespace, self.custom_resource_object['metadata']['name'])
else:
shell_subprocess_run(f"kubectl delete -n {self.namespace} -f {self.filepath}")
# Wait for pods to be deleted
converge = False
k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY]
start_time = time.time()
for _ in range(self.timeout):
headpods = k8s_v1_api.list_namespaced_pod(
namespace = self.namespace, label_selector = 'ray.io/node-type=head')
workerpods = k8s_v1_api.list_namespaced_pod(
namespace = self.namespace, label_selector = 'ray.io/node-type=worker')
if (len(headpods.items) == 0 and len(workerpods.items) == 0):
converge = True
logger.info("--- Cleanup RayJob %s seconds ---", time.time() - start_time)
break
time.sleep(1)

if not converge:
logger.info("RayJobAddCREvent clean_up() failed to converge in %d seconds.",
self.timeout)
logger.info("expected_head_pods: 0, expected_worker_pods: 0")
show_cluster_info(self.namespace)
raise Exception("RayJobAddCREvent clean_up() timeout")

class GeneralTestCase(unittest.TestCase):
"""TestSuite"""
def __init__(self, methodName, docker_image_dict, cr_event):
Expand Down
24 changes: 22 additions & 2 deletions tests/framework/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ def create_custom_object(namespace, cr_object):
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayservices', body = cr_object)
elif crd == CONST.RAY_JOB_CRD:
raise NotImplementedError
k8s_cr_api.create_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayjobs', body = cr_object)

def delete_custom_object(crd, namespace, cr_name):
"""Delete the given `cr_name` custom resource in the given `namespace`."""
Expand All @@ -224,4 +226,22 @@ def delete_custom_object(crd, namespace, cr_name):
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayservices', name = cr_name)
elif crd == CONST.RAY_JOB_CRD:
raise NotImplementedError
k8s_cr_api.delete_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayjobs', name = cr_name)

def get_custom_object(crd, namespace, cr_name):
"""Get the given `cr_name` custom resource in the given `namespace`."""
k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY]
if crd == CONST.RAY_CLUSTER_CRD:
return k8s_cr_api.get_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayclusters', name = cr_name)
elif crd == CONST.RAY_SERVICE_CRD:
return k8s_cr_api.get_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayservices', name = cr_name)
elif crd == CONST.RAY_JOB_CRD:
return k8s_cr_api.get_namespaced_custom_object(
group = 'ray.io', version = 'v1alpha1', namespace = namespace,
plural = 'rayjobs', name = cr_name)
58 changes: 58 additions & 0 deletions tests/test_sample_rayjob_yamls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
''' Test sample RayJob YAML files to catch invalid and outdated ones. '''
import unittest
import os
import logging
import yaml

from framework.prototype import (
RuleSet,
GeneralTestCase,
RayJobAddCREvent,
EasyJobRule,
)

from framework.utils import (
CONST
)

logger = logging.getLogger(__name__)

if __name__ == '__main__':
NAMESPACE = 'default'
SAMPLE_PATH = CONST.REPO_ROOT.joinpath("ray-operator/config/samples/")
YAMLs = ['ray_v1alpha1_rayjob.yaml']

sample_yaml_files = []
for filename in YAMLs:
filepath = SAMPLE_PATH.joinpath(filename)
with open(filepath, encoding="utf-8") as cr_yaml:
for k8s_object in yaml.safe_load_all(cr_yaml):
if k8s_object['kind'] == 'RayJob':
sample_yaml_files.append(
{'path': filepath, 'name': filename, 'cr': k8s_object}
)
break
# TODO(architkulkarni): Add RayJobSuccessRule. Currently fails with the following error:
# Failed to start Job Supervisor actor: The name _ray_internal_job_actor_rayjob-sample-8tzrb
# (namespace=SUPERVISOR_ACTOR_RAY_NAMESPACE) is already taken.
rs = RuleSet([EasyJobRule()])
image_dict = {
CONST.RAY_IMAGE_KEY: os.getenv('RAY_IMAGE', default='rayproject/ray:2.3.0'),
CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'),
}
logger.info(image_dict)

# Build a test plan
logger.info("Building a test plan ...")
test_cases = unittest.TestSuite()
for index, new_cr in enumerate(sample_yaml_files):
logger.info('[TEST %d]: %s', index, new_cr['name'])
addEvent = RayJobAddCREvent(new_cr['cr'], [rs], 300, NAMESPACE, new_cr['path'])
test_cases.addTest(GeneralTestCase('runtest', image_dict, addEvent))

# Execute all testsCRs
runner = unittest.TextTestRunner()
test_result = runner.run(test_cases)

# Without this line, the exit code will always be 0.
assert test_result.wasSuccessful()

0 comments on commit ef290e0

Please sign in to comment.