From ab31ed07d563d8a6790e5a754c0a0d5ea7d1183f Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Tue, 24 Sep 2024 18:25:47 +0000 Subject: [PATCH 1/4] add automatic testing and aggregation to OSB Signed-off-by: Michael Oviedo --- osbenchmark/benchmark.py | 41 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 32f28e90..1d57c13f 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -28,6 +28,7 @@ import os import platform import sys +import threading import time import uuid @@ -634,8 +635,18 @@ def add_workload_source(subparser): action="store_true", default=False) - for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, download_parser, install_parser, - start_parser, stop_parser, info_parser, create_workload_parser]: + auto_aggregate_parser = subparsers.add_parser("auto-aggregate", + add_help=False, + help="Run multiple test executions with the same configuration and aggregate the results", + parents=[test_execution_parser]) + auto_aggregate_parser.add_argument( + "--test-iterations", + type=int, + required=True, + help="Number of test executions to run and aggregate") + + for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, auto_aggregate_parser, + download_parser, install_parser, start_parser, stop_parser, info_parser, create_workload_parser]: # This option is needed to support a separate configuration for the integration tests on the same machine p.add_argument( "--configuration-name", @@ -863,6 +874,30 @@ def prepare_test_executions_dict(args, cfg): test_executions_dict[execution] = None return test_executions_dict +def run_and_aggregate(arg_parser, args, cfg): + semaphore = threading.Semaphore(1) + test_exes = [] + args.subcommand = "execute-test" + aggregate_id = args.test_execution_id + + for _ in range(args.test_iterations): + console.info(f"Running test {_ + 1}...") + args.test_execution_id = str(uuid.uuid4()) # we reuse the same args for each test so need to refresh the id + test_exes.append(args.test_execution_id) + with semaphore: + dispatch_sub_command(arg_parser, args, cfg) + + console.info(f"Test executions: {', '.join(test_exes)}") + console.info("Aggregating results...") + aggregate_args = arg_parser.parse_args([ + "aggregate", + f"--test-executions={','.join(test_exes)}", + f"--test-execution-id={aggregate_id}", + f"--results-file={args.results_file}", + f"--workload-repository={args.workload_repository}" + ]) + dispatch_sub_command(arg_parser, aggregate_args, cfg) + def print_test_execution_id(args): console.info(f"[Test Execution ID]: {args.test_execution_id}") @@ -881,6 +916,8 @@ def dispatch_sub_command(arg_parser, args, cfg): test_executions_dict = prepare_test_executions_dict(args, cfg) aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args) aggregator_instance.aggregate() + elif sub_command == "auto-aggregate": + run_and_aggregate(arg_parser, args, cfg) elif sub_command == "list": cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration) cfg.add(config.Scope.applicationOverride, "system", "list.test_executions.max_results", args.limit) From fab416674d5638080e0bd5c3aee9fbdf71a89296 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Fri, 27 Sep 2024 17:13:41 +0000 Subject: [PATCH 2/4] move auto-aggregation logic to execute-test subcommand Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 1 + osbenchmark/benchmark.py | 168 ++++++++++++++++++++------------------ 2 files changed, 89 insertions(+), 80 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index 49827fda..d57720ba 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -169,6 +169,7 @@ def build_aggregated_results(self): test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure) test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision) + test_execution.user_tags = list(self.test_executions.keys()) test_execution.add_results(AggregatedResults(aggregated_results)) test_execution.distribution_version = test_exe.distribution_version test_execution.revision = test_exe.revision diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 1d57c13f..967971b8 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -28,7 +28,6 @@ import os import platform import sys -import threading import time import uuid @@ -615,6 +614,25 @@ def add_workload_source(subparser): f"high values favor the most common queries. " f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA}).", default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA) + test_execution_parser.add_argument( + "--test-iterations", + help="The number of times to run the workload (default: 1).", + default=1) + test_execution_parser.add_argument( + "--aggregate", + type=lambda x: (str(x).lower() in ['true', '1', 'yes', 'y']), + help="Aggregate the results of multiple test executions (default: true).", + default=True) + test_execution_parser.add_argument( + "--sleep-timer", + help="Sleep for the specified number of seconds before starting the next test execution (default: 5).", + default=5) + test_execution_parser.add_argument( + "--cancel-on-error", + action="store_true", + help="Stop executing tests if an error occurs in one of the test iterations (default: false).", + default=False + ) ############################################################################### # @@ -635,17 +653,7 @@ def add_workload_source(subparser): action="store_true", default=False) - auto_aggregate_parser = subparsers.add_parser("auto-aggregate", - add_help=False, - help="Run multiple test executions with the same configuration and aggregate the results", - parents=[test_execution_parser]) - auto_aggregate_parser.add_argument( - "--test-iterations", - type=int, - required=True, - help="Number of test executions to run and aggregate") - - for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, auto_aggregate_parser, + for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, download_parser, install_parser, start_parser, stop_parser, info_parser, create_workload_parser]: # This option is needed to support a separate configuration for the integration tests on the same machine p.add_argument( @@ -874,29 +882,48 @@ def prepare_test_executions_dict(args, cfg): test_executions_dict[execution] = None return test_executions_dict -def run_and_aggregate(arg_parser, args, cfg): - semaphore = threading.Semaphore(1) - test_exes = [] - args.subcommand = "execute-test" - aggregate_id = args.test_execution_id - - for _ in range(args.test_iterations): - console.info(f"Running test {_ + 1}...") - args.test_execution_id = str(uuid.uuid4()) # we reuse the same args for each test so need to refresh the id - test_exes.append(args.test_execution_id) - with semaphore: - dispatch_sub_command(arg_parser, args, cfg) - - console.info(f"Test executions: {', '.join(test_exes)}") - console.info("Aggregating results...") - aggregate_args = arg_parser.parse_args([ - "aggregate", - f"--test-executions={','.join(test_exes)}", - f"--test-execution-id={aggregate_id}", - f"--results-file={args.results_file}", - f"--workload-repository={args.workload_repository}" - ]) - dispatch_sub_command(arg_parser, aggregate_args, cfg) +def configure_test(arg_parser, args, cfg): + # As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters + # in this section that actually belong to dedicated subcommands (like install, start or stop). Over time + # these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only + # to run the actual benchmark (i.e. generating load). + print_test_execution_id(args) + if args.effective_start_date: + cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date) + cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id) + # use the test_execution id implicitly also as the install id. + cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id) + cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline) + cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag) + cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling) + cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions) + cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error) + cfg.add( + config.Scope.applicationOverride, + "worker_coordinator", + "load_worker_coordinator_hosts", + opts.csv_to_list(args.load_worker_coordinator_hosts)) + cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) + cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles) + cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles) + cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled) + cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency) + cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n) + cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha) + configure_workload_params(arg_parser, args, cfg) + configure_connection_params(arg_parser, args, cfg) + configure_telemetry_params(args, cfg) + configure_builder_params(args, cfg) + cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) + cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision) + cfg.add(config.Scope.applicationOverride, "builder", + "provision_config_instance.plugins", opts.csv_to_list( + args.opensearch_plugins)) + cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params)) + cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install)) + cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check)) + + configure_results_publishing_params(args, cfg) def print_test_execution_id(args): console.info(f"[Test Execution ID]: {args.test_execution_id}") @@ -916,8 +943,6 @@ def dispatch_sub_command(arg_parser, args, cfg): test_executions_dict = prepare_test_executions_dict(args, cfg) aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args) aggregator_instance.aggregate() - elif sub_command == "auto-aggregate": - run_and_aggregate(arg_parser, args, cfg) elif sub_command == "list": cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration) cfg.add(config.Scope.applicationOverride, "system", "list.test_executions.max_results", args.limit) @@ -957,49 +982,32 @@ def dispatch_sub_command(arg_parser, args, cfg): cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id) builder.stop(cfg) elif sub_command == "execute-test": - # As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters - # in this section that actually belong to dedicated subcommands (like install, start or stop). Over time - # these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only - # to run the actual benchmark (i.e. generating load). - print_test_execution_id(args) - if args.effective_start_date: - cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date) - cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id) - # use the test_execution id implicitly also as the install id. - cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id) - cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline) - cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag) - cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling) - cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions) - cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error) - cfg.add( - config.Scope.applicationOverride, - "worker_coordinator", - "load_worker_coordinator_hosts", - opts.csv_to_list(args.load_worker_coordinator_hosts)) - cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode) - cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles) - cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles) - cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled) - cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency) - cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n) - cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha) - configure_workload_params(arg_parser, args, cfg) - configure_connection_params(arg_parser, args, cfg) - configure_telemetry_params(args, cfg) - configure_builder_params(args, cfg) - cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk) - cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision) - cfg.add(config.Scope.applicationOverride, "builder", - "provision_config_instance.plugins", opts.csv_to_list( - args.opensearch_plugins)) - cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params)) - cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install)) - cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check)) - - configure_results_publishing_params(args, cfg) - - execute_test(cfg, args.kill_running_processes) + iterations = int(args.test_iterations) + if iterations > 1: + test_exes = [] + for _ in range(iterations): + try: + test_exes.append(args.test_execution_id) + configure_test(arg_parser, args, cfg) + execute_test(cfg, args.kill_running_processes) + time.sleep(int(args.sleep_timer)) + args.test_execution_id = str(uuid.uuid4()) + except Exception as e: + console.error(f"Error occurred during test execution {_+1}: {str(e)}") + if args.cancel_on_error: + console.info("Cancelling remaining test executions.") + break + + if args.aggregate: + args.test_executions = test_exes + test_executions_dict = prepare_test_executions_dict(args, cfg) + aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args) + aggregator_instance.aggregate() + elif args.test_iterations == 1: + configure_test(arg_parser, args, cfg) + execute_test(cfg, args.kill_running_processes) + else: + console.info("Please enter a valid number of test iterations") elif sub_command == "create-workload": cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices) cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs) From 54febf3970e45ab232c4f2ac77be76264e571e23 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Wed, 2 Oct 2024 20:15:52 +0000 Subject: [PATCH 3/4] address a few more comments Signed-off-by: Michael Oviedo --- osbenchmark/aggregator.py | 4 +++- osbenchmark/benchmark.py | 5 +---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/osbenchmark/aggregator.py b/osbenchmark/aggregator.py index d57720ba..23622995 100644 --- a/osbenchmark/aggregator.py +++ b/osbenchmark/aggregator.py @@ -169,7 +169,9 @@ def build_aggregated_results(self): test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure) test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision) - test_execution.user_tags = list(self.test_executions.keys()) + test_execution.user_tags = { + "aggregation-of-runs": list(self.test_executions.keys()) + } test_execution.add_results(AggregatedResults(aggregated_results)) test_execution.distribution_version = test_exe.distribution_version test_execution.revision = test_exe.revision diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 967971b8..316207c2 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -631,7 +631,6 @@ def add_workload_source(subparser): "--cancel-on-error", action="store_true", help="Stop executing tests if an error occurs in one of the test iterations (default: false).", - default=False ) ############################################################################### @@ -987,10 +986,10 @@ def dispatch_sub_command(arg_parser, args, cfg): test_exes = [] for _ in range(iterations): try: - test_exes.append(args.test_execution_id) configure_test(arg_parser, args, cfg) execute_test(cfg, args.kill_running_processes) time.sleep(int(args.sleep_timer)) + test_exes.append(args.test_execution_id) args.test_execution_id = str(uuid.uuid4()) except Exception as e: console.error(f"Error occurred during test execution {_+1}: {str(e)}") @@ -1006,8 +1005,6 @@ def dispatch_sub_command(arg_parser, args, cfg): elif args.test_iterations == 1: configure_test(arg_parser, args, cfg) execute_test(cfg, args.kill_running_processes) - else: - console.info("Please enter a valid number of test iterations") elif sub_command == "create-workload": cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices) cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs) From b9d7137d31358435ffdcffaae16a82266edd5c15 Mon Sep 17 00:00:00 2001 From: Michael Oviedo Date: Wed, 2 Oct 2024 22:48:38 +0000 Subject: [PATCH 4/4] add back check for invalid test-iterations input Signed-off-by: Michael Oviedo --- osbenchmark/benchmark.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/osbenchmark/benchmark.py b/osbenchmark/benchmark.py index 316207c2..f4c4020e 100644 --- a/osbenchmark/benchmark.py +++ b/osbenchmark/benchmark.py @@ -1005,6 +1005,8 @@ def dispatch_sub_command(arg_parser, args, cfg): elif args.test_iterations == 1: configure_test(arg_parser, args, cfg) execute_test(cfg, args.kill_running_processes) + else: + console.info("Please enter a valid number of test iterations") elif sub_command == "create-workload": cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices) cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs)