diff --git a/osbenchmark/resources/workload-schema.json b/osbenchmark/resources/workload-schema.json index 54f9e2b5..41ee68d3 100644 --- a/osbenchmark/resources/workload-schema.json +++ b/osbenchmark/resources/workload-schema.json @@ -28,6 +28,11 @@ "type": "integer", "minimum": 1 }, + "ramp-up-time-period": { + "type": "integer", + "minimum": 0, + "description": "Defines the time period in seconds to gradually increase the number of clients." + }, "warmup-time-period": { "type": "integer", "minimum": 0, @@ -75,6 +80,11 @@ "minimum": 1, "description": "Defines the number of times to run the operation." }, + "ramp-up-time-period": { + "type": "integer", + "minimum": 0, + "description": "Defines the time period in seconds to gradually increase the number of clients." + }, "warmup-time-period": { "type": "integer", "minimum": 0, @@ -146,6 +156,11 @@ "minimum": 1, "description": "Defines the number of times to run the operation." }, + "ramp-up-time-period": { + "type": "integer", + "minimum": 0, + "description": "Defines the time period in seconds to gradually increase the number of clients." + }, "warmup-time-period": { "type": "integer", "minimum": 0, diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index 1b370fee..360fc640 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -1522,7 +1522,7 @@ def os_clients(all_hosts, all_client_options): # # Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we # need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB. - schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task]) + schedule = schedule_for(task_allocation, params_per_task[task]) async_executor = AsyncExecutor( client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete, task.error_behavior(self.abort_on_error), self.cfg) @@ -1607,6 +1607,15 @@ async def __call__(self, *args, **kwargs): # lazily initialize the schedule self.logger.debug("Initializing schedule for client id [%s].", self.client_id) schedule = self.schedule_handle() + self.schedule_handle.start() + rampup_wait_time = self.schedule_handle.ramp_up_wait_time + if rampup_wait_time: + self.logger.info("client id [%s] waiting [%.2f]s for ramp-up.", self.client_id, rampup_wait_time) + await asyncio.sleep(rampup_wait_time) + + if rampup_wait_time: + console.println(f" Client id {self.client_id} is running now.") + self.logger.debug("Entering main loop for client id [%s].", self.client_id) # noinspection PyBroadException try: @@ -1806,18 +1815,21 @@ def __repr__(self, *args, **kwargs): class TaskAllocation: - def __init__(self, task, client_index_in_task): + def __init__(self, task, client_index_in_task, global_client_index, total_clients): self.task = task self.client_index_in_task = client_index_in_task + self.global_client_index = global_client_index + self.total_clients = total_clients def __hash__(self): - return hash(self.task) ^ hash(self.client_index_in_task) + return hash(self.task) ^ hash(self.global_client_index) def __eq__(self, other): - return isinstance(other, type(self)) and self.task == other.task and self.client_index_in_task == other.client_index_in_task + return isinstance(other, type(self)) and self.task == other.task and self.global_client_index == other.global_client_index def __repr__(self, *args, **kwargs): - return "TaskAllocation [%d/%d] for %s" % (self.client_index_in_task, self.task.clients, self.task) + return f"TaskAllocation [{self.client_index_in_task}/{self.task.clients}] for {self.task} " \ + f"and [{self.global_client_index}/{self.total_clients}] in total" class Allocator: @@ -1863,7 +1875,11 @@ def allocations(self): physical_client_index = client_index % max_clients if sub_task.completes_parent: clients_executing_completing_task.append(physical_client_index) - allocations[physical_client_index].append(TaskAllocation(sub_task, client_index - start_client_index)) + ta = TaskAllocation(task = sub_task, + client_index_in_task = client_index - start_client_index, + global_client_index=client_index, + total_clients=task.clients) + allocations[physical_client_index].append(ta) start_client_index += sub_task.clients # uneven distribution between tasks and clients, e.g. there are 5 (parallel) tasks but only 2 clients. Then, one of them @@ -1941,7 +1957,7 @@ def clients(self): # Runs a concrete schedule on one worker client # Needs to determine the runners and concrete iterations per client. -def schedule_for(task, client_index, parameter_source): +def schedule_for(task_allocation, parameter_source): """ Calculates a client's schedule for a given task. @@ -1951,15 +1967,17 @@ def schedule_for(task, client_index, parameter_source): :return: A generator for the operations the given client needs to perform for this task. """ logger = logging.getLogger(__name__) + task = task_allocation.task op = task.operation - num_clients = task.clients sched = scheduler.scheduler_for(task) + + client_index = task_allocation.client_index_in_task # guard all logging statements with the client index and only emit them for the first client. This information is # repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent). if client_index == 0: logger.info("Choosing [%s] for [%s].", sched, task) runner_for_op = runner.runner_for(op.type) - params_for_op = parameter_source.partition(client_index, num_clients) + params_for_op = parameter_source.partition(client_index, task.clients) if hasattr(sched, "parameter_source"): if client_index == 0: logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched) @@ -1992,7 +2010,7 @@ def schedule_for(task, client_index, parameter_source): else: logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name) - return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op) + return ScheduleHandle(task_allocation, sched, loop_control, runner_for_op, params_for_op) def requires_time_period_schedule(task, task_runner, params): @@ -2009,7 +2027,7 @@ def requires_time_period_schedule(task, task_runner, params): class ScheduleHandle: - def __init__(self, task_name, sched, task_progress_control, runner, params): + def __init__(self, task_allocation, sched, task_progress_control, runner, params): """ Creates a generator that will yield individual task invocations for the provided schedule. @@ -2020,16 +2038,29 @@ def __init__(self, task_name, sched, task_progress_control, runner, params): :param params: The parameter source for a given operation. :return: A generator for the corresponding parameters. """ - self.task_name = task_name + self.task_allocation = task_allocation self.sched = sched self.task_progress_control = task_progress_control self.runner = runner self.params = params # TODO: Can we offload the parameter source execution to a different thread / process? Is this too heavy-weight? - #from concurrent.futures import ThreadPoolExecutor - #import asyncio - #self.io_pool_exc = ThreadPoolExecutor(max_workers=1) - #self.loop = asyncio.get_event_loop() + # from concurrent.futures import ThreadPoolExecutor + # import asyncio + # self.io_pool_exc = ThreadPoolExecutor(max_workers=1) + # self.loop = asyncio.get_event_loop() + @property + def ramp_up_wait_time(self): + """ + :return: the number of seconds to wait until this client should start so load can gradually ramp-up. + """ + ramp_up_time_period = self.task_allocation.task.ramp_up_time_period + if ramp_up_time_period: + return ramp_up_time_period * (self.task_allocation.global_client_index / self.task_allocation.total_clients) + else: + return 0 + + def start(self): + self.task_progress_control.start() def before_request(self, now): self.sched.before_request(now) @@ -2041,20 +2072,18 @@ async def __call__(self): next_scheduled = 0 if self.task_progress_control.infinite: param_source_knows_progress = hasattr(self.params, "percent_completed") - self.task_progress_control.start() while True: try: next_scheduled = self.sched.next(next_scheduled) # does not contribute at all to completion. Hence, we cannot define completion. percent_completed = self.params.percent_completed if param_source_knows_progress else None - #current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params) + # current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params) yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner, self.params.params()) self.task_progress_control.next() except StopIteration: return else: - self.task_progress_control.start() while not self.task_progress_control.completed: try: next_scheduled = self.sched.next(next_scheduled) @@ -2146,4 +2175,4 @@ def next(self): self._it += 1 def __str__(self): - return "iteration-count-based" + return "iteration-count-based" \ No newline at end of file diff --git a/osbenchmark/workload/loader.py b/osbenchmark/workload/loader.py index 1bced882..b1493939 100644 --- a/osbenchmark/workload/loader.py +++ b/osbenchmark/workload/loader.py @@ -1765,6 +1765,7 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name): default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False) default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False) default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False) + default_ramp_up_time_period = self._r(ops_spec, "ramp-up-time-period", error_ctx="parallel", mandatory=False) clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False) completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False) @@ -1772,7 +1773,16 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name): tasks = [] for task in self._r(ops_spec, "tasks", error_ctx="parallel"): tasks.append(self.parse_task(task, ops, test_procedure_name, default_warmup_iterations, default_iterations, - default_warmup_time_period, default_time_period, completed_by)) + default_warmup_time_period, default_time_period, default_ramp_up_time_period, completed_by)) + + for task in tasks: + if task.ramp_up_time_period != default_ramp_up_time_period: + if default_ramp_up_time_period is None: + self._error(f"task '{task.name}' in 'parallel' element of test-procedure '{test_procedure_name}' specifies " + f"a ramp-up-time-period but it is only allowed on the 'parallel' element.") + else: + self._error(f"task '{task.name}' specifies a different ramp-up-time-period than its enclosing " + f"'parallel' element in test-procedure '{test_procedure_name}'.") if completed_by: completion_task = None for task in tasks: @@ -1788,7 +1798,8 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name): return workload.Parallel(tasks, clients) def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterations=None, default_iterations=None, - default_warmup_time_period=None, default_time_period=None, completed_by_name=None): + default_warmup_time_period=None, default_time_period=None, default_ramp_up_time_period=None, + completed_by_name=None): op_spec = task_spec["operation"] if isinstance(op_spec, str) and op_spec in ops: @@ -1811,6 +1822,8 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati default_value=default_warmup_time_period), time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False, default_value=default_time_period), + ramp_up_time_period=self._r(task_spec, "ramp-up-time-period", error_ctx=op.name, + mandatory=False, default_value=default_ramp_up_time_period), clients=self._r(task_spec, "clients", error_ctx=op.name, mandatory=False, default_value=1), completes_parent=(task_name == completed_by_name), schedule=schedule, @@ -1819,11 +1832,25 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati if task.warmup_iterations is not None and task.time_period is not None: self._error( "Operation '%s' in test_procedure '%s' defines '%d' warmup iterations and a time period of '%d' seconds. Please do not " - "mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_iterations, task.time_period)) + "mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_iterations, task.time_period)) elif task.warmup_time_period is not None and task.iterations is not None: self._error( "Operation '%s' in test_procedure '%s' defines a warmup time period of '%d' seconds and '%d' iterations. Please do not " - "mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations)) + "mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations)) + + if (task.warmup_iterations is not None or task.iterations is not None) and task.ramp_up_time_period is not None: + self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of " + f"{task.ramp_up_time_period} seconds as well as {task.warmup_iterations} warmup iterations and " + f"{task.iterations} iterations but mixing time periods and iterations is not allowed.") + + if task.ramp_up_time_period is not None: + if task.warmup_time_period is None: + self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of " + f"{task.ramp_up_time_period} seconds but no warmup-time-period.") + elif task.warmup_time_period < task.ramp_up_time_period: + self._error(f"The warmup-time-period of operation '{op.name}' in test_procedure '{test_procedure_name}' is " + f"{task.warmup_time_period} seconds but must be greater than or equal to the " + f"ramp-up-time-period of {task.ramp_up_time_period} seconds.") return task diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index d221e22e..ac2f8be9 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -894,7 +894,8 @@ class Task: IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST = ["non-fatal"] def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations=None, iterations=None, - warmup_time_period=None, time_period=None, clients=1, completes_parent=False, schedule=None, params=None): + warmup_time_period=None, time_period=None, ramp_up_time_period=None, clients=1, completes_parent=False, + schedule=None, params=None): self.name = name self.operation = operation if isinstance(tags, str): @@ -908,6 +909,7 @@ def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations self.iterations = iterations self.warmup_time_period = warmup_time_period self.time_period = time_period + self.ramp_up_time_period = ramp_up_time_period self.clients = clients self.completes_parent = completes_parent self.schedule = schedule @@ -988,16 +990,18 @@ def error_behavior(self, default_error_behavior): def __hash__(self): # Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task) return hash(self.name) ^ hash(self.operation) ^ hash(self.warmup_iterations) ^ hash(self.iterations) ^ \ - hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ \ - hash(self.completes_parent) + hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.ramp_up_time_period) ^ \ + hash(self.clients) ^ hash(self.schedule) ^ hash(self.completes_parent) def __eq__(self, other): # Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task) return isinstance(other, type(self)) and (self.name, self.operation, self.warmup_iterations, self.iterations, - self.warmup_time_period, self.time_period, self.clients, self.schedule, - self.completes_parent) == (other.name, other.operation, other.warmup_iterations, + self.warmup_time_period, self.time_period, self.ramp_up_time_period, + self.clients, self.schedule,self.completes_parent) == (other.name, + other.operation, other.warmup_iterations, other.iterations, other.warmup_time_period, other.time_period, - other.clients, other.schedule, other.completes_parent) + self.ramp_up_time_period, other.clients, other.schedule, + other.completes_parent) def __iter__(self): return iter([self]) diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index b43bd53b..ca98cc1c 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -3435,9 +3435,15 @@ def params(self): runner.register_runner(operation_type=workload.OperationType.VectorSearch, runner=runner.Query(), async_runner=True) param_source = workload.operation_parameters(test_workload, task) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) # pylint: disable=C0415 import threading - schedule = worker_coordinator.schedule_for(task, 0, param_source) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) # pylint: disable=C0415 def create_config(): cfg = config.Config() @@ -3570,9 +3576,15 @@ def params(self): runner.register_runner(operation_type=workload.OperationType.VectorSearch, runner=runner.Query(), async_runner=True) param_source = workload.operation_parameters(test_workload, task) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) # pylint: disable=C0415 import threading - schedule = worker_coordinator.schedule_for(task, 0, param_source) + schedule = worker_coordinator.schedule_for(task_allocation, param_source) def create_config(): cfg = config.Config() cfg.add(config.Scope.application, "system", "available.cores", 8) @@ -7313,4 +7325,4 @@ async def test_concurrent_segment_search_settings(self, opensearch, on_client_re "search.concurrent_segment_search.enabled": "true", "search.concurrent.max_slice_count": 2 } - }) + }) \ No newline at end of file diff --git a/tests/worker_coordinator/worker_coordinator_test.py b/tests/worker_coordinator/worker_coordinator_test.py index 47972ad4..1334c839 100644 --- a/tests/worker_coordinator/worker_coordinator_test.py +++ b/tests/worker_coordinator/worker_coordinator_test.py @@ -577,8 +577,10 @@ class AllocatorTests(TestCase): def setUp(self): params.register_param_source_for_name("worker-coordinator-test-param-source", WorkerCoordinatorTestParamSource) - def ta(self, task, client_index_in_task): - return worker_coordinator.TaskAllocation(task, client_index_in_task) + def ta(self, task, client_index_in_task, global_client_index=None, total_clients=None): + return worker_coordinator.TaskAllocation(task, client_index_in_task, + client_index_in_task if global_client_index is None else global_client_index, + task.clients if total_clients is None else total_clients) def test_allocates_one_task(self): task = workload.Task("index", op("index", workload.OperationType.Bulk)) @@ -672,11 +674,24 @@ def test_allocates_more_tasks_than_clients(self): # join_point, index_a, index_c, index_e, join_point self.assertEqual(5, len(allocations[0])) # we really have no chance to extract the join point so we just take what is there... - self.assertEqual([allocations[0][0], self.ta(index_a, 0), self.ta(index_c, 0), self.ta(index_e, 0), allocations[0][4]], + self.assertEqual([allocations[0][0], + self.ta(index_a, client_index_in_task=0, + global_client_index=0, total_clients=2), + self.ta(index_c, client_index_in_task=0, + global_client_index=2, total_clients=2), + self.ta(index_e, client_index_in_task=0, + global_client_index=4, total_clients=2), + allocations[0][4]], allocations[0]) # join_point, index_a, index_c, None, join_point self.assertEqual(5, len(allocator.allocations[1])) - self.assertEqual([allocations[1][0], self.ta(index_b, 0), self.ta(index_d, 0), None, allocations[1][4]], allocations[1]) + self.assertEqual([allocations[1][0], + self.ta(index_b, client_index_in_task=0, + global_client_index=1, total_clients=2), + self.ta(index_d, client_index_in_task=0, + global_client_index=3, total_clients=2), + None, allocations[1][4]], + allocations[1]) self.assertEqual([{index_a, index_b, index_c, index_d, index_e}], allocator.tasks_per_joinpoint) self.assertEqual(2, len(allocator.join_points)) @@ -703,16 +718,32 @@ def test_considers_number_of_clients_per_subtask(self): # join_point, index_a, index_c, join_point self.assertEqual(4, len(allocations[0])) # we really have no chance to extract the join point so we just take what is there... - self.assertEqual([allocations[0][0], self.ta(index_a, 0), self.ta(index_c, 1), allocations[0][3]], allocations[0]) + self.assertEqual([allocations[0][0], + self.ta(index_a, client_index_in_task=0, + global_client_index=0, total_clients=3), + self.ta(index_c, client_index_in_task=1, + global_client_index=3, total_clients=3), + allocations[0][3]], + allocations[0]) # task that client 1 will execute: # join_point, index_b, None, join_point self.assertEqual(4, len(allocator.allocations[1])) - self.assertEqual([allocations[1][0], self.ta(index_b, 0), None, allocations[1][3]], allocations[1]) + self.assertEqual([allocations[1][0], + self.ta(index_b, client_index_in_task=0, + global_client_index=1, total_clients=3), + None, + allocations[1][3]], + allocations[1]) # tasks that client 2 will execute: self.assertEqual(4, len(allocator.allocations[2])) - self.assertEqual([allocations[2][0], self.ta(index_c, 0), None, allocations[2][3]], allocations[2]) + self.assertEqual([allocations[2][0], + self.ta(index_c, client_index_in_task=0, + global_client_index=2, total_clients=3), + None, + allocations[2][3]], + allocations[2]) self.assertEqual([{index_a, index_b, index_c}], allocator.tasks_per_joinpoint) @@ -839,6 +870,7 @@ def next(self, current): async def assert_schedule(self, expected_schedule, schedule_handle, infinite_schedule=False): idx = 0 + schedule_handle.start() async for invocation_time, sample_type, progress_percent, runner, params in schedule_handle(): schedule_handle.before_request(now=idx) exp_invocation_time, exp_sample_type, exp_progress_percent, exp_params = expected_schedule[idx] @@ -882,7 +914,14 @@ def test_injects_parameter_source_into_scheduler(self): }) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) self.assertIsNotNone(schedule.sched.parameter_source, "Parameter source has not been injected into scheduler") self.assertEqual(param_source, schedule.sched.parameter_source) @@ -893,7 +932,14 @@ async def test_search_task_one_client(self): param_source="worker-coordinator-test-param-source"), warmup_iterations=3, iterations=5, clients=1, params={"target-throughput": 10, "clients": 1}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) expected_schedule = [ (0, metrics.SampleType.Warmup, 1 / 8, {}), @@ -913,7 +959,14 @@ async def test_search_task_two_clients(self): param_source="worker-coordinator-test-param-source"), warmup_iterations=1, iterations=5, clients=2, params={"target-throughput": 10, "clients": 2}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) expected_schedule = [ (0, metrics.SampleType.Warmup, 1 / 6, {}), @@ -934,7 +987,14 @@ async def test_schedule_param_source_determines_iterations_no_warmup(self): clients=4, params={"target-throughput": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, 1 / 3, {"body": ["a"], "size": 3}), @@ -950,7 +1010,14 @@ async def test_schedule_param_source_determines_iterations_including_warmup(self warmup_iterations=2, clients=4, params={"target-throughput": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Warmup, 1 / 5, {"body": ["a"], "size": 5}), @@ -969,7 +1036,14 @@ async def test_schedule_defaults_to_iteration_based(self): clients=1, params={"target-throughput": 4, "clients": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, 1 / 1, {"body": ["a"]}), @@ -983,7 +1057,14 @@ async def test_schedule_for_warmup_time_based(self): warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, 1 / 11, {"body": ["a"], "size": 11}), @@ -1007,7 +1088,14 @@ async def test_infinite_schedule_without_progress_indication(self): warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, None, {"body": ["a"]}), @@ -1020,19 +1108,32 @@ async def test_infinite_schedule_without_progress_indication(self): @run_async async def test_finite_schedule_with_progress_indication(self): task = workload.Task("time-based", workload.Operation("time-based", workload.OperationType.Bulk.to_hyphenated_string(), - params={"body": ["a"], "size": 5}, - param_source="worker-coordinator-test-param-source"), - warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4}) + params={ + "body": ["a"], "size": 5}, + param_source="worker-coordinator-test-param-source"), + warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) await self.assert_schedule([ - (0.0, metrics.SampleType.Normal, 1 / 5, {"body": ["a"], "size": 5}), - (1.0, metrics.SampleType.Normal, 2 / 5, {"body": ["a"], "size": 5}), - (2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}), - (3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}), - (4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}), + (0.0, metrics.SampleType.Normal, + 1 / 5, {"body": ["a"], "size": 5}), + (1.0, metrics.SampleType.Normal, + 2 / 5, {"body": ["a"], "size": 5}), + (2.0, metrics.SampleType.Normal, + 3 / 5, {"body": ["a"], "size": 5}), + (3.0, metrics.SampleType.Normal, + 4 / 5, {"body": ["a"], "size": 5}), + (4.0, metrics.SampleType.Normal, + 5 / 5, {"body": ["a"], "size": 5}), ], schedule, infinite_schedule=False) @run_async @@ -1044,7 +1145,14 @@ async def test_schedule_with_progress_determined_by_runner(self): params={"target-throughput": 1, "clients": 1}) param_source = workload.operation_parameters(self.test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) await self.assert_schedule([ (0.0, metrics.SampleType.Normal, None, {"body": ["a"]}), @@ -1064,7 +1172,16 @@ async def test_schedule_for_time_based(self): clients=1) param_source = workload.operation_parameters(self.test_workload, task) - schedule_handle = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation( + task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients + ) + schedule_handle = worker_coordinator.schedule_for( + task_allocation, param_source) + schedule_handle.start() + self.assertEqual(0.0, schedule_handle.ramp_up_wait_time) schedule = schedule_handle() last_progress = -1 @@ -1198,8 +1315,12 @@ async def test_execute_schedule_in_throughput_mode(self, opensearch, on_client_r param_source="worker-coordinator-test-param-source"), warmup_time_period=0, clients=4) param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) - + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) sampler = worker_coordinator.Sampler(start_timestamp=task_start) cancel = threading.Event() complete = threading.Event() @@ -1253,7 +1374,12 @@ async def test_execute_schedule_with_progress_determined_by_runner(self, opensea "size": None }, param_source="worker-coordinator-test-param-source"), warmup_time_period=0, clients=4) param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) sampler = worker_coordinator.Sampler(start_timestamp=task_start) cancel = threading.Event() @@ -1315,7 +1441,12 @@ async def test_execute_schedule_runner_overrides_times(self, opensearch): param_source="worker-coordinator-test-param-source"), warmup_iterations=0, iterations=1, clients=1) param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) sampler = worker_coordinator.Sampler(start_timestamp=task_start) cancel = threading.Event() @@ -1394,7 +1525,12 @@ def perform_request(*args, **kwargs): complete = threading.Event() param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) execute_schedule = worker_coordinator.AsyncExecutor(client_id=0, task=task, schedule=schedule, @@ -1446,7 +1582,12 @@ async def test_cancel_execute_schedule(self, opensearch): params={"target-throughput": target_throughput, "clients": 4}) param_source = workload.operation_parameters(test_workload, task) - schedule = worker_coordinator.schedule_for(task, 0, param_source) + task_allocation = worker_coordinator.TaskAllocation(task=task, + client_index_in_task=0, + global_client_index=0, + total_clients=task.clients) + schedule = worker_coordinator.schedule_for( + task_allocation, param_source) sampler = worker_coordinator.Sampler(start_timestamp=0) cancel = threading.Event() @@ -1482,12 +1623,19 @@ def run(*args, **kwargs): raise ExpectedUnitTestException() class ScheduleHandle: + + def __init__(self): + self.ramp_up_wait_time = 0 + def before_request(self, now): pass def after_request(self, now, weight, unit, meta_data): pass + def start(self): + pass + async def __call__(self): invocations = [(0, metrics.SampleType.Warmup, 0, AsyncExecutorTests.context_managed(run), None)] for invocation in invocations: diff --git a/tests/workload/loader_test.py b/tests/workload/loader_test.py index adfadf42..9386dcae 100644 --- a/tests/workload/loader_test.py +++ b/tests/workload/loader_test.py @@ -13,7 +13,7 @@ # not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an @@ -77,8 +77,10 @@ def test_workload_from_directory(self, is_dir, path_exists): repo = loader.SimpleWorkloadRepository("/path/to/workload/unit-test") self.assertEqual("unit-test", repo.workload_name) self.assertEqual(["unit-test"], repo.workload_names) - self.assertEqual("/path/to/workload/unit-test", repo.workload_dir("unit-test")) - self.assertEqual("/path/to/workload/unit-test/workload.json", repo.workload_file("unit-test")) + self.assertEqual("/path/to/workload/unit-test", + repo.workload_dir("unit-test")) + self.assertEqual("/path/to/workload/unit-test/workload.json", + repo.workload_file("unit-test")) @mock.patch("os.path.exists") @mock.patch("os.path.isdir") @@ -88,11 +90,14 @@ def test_workload_from_file(self, is_file, is_dir, path_exists): is_dir.return_value = False path_exists.return_value = True - repo = loader.SimpleWorkloadRepository("/path/to/workload/unit-test/my-workload.json") + repo = loader.SimpleWorkloadRepository( + "/path/to/workload/unit-test/my-workload.json") self.assertEqual("my-workload", repo.workload_name) self.assertEqual(["my-workload"], repo.workload_names) - self.assertEqual("/path/to/workload/unit-test", repo.workload_dir("my-workload")) - self.assertEqual("/path/to/workload/unit-test/my-workload.json", repo.workload_file("my-workload")) + self.assertEqual("/path/to/workload/unit-test", + repo.workload_dir("my-workload")) + self.assertEqual("/path/to/workload/unit-test/my-workload.json", + repo.workload_file("my-workload")) @mock.patch("os.path.exists") @mock.patch("os.path.isdir") @@ -103,15 +108,18 @@ def test_workload_from_named_pipe(self, is_file, is_dir, path_exists): path_exists.return_value = True with self.assertRaises(exceptions.SystemSetupError) as ctx: - loader.SimpleWorkloadRepository("a named pipe cannot point to a workload") - self.assertEqual("a named pipe cannot point to a workload is neither a file nor a directory", ctx.exception.args[0]) + loader.SimpleWorkloadRepository( + "a named pipe cannot point to a workload") + self.assertEqual( + "a named pipe cannot point to a workload is neither a file nor a directory", ctx.exception.args[0]) @mock.patch("os.path.exists") def test_workload_from_non_existing_path(self, path_exists): path_exists.return_value = False with self.assertRaises(exceptions.SystemSetupError) as ctx: loader.SimpleWorkloadRepository("/path/does/not/exist") - self.assertEqual("Workload path /path/does/not/exist does not exist", ctx.exception.args[0]) + self.assertEqual( + "Workload path /path/does/not/exist does not exist", ctx.exception.args[0]) @mock.patch("os.path.isdir") @mock.patch("os.path.exists") @@ -121,7 +129,8 @@ def test_workload_from_directory_without_workload(self, path_exists, is_dir): is_dir.return_value = True with self.assertRaises(exceptions.SystemSetupError) as ctx: loader.SimpleWorkloadRepository("/path/to/not/a/workload") - self.assertEqual("Could not find workload.json in /path/to/not/a/workload", ctx.exception.args[0]) + self.assertEqual( + "Could not find workload.json in /path/to/not/a/workload", ctx.exception.args[0]) @mock.patch("os.path.exists") @mock.patch("os.path.isdir") @@ -132,8 +141,10 @@ def test_workload_from_file_but_not_json(self, is_file, is_dir, path_exists): path_exists.return_value = True with self.assertRaises(exceptions.SystemSetupError) as ctx: - loader.SimpleWorkloadRepository("/path/to/workload/unit-test/my-workload.xml") - self.assertEqual("/path/to/workload/unit-test/my-workload.xml has to be a JSON file", ctx.exception.args[0]) + loader.SimpleWorkloadRepository( + "/path/to/workload/unit-test/my-workload.xml") + self.assertEqual( + "/path/to/workload/unit-test/my-workload.xml has to be a JSON file", ctx.exception.args[0]) class GitRepositoryTests(TestCase): @@ -144,21 +155,29 @@ def __init__(self, remote_url, root_dir, repo_name, resource_name, offline, fetc @mock.patch("os.path.exists") @mock.patch("os.walk") def test_workload_from_existing_repo(self, walk, exists): - walk.return_value = iter([(".", ["unittest", "unittest2", "unittest3"], [])]) + walk.return_value = iter( + [(".", ["unittest", "unittest2", "unittest3"], [])]) exists.return_value = True cfg = config.Config() - cfg.add(config.Scope.application, "workload", "workload.name", "unittest") - cfg.add(config.Scope.application, "workload", "repository.name", "default") + cfg.add(config.Scope.application, "workload", + "workload.name", "unittest") + cfg.add(config.Scope.application, "workload", + "repository.name", "default") cfg.add(config.Scope.application, "system", "offline.mode", False) cfg.add(config.Scope.application, "node", "root.dir", "/tmp") - cfg.add(config.Scope.application, "benchmarks", "workload.repository.dir", "workloads") + cfg.add(config.Scope.application, "benchmarks", + "workload.repository.dir", "workloads") - repo = loader.GitWorkloadRepository(cfg, fetch=False, update=False, repo_class=GitRepositoryTests.MockGitRepo) + repo = loader.GitWorkloadRepository( + cfg, fetch=False, update=False, repo_class=GitRepositoryTests.MockGitRepo) self.assertEqual("unittest", repo.workload_name) - self.assertEqual(["unittest", "unittest2", "unittest3"], list(repo.workload_names)) - self.assertEqual("/tmp/workloads/default/unittest", repo.workload_dir("unittest")) - self.assertEqual("/tmp/workloads/default/unittest/workload.json", repo.workload_file("unittest")) + self.assertEqual(["unittest", "unittest2", "unittest3"], + list(repo.workload_names)) + self.assertEqual("/tmp/workloads/default/unittest", + repo.workload_dir("unittest")) + self.assertEqual( + "/tmp/workloads/default/unittest/workload.json", repo.workload_file("unittest")) class WorkloadPreparationTests(TestCase): @@ -171,18 +190,20 @@ def test_does_nothing_if_document_file_available(self, is_file, get_size, prepar prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root="/tmp") - prepare_file_offset_table.assert_called_with("/tmp/docs.json", None, None, InstanceOf(loader.Downloader)) + prepare_file_offset_table.assert_called_with( + "/tmp/docs.json", None, None, InstanceOf(loader.Downloader)) @mock.patch("osbenchmark.utils.io.prepare_file_offset_table") @mock.patch("os.path.getsize") @@ -193,18 +214,20 @@ def test_decompresses_if_archive_available(self, is_file, get_size, prepare_file prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root="/tmp") - prepare_file_offset_table.assert_called_with("/tmp/docs.json", None, None, InstanceOf(loader.Downloader)) + prepare_file_offset_table.assert_called_with( + "/tmp/docs.json", None, None, InstanceOf(loader.Downloader)) @mock.patch("osbenchmark.utils.io.decompress") @mock.patch("os.path.getsize") @@ -219,18 +242,20 @@ def test_raise_error_on_wrong_uncompressed_file_size(self, is_file, get_size, de get_size.side_effect = [200, 1] p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) with self.assertRaises(exceptions.DataError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root="/tmp") - self.assertEqual("[/tmp/docs.json] is corrupt. Extracted [1] bytes but [2000] bytes are expected.", ctx.exception.args[0]) + self.assertEqual( + "[/tmp/docs.json] is corrupt. Extracted [1] bytes but [2000] bytes are expected.", ctx.exception.args[0]) decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp") @@ -246,17 +271,18 @@ def test_raise_error_if_compressed_does_not_contain_expected_document_file(self, get_size.return_value = 200 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) with self.assertRaises(exceptions.DataError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.opensearch.org/corpora/unit-test", - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + base_url="http://benchmarks.opensearch.org/corpora/unit-test", + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root="/tmp") self.assertEqual("Decompressing [/tmp/docs.json.bz2] did not create [/tmp/docs.json]. Please check with the workload author if the " "compressed archive has been created correctly.", ctx.exception.args[0]) @@ -277,7 +303,8 @@ def test_download_document_archive_if_no_file_available(self, is_file, get_size, # after download uncompressed file still does not exist (in main loop) # after download compressed file exists (in main loop) # after decompression, uncompressed file exists - is_file.side_effect = [False, False, True, False, True, True, True, True] + is_file.side_effect = [False, False, + True, False, True, True, True, True] # compressed file size is 200 after download # compressed file size is 200 after download (in main loop) # uncompressed file size is 2000 after decompression @@ -287,22 +314,23 @@ def test_download_document_archive_if_no_file_available(self, is_file, get_size, prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.opensearch.org/corpora/unit-test", - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + base_url="http://benchmarks.opensearch.org/corpora/unit-test", + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root="/tmp") ensure_dir.assert_called_with("/tmp") decompress.assert_called_with("/tmp/docs.json.bz2", "/tmp") - calls = [ mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", - "/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY) ] + calls = [mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", + "/tmp/docs.json.bz2", 200, progress_indicator=mock.ANY)] download.assert_has_calls(calls) prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test', None, InstanceOf(loader.Downloader)) @@ -321,7 +349,8 @@ def test_download_document_archive_with_source_url_compressed(self, is_file, get # after download uncompressed file still does not exist (in main loop) # after download compressed file exists (in main loop) # after decompression, uncompressed file exists - is_file.side_effect = [False, False, True, False, True, True, True, True] + is_file.side_effect = [False, False, + True, False, True, True, True, True] # compressed file size is 200 after download # compressed file size is 200 after download (in main loop) # uncompressed file size is 2000 after decompression @@ -331,17 +360,18 @@ def test_download_document_archive_with_source_url_compressed(self, is_file, get prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.opensearch.org/corpora", - source_url="http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + base_url="http://benchmarks.opensearch.org/corpora", + source_url="http://benchmarks.opensearch.org/corpora/unit-test/docs.json.bz2", + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root="/tmp") ensure_dir.assert_called_with("/tmp") @@ -371,18 +401,19 @@ def test_download_document_with_source_url_uncompressed(self, is_file, get_size, prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - source_url=f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", - base_url=f"{scheme}://benchmarks.opensearch.org/corpora/", - document_file="docs.json", - # --> We don't provide a document archive here <-- - document_archive=None, - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + source_url=f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", + base_url=f"{scheme}://benchmarks.opensearch.org/corpora/", + document_file="docs.json", + # --> We don't provide a document archive here <-- + document_archive=None, + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root="/tmp") ensure_dir.assert_called_with("/tmp") @@ -411,22 +442,23 @@ def test_download_document_with_trailing_baseurl_slash(self, is_file, get_size, prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url=f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/", - document_file="docs.json", - # --> We don't provide a document archive here <-- - document_archive=None, - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + base_url=f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/", + document_file="docs.json", + # --> We don't provide a document archive here <-- + document_archive=None, + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root="/tmp") ensure_dir.assert_called_with("/tmp") - calls = [ mock.call(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", \ - "/tmp/docs.json", 2000, progress_indicator=mock.ANY) ] + calls = [mock.call(f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/docs.json", + "/tmp/docs.json", 2000, progress_indicator=mock.ANY)] download.assert_has_calls(calls) prepare_file_offset_table.assert_called_with("/tmp/docs.json", f"{scheme}://benchmarks.opensearch.org/corpora/unit-test/", None, InstanceOf(loader.Downloader)) @@ -447,22 +479,23 @@ def test_download_document_file_if_no_file_available(self, is_file, get_size, en prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.opensearch.org/corpora/unit-test", - document_file="docs.json", - # --> We don't provide a document archive here <-- - document_archive=None, - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + base_url="http://benchmarks.opensearch.org/corpora/unit-test", + document_file="docs.json", + # --> We don't provide a document archive here <-- + document_archive=None, + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root="/tmp") ensure_dir.assert_called_with("/tmp") - calls = [ mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", \ - "/tmp/docs.json", 2000, progress_indicator=mock.ANY) ] + calls = [mock.call("http://benchmarks.opensearch.org/corpora/unit-test/docs.json", + "/tmp/docs.json", 2000, progress_indicator=mock.ANY)] download.assert_has_calls(calls) prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test', None, InstanceOf(loader.Downloader)) @@ -475,18 +508,20 @@ def test_raise_download_error_if_offline(self, is_file, ensure_dir, download): is_file.return_value = False p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=True, test_mode=False), + downloader=loader.Downloader( + offline=True, test_mode=False), decompressor=loader.Decompressor()) with self.assertRaises(exceptions.SystemSetupError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.opensearch.org/corpora/unit-test", - document_file="docs.json", - number_of_documents=5, - uncompressed_size_in_bytes=2000), + base_url="http://benchmarks.opensearch.org/corpora/unit-test", + document_file="docs.json", + number_of_documents=5, + uncompressed_size_in_bytes=2000), data_root="/tmp") - self.assertEqual("Cannot find [/tmp/docs.json]. Please disable offline mode and retry.", ctx.exception.args[0]) + self.assertEqual( + "Cannot find [/tmp/docs.json]. Please disable offline mode and retry.", ctx.exception.args[0]) self.assertEqual(0, ensure_dir.call_count) self.assertEqual(0, download.call_count) @@ -499,19 +534,21 @@ def test_raise_download_error_if_no_url_provided_and_file_missing(self, is_file, is_file.return_value = False p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) with self.assertRaises(exceptions.DataError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url=None, - document_file="docs.json", - document_archive=None, - number_of_documents=5, - uncompressed_size_in_bytes=2000), + base_url=None, + document_file="docs.json", + document_archive=None, + number_of_documents=5, + uncompressed_size_in_bytes=2000), data_root="/tmp") - self.assertEqual("Cannot download data because no base URL is provided.", ctx.exception.args[0]) + self.assertEqual( + "Cannot download data because no base URL is provided.", ctx.exception.args[0]) self.assertEqual(0, ensure_dir.call_count) self.assertEqual(0, download.call_count) @@ -527,14 +564,15 @@ def test_raise_download_error_if_no_url_provided_and_wrong_file_size(self, is_fi get_size.return_value = 100 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) with self.assertRaises(exceptions.DataError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs.json", - number_of_documents=5, - uncompressed_size_in_bytes=2000), + document_file="docs.json", + number_of_documents=5, + uncompressed_size_in_bytes=2000), data_root="/tmp") self.assertEqual("[/tmp/docs.json] is present but does not have the expected size of [2000] bytes and it " @@ -554,15 +592,16 @@ def test_raise_download_error_no_test_mode_file(self, is_file, ensure_dir, downl 404, "", None, None) p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=True), + downloader=loader.Downloader( + offline=False, test_mode=True), decompressor=loader.Decompressor()) with self.assertRaises(exceptions.DataError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.opensearch.org/corpora/unit-test", - document_file="docs-1k.json", - number_of_documents=5, - uncompressed_size_in_bytes=None), + base_url="http://benchmarks.opensearch.org/corpora/unit-test", + document_file="docs-1k.json", + number_of_documents=5, + uncompressed_size_in_bytes=None), data_root="/tmp") self.assertEqual("This workload does not support test mode. Ask the workload author to add it or disable " @@ -583,15 +622,16 @@ def test_raise_download_error_on_connection_problems(self, is_file, ensure_dir, 500, "Internal Server Error", None, None) p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) with self.assertRaises(exceptions.DataError) as ctx: p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.opensearch.org/corpora/unit-test", - document_file="docs.json", - number_of_documents=5, - uncompressed_size_in_bytes=2000), + base_url="http://benchmarks.opensearch.org/corpora/unit-test", + document_file="docs.json", + number_of_documents=5, + uncompressed_size_in_bytes=2000), data_root="/tmp") self.assertEqual("Could not download [http://benchmarks.opensearch.org/corpora/unit-test/docs.json] " @@ -612,18 +652,20 @@ def test_prepare_bundled_document_set_if_document_file_available(self, is_file, prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) self.assertTrue(p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root=".")) - prepare_file_offset_table.assert_called_with("./docs.json", None, None, InstanceOf(loader.Downloader)) + prepare_file_offset_table.assert_called_with( + "./docs.json", None, None, InstanceOf(loader.Downloader)) @mock.patch("osbenchmark.utils.io.prepare_file_offset_table") @mock.patch("osbenchmark.utils.io.decompress") @@ -634,15 +676,16 @@ def test_prepare_bundled_document_set_does_nothing_if_no_document_files(self, is is_file.return_value = False p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) self.assertFalse(p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root=".")) self.assertEqual(0, decompress.call_count) @@ -768,9 +811,11 @@ def test_used_corpora(self): } ] } - reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure") + reader = loader.WorkloadSpecificationReader( + selected_test_procedure="default-test_procedure") full_workload = reader("unittest", workload_specification, "/mappings") - used_corpora = sorted(loader.used_corpora(full_workload), key=lambda c: c.name) + used_corpora = sorted(loader.used_corpora( + full_workload), key=lambda c: c.name) self.assertEqual(2, len(used_corpora)) self.assertEqual("http_logs", used_corpora[0].name) # each bulk operation requires a different data file but they should have been merged properly. @@ -778,7 +823,8 @@ def test_used_corpora(self): {d.document_archive for d in used_corpora[0].documents}) self.assertEqual("http_logs_unparsed", used_corpora[1].name) - self.assertEqual({"documents-201998.unparsed.json.bz2"}, {d.document_archive for d in used_corpora[1].documents}) + self.assertEqual({"documents-201998.unparsed.json.bz2"}, + {d.document_archive for d in used_corpora[1].documents}) @mock.patch("osbenchmark.utils.io.prepare_file_offset_table") @mock.patch("osbenchmark.utils.io.decompress") @@ -797,18 +843,20 @@ def test_prepare_bundled_document_set_decompresses_compressed_docs(self, is_file prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) self.assertTrue(p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root=".")) - prepare_file_offset_table.assert_called_with("./docs.json", None, None, InstanceOf(loader.Downloader)) + prepare_file_offset_table.assert_called_with( + "./docs.json", None, None, InstanceOf(loader.Downloader)) @mock.patch("os.path.getsize") @mock.patch("os.path.isfile") @@ -820,16 +868,17 @@ def test_prepare_bundled_document_set_error_compressed_docs_wrong_size(self, is_ get_size.side_effect = [150] p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) with self.assertRaises(exceptions.DataError) as ctx: p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root=".") self.assertEqual("[./docs.json.bz2] is present but does not have the expected size of [200] bytes.", @@ -846,16 +895,17 @@ def test_prepare_bundled_document_set_uncompressed_docs_wrong_size(self, is_file get_size.side_effect = [1500] p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) with self.assertRaises(exceptions.DataError) as ctx: p.prepare_bundled_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs.json", - document_archive="docs.json.bz2", - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), + document_file="docs.json", + document_archive="docs.json.bz2", + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), data_root=".") self.assertEqual("[./docs.json] is present but does not have the expected size of [2000] bytes.", ctx.exception.args[0]) @@ -881,28 +931,31 @@ def test_download_document_file_from_part_files(self, rm_file, is_file, get_size prepare_file_offset_table.return_value = 5 p = loader.DocumentSetPreparator(workload_name="unit-test", - downloader=loader.Downloader(offline=False, test_mode=False), + downloader=loader.Downloader( + offline=False, test_mode=False), decompressor=loader.Decompressor()) mo = mock.mock_open() with mock.patch("builtins.open", mo): p.prepare_document_set(document_set=workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - base_url="http://benchmarks.opensearch.org/corpora/unit-test", - document_file="docs.json", - document_file_parts=[ {"name": "xaa", "size": 1000 }, - {"name": "xab", "size": 600 }, - {"name": "xac", "size": 400 } ], - # --> We don't provide a document archive here <-- - document_archive=None, - number_of_documents=5, - compressed_size_in_bytes=200, - uncompressed_size_in_bytes=2000), - data_root="/tmp") + base_url="http://benchmarks.opensearch.org/corpora/unit-test", + document_file="docs.json", + document_file_parts=[{"name": "xaa", "size": 1000}, + {"name": "xab", + "size": 600}, + {"name": "xac", "size": 400}], + # --> We don't provide a document archive here <-- + document_archive=None, + number_of_documents=5, + compressed_size_in_bytes=200, + uncompressed_size_in_bytes=2000), + data_root="/tmp") ensure_dir.assert_called_with("/tmp") - calls = [ mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xaa', '/tmp/xaa', 1000, progress_indicator=mock.ANY), - mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xab', '/tmp/xab', 600, progress_indicator=mock.ANY), - mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xac', '/tmp/xac', 400, progress_indicator=mock.ANY) ] + calls = [mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xaa', '/tmp/xaa', 1000, progress_indicator=mock.ANY), + mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xab', + '/tmp/xab', 600, progress_indicator=mock.ANY), + mock.call('http://benchmarks.opensearch.org/corpora/unit-test/xac', '/tmp/xac', 400, progress_indicator=mock.ANY)] download.assert_has_calls(calls) prepare_file_offset_table.assert_called_with("/tmp/docs.json", 'http://benchmarks.opensearch.org/corpora/unit-test', @@ -1001,8 +1054,10 @@ def test_read_glob_files(self): base_path="/some/path/to/a/benchmark/workload", template_file_name="workload.json", fileglobber=lambda pat: [ - os.path.join(os.path.dirname(__file__), "resources", "workload_fragment_1.json"), - os.path.join(os.path.dirname(__file__), "resources", "workload_fragment_2.json") + os.path.join(os.path.dirname(__file__), + "resources", "workload_fragment_1.json"), + os.path.join(os.path.dirname(__file__), + "resources", "workload_fragment_2.json") ] ) response = tmpl_obj.read_glob_files("*workload_fragment_*.json") @@ -1012,7 +1067,8 @@ def test_read_glob_files(self): class TemplateRenderTests(TestCase): - unittest_template_internal_vars = loader.default_internal_template_vars(clock=StaticClock) + unittest_template_internal_vars = loader.default_internal_template_vars( + clock=StaticClock) def test_render_simple_template(self): template = """ @@ -1022,7 +1078,8 @@ def test_render_simple_template(self): } """ - rendered = loader.render_template(template, template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) + rendered = loader.render_template( + template, template_internal_vars=TemplateRenderTests.unittest_template_internal_vars) expected = """ { @@ -1083,7 +1140,8 @@ def key_globber(e): ] }) - template_source = loader.TemplateSource("", "workload.json", source=source, fileglobber=key_globber) + template_source = loader.TemplateSource( + "", "workload.json", source=source, fileglobber=key_globber) template_source.load_template_from_string(template) rendered = loader.render_template( @@ -1140,7 +1198,8 @@ class CompleteWorkloadParamsTests(TestCase): def test_check_complete_workload_params_contains_all_workload_params(self): complete_workload_params = loader.CompleteWorkloadParams() - loader.register_all_params_in_workload(CompleteWorkloadParamsTests.assembled_source, complete_workload_params) + loader.register_all_params_in_workload( + CompleteWorkloadParamsTests.assembled_source, complete_workload_params) self.assertEqual( ["value2", "value3"], @@ -1163,7 +1222,8 @@ def test_unused_user_defined_workload_params(self): "number_of_shards": 5 } - complete_workload_params = loader.CompleteWorkloadParams(user_specified_workload_params=workload_params) + complete_workload_params = loader.CompleteWorkloadParams( + user_specified_workload_params=workload_params) complete_workload_params.populate_workload_defined_params(list_of_workload_params=[ "bulk_indexing_clients", "bulk_indexing_iterations", @@ -1426,12 +1486,15 @@ def test_post_processes_workload_spec(self): '"index.number_of_replicas": {{ number_of_replicas | default(0)}} }}' cfg = config.Config() - cfg.add(config.Scope.application, "workload", "test.mode.enabled", True) + cfg.add(config.Scope.application, "workload", + "test.mode.enabled", True) self.assertEqual( - self.as_workload(expected_post_processed, complete_workload_params=complete_workload_params, index_body=index_body), + self.as_workload( + expected_post_processed, complete_workload_params=complete_workload_params, index_body=index_body), loader.TestModeWorkloadProcessor(cfg).on_after_load_workload( - self.as_workload(workload_specification, complete_workload_params=complete_workload_params, index_body=index_body) + self.as_workload( + workload_specification, complete_workload_params=complete_workload_params, index_body=index_body) ) ) @@ -1457,45 +1520,54 @@ def test_sets_absolute_path(self, path_exists): path_exists.return_value = True cfg = config.Config() - cfg.add(config.Scope.application, "benchmarks", "local.dataset.cache", "/data") + cfg.add(config.Scope.application, "benchmarks", + "local.dataset.cache", "/data") default_test_procedure = workload.TestProcedure("default", default=True, schedule=[ - workload.Task(name="index", operation=workload.Operation("index", operation_type=workload.OperationType.Bulk), clients=4) + workload.Task(name="index", operation=workload.Operation( + "index", operation_type=workload.OperationType.Bulk), clients=4) ]) another_test_procedure = workload.TestProcedure("other", default=False) t = workload.Workload(name="u", test_procedures=[another_test_procedure, default_test_procedure], - corpora=[ - workload.DocumentCorpus("unittest", documents=[ - workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, - document_file="docs/documents.json", - document_archive="docs/documents.json.bz2") - ]) - ], - indices=[workload.Index(name="test", types=["docs"])]) + corpora=[ + workload.DocumentCorpus("unittest", documents=[ + workload.Documents(source_format=workload.Documents.SOURCE_FORMAT_BULK, + document_file="docs/documents.json", + document_archive="docs/documents.json.bz2") + ]) + ], + indices=[workload.Index(name="test", types=["docs"])]) loader.set_absolute_data_path(cfg, t) - self.assertEqual("/data/unittest/docs/documents.json", t.corpora[0].documents[0].document_file) - self.assertEqual("/data/unittest/docs/documents.json.bz2", t.corpora[0].documents[0].document_archive) + self.assertEqual("/data/unittest/docs/documents.json", + t.corpora[0].documents[0].document_file) + self.assertEqual("/data/unittest/docs/documents.json.bz2", + t.corpora[0].documents[0].document_archive) class WorkloadFilterTests(TestCase): def filter(self, workload_specification, include_tasks=None, exclude_tasks=None): cfg = config.Config() - cfg.add(config.Scope.application, "workload", "include.tasks", include_tasks) - cfg.add(config.Scope.application, "workload", "exclude.tasks", exclude_tasks) + cfg.add(config.Scope.application, "workload", + "include.tasks", include_tasks) + cfg.add(config.Scope.application, "workload", + "exclude.tasks", exclude_tasks) processor = loader.TaskFilterWorkloadProcessor(cfg) return processor.on_after_load_workload(workload_specification) def test_rejects_invalid_syntax(self): with self.assertRaises(exceptions.SystemSetupError) as ctx: - self.filter(workload_specification=None, include_tasks=["valid", "a:b:c"]) - self.assertEqual("Invalid format for filtered tasks: [a:b:c]", ctx.exception.args[0]) + self.filter(workload_specification=None, + include_tasks=["valid", "a:b:c"]) + self.assertEqual( + "Invalid format for filtered tasks: [a:b:c]", ctx.exception.args[0]) def test_rejects_unknown_filter_type(self): with self.assertRaises(exceptions.SystemSetupError) as ctx: - self.filter(workload_specification=None, include_tasks=["valid", "op-type:index"]) + self.filter(workload_specification=None, + include_tasks=["valid", "op-type:index"]) self.assertEqual("Invalid format for filtered tasks: [op-type:index]. Expected [type] but got [op-type].", ctx.exception.args[0]) @@ -1603,17 +1675,19 @@ def test_filters_tasks(self): self.assertEqual(7, len(full_workload.test_procedures[0].schedule)) filtered = self.filter(full_workload, include_tasks=["index-3", - "type:search", - # Filtering should also work for non-core operation types. - "type:custom-operation-type", - "tag:include-me"]) + "type:search", + # Filtering should also work for non-core operation types. + "type:custom-operation-type", + "tag:include-me"]) schedule = filtered.test_procedures[0].schedule self.assertEqual(5, len(schedule)) - self.assertEqual(["index-3", "match-all-parallel"], [t.name for t in schedule[0].tasks]) + self.assertEqual(["index-3", "match-all-parallel"], + [t.name for t in schedule[0].tasks]) self.assertEqual("match-all-serial", schedule[1].name) self.assertEqual("cluster-stats", schedule[2].name) - self.assertEqual(["query-filtered", "index-4"], [t.name for t in schedule[3].tasks]) + self.assertEqual(["query-filtered", "index-4"], + [t.name for t in schedule[3].tasks]) self.assertEqual("final-cluster-stats", schedule[4].name) def test_filters_exclude_tasks(self): @@ -1694,11 +1768,13 @@ def test_filters_exclude_tasks(self): full_workload = reader("unittest", workload_specification, "/mappings") self.assertEqual(5, len(full_workload.test_procedures[0].schedule)) - filtered = self.filter(full_workload, exclude_tasks=["index-3", "type:search", "create-index"]) + filtered = self.filter(full_workload, exclude_tasks=[ + "index-3", "type:search", "create-index"]) schedule = filtered.test_procedures[0].schedule self.assertEqual(3, len(schedule)) - self.assertEqual(["index-1", "index-2"], [t.name for t in schedule[0].tasks]) + self.assertEqual(["index-1", "index-2"], + [t.name for t in schedule[0].tasks]) self.assertEqual("node-stats", schedule[1].name) self.assertEqual("cluster-stats", schedule[2].name) @@ -2165,7 +2241,6 @@ def test_on_after_load_workload(self): self.assertIsNotNone(leaf_task.operation.param_source) - # pylint: disable=too-many-public-methods class WorkloadSpecificationReaderTests(TestCase): def test_description_is_optional(self): @@ -2175,7 +2250,8 @@ def test_description_is_optional(self): } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual("unittest", resulting_workload.name) self.assertEqual("", resulting_workload.description) @@ -2189,9 +2265,11 @@ def test_can_read_workload_info(self): "test_procedures": [] } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual("unittest", resulting_workload.name) - self.assertEqual("description for unit test", resulting_workload.description) + self.assertEqual("description for unit test", + resulting_workload.description) def test_document_count_mandatory_if_file_present(self): workload_specification = { @@ -2209,7 +2287,8 @@ def test_document_count_mandatory_if_file_present(self): reader = loader.WorkloadSpecificationReader() with self.assertRaises(loader.WorkloadSyntaxError) as ctx: reader("unittest", workload_specification, "/mappings") - self.assertEqual("Workload 'unittest' is invalid. Mandatory element 'document-count' is missing.", ctx.exception.args[0]) + self.assertEqual( + "Workload 'unittest' is invalid. Mandatory element 'document-count' is missing.", ctx.exception.args[0]) @mock.patch("osbenchmark.workload.loader.register_all_params_in_workload") def test_parse_with_mixed_warmup_iterations_and_measurement(self, mocked_params_checker): @@ -2302,6 +2381,63 @@ def test_parse_missing_test_procedure_or_test_procedures(self, mocked_params_che "'schedule' but none is specified.", ctx.exception.args[0]) + @mock.patch("osbenchmark.workload.loader.register_all_params_in_workload") + def test_parse_iteration_and_ramp_up_period(self, mocked_params_checker): + workload_specification = { + "description": "description for unit test", + "indices": [ + { + "name": "test-index", + "body": "index.json", + "types": ["docs"] + } + ], + "corpora": [ + { + "name": "test", + "documents": [ + { + "source-file": "documents-main.json.bz2", + "document-count": 10, + "compressed-bytes": 100, + "uncompressed-bytes": 10000 + } + ] + } + ], + "operations": [ + { + "name": "index-append", + "operation-type": "bulk", + "bulk-size": 5000, + } + ], + "test_procedures": [ + { + "name": "default-challenge", + "schedule": [ + { + "clients": 8, + "operation": "index-append", + "ramp-up-time-period": 120, + "warmup-iterations": 3, + "iterations": 5 + } + ] + } + + ] + } + reader = loader.WorkloadSpecificationReader(source=io.DictStringFileSourceFactory({ + "/mappings/index.json": ['{"mappings": {"docs": "empty-for-test"}}'], + })) + + with self.assertRaises(loader.WorkloadSyntaxError) as ctx: + reader("unittest", workload_specification, "/mappings") + + self.assertEqual("Workload 'unittest' is invalid. Operation 'index-append' in test_procedure 'default-challenge' defines a ramp-up time period of " + "120 seconds as well as 3 warmup iterations and 5 iterations but mixing time periods and iterations is not allowed.", ctx.exception.args[0]) + @mock.patch("osbenchmark.workload.loader.register_all_params_in_workload") def test_parse_test_procedure_and_test_procedures_are_defined(self, mocked_params_checker): workload_specification = { @@ -2514,7 +2650,8 @@ def test_load_invalid_index_body(self, mocked_params_checker): })) with self.assertRaises(loader.WorkloadSyntaxError) as ctx: reader("unittest", workload_specification, "/mappings") - self.assertEqual("Could not load file template for 'definition for index index-historical in body.json'", ctx.exception.args[0]) + self.assertEqual( + "Could not load file template for 'definition for index index-historical in body.json'", ctx.exception.args[0]) def test_parse_unique_task_names(self): workload_specification = { @@ -2542,8 +2679,10 @@ def test_parse_unique_task_names(self): ] } } - reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure") - resulting_workload = reader("unittest", workload_specification, "/mappings") + reader = loader.WorkloadSpecificationReader( + selected_test_procedure="default-test_procedure") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual("unittest", resulting_workload.name) test_procedure = resulting_workload.test_procedures[0] self.assertTrue(test_procedure.selected) @@ -2571,7 +2710,7 @@ def test_parse_clients_list(self): "name": "search-one-client", "operation": "search", "clients": 1, - "clients_list": [1,2,3] + "clients_list": [1, 2, 3] }, { "name": "search-two-clients", @@ -2582,8 +2721,10 @@ def test_parse_clients_list(self): } } - reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test-procedure") - resulting_workload = reader("unittest", workload_specification, "/mappings") + reader = loader.WorkloadSpecificationReader( + selected_test_procedure="default-test-procedure") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual("unittest", resulting_workload.name) test_procedure = resulting_workload.test_procedures[0] self.assertTrue(test_procedure.selected) @@ -2600,8 +2741,10 @@ def test_parse_clients_list(self): self.assertEqual("search-two-clients", schedule[3].name) self.assertEqual("search", schedule[3].operation.name) # pylint: disable=W0212 + def test_naming_with_clients_list(self): - reader = loader.WorkloadSpecificationReader(selected_test_procedure="default-test_procedure") + reader = loader.WorkloadSpecificationReader( + selected_test_procedure="default-test_procedure") # Test case 1: name contains both "_" and "-" result = reader._rename_task_based_on_num_clients("test_name-task", 5) self.assertEqual(result, "test_name-task_5_clients") @@ -2719,17 +2862,20 @@ def test_parse_indices_valid_workload_specification(self): } """] })) - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") # j2 variables defined in the workload -- used for checking mismatching user workload params self.assertEqual( ["number_of_shards"], complete_workload_params.sorted_workload_defined_params ) self.assertEqual("unittest", resulting_workload.name) - self.assertEqual("description for unit test", resulting_workload.description) + self.assertEqual("description for unit test", + resulting_workload.description) # indices self.assertEqual(1, len(resulting_workload.indices)) - self.assertEqual("index-historical", resulting_workload.indices[0].name) + self.assertEqual("index-historical", + resulting_workload.indices[0].name) self.assertDictEqual({ "settings": { "number_of_shards": 3 @@ -2738,7 +2884,7 @@ def test_parse_indices_valid_workload_specification(self): { "main": "empty-for-test", "secondary": "empty-for-test" - } + } }, resulting_workload.indices[0].body) self.assertEqual(2, len(resulting_workload.indices[0].types)) self.assertEqual("main", resulting_workload.indices[0].types[0]) @@ -2746,13 +2892,16 @@ def test_parse_indices_valid_workload_specification(self): # corpora self.assertEqual(1, len(resulting_workload.corpora)) self.assertEqual("test", resulting_workload.corpora[0].name) - self.assertDictEqual({"test-corpus": True}, resulting_workload.corpora[0].meta_data) + self.assertDictEqual({"test-corpus": True}, + resulting_workload.corpora[0].meta_data) self.assertEqual(2, len(resulting_workload.corpora[0].documents)) docs_primary = resulting_workload.corpora[0].documents[0] - self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, docs_primary.source_format) + self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, + docs_primary.source_format) self.assertEqual("documents-main.json", docs_primary.document_file) - self.assertEqual("documents-main.json.bz2", docs_primary.document_archive) + self.assertEqual("documents-main.json.bz2", + docs_primary.document_archive) self.assertEqual("https://localhost/data", docs_primary.base_url) self.assertFalse(docs_primary.includes_action_and_meta_data) self.assertEqual(10, docs_primary.number_of_documents) @@ -2766,9 +2915,12 @@ def test_parse_indices_valid_workload_specification(self): }, docs_primary.meta_data) docs_secondary = resulting_workload.corpora[0].documents[1] - self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, docs_secondary.source_format) - self.assertEqual("documents-secondary.json", docs_secondary.document_file) - self.assertEqual("documents-secondary.json.bz2", docs_secondary.document_archive) + self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, + docs_secondary.source_format) + self.assertEqual("documents-secondary.json", + docs_secondary.document_file) + self.assertEqual("documents-secondary.json.bz2", + docs_secondary.document_archive) self.assertEqual("https://localhost/data", docs_secondary.base_url) self.assertTrue(docs_secondary.includes_action_and_meta_data) self.assertEqual(20, docs_secondary.number_of_documents) @@ -2784,11 +2936,16 @@ def test_parse_indices_valid_workload_specification(self): # test_procedures self.assertEqual(1, len(resulting_workload.test_procedures)) - self.assertEqual("default-test_procedure", resulting_workload.test_procedures[0].name) - self.assertEqual("Default test_procedure", resulting_workload.test_procedures[0].description) - self.assertEqual({"mixed": True, "max-clients": 8}, resulting_workload.test_procedures[0].meta_data) - self.assertEqual({"append": True}, resulting_workload.test_procedures[0].schedule[0].operation.meta_data) - self.assertEqual({"operation-index": 0}, resulting_workload.test_procedures[0].schedule[0].meta_data) + self.assertEqual("default-test_procedure", + resulting_workload.test_procedures[0].name) + self.assertEqual("Default test_procedure", + resulting_workload.test_procedures[0].description) + self.assertEqual({"mixed": True, "max-clients": 8}, + resulting_workload.test_procedures[0].meta_data) + self.assertEqual( + {"append": True}, resulting_workload.test_procedures[0].schedule[0].operation.meta_data) + self.assertEqual({"operation-index": 0}, + resulting_workload.test_procedures[0].schedule[0].meta_data) def test_parse_data_streams_valid_workload_specification(self): workload_specification = { @@ -2869,35 +3026,44 @@ def test_parse_data_streams_valid_workload_specification(self): complete_workload_params = loader.CompleteWorkloadParams() reader = loader.WorkloadSpecificationReader( complete_workload_params=complete_workload_params) - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") # j2 variables defined in the workload -- used for checking mismatching user workload params self.assertEqual("unittest", resulting_workload.name) - self.assertEqual("description for unit test", resulting_workload.description) + self.assertEqual("description for unit test", + resulting_workload.description) # data streams self.assertEqual(1, len(resulting_workload.data_streams)) - self.assertEqual("data-stream-historical", resulting_workload.data_streams[0].name) + self.assertEqual("data-stream-historical", + resulting_workload.data_streams[0].name) # corpora self.assertEqual(1, len(resulting_workload.corpora)) self.assertEqual("test", resulting_workload.corpora[0].name) self.assertEqual(3, len(resulting_workload.corpora[0].documents)) docs_primary = resulting_workload.corpora[0].documents[0] - self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, docs_primary.source_format) + self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, + docs_primary.source_format) self.assertEqual("documents-main.json", docs_primary.document_file) - self.assertEqual("documents-main.json.bz2", docs_primary.document_archive) + self.assertEqual("documents-main.json.bz2", + docs_primary.document_archive) self.assertEqual("https://localhost/data", docs_primary.base_url) self.assertFalse(docs_primary.includes_action_and_meta_data) self.assertEqual(10, docs_primary.number_of_documents) self.assertEqual(100, docs_primary.compressed_size_in_bytes) self.assertEqual(10000, docs_primary.uncompressed_size_in_bytes) - self.assertEqual("data-stream-historical", docs_primary.target_data_stream) + self.assertEqual("data-stream-historical", + docs_primary.target_data_stream) self.assertIsNone(docs_primary.target_index) self.assertIsNone(docs_primary.target_type) docs_secondary = resulting_workload.corpora[0].documents[1] - self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, docs_secondary.source_format) - self.assertEqual("documents-secondary.json", docs_secondary.document_file) - self.assertEqual("documents-secondary.json.bz2", docs_secondary.document_archive) + self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, + docs_secondary.source_format) + self.assertEqual("documents-secondary.json", + docs_secondary.document_file) + self.assertEqual("documents-secondary.json.bz2", + docs_secondary.document_archive) self.assertEqual("https://localhost/data", docs_secondary.base_url) self.assertTrue(docs_secondary.includes_action_and_meta_data) self.assertEqual(20, docs_secondary.number_of_documents) @@ -2909,24 +3075,32 @@ def test_parse_data_streams_valid_workload_specification(self): self.assertIsNone(docs_secondary.target_type) docs_tertiary = resulting_workload.corpora[0].documents[2] - self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, docs_tertiary.source_format) + self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, + docs_tertiary.source_format) self.assertEqual("documents-main.json", docs_tertiary.document_file) - self.assertEqual("documents-main.json.bz2", docs_tertiary.document_archive) + self.assertEqual("documents-main.json.bz2", + docs_tertiary.document_archive) self.assertEqual("https://localhost/data", docs_tertiary.base_url) self.assertFalse(docs_tertiary.includes_action_and_meta_data) self.assertEqual(10, docs_tertiary.number_of_documents) self.assertEqual(100, docs_tertiary.compressed_size_in_bytes) self.assertIsNone(docs_tertiary.target_index) self.assertIsNone(docs_tertiary.target_type) - self.assertEqual("data-stream-historical", docs_tertiary.target_data_stream) + self.assertEqual("data-stream-historical", + docs_tertiary.target_data_stream) # test_procedures self.assertEqual(1, len(resulting_workload.test_procedures)) - self.assertEqual("default-test_procedure", resulting_workload.test_procedures[0].name) - self.assertEqual("Default test_procedure", resulting_workload.test_procedures[0].description) - self.assertEqual({"mixed": True, "max-clients": 8}, resulting_workload.test_procedures[0].meta_data) - self.assertEqual({"append": True}, resulting_workload.test_procedures[0].schedule[0].operation.meta_data) - self.assertEqual({"operation-index": 0}, resulting_workload.test_procedures[0].schedule[0].meta_data) + self.assertEqual("default-test_procedure", + resulting_workload.test_procedures[0].name) + self.assertEqual("Default test_procedure", + resulting_workload.test_procedures[0].description) + self.assertEqual({"mixed": True, "max-clients": 8}, + resulting_workload.test_procedures[0].meta_data) + self.assertEqual( + {"append": True}, resulting_workload.test_procedures[0].schedule[0].operation.meta_data) + self.assertEqual({"operation-index": 0}, + resulting_workload.test_procedures[0].schedule[0].meta_data) @mock.patch("osbenchmark.workload.loader.register_all_params_in_workload") def test_parse_valid_without_types(self, mocked_param_checker): @@ -2975,12 +3149,15 @@ def test_parse_valid_without_types(self, mocked_param_checker): } """] })) - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual("unittest", resulting_workload.name) - self.assertEqual("description for unit test", resulting_workload.description) + self.assertEqual("description for unit test", + resulting_workload.description) # indices self.assertEqual(1, len(resulting_workload.indices)) - self.assertEqual("index-historical", resulting_workload.indices[0].name) + self.assertEqual("index-historical", + resulting_workload.indices[0].name) self.assertDictEqual({ "settings": { "number_of_shards": 3 @@ -2993,9 +3170,11 @@ def test_parse_valid_without_types(self, mocked_param_checker): self.assertEqual(1, len(resulting_workload.corpora[0].documents)) docs_primary = resulting_workload.corpora[0].documents[0] - self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, docs_primary.source_format) + self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, + docs_primary.source_format) self.assertEqual("documents-main.json", docs_primary.document_file) - self.assertEqual("documents-main.json.bz2", docs_primary.document_archive) + self.assertEqual("documents-main.json.bz2", + docs_primary.document_archive) self.assertEqual("https://localhost/data", docs_primary.base_url) self.assertFalse(docs_primary.includes_action_and_meta_data) self.assertEqual(10, docs_primary.number_of_documents) @@ -3224,29 +3403,35 @@ def test_parse_valid_without_indices(self, mocked_param_checker): } """] })) - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual("unittest", resulting_workload.name) - self.assertEqual("description for unit test", resulting_workload.description) + self.assertEqual("description for unit test", + resulting_workload.description) # indices self.assertEqual(0, len(resulting_workload.indices)) # data streams self.assertEqual(1, len(resulting_workload.data_streams)) - self.assertEqual("historical-data-stream", resulting_workload.data_streams[0].name) + self.assertEqual("historical-data-stream", + resulting_workload.data_streams[0].name) # corpora self.assertEqual(1, len(resulting_workload.corpora)) self.assertEqual("test", resulting_workload.corpora[0].name) self.assertEqual(1, len(resulting_workload.corpora[0].documents)) docs_primary = resulting_workload.corpora[0].documents[0] - self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, docs_primary.source_format) + self.assertEqual(workload.Documents.SOURCE_FORMAT_BULK, + docs_primary.source_format) self.assertEqual("documents-main.json", docs_primary.document_file) - self.assertEqual("documents-main.json.bz2", docs_primary.document_archive) + self.assertEqual("documents-main.json.bz2", + docs_primary.document_archive) self.assertEqual("https://localhost/data", docs_primary.base_url) self.assertFalse(docs_primary.includes_action_and_meta_data) self.assertEqual(10, docs_primary.number_of_documents) self.assertEqual(100, docs_primary.compressed_size_in_bytes) self.assertEqual(10000, docs_primary.uncompressed_size_in_bytes) - self.assertEqual("historical-data-stream", docs_primary.target_data_stream) + self.assertEqual("historical-data-stream", + docs_primary.target_data_stream) self.assertIsNone(docs_primary.target_type) self.assertIsNone(docs_primary.target_index) @@ -3279,17 +3464,20 @@ def test_parse_valid_workload_specification_with_index_template(self): } } """], - })) - resulting_workload = reader("unittest", workload_specification, "/mappings") + })) + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual( ["index_pattern", "number_of_shards"], complete_workload_params.sorted_workload_defined_params ) self.assertEqual("unittest", resulting_workload.name) - self.assertEqual("description for unit test", resulting_workload.description) + self.assertEqual("description for unit test", + resulting_workload.description) self.assertEqual(0, len(resulting_workload.indices)) self.assertEqual(1, len(resulting_workload.templates)) - self.assertEqual("my-index-template", resulting_workload.templates[0].name) + self.assertEqual("my-index-template", + resulting_workload.templates[0].name) self.assertEqual("*", resulting_workload.templates[0].pattern) self.assertDictEqual( { @@ -3325,7 +3513,8 @@ def test_parse_valid_workload_specification_with_composable_template(self): } complete_workload_params = loader.CompleteWorkloadParams() reader = loader.WorkloadSpecificationReader( - workload_params={"index_pattern": "logs-*", "number_of_replicas": 1}, + workload_params={"index_pattern": "logs-*", + "number_of_replicas": 1}, complete_workload_params=complete_workload_params, source=io.DictStringFileSourceFactory({ "/mappings/default-template.json": [""" @@ -3365,20 +3554,26 @@ def test_parse_valid_workload_specification_with_composable_template(self): } """] })) - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual( ["index_pattern", "number_of_replicas", "number_of_shards"], complete_workload_params.sorted_workload_defined_params ) self.assertEqual("unittest", resulting_workload.name) - self.assertEqual("description for unit test", resulting_workload.description) + self.assertEqual("description for unit test", + resulting_workload.description) self.assertEqual(0, len(resulting_workload.indices)) self.assertEqual(1, len(resulting_workload.composable_templates)) self.assertEqual(2, len(resulting_workload.component_templates)) - self.assertEqual("my-index-template", resulting_workload.composable_templates[0].name) - self.assertEqual("*", resulting_workload.composable_templates[0].pattern) - self.assertEqual("my-component-template-1", resulting_workload.component_templates[0].name) - self.assertEqual("my-component-template-2", resulting_workload.component_templates[1].name) + self.assertEqual("my-index-template", + resulting_workload.composable_templates[0].name) + self.assertEqual( + "*", resulting_workload.composable_templates[0].pattern) + self.assertEqual("my-component-template-1", + resulting_workload.component_templates[0].name) + self.assertEqual("my-component-template-2", + resulting_workload.component_templates[1].name) self.assertDictEqual( { "index_patterns": ["logs-*"], @@ -3427,7 +3622,8 @@ def test_parse_invalid_workload_specification_with_composable_template(self): } complete_workload_params = loader.CompleteWorkloadParams() reader = loader.WorkloadSpecificationReader( - workload_params={"index_pattern": "logs-*", "number_of_replicas": 1}, + workload_params={"index_pattern": "logs-*", + "number_of_replicas": 1}, complete_workload_params=complete_workload_params) with self.assertRaises(loader.WorkloadSyntaxError) as ctx: reader("unittest", workload_specification, "/mappings") @@ -3470,7 +3666,8 @@ def test_unique_test_procedure_names(self): reader = loader.WorkloadSpecificationReader() with self.assertRaises(loader.WorkloadSyntaxError) as ctx: reader("unittest", workload_specification, "/mappings") - self.assertEqual("Workload 'unittest' is invalid. Duplicate test_procedure with name 'test-test_procedure'.", ctx.exception.args[0]) + self.assertEqual( + "Workload 'unittest' is invalid. Duplicate test_procedure with name 'test-test_procedure'.", ctx.exception.args[0]) def test_not_more_than_one_default_test_procedure_possible(self): workload_specification = { @@ -3581,10 +3778,13 @@ def test_exactly_one_default_test_procedure(self): ] } - reader = loader.WorkloadSpecificationReader(selected_test_procedure="another-test_procedure") - resulting_workload = reader("unittest", workload_specification, "/mappings") + reader = loader.WorkloadSpecificationReader( + selected_test_procedure="another-test_procedure") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual(2, len(resulting_workload.test_procedures)) - self.assertEqual("test_procedure", resulting_workload.test_procedures[0].name) + self.assertEqual("test_procedure", + resulting_workload.test_procedures[0].name) self.assertTrue(resulting_workload.test_procedures[0].default) self.assertFalse(resulting_workload.test_procedures[1].default) self.assertTrue(resulting_workload.test_procedures[1].selected) @@ -3609,9 +3809,11 @@ def test_selects_sole_test_procedure_implicitly_as_default(self): } } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual(1, len(resulting_workload.test_procedures)) - self.assertEqual("test_procedure", resulting_workload.test_procedures[0].name) + self.assertEqual("test_procedure", + resulting_workload.test_procedures[0].name) self.assertTrue(resulting_workload.test_procedures[0].default) self.assertTrue(resulting_workload.test_procedures[0].selected) @@ -3632,7 +3834,8 @@ def test_auto_generates_test_procedure_from_schedule(self): ] } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual(1, len(resulting_workload.test_procedures)) self.assertTrue(resulting_workload.test_procedures[0].auto_generated) self.assertTrue(resulting_workload.test_procedures[0].default) @@ -3660,12 +3863,15 @@ def test_inline_operations(self): } } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") test_procedure = resulting_workload.test_procedures[0] self.assertEqual(2, len(test_procedure.schedule)) - self.assertEqual(workload.OperationType.Bulk.to_hyphenated_string(), test_procedure.schedule[0].operation.type) - self.assertEqual(workload.OperationType.ForceMerge.to_hyphenated_string(), test_procedure.schedule[1].operation.type) + self.assertEqual(workload.OperationType.Bulk.to_hyphenated_string( + ), test_procedure.schedule[0].operation.type) + self.assertEqual(workload.OperationType.ForceMerge.to_hyphenated_string( + ), test_procedure.schedule[1].operation.type) def test_supports_target_throughput(self): workload_specification = { @@ -3683,13 +3889,19 @@ def test_supports_target_throughput(self): { "operation": "index-append", "target-throughput": 10, + "warmup-time-period": 120, + "ramp-up-time-period": 60 } ] } } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") - self.assertEqual(10, resulting_workload.test_procedures[0].schedule[0].params["target-throughput"]) + resulting_workload = reader( + "unittest", workload_specification, "/mappings") + indexing_task = resulting_workload.test_procedures[0].schedule[0] + self.assertEqual(10, indexing_task.params["target-throughput"]) + self.assertEqual(120, indexing_task.warmup_time_period) + self.assertEqual(60, indexing_task.ramp_up_time_period) def test_supports_target_interval(self): workload_specification = { @@ -3714,8 +3926,10 @@ def test_supports_target_interval(self): ] } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") - self.assertEqual(5, resulting_workload.test_procedures[0].schedule[0].params["target-interval"]) + resulting_workload = reader( + "unittest", workload_specification, "/mappings") + self.assertEqual( + 5, resulting_workload.test_procedures[0].schedule[0].params["target-interval"]) def test_parallel_tasks_with_default_values(self): workload_specification = { @@ -3767,7 +3981,8 @@ def test_parallel_tasks_with_default_values(self): ] } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") parallel_element = resulting_workload.test_procedures[0].schedule[0] parallel_tasks = parallel_element.tasks @@ -3836,7 +4051,8 @@ def test_parallel_tasks_with_default_clients_does_not_propagate(self): ] } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") parallel_element = resulting_workload.test_procedures[0].schedule[0] parallel_tasks = parallel_element.tasks @@ -3884,7 +4100,8 @@ def test_parallel_tasks_with_completed_by_set(self): ] } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") parallel_element = resulting_workload.test_procedures[0].schedule[0] parallel_tasks = parallel_element.tasks @@ -3938,7 +4155,8 @@ def test_parallel_tasks_with_named_task_completed_by_set(self): ] } reader = loader.WorkloadSpecificationReader() - resulting_workload = reader("unittest", workload_specification, "/mappings") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") parallel_element = resulting_workload.test_procedures[0].schedule[0] parallel_tasks = parallel_element.tasks @@ -4074,10 +4292,13 @@ def test_propagate_parameters_to_test_procedure_level(self): ] } - reader = loader.WorkloadSpecificationReader(selected_test_procedure="another-test_procedure") - resulting_workload = reader("unittest", workload_specification, "/mappings") + reader = loader.WorkloadSpecificationReader( + selected_test_procedure="another-test_procedure") + resulting_workload = reader( + "unittest", workload_specification, "/mappings") self.assertEqual(2, len(resulting_workload.test_procedures)) - self.assertEqual("test_procedure", resulting_workload.test_procedures[0].name) + self.assertEqual("test_procedure", + resulting_workload.test_procedures[0].name) self.assertTrue(resulting_workload.test_procedures[0].default) self.assertDictEqual({ "level": "test_procedure", @@ -4116,7 +4337,7 @@ def test_override_default_preparator(self): cfg.add(config.Scope.application, "system", "offline.mode", False) tpr = loader.WorkloadProcessorRegistry(cfg) # call this once beforehand to make sure we don't "harden" the default in case calls are made out of order - tpr.processors # pylint: disable=pointless-statement + tpr.processors # pylint: disable=pointless-statement tpr.register_workload_processor(MyMockWorkloadProcessor()) expected_processors = [ loader.TaskFilterWorkloadProcessor, @@ -4133,7 +4354,7 @@ def test_allow_to_specify_default_preparator(self): tpr = loader.WorkloadProcessorRegistry(cfg) tpr.register_workload_processor(MyMockWorkloadProcessor()) # should be idempotent now that we have a custom config - tpr.processors # pylint: disable=pointless-statement + tpr.processors # pylint: disable=pointless-statement tpr.register_workload_processor(loader.DefaultWorkloadPreparator(cfg)) expected_processors = [ loader.TaskFilterWorkloadProcessor, @@ -4143,4 +4364,4 @@ def test_allow_to_specify_default_preparator(self): loader.DefaultWorkloadPreparator ] actual_processors = [proc.__class__ for proc in tpr.processors] - self.assertCountEqual(expected_processors, actual_processors) + self.assertCountEqual(expected_processors, actual_processors) \ No newline at end of file