Skip to content

Commit

Permalink
[core] [runtime_env] Use per-env async lock in agent (#17542)
Browse files Browse the repository at this point in the history
Co-authored-by: Ed Oakes <[email protected]>
  • Loading branch information
architkulkarni and edoakes authored Aug 6, 2021
1 parent 2b520ba commit ac9a1a2
Showing 1 changed file with 62 additions and 51 deletions.
113 changes: 62 additions & 51 deletions dashboard/modules/runtime_env/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def __init__(self, dashboard_agent):
# Cache the results of creating envs to avoid repeatedly calling into
# conda and other slow calls.
self._env_cache: Dict[str, CreatedEnvResult] = dict()
# Maps a serialized runtime env to a lock that is used
# to prevent multiple concurrent installs of the same env.
self._env_locks: Dict[str, asyncio.Lock] = dict()

def get_or_create_logger(self, job_id: bytes):
job_id = job_id.decode()
Expand Down Expand Up @@ -75,63 +78,71 @@ def run_setup_with_logger():
return await loop.run_in_executor(None, run_setup_with_logger)

serialized_env = request.serialized_runtime_env
if serialized_env in self._env_cache:
serialized_context = self._env_cache[serialized_env]
result = self._env_cache[serialized_env]
if result.success:
context = result.result
logger.info("Runtime env already created successfully. "
f"Env: {serialized_env}, context: {context}")
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=context)
else:
error_message = result.result
logger.info("Runtime env already failed. "
f"Env: {serialized_env}, err: {error_message}")

if serialized_env not in self._env_locks:
# async lock to prevent the same env being concurrently installed
self._env_locks[serialized_env] = asyncio.Lock()

async with self._env_locks[serialized_env]:
if serialized_env in self._env_cache:
serialized_context = self._env_cache[serialized_env]
result = self._env_cache[serialized_env]
if result.success:
context = result.result
logger.info("Runtime env already created successfully. "
f"Env: {serialized_env}, context: {context}")
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=context)
else:
error_message = result.result
logger.info("Runtime env already failed. "
f"Env: {serialized_env}, err: {error_message}")
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message=error_message)

logger.info(f"Creating runtime env: {serialized_env}")
runtime_env_dict = json.loads(serialized_env or "{}")
uris = runtime_env_dict.get("uris")
runtime_env_context: RuntimeEnvContext = None
error_message = None
for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES):
try:
if uris:
# TODO(guyang.sgy): Try ensure_runtime_env_setup(uris)
# to download packages.
# But we don't initailize internal kv in agent now.
pass
runtime_env_context = await _setup_runtime_env(
serialized_env, self._session_dir)
break
except Exception as ex:
logger.exception("Runtime env creation failed.")
error_message = str(ex)
await asyncio.sleep(
runtime_env_consts.RUNTIME_ENV_RETRY_INTERVAL_MS / 1000
)
if error_message:
logger.error(
"Runtime env creation failed for %d times, "
"don't retry any more.",
runtime_env_consts.RUNTIME_ENV_RETRY_TIMES)
self._env_cache[serialized_env] = CreatedEnvResult(
False, error_message)
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message=error_message)

logger.info(f"Creating runtime env: {serialized_env}")
runtime_env_dict = json.loads(serialized_env or "{}")
uris = runtime_env_dict.get("uris")
runtime_env_context: RuntimeEnvContext = None
error_message = None
for _ in range(runtime_env_consts.RUNTIME_ENV_RETRY_TIMES):
try:
if uris:
# TODO(guyang.sgy): Try `ensure_runtime_env_setup(uris)`
# to download packages.
# But we don't initailize internal kv in agent now.
pass
runtime_env_context = await _setup_runtime_env(
serialized_env, self._session_dir)
break
except Exception as ex:
logger.exception("Runtime env creation failed.")
error_message = str(ex)
await asyncio.sleep(
runtime_env_consts.RUNTIME_ENV_RETRY_INTERVAL_MS / 1000)
if error_message:
logger.error(
"Runtime env creation failed for %d times, "
"don't retry any more.",
runtime_env_consts.RUNTIME_ENV_RETRY_TIMES)
serialized_context = runtime_env_context.serialize()
self._env_cache[serialized_env] = CreatedEnvResult(
False, error_message)
True, serialized_context)
logger.info(
"Successfully created runtime env: %s, the context: %s",
serialized_env, serialized_context)
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message=error_message)

serialized_context = runtime_env_context.serialize()
self._env_cache[serialized_env] = CreatedEnvResult(
True, serialized_context)
logger.info("Successfully created runtime env: %s, the context: %s",
serialized_env, serialized_context)
return runtime_env_agent_pb2.CreateRuntimeEnvReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=serialized_context)
status=agent_manager_pb2.AGENT_RPC_STATUS_OK,
serialized_runtime_env_context=serialized_context)

async def DeleteRuntimeEnv(self, request, context):
# TODO(guyang.sgy): Delete runtime env local files.
Expand Down

0 comments on commit ac9a1a2

Please sign in to comment.