From b43f603553f053a186ceb0f224cf53fd704ad685 Mon Sep 17 00:00:00 2001 From: Electronic-Waste <2690692950@qq.com> Date: Tue, 30 Jul 2024 18:04:27 +0000 Subject: [PATCH] refactor(test/sdk): add run-e2e-tune-api.py. Signed-off-by: Electronic-Waste <2690692950@qq.com> --- .../v1beta1/scripts/gh-actions/build-load.sh | 5 + .../scripts/gh-actions/run-e2e-experiment.py | 139 +---------------- .../scripts/gh-actions/run-e2e-experiment.sh | 3 + .../scripts/gh-actions/run-e2e-tune-api.py | 90 +++++++++++ test/e2e/v1beta1/scripts/gh-actions/verify.py | 143 ++++++++++++++++++ 5 files changed, 242 insertions(+), 138 deletions(-) create mode 100644 test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py create mode 100644 test/e2e/v1beta1/scripts/gh-actions/verify.py diff --git a/test/e2e/v1beta1/scripts/gh-actions/build-load.sh b/test/e2e/v1beta1/scripts/gh-actions/build-load.sh index 2ce492da79a..52328a07ac0 100755 --- a/test/e2e/v1beta1/scripts/gh-actions/build-load.sh +++ b/test/e2e/v1beta1/scripts/gh-actions/build-load.sh @@ -34,6 +34,7 @@ TAG="e2e-test" VERSION="v1beta1" CMD_PREFIX="cmd" SPECIFIED_DEVICE_TYPE_IMAGES=("enas-cnn-cifar10-cpu" "darts-cnn-cifar10-cpu" "pytorch-mnist-cpu") +DEFAULT_IMAGE_FOR_TUNE="docker.io/tensorflow/tensorflow:2.13.0" IFS="," read -r -a TRIAL_IMAGE_ARRAY <<< "$TRIAL_IMAGES" IFS="," read -r -a EXPERIMENT_ARRAY <<< "$EXPERIMENTS" @@ -162,6 +163,10 @@ for name in "${TRIAL_IMAGE_ARRAY[@]}"; do run "$name" "examples/$VERSION/trial-images/$name/Dockerfile" done +# Testing image for tune function +echo -e "\nPulling testing image for tune function..." +docker pull $DEFAULT_IMAGE_FOR_TUNE + echo -e "\nCleanup Build Cache...\n" docker buildx prune -f diff --git a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py index b6e1f031415..430f72f3e56 100644 --- a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py +++ b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.py @@ -1,6 +1,5 @@ import argparse import logging -import time from kubeflow.katib import ApiClient from kubeflow.katib import KatibClient @@ -9,6 +8,7 @@ from kubeflow.katib.constants import constants from kubeflow.katib.utils.utils import FakeResponse from kubernetes import client +from verify import verify_experiment_results import yaml # Experiment timeout is 40 min. @@ -18,143 +18,6 @@ logging.basicConfig(level=logging.INFO) -def verify_experiment_results( - katib_client: KatibClient, - experiment: models.V1beta1Experiment, - exp_name: str, - exp_namespace: str, -): - - # Get the best objective metric. - best_objective_metric = None - for metric in experiment.status.current_optimal_trial.observation.metrics: - if metric.name == experiment.spec.objective.objective_metric_name: - best_objective_metric = metric - break - - if best_objective_metric is None: - raise Exception( - "Unable to get the best metrics for objective: {}. Current Optimal Trial: {}".format( - experiment.spec.objective.objective_metric_name, - experiment.status.current_optimal_trial, - ) - ) - - # Get Experiment Succeeded reason. - for c in experiment.status.conditions: - if ( - c.type == constants.EXPERIMENT_CONDITION_SUCCEEDED - and c.status == constants.CONDITION_STATUS_TRUE - ): - succeeded_reason = c.reason - break - - trials_completed = experiment.status.trials_succeeded or 0 - trials_completed += experiment.status.trials_early_stopped or 0 - max_trial_count = experiment.spec.max_trial_count - - # If Experiment is Succeeded because of Max Trial Reached, all Trials must be completed. - if ( - succeeded_reason == "ExperimentMaxTrialsReached" - and trials_completed != max_trial_count - ): - raise Exception( - "All Trials must be Completed. Max Trial count: {}, Experiment status: {}".format( - max_trial_count, experiment.status - ) - ) - - # If Experiment is Succeeded because of Goal reached, the metrics must be correct. - if succeeded_reason == "ExperimentGoalReached" and ( - ( - experiment.spec.objective.type == "minimize" - and float(best_objective_metric.min) > float(experiment.spec.objective.goal) - ) - or ( - experiment.spec.objective.type == "maximize" - and float(best_objective_metric.max) < float(experiment.spec.objective.goal) - ) - ): - raise Exception( - "Experiment goal is reached, but metrics are incorrect. " - f"Experiment objective: {experiment.spec.objective}. " - f"Experiment best objective metric: {best_objective_metric}" - ) - - # Verify Suggestion's resources. Suggestion name = Experiment name. - suggestion = katib_client.get_suggestion(exp_name, exp_namespace) - - # For the Never or FromVolume resume policies Suggestion must be Succeeded. - # For the LongRunning resume policy Suggestion must be always Running. - for c in suggestion.status.conditions: - if ( - c.type == constants.EXPERIMENT_CONDITION_SUCCEEDED - and c.status == constants.CONDITION_STATUS_TRUE - and experiment.spec.resume_policy == "LongRunning" - ): - raise Exception( - f"Suggestion is Succeeded while Resume Policy is {experiment.spec.resume_policy}." - f"Suggestion conditions: {suggestion.status.conditions}" - ) - elif ( - c.type == constants.EXPERIMENT_CONDITION_RUNNING - and c.status == constants.CONDITION_STATUS_TRUE - and experiment.spec.resume_policy != "LongRunning" - ): - raise Exception( - f"Suggestion is Running while Resume Policy is {experiment.spec.resume_policy}." - f"Suggestion conditions: {suggestion.status.conditions}" - ) - - # For Never and FromVolume resume policies verify Suggestion's resources. - if ( - experiment.spec.resume_policy == "Never" - or experiment.spec.resume_policy == "FromVolume" - ): - resource_name = exp_name + "-" + experiment.spec.algorithm.algorithm_name - - # Suggestion's Service and Deployment should be deleted. - for i in range(10): - try: - client.AppsV1Api().read_namespaced_deployment( - resource_name, exp_namespace - ) - except client.ApiException as e: - if e.status == 404: - break - else: - raise e - # Deployment deletion might take some time. - time.sleep(1) - if i == 10: - raise Exception( - "Suggestion Deployment is still alive for Resume Policy: {}".format( - experiment.spec.resume_policy - ) - ) - - try: - client.CoreV1Api().read_namespaced_service(resource_name, exp_namespace) - except client.ApiException as e: - if e.status != 404: - raise e - else: - raise Exception( - "Suggestion Service is still alive for Resume Policy: {}".format( - experiment.spec.resume_policy - ) - ) - - # For FromVolume resume policy PVC should not be deleted. - if experiment.spec.resume_policy == "FromVolume": - try: - client.CoreV1Api().read_namespaced_persistent_volume_claim( - resource_name, exp_namespace - ) - except client.ApiException: - raise Exception("PVC is deleted for FromVolume Resume Policy") - - def run_e2e_experiment( katib_client: KatibClient, experiment: models.V1beta1Experiment, diff --git a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.sh b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.sh index 5a20faa6934..93f26012eb4 100755 --- a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.sh +++ b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-experiment.sh @@ -48,4 +48,7 @@ for exp_name in "${EXPERIMENT_FILE_ARRAY[@]}"; do --verbose || (kubectl get pods -n kubeflow && exit 1) done +python run-e2e-tune-api.py --namespace default \ +--verbose || (kubectl get pods -n kubeflow && exit 1) + exit 0 diff --git a/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py new file mode 100644 index 00000000000..4866c855988 --- /dev/null +++ b/test/e2e/v1beta1/scripts/gh-actions/run-e2e-tune-api.py @@ -0,0 +1,90 @@ +import argparse +import logging + +from kubeflow.katib import KatibClient +from kubeflow.katib import search +from verify import verify_experiment_results +import yaml + +# Experiment timeout is 40 min. +EXPERIMENT_TIMEOUT = 60 * 40 + +# The default logging config. +logging.basicConfig(level=logging.INFO) + + +def run_e2e_experiment_create_by_tune( + katib_client: KatibClient, + exp_name: str, + exp_namespace: str, +): + # Create Katib Experiment and wait until it is finished. + logging.debug("Creating Experiment: {}/{}".format(exp_namespace, exp_name)) + + # Use the test case from get-started tutorial. + # https://www.kubeflow.org/docs/components/katib/getting-started/#getting-started-with-katib-python-sdk + # [1] Create an objective function. + def objective(parameters): + result = 4 * int(parameters["a"]) - float(parameters["b"]) ** 2 + print(f"result={result}") + + # [2] Create hyperparameter search space. + parameters = { + "a": search.int(min=10, max=20), + "b": search.double(min=0.1, max=0.2) + } + + # [3] Create Katib Experiment with 4 Trials and 2 CPUs per Trial. + # And Wait until Experiment reaches Succeeded condition. + katib_client.tune( + name=exp_name, + namespace=exp_namespace, + objective=objective, + parameters=parameters, + objective_metric_name="result", + max_trial_count=4, + resources_per_trial={"cpu": "2"}, + ) + experiment = katib_client.wait_for_experiment_condition( + exp_name, exp_namespace, timeout=EXPERIMENT_TIMEOUT + ) + + # Verify the Experiment results. + verify_experiment_results(katib_client, experiment, exp_name, exp_namespace) + + # Print the Experiment and Suggestion. + logging.debug(katib_client.get_experiment(exp_name, exp_namespace)) + logging.debug(katib_client.get_suggestion(exp_name, exp_namespace)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--namespace", type=str, required=True, help="Namespace for the Katib E2E test", + ) + parser.add_argument( + "--trial-pod-annotations", type=str, help="Annotation for the pod created by trial", + ) + parser.add_argument( + "--verbose", action="store_true", help="Verbose output for the Katib E2E test", + ) + args = parser.parse_args() + + katib_client = KatibClient() + + # Test with run_e2e_experiment_create_by_tune + exp_name = "tune-example" + exp_namespace = args.namespace + try: + run_e2e_experiment_create_by_tune(katib_client, exp_name, exp_namespace) + logging.info("---------------------------------------------------------------") + logging.info(f"E2E is succeeded for Experiment created by tune: {exp_namespace}/{exp_name}") + except Exception as e: + logging.info("---------------------------------------------------------------") + logging.info(f"E2E is failed for Experiment created by tune: {exp_namespace}/{exp_name}") + raise e + finally: + # Delete the Experiment. + logging.info("---------------------------------------------------------------") + logging.info("---------------------------------------------------------------") + katib_client.delete_experiment(exp_name, exp_namespace) \ No newline at end of file diff --git a/test/e2e/v1beta1/scripts/gh-actions/verify.py b/test/e2e/v1beta1/scripts/gh-actions/verify.py new file mode 100644 index 00000000000..b49388cd237 --- /dev/null +++ b/test/e2e/v1beta1/scripts/gh-actions/verify.py @@ -0,0 +1,143 @@ +import time + +from kubeflow.katib import KatibClient +from kubeflow.katib import models +from kubeflow.katib.constants import constants +from kubernetes import client + + +def verify_experiment_results( + katib_client: KatibClient, + experiment: models.V1beta1Experiment, + exp_name: str, + exp_namespace: str, +): + + # Get the best objective metric. + best_objective_metric = None + for metric in experiment.status.current_optimal_trial.observation.metrics: + if metric.name == experiment.spec.objective.objective_metric_name: + best_objective_metric = metric + break + + if best_objective_metric is None: + raise Exception( + "Unable to get the best metrics for objective: {}. Current Optimal Trial: {}".format( + experiment.spec.objective.objective_metric_name, + experiment.status.current_optimal_trial, + ) + ) + + # Get Experiment Succeeded reason. + for c in experiment.status.conditions: + if ( + c.type == constants.EXPERIMENT_CONDITION_SUCCEEDED + and c.status == constants.CONDITION_STATUS_TRUE + ): + succeeded_reason = c.reason + break + + trials_completed = experiment.status.trials_succeeded or 0 + trials_completed += experiment.status.trials_early_stopped or 0 + max_trial_count = experiment.spec.max_trial_count + + # If Experiment is Succeeded because of Max Trial Reached, all Trials must be completed. + if ( + succeeded_reason == "ExperimentMaxTrialsReached" + and trials_completed != max_trial_count + ): + raise Exception( + "All Trials must be Completed. Max Trial count: {}, Experiment status: {}".format( + max_trial_count, experiment.status + ) + ) + + # If Experiment is Succeeded because of Goal reached, the metrics must be correct. + if succeeded_reason == "ExperimentGoalReached" and ( + ( + experiment.spec.objective.type == "minimize" + and float(best_objective_metric.min) > float(experiment.spec.objective.goal) + ) + or ( + experiment.spec.objective.type == "maximize" + and float(best_objective_metric.max) < float(experiment.spec.objective.goal) + ) + ): + raise Exception( + "Experiment goal is reached, but metrics are incorrect. " + f"Experiment objective: {experiment.spec.objective}. " + f"Experiment best objective metric: {best_objective_metric}" + ) + + # Verify Suggestion's resources. Suggestion name = Experiment name. + suggestion = katib_client.get_suggestion(exp_name, exp_namespace) + + # For the Never or FromVolume resume policies Suggestion must be Succeeded. + # For the LongRunning resume policy Suggestion must be always Running. + for c in suggestion.status.conditions: + if ( + c.type == constants.EXPERIMENT_CONDITION_SUCCEEDED + and c.status == constants.CONDITION_STATUS_TRUE + and experiment.spec.resume_policy == "LongRunning" + ): + raise Exception( + f"Suggestion is Succeeded while Resume Policy is {experiment.spec.resume_policy}." + f"Suggestion conditions: {suggestion.status.conditions}" + ) + elif ( + c.type == constants.EXPERIMENT_CONDITION_RUNNING + and c.status == constants.CONDITION_STATUS_TRUE + and experiment.spec.resume_policy != "LongRunning" + ): + raise Exception( + f"Suggestion is Running while Resume Policy is {experiment.spec.resume_policy}." + f"Suggestion conditions: {suggestion.status.conditions}" + ) + + # For Never and FromVolume resume policies verify Suggestion's resources. + if ( + experiment.spec.resume_policy == "Never" + or experiment.spec.resume_policy == "FromVolume" + ): + resource_name = exp_name + "-" + experiment.spec.algorithm.algorithm_name + + # Suggestion's Service and Deployment should be deleted. + for i in range(10): + try: + client.AppsV1Api().read_namespaced_deployment( + resource_name, exp_namespace + ) + except client.ApiException as e: + if e.status == 404: + break + else: + raise e + # Deployment deletion might take some time. + time.sleep(1) + if i == 10: + raise Exception( + "Suggestion Deployment is still alive for Resume Policy: {}".format( + experiment.spec.resume_policy + ) + ) + + try: + client.CoreV1Api().read_namespaced_service(resource_name, exp_namespace) + except client.ApiException as e: + if e.status != 404: + raise e + else: + raise Exception( + "Suggestion Service is still alive for Resume Policy: {}".format( + experiment.spec.resume_policy + ) + ) + + # For FromVolume resume policy PVC should not be deleted. + if experiment.spec.resume_policy == "FromVolume": + try: + client.CoreV1Api().read_namespaced_persistent_volume_claim( + resource_name, exp_namespace + ) + except client.ApiException: + raise Exception("PVC is deleted for FromVolume Resume Policy") \ No newline at end of file