Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to support ramp-up feature #725

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions osbenchmark/resources/workload-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"type": "integer",
"minimum": 1
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -75,6 +80,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down Expand Up @@ -146,6 +156,11 @@
"minimum": 1,
"description": "Defines the number of times to run the operation."
},
"ramp-up-time-period": {
"type": "integer",
"minimum": 0,
"description": "Defines the time period in seconds to gradually increase the number of clients."
},
"warmup-time-period": {
"type": "integer",
"minimum": 0,
Expand Down
69 changes: 49 additions & 20 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ def os_clients(all_hosts, all_client_options):
#
# Now we need to ensure that we start partitioning parameters correctly in both cases. And that means we
# need to start from (client) index 0 in both cases instead of 0 for indexA and 4 for indexB.
schedule = schedule_for(task, task_allocation.client_index_in_task, params_per_task[task])
schedule = schedule_for(task_allocation, params_per_task[task])
async_executor = AsyncExecutor(
client_id, task, schedule, opensearch, self.sampler, self.cancel, self.complete,
task.error_behavior(self.abort_on_error), self.cfg)
Expand Down Expand Up @@ -1607,6 +1607,15 @@ async def __call__(self, *args, **kwargs):
# lazily initialize the schedule
self.logger.debug("Initializing schedule for client id [%s].", self.client_id)
schedule = self.schedule_handle()
self.schedule_handle.start()
rampup_wait_time = self.schedule_handle.ramp_up_wait_time
if rampup_wait_time:
self.logger.info("client id [%s] waiting [%.2f]s for ramp-up.", self.client_id, rampup_wait_time)
await asyncio.sleep(rampup_wait_time)

if rampup_wait_time:
console.println(f" Client id {self.client_id} is running now.")

self.logger.debug("Entering main loop for client id [%s].", self.client_id)
# noinspection PyBroadException
try:
Expand Down Expand Up @@ -1806,18 +1815,21 @@ def __repr__(self, *args, **kwargs):


class TaskAllocation:
def __init__(self, task, client_index_in_task):
def __init__(self, task, client_index_in_task, global_client_index, total_clients):
self.task = task
self.client_index_in_task = client_index_in_task
self.global_client_index = global_client_index
self.total_clients = total_clients

def __hash__(self):
return hash(self.task) ^ hash(self.client_index_in_task)
return hash(self.task) ^ hash(self.global_client_index)

def __eq__(self, other):
return isinstance(other, type(self)) and self.task == other.task and self.client_index_in_task == other.client_index_in_task
return isinstance(other, type(self)) and self.task == other.task and self.global_client_index == other.global_client_index

def __repr__(self, *args, **kwargs):
return "TaskAllocation [%d/%d] for %s" % (self.client_index_in_task, self.task.clients, self.task)
return f"TaskAllocation [{self.client_index_in_task}/{self.task.clients}] for {self.task} " \
f"and [{self.global_client_index}/{self.total_clients}] in total"


class Allocator:
Expand Down Expand Up @@ -1863,7 +1875,11 @@ def allocations(self):
physical_client_index = client_index % max_clients
if sub_task.completes_parent:
clients_executing_completing_task.append(physical_client_index)
allocations[physical_client_index].append(TaskAllocation(sub_task, client_index - start_client_index))
ta = TaskAllocation(task = sub_task,
client_index_in_task = client_index - start_client_index,
global_client_index=client_index,
total_clients=task.clients)
allocations[physical_client_index].append(ta)
start_client_index += sub_task.clients

# uneven distribution between tasks and clients, e.g. there are 5 (parallel) tasks but only 2 clients. Then, one of them
Expand Down Expand Up @@ -1941,7 +1957,7 @@ def clients(self):

# Runs a concrete schedule on one worker client
# Needs to determine the runners and concrete iterations per client.
def schedule_for(task, client_index, parameter_source):
def schedule_for(task_allocation, parameter_source):
"""
Calculates a client's schedule for a given task.

Expand All @@ -1951,15 +1967,17 @@ def schedule_for(task, client_index, parameter_source):
:return: A generator for the operations the given client needs to perform for this task.
"""
logger = logging.getLogger(__name__)
task = task_allocation.task
op = task.operation
num_clients = task.clients
sched = scheduler.scheduler_for(task)

client_index = task_allocation.client_index_in_task
# guard all logging statements with the client index and only emit them for the first client. This information is
# repetitive and may cause issues in thespian with many clients (an excessive number of actor messages is sent).
if client_index == 0:
logger.info("Choosing [%s] for [%s].", sched, task)
runner_for_op = runner.runner_for(op.type)
params_for_op = parameter_source.partition(client_index, num_clients)
params_for_op = parameter_source.partition(client_index, task.clients)
if hasattr(sched, "parameter_source"):
if client_index == 0:
logger.debug("Setting parameter source [%s] for scheduler [%s]", params_for_op, sched)
Expand Down Expand Up @@ -1992,7 +2010,7 @@ def schedule_for(task, client_index, parameter_source):
else:
logger.info("%s schedule will determine when the schedule for [%s] terminates.", str(loop_control), task.name)

return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op)
return ScheduleHandle(task_allocation, sched, loop_control, runner_for_op, params_for_op)


def requires_time_period_schedule(task, task_runner, params):
Expand All @@ -2009,7 +2027,7 @@ def requires_time_period_schedule(task, task_runner, params):


class ScheduleHandle:
def __init__(self, task_name, sched, task_progress_control, runner, params):
def __init__(self, task_allocation, sched, task_progress_control, runner, params):
"""
Creates a generator that will yield individual task invocations for the provided schedule.

Expand All @@ -2020,16 +2038,29 @@ def __init__(self, task_name, sched, task_progress_control, runner, params):
:param params: The parameter source for a given operation.
:return: A generator for the corresponding parameters.
"""
self.task_name = task_name
self.task_allocation = task_allocation
self.sched = sched
self.task_progress_control = task_progress_control
self.runner = runner
self.params = params
# TODO: Can we offload the parameter source execution to a different thread / process? Is this too heavy-weight?
#from concurrent.futures import ThreadPoolExecutor
#import asyncio
#self.io_pool_exc = ThreadPoolExecutor(max_workers=1)
#self.loop = asyncio.get_event_loop()
# from concurrent.futures import ThreadPoolExecutor
# import asyncio
# self.io_pool_exc = ThreadPoolExecutor(max_workers=1)
# self.loop = asyncio.get_event_loop()
@property
def ramp_up_wait_time(self):
"""
:return: the number of seconds to wait until this client should start so load can gradually ramp-up.
"""
ramp_up_time_period = self.task_allocation.task.ramp_up_time_period
if ramp_up_time_period:
return ramp_up_time_period * (self.task_allocation.global_client_index / self.task_allocation.total_clients)
else:
return 0

def start(self):
self.task_progress_control.start()

def before_request(self, now):
self.sched.before_request(now)
Expand All @@ -2041,20 +2072,18 @@ async def __call__(self):
next_scheduled = 0
if self.task_progress_control.infinite:
param_source_knows_progress = hasattr(self.params, "percent_completed")
self.task_progress_control.start()
while True:
try:
next_scheduled = self.sched.next(next_scheduled)
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = self.params.percent_completed if param_source_knows_progress else None
#current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
# current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner,
self.params.params())
self.task_progress_control.next()
except StopIteration:
return
else:
self.task_progress_control.start()
while not self.task_progress_control.completed:
try:
next_scheduled = self.sched.next(next_scheduled)
Expand Down Expand Up @@ -2146,4 +2175,4 @@ def next(self):
self._it += 1

def __str__(self):
return "iteration-count-based"
return "iteration-count-based"
35 changes: 31 additions & 4 deletions osbenchmark/workload/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -1765,14 +1765,24 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
default_iterations = self._r(ops_spec, "iterations", error_ctx="parallel", mandatory=False)
default_warmup_time_period = self._r(ops_spec, "warmup-time-period", error_ctx="parallel", mandatory=False)
default_time_period = self._r(ops_spec, "time-period", error_ctx="parallel", mandatory=False)
default_ramp_up_time_period = self._r(ops_spec, "ramp-up-time-period", error_ctx="parallel", mandatory=False)
clients = self._r(ops_spec, "clients", error_ctx="parallel", mandatory=False)
completed_by = self._r(ops_spec, "completed-by", error_ctx="parallel", mandatory=False)

# now descent to each operation
tasks = []
for task in self._r(ops_spec, "tasks", error_ctx="parallel"):
tasks.append(self.parse_task(task, ops, test_procedure_name, default_warmup_iterations, default_iterations,
default_warmup_time_period, default_time_period, completed_by))
default_warmup_time_period, default_time_period, default_ramp_up_time_period, completed_by))

for task in tasks:
if task.ramp_up_time_period != default_ramp_up_time_period:
if default_ramp_up_time_period is None:
self._error(f"task '{task.name}' in 'parallel' element of test-procedure '{test_procedure_name}' specifies "
f"a ramp-up-time-period but it is only allowed on the 'parallel' element.")
else:
self._error(f"task '{task.name}' specifies a different ramp-up-time-period than its enclosing "
f"'parallel' element in test-procedure '{test_procedure_name}'.")
if completed_by:
completion_task = None
for task in tasks:
Expand All @@ -1788,7 +1798,8 @@ def parse_parallel(self, ops_spec, ops, test_procedure_name):
return workload.Parallel(tasks, clients)

def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterations=None, default_iterations=None,
default_warmup_time_period=None, default_time_period=None, completed_by_name=None):
default_warmup_time_period=None, default_time_period=None, default_ramp_up_time_period=None,
completed_by_name=None):

op_spec = task_spec["operation"]
if isinstance(op_spec, str) and op_spec in ops:
Expand All @@ -1811,6 +1822,8 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
default_value=default_warmup_time_period),
time_period=self._r(task_spec, "time-period", error_ctx=op.name, mandatory=False,
default_value=default_time_period),
ramp_up_time_period=self._r(task_spec, "ramp-up-time-period", error_ctx=op.name,
mandatory=False, default_value=default_ramp_up_time_period),
clients=self._r(task_spec, "clients", error_ctx=op.name, mandatory=False, default_value=1),
completes_parent=(task_name == completed_by_name),
schedule=schedule,
Expand All @@ -1819,11 +1832,25 @@ def parse_task(self, task_spec, ops, test_procedure_name, default_warmup_iterati
if task.warmup_iterations is not None and task.time_period is not None:
self._error(
"Operation '%s' in test_procedure '%s' defines '%d' warmup iterations and a time period of '%d' seconds. Please do not "
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_iterations, task.time_period))
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_iterations, task.time_period))
elif task.warmup_time_period is not None and task.iterations is not None:
self._error(
"Operation '%s' in test_procedure '%s' defines a warmup time period of '%d' seconds and '%d' iterations. Please do not "
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations))
"mix time periods and iterations." % (op.name, test_procedure_name, task.warmup_time_period, task.iterations))

if (task.warmup_iterations is not None or task.iterations is not None) and task.ramp_up_time_period is not None:
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of "
f"{task.ramp_up_time_period} seconds as well as {task.warmup_iterations} warmup iterations and "
f"{task.iterations} iterations but mixing time periods and iterations is not allowed.")

if task.ramp_up_time_period is not None:
if task.warmup_time_period is None:
self._error(f"Operation '{op.name}' in test_procedure '{test_procedure_name}' defines a ramp-up time period of "
f"{task.ramp_up_time_period} seconds but no warmup-time-period.")
elif task.warmup_time_period < task.ramp_up_time_period:
self._error(f"The warmup-time-period of operation '{op.name}' in test_procedure '{test_procedure_name}' is "
f"{task.warmup_time_period} seconds but must be greater than or equal to the "
f"ramp-up-time-period of {task.ramp_up_time_period} seconds.")

return task

Expand Down
16 changes: 10 additions & 6 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,8 @@ class Task:
IGNORE_RESPONSE_ERROR_LEVEL_WHITELIST = ["non-fatal"]

def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations=None, iterations=None,
warmup_time_period=None, time_period=None, clients=1, completes_parent=False, schedule=None, params=None):
warmup_time_period=None, time_period=None, ramp_up_time_period=None, clients=1, completes_parent=False,
schedule=None, params=None):
self.name = name
self.operation = operation
if isinstance(tags, str):
Expand All @@ -908,6 +909,7 @@ def __init__(self, name, operation, tags=None, meta_data=None, warmup_iterations
self.iterations = iterations
self.warmup_time_period = warmup_time_period
self.time_period = time_period
self.ramp_up_time_period = ramp_up_time_period
self.clients = clients
self.completes_parent = completes_parent
self.schedule = schedule
Expand Down Expand Up @@ -988,16 +990,18 @@ def error_behavior(self, default_error_behavior):
def __hash__(self):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return hash(self.name) ^ hash(self.operation) ^ hash(self.warmup_iterations) ^ hash(self.iterations) ^ \
hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.clients) ^ hash(self.schedule) ^ \
hash(self.completes_parent)
hash(self.warmup_time_period) ^ hash(self.time_period) ^ hash(self.ramp_up_time_period) ^ \
hash(self.clients) ^ hash(self.schedule) ^ hash(self.completes_parent)

def __eq__(self, other):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
return isinstance(other, type(self)) and (self.name, self.operation, self.warmup_iterations, self.iterations,
self.warmup_time_period, self.time_period, self.clients, self.schedule,
self.completes_parent) == (other.name, other.operation, other.warmup_iterations,
self.warmup_time_period, self.time_period, self.ramp_up_time_period,
self.clients, self.schedule,self.completes_parent) == (other.name,
other.operation, other.warmup_iterations,
other.iterations, other.warmup_time_period, other.time_period,
other.clients, other.schedule, other.completes_parent)
self.ramp_up_time_period, other.clients, other.schedule,
other.completes_parent)

def __iter__(self):
return iter([self])
Expand Down
Loading
Loading