Skip to content

Commit

Permalink
[Fix][MetaSchedule] RPCRunner timeout when queueing up
Browse files Browse the repository at this point in the history
This PR fixes the timeout rule of MetaSchedule RPCRunner.

Prior to this PR, the RPCRunner sets a timeout threshold for jobs
submitted to popen pool. As a result, the jobs are timed since the
time that they are sent to the remote side.

Consider the case where there is only a single device for measurement.
In this case, all jobs can only be executed serially and jobs must
queue up. Therefore, the previous timeout configuration means the
time spent on queueing will also be counted. This causes some jobs,
in the worst cases, gets timeout without even started to execute,
and has negative impacts on RPC MetaSchedule tuning, from the
perspectives of both efficiency and result performance.

To address the problem, this PR
* removes the RPCRunner timeout requirement for PopenExecutor. Since
now, the RPCRunner timeout will only be raised from the RPC server
side.
* removes the timeout information in RPCRunnerFuture, as timeout is
no longer issued by popen side, as mentioned in the point above.
* changes the default maximum number of RPCRunner workers to 1, which
is to avoid the case where too many jobs waiting in line.

Co-authored-by: Bohan Hou <[email protected]>
  • Loading branch information
MasterJH5574 and spectrometerHBH committed Feb 12, 2023
1 parent 49b6c3a commit a8d490d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 32 deletions.
2 changes: 1 addition & 1 deletion 3rdparty/cutlass
Submodule cutlass updated 1626 files
20 changes: 3 additions & 17 deletions python/tvm/meta_schedule/runner/rpc_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from ..logging import get_logger
from ..profiler import Profiler
from ..utils import (
cpu_count,
derived_object,
get_global_func_on_rpc_session,
get_global_func_with_default_on_worker,
Expand Down Expand Up @@ -92,38 +91,27 @@ class RPCRunnerFuture(PyRunnerFuture):
----------
future: concurrent.futures.Future
The concurrent function to check when the function is done and to return the result.
timeout_sec: float
The timeout in seconds.
"""

future: concurrent.futures.Future
timeout_sec: float

def __init__(self, future: concurrent.futures.Future, timeout_sec: float) -> None:
def __init__(self, future: concurrent.futures.Future) -> None:
"""Constructor
Parameters
----------
future: concurrent.futures.Future
The concurrent function to check when the function is done and to return the result.
timeout_sec: float
The timeout in seconds.
"""
super().__init__()
self.future = future
self.timeout_sec = timeout_sec

def done(self) -> bool:
return self.future.done()

def result(self) -> RunnerResult:
try:
run_secs: List[float] = self.future.result()
except TimeoutError:
return RunnerResult(
None,
error_msg=f"RPCRunner: Timeout, killed after {self.timeout_sec} seconds",
)
except Exception as exception: # pylint: disable=broad-except
return RunnerResult(
None,
Expand Down Expand Up @@ -270,7 +258,7 @@ def __init__(
f_cleanup: Union[T_CLEANUP, str, None]
The function name to cleanup the session or the function itself.
max_workers: Optional[int] = None
The maximum number of connections. Defaults to number of logical CPU cores.
The maximum number of connections. Defaults to 1.
initializer: Optional[Callable[[], None]]
The initializer function.
"""
Expand All @@ -285,11 +273,10 @@ def __init__(
self.f_run_evaluator = f_run_evaluator
self.f_cleanup = f_cleanup
if max_workers is None:
max_workers = cpu_count(logical=True)
max_workers = 1
logger.info("RPCRunner: max_workers = %d", max_workers)
self.pool = PopenPoolExecutor(
max_workers=max_workers,
timeout=rpc_config.session_timeout_sec,
initializer=initializer,
)
self._sanity_check()
Expand All @@ -312,7 +299,6 @@ def run(self, runner_inputs: List[RunnerInput]) -> List[RunnerFuture]:
str(runner_input.device_type),
tuple(arg_info.as_json() for arg_info in runner_input.args_info),
),
timeout_sec=self.rpc_config.session_timeout_sec,
)
results.append(future) # type: ignore
return results
Expand Down
41 changes: 27 additions & 14 deletions tests/python/unittest/test_meta_schedule_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,23 @@ def main(a: T.handle, b: T.handle, c: T.handle) -> None: # pylint: disable=no-s
C[vi] = A[vi] + B[vi]


# A huge matmul that must cause timeout in the timeout test below.
@tvm.script.ir_module
class MatmulHugeModule:
@T.prim_func
def main(a: T.handle, b: T.handle, c: T.handle) -> None: # pylint: disable=no-self-argument
T.func_attr({"global_symbol": "main", "tir.noalias": True})
A = T.match_buffer(a, (4096, 4096), "float32")
B = T.match_buffer(b, (4096, 4096), "float32")
C = T.match_buffer(c, (4096, 4096), "float32")
for i, j, k in T.grid(4096, 4096, 4096):
with T.block("matmul"):
vi, vj, vk = T.axis.remap("SSR", [i, j, k])
with T.init():
C[vi, vj] = 0.0
C[vi, vj] = C[vi, vj] + A[vi, vk] * B[vk, vj]


# pylint: enable=invalid-name,no-member,line-too-long,too-many-nested-blocks,missing-docstring


Expand Down Expand Up @@ -372,22 +389,20 @@ def run(self, runner_inputs: List[RunnerInput]) -> List[RunnerFuture]:


def test_meta_schedule_rpc_runner_time_out():
"""Test meta schedule RPC Runner time out"""
"""Test meta schedule RPC Runner time out by using a super large workload"""

def initializer():
@register_func("meta_schedule.runner.test_time_out")
def timeout_session_creator( # pylint: disable=unused-variable
rpc_config: RPCConfig, # pylint: disable=unused-argument
) -> RPCSession:
time.sleep(2)
builder = LocalBuilder()
builder_inputs = [BuilderInput(MatmulHugeModule, Target("llvm"))]
builder_results = builder.build(builder_inputs)
builder_results[0].artifact_path

runner_input = RunnerInput(
"test",
builder_results[0].artifact_path,
"llvm",
[
TensorInfo("float32", (MATMUL_N, MATMUL_N)),
TensorInfo("float32", (MATMUL_N, MATMUL_N)),
TensorInfo("float32", (MATMUL_N, MATMUL_N)),
TensorInfo("float32", (4096, 4096)),
TensorInfo("float32", (4096, 4096)),
TensorInfo("float32", (4096, 4096)),
],
)

Expand All @@ -408,15 +423,13 @@ def timeout_session_creator( # pylint: disable=unused-variable
runner = RPCRunner(
rpc_config,
evaluator_config,
initializer=initializer,
f_create_session="meta_schedule.runner.test_time_out",
)
# Run the module
(runner_future,) = runner.run([runner_input])
runner_result = runner_future.result()

assert runner_result.error_msg is not None and runner_result.error_msg.startswith(
"RPCRunner: Timeout, killed after"
"RPCRunner: An exception occurred"
)
assert runner_result.run_secs is None

Expand Down

0 comments on commit a8d490d

Please sign in to comment.