diff --git a/.github/workflows/actions/configuration/action.yaml b/.github/workflows/actions/configuration/action.yaml index 7fdbd788ea..9baee70cf4 100644 --- a/.github/workflows/actions/configuration/action.yaml +++ b/.github/workflows/actions/configuration/action.yaml @@ -67,3 +67,23 @@ runs: run: | python tests/test_sample_raycluster_yamls.py shell: bash + + - name: Run tests for sample RayJob YAML files with the nightly operator. + # Depends on the KubeRay operator image built in previous steps + env: + GITHUB_ACTIONS: true + RAY_IMAGE: rayproject/ray:${{ inputs.ray_version }} + OPERATOR_IMAGE: kuberay/operator:${{ steps.vars.outputs.sha_short }} + run: | + python tests/test_sample_rayjob_yamls.py + shell: bash + + - name: Run tests for sample RayJob YAML files with the latest KubeRay release. + # Depends on latest KubeRay release. + env: + GITHUB_ACTIONS: true + RAY_IMAGE: rayproject/ray:${{ inputs.ray_version }} + OPERATOR_IMAGE: kuberay/operator:v0.5.0 # The operator image in the latest KubeRay release. + run: | + python tests/test_sample_rayjob_yamls.py + shell: bash diff --git a/ray-operator/config/samples/ray_v1alpha1_rayjob.yaml b/ray-operator/config/samples/ray_v1alpha1_rayjob.yaml index ce98f2d0f5..04b9ee9af5 100644 --- a/ray-operator/config/samples/ray_v1alpha1_rayjob.yaml +++ b/ray-operator/config/samples/ray_v1alpha1_rayjob.yaml @@ -21,7 +21,7 @@ spec: # the following params are used to complete the ray start: ray start --head --block --redis-port=6379 ... rayStartParams: dashboard-host: '0.0.0.0' - num-cpus: '2' # can be auto-completed from the limits + num-cpus: '1' # can be auto-completed from the limits #pod template template: spec: @@ -37,6 +37,11 @@ spec: name: client - containerPort: 8000 name: serve + resources: + limits: + cpu: "1" + requests: + cpu: "200m" volumeMounts: - mountPath: /home/ray/samples name: code-sample @@ -68,6 +73,11 @@ spec: preStop: exec: command: [ "/bin/sh","-c","ray stop" ] + resources: + limits: + cpu: "1" + requests: + cpu: "200m" ######################Ray code sample################################# # this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example # it is mounted into the container and executed to show the Ray job at work diff --git a/tests/framework/prototype.py b/tests/framework/prototype.py index 533410167f..9f2246b174 100644 --- a/tests/framework/prototype.py +++ b/tests/framework/prototype.py @@ -7,6 +7,7 @@ from framework.utils import ( create_custom_object, delete_custom_object, + get_custom_object, get_head_pod, logger, pod_exec_command, @@ -368,6 +369,103 @@ def clean_up(self): show_cluster_info(self.namespace) raise Exception("RayServiceAddCREvent clean_up() timeout") +class RayJobAddCREvent(CREvent): + """CREvent for RayJob addition""" + def wait(self): + """Wait for RayJob to converge""" + start_time = time.time() + expected_head_pods = get_expected_head_pods(self.custom_resource_object) + expected_worker_pods = get_expected_worker_pods(self.custom_resource_object) + # Wait until: + # (1) The number of head pods and worker pods are as expected. + # (2) All head pods and worker pods are "Running". + # (3) RayJob named "rayjob-sample" has status "SUCCEEDED". + converge = False + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] + for i in range(self.timeout): + headpods = k8s_v1_api.list_namespaced_pod( + namespace = self.namespace, label_selector='ray.io/node-type=head') + workerpods = k8s_v1_api.list_namespaced_pod( + namespace = self.namespace, label_selector='ray.io/node-type=worker') + rayjob = get_custom_object(CONST.RAY_JOB_CRD, self.namespace, + self.custom_resource_object['metadata']['name']) + + if (len(headpods.items) == expected_head_pods + and len(workerpods.items) == expected_worker_pods + and check_pod_running(headpods.items) and check_pod_running(workerpods.items) + and rayjob.get('status') is not None + and rayjob.get('status').get('jobStatus') == "SUCCEEDED"): + converge = True + logger.info("--- RayJobAddCREvent converged in %s seconds ---", + time.time() - start_time) + break + else: + # Print debug logs every 10 seconds. + if i % 10 == 0 and i != 0: + logger.info("RayJobAddCREvent wait() hasn't converged yet.") + # Print out the delta between expected and actual for the parts that are not + # converged yet. + if len(headpods.items) != expected_head_pods: + logger.info("expected_head_pods: %d, actual_head_pods: %d", + expected_head_pods, len(headpods.items)) + if len(workerpods.items) != expected_worker_pods: + logger.info("expected_worker_pods: %d, actual_worker_pods: %d", + expected_worker_pods, len(workerpods.items)) + if not check_pod_running(headpods.items): + logger.info("head pods are not running yet.") + if not check_pod_running(workerpods.items): + logger.info("worker pods are not running yet.") + if rayjob.get('status') is None: + logger.info("rayjob status is None.") + elif rayjob.get('status').get('jobStatus') != "SUCCEEDED": + logger.info("rayjob status is not SUCCEEDED yet.") + logger.info("rayjob status: %s", rayjob.get('status').get('jobStatus')) + + if (rayjob.get("status") is not None and + rayjob.get("status").get("jobStatus") in ["STOPPED", "FAILED"]): + logger.info("Job Status: %s", rayjob.get("status").get("jobStatus")) + logger.info("Job Message: %s", rayjob.get("status").get("message")) + break + time.sleep(1) + + if not converge: + logger.info("RayJobAddCREvent wait() failed to converge in %d seconds.", + self.timeout) + logger.info("expected_head_pods: %d, expected_worker_pods: %d", + expected_head_pods, expected_worker_pods) + logger.info("rayjob: %s", rayjob) + show_cluster_info(self.namespace) + raise Exception("RayJobAddCREvent wait() timeout") + + def clean_up(self): + """Delete added RayJob""" + if not self.filepath: + delete_custom_object(CONST.RAY_JOB_CRD, + self.namespace, self.custom_resource_object['metadata']['name']) + else: + shell_subprocess_run(f"kubectl delete -n {self.namespace} -f {self.filepath}") + # Wait for pods to be deleted + converge = False + k8s_v1_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_V1_CLIENT_KEY] + start_time = time.time() + for _ in range(self.timeout): + headpods = k8s_v1_api.list_namespaced_pod( + namespace = self.namespace, label_selector = 'ray.io/node-type=head') + workerpods = k8s_v1_api.list_namespaced_pod( + namespace = self.namespace, label_selector = 'ray.io/node-type=worker') + if (len(headpods.items) == 0 and len(workerpods.items) == 0): + converge = True + logger.info("--- Cleanup RayJob %s seconds ---", time.time() - start_time) + break + time.sleep(1) + + if not converge: + logger.info("RayJobAddCREvent clean_up() failed to converge in %d seconds.", + self.timeout) + logger.info("expected_head_pods: 0, expected_worker_pods: 0") + show_cluster_info(self.namespace) + raise Exception("RayJobAddCREvent clean_up() timeout") + class GeneralTestCase(unittest.TestCase): """TestSuite""" def __init__(self, methodName, docker_image_dict, cr_event): diff --git a/tests/framework/utils.py b/tests/framework/utils.py index 9d5e59557b..7709195ae4 100644 --- a/tests/framework/utils.py +++ b/tests/framework/utils.py @@ -210,7 +210,9 @@ def create_custom_object(namespace, cr_object): group = 'ray.io', version = 'v1alpha1', namespace = namespace, plural = 'rayservices', body = cr_object) elif crd == CONST.RAY_JOB_CRD: - raise NotImplementedError + k8s_cr_api.create_namespaced_custom_object( + group = 'ray.io', version = 'v1alpha1', namespace = namespace, + plural = 'rayjobs', body = cr_object) def delete_custom_object(crd, namespace, cr_name): """Delete the given `cr_name` custom resource in the given `namespace`.""" @@ -224,4 +226,22 @@ def delete_custom_object(crd, namespace, cr_name): group = 'ray.io', version = 'v1alpha1', namespace = namespace, plural = 'rayservices', name = cr_name) elif crd == CONST.RAY_JOB_CRD: - raise NotImplementedError + k8s_cr_api.delete_namespaced_custom_object( + group = 'ray.io', version = 'v1alpha1', namespace = namespace, + plural = 'rayjobs', name = cr_name) + +def get_custom_object(crd, namespace, cr_name): + """Get the given `cr_name` custom resource in the given `namespace`.""" + k8s_cr_api = K8S_CLUSTER_MANAGER.k8s_client_dict[CONST.K8S_CR_CLIENT_KEY] + if crd == CONST.RAY_CLUSTER_CRD: + return k8s_cr_api.get_namespaced_custom_object( + group = 'ray.io', version = 'v1alpha1', namespace = namespace, + plural = 'rayclusters', name = cr_name) + elif crd == CONST.RAY_SERVICE_CRD: + return k8s_cr_api.get_namespaced_custom_object( + group = 'ray.io', version = 'v1alpha1', namespace = namespace, + plural = 'rayservices', name = cr_name) + elif crd == CONST.RAY_JOB_CRD: + return k8s_cr_api.get_namespaced_custom_object( + group = 'ray.io', version = 'v1alpha1', namespace = namespace, + plural = 'rayjobs', name = cr_name) \ No newline at end of file diff --git a/tests/test_sample_rayjob_yamls.py b/tests/test_sample_rayjob_yamls.py new file mode 100644 index 0000000000..9375d1d562 --- /dev/null +++ b/tests/test_sample_rayjob_yamls.py @@ -0,0 +1,58 @@ +''' Test sample RayJob YAML files to catch invalid and outdated ones. ''' +import unittest +import os +import logging +import yaml + +from framework.prototype import ( + RuleSet, + GeneralTestCase, + RayJobAddCREvent, + EasyJobRule, +) + +from framework.utils import ( + CONST +) + +logger = logging.getLogger(__name__) + +if __name__ == '__main__': + NAMESPACE = 'default' + SAMPLE_PATH = CONST.REPO_ROOT.joinpath("ray-operator/config/samples/") + YAMLs = ['ray_v1alpha1_rayjob.yaml'] + + sample_yaml_files = [] + for filename in YAMLs: + filepath = SAMPLE_PATH.joinpath(filename) + with open(filepath, encoding="utf-8") as cr_yaml: + for k8s_object in yaml.safe_load_all(cr_yaml): + if k8s_object['kind'] == 'RayJob': + sample_yaml_files.append( + {'path': filepath, 'name': filename, 'cr': k8s_object} + ) + break + # TODO(architkulkarni): Add RayJobSuccessRule. Currently fails with the following error: + # Failed to start Job Supervisor actor: The name _ray_internal_job_actor_rayjob-sample-8tzrb + # (namespace=SUPERVISOR_ACTOR_RAY_NAMESPACE) is already taken. + rs = RuleSet([EasyJobRule()]) + image_dict = { + CONST.RAY_IMAGE_KEY: os.getenv('RAY_IMAGE', default='rayproject/ray:2.3.0'), + CONST.OPERATOR_IMAGE_KEY: os.getenv('OPERATOR_IMAGE', default='kuberay/operator:nightly'), + } + logger.info(image_dict) + + # Build a test plan + logger.info("Building a test plan ...") + test_cases = unittest.TestSuite() + for index, new_cr in enumerate(sample_yaml_files): + logger.info('[TEST %d]: %s', index, new_cr['name']) + addEvent = RayJobAddCREvent(new_cr['cr'], [rs], 300, NAMESPACE, new_cr['path']) + test_cases.addTest(GeneralTestCase('runtest', image_dict, addEvent)) + + # Execute all testsCRs + runner = unittest.TextTestRunner() + test_result = runner.run(test_cases) + + # Without this line, the exit code will always be 0. + assert test_result.wasSuccessful()