diff --git a/enterprise_gateway/services/kernels/remotemanager.py b/enterprise_gateway/services/kernels/remotemanager.py index 29acf1caa..f29165f56 100644 --- a/enterprise_gateway/services/kernels/remotemanager.py +++ b/enterprise_gateway/services/kernels/remotemanager.py @@ -203,6 +203,7 @@ def __init__(self, **kwargs): self.user_overrides = {} self.restarting = False # need to track whether we're in a restart situation or not + @gen.coroutine def start_kernel(self, **kwargs): """Starts a kernel in a separate process. @@ -222,7 +223,7 @@ def start_kernel(self, **kwargs): process_proxy_class = import_item(process_proxy_class_name) self.process_proxy = process_proxy_class(kernel_manager=self, proxy_config=process_proxy.get('config')) self._capture_user_overrides(**kwargs) - super(RemoteKernelManager, self).start_kernel(**kwargs) + yield super(RemoteKernelManager, self).start_kernel(**kwargs) def _capture_user_overrides(self, **kwargs): """ @@ -286,6 +287,7 @@ def request_shutdown(self, restart=False): if isinstance(self.process_proxy, RemoteProcessProxy): self.process_proxy.shutdown_listener() + @gen.coroutine def restart_kernel(self, now=False, **kwargs): """Restarts a kernel with the arguments that were used to launch it. @@ -319,7 +321,7 @@ def restart_kernel(self, now=False, **kwargs): # Use the parent mapping kernel manager so activity monitoring and culling is also shutdown self.parent.shutdown_kernel(kernel_id, now=now) return - super(RemoteKernelManager, self).restart_kernel(now, **kwargs) + yield gen.maybe_future(super(RemoteKernelManager, self).restart_kernel(now, **kwargs)) if isinstance(self.process_proxy, RemoteProcessProxy): # for remote kernels... # Re-establish activity watching... if self._activity_stream: diff --git a/enterprise_gateway/services/processproxies/conductor.py b/enterprise_gateway/services/processproxies/conductor.py index a97f1b7aa..a502d58a1 100644 --- a/enterprise_gateway/services/processproxies/conductor.py +++ b/enterprise_gateway/services/processproxies/conductor.py @@ -10,6 +10,8 @@ import socket import re +from tornado import gen + from jupyter_client import launch_kernel, localinterfaces from .processproxy import RemoteProcessProxy @@ -34,6 +36,7 @@ def __init__(self, kernel_manager, proxy_config): self.conductor_endpoint = proxy_config.get('conductor_endpoint', kernel_manager.parent.parent.conductor_endpoint) + @gen.coroutine def launch_process(self, kernel_cmd, **kwargs): """Launches the specified process within a Conductor cluster environment.""" super(ConductorClusterProcessProxy, self).launch_process(kernel_cmd, **kwargs) @@ -55,9 +58,8 @@ def launch_process(self, kernel_cmd, **kwargs): self.env = kwargs.get('env') self.log.debug("Conductor cluster kernel launched using Conductor endpoint: {}, pid: {}, Kernel ID: {}, " "cmd: '{}'".format(self.conductor_endpoint, self.local_proc.pid, self.kernel_id, kernel_cmd)) - self.confirm_remote_startup() - - return self + yield self.confirm_remote_startup() + raise gen.Return(self) def _update_launch_info(self, kernel_cmd, **kwargs): """ Dynamically assemble the spark-submit configuration passed from NB2KG.""" @@ -114,6 +116,7 @@ def send_signal(self, signum): else: return super(ConductorClusterProcessProxy, self).send_signal(signum) + @gen.coroutine def kill(self): """Kill a kernel. :return: None if the application existed and is not in RUNNING state, False otherwise. @@ -127,7 +130,7 @@ def kill(self): i = 1 state = self._query_app_state_by_driver_id(self.driver_id) while state not in ConductorClusterProcessProxy.final_states and i <= max_poll_attempts: - time.sleep(poll_interval) + yield gen.sleep(poll_interval) state = self._query_app_state_by_driver_id(self.driver_id) i = i + 1 @@ -138,7 +141,7 @@ def kill(self): self.log.debug("ConductorClusterProcessProxy.kill, application ID: {}, kernel ID: {}, state: {}" .format(self.application_id, self.kernel_id, state)) - return result + raise gen.Return(result) def cleanup(self): # we might have a defunct process (if using waitAppCompletion = false) - so poll, kill, wait when we have @@ -173,6 +176,7 @@ def _parse_driver_submission_id(self, submission_response): self.driver_id = driver_id[0] self.log.debug("Driver ID: {}".format(driver_id[0])) + @gen.coroutine def confirm_remote_startup(self): """ Confirms the application is in a started state before returning. Should post-RUNNING states be unexpectedly encountered ('FINISHED', 'KILLED', 'RECLAIMED') then we must throw, otherwise the rest @@ -187,7 +191,7 @@ def confirm_remote_startup(self): output = self.local_proc.stderr.read().decode("utf-8") self._parse_driver_submission_id(output) i += 1 - self.handle_timeout() + yield self.handle_timeout() if self._get_application_id(True): # Once we have an application ID, start monitoring state, obtain assigned host and get connection info @@ -223,9 +227,10 @@ def _get_application_state(self): self.assigned_ip = socket.gethostbyname(self.assigned_host) return app_state + @gen.coroutine def handle_timeout(self): """Checks to see if the kernel launch timeout has been exceeded while awaiting connection info.""" - time.sleep(poll_interval) + yield gen.sleep(poll_interval) time_interval = RemoteProcessProxy.get_time_diff(self.start_time, RemoteProcessProxy.get_current_time()) if time_interval > self.kernel_launch_timeout: diff --git a/enterprise_gateway/services/processproxies/container.py b/enterprise_gateway/services/processproxies/container.py index b01bd61b9..f4415b343 100644 --- a/enterprise_gateway/services/processproxies/container.py +++ b/enterprise_gateway/services/processproxies/container.py @@ -8,6 +8,8 @@ import urllib3 # docker ends up using this and it causes lots of noise, so turn off warnings +from tornado import gen + from jupyter_client import launch_kernel, localinterfaces from .processproxy import RemoteProcessProxy @@ -51,6 +53,7 @@ def _determine_kernel_images(self, proxy_config): self.kernel_executor_image = proxy_config.get('executor_image_name') self.kernel_executor_image = os.environ.get('KERNEL_EXECUTOR_IMAGE', self.kernel_executor_image) + @gen.coroutine def launch_process(self, kernel_cmd, **kwargs): """Launches the specified process within the container environment.""" # Set env before superclass call so we see these in the debug output @@ -73,9 +76,8 @@ def launch_process(self, kernel_cmd, **kwargs): self.log.info("{}: kernel launched. Kernel image: {}, KernelID: {}, cmd: '{}'" .format(self.__class__.__name__, self.kernel_image, self.kernel_id, kernel_cmd)) - self.confirm_remote_startup() - - return self + yield self.confirm_remote_startup() + raise gen.Return(self) def _enforce_uid_gid_blacklists(self, **kwargs): """Determine UID and GID with which to launch container and ensure they do not appear in blacklist.""" @@ -153,6 +155,7 @@ def cleanup(self): self.kill() super(ContainerProcessProxy, self).cleanup() + @gen.coroutine def confirm_remote_startup(self): """Confirms the container has started and returned necessary connection information.""" self.start_time = RemoteProcessProxy.get_current_time() @@ -160,7 +163,7 @@ def confirm_remote_startup(self): ready_to_connect = False # we're ready to connect when we have a connection file to use while not ready_to_connect: i += 1 - self.handle_timeout() + yield self.handle_timeout() container_status = self.get_container_status(str(i)) if container_status: diff --git a/enterprise_gateway/services/processproxies/distributed.py b/enterprise_gateway/services/processproxies/distributed.py index 77a5053e1..d310aef85 100644 --- a/enterprise_gateway/services/processproxies/distributed.py +++ b/enterprise_gateway/services/processproxies/distributed.py @@ -4,8 +4,8 @@ import os import json -import time +from tornado import gen from subprocess import STDOUT from socket import gethostbyname @@ -29,6 +29,7 @@ def __init__(self, kernel_manager, proxy_config): else: self.hosts = kernel_manager.parent.parent.remote_hosts # from command line or env + @gen.coroutine def launch_process(self, kernel_cmd, **kwargs): """Launches a kernel process on a selected host.""" super(DistributedProcessProxy, self).launch_process(kernel_cmd, **kwargs) @@ -48,9 +49,8 @@ def launch_process(self, kernel_cmd, **kwargs): self.log.info("Kernel launched on '{}', pid: {}, ID: {}, Log file: {}:{}, Command: '{}'. ". format(self.assigned_host, self.pid, self.kernel_id, self.assigned_host, self.kernel_log, kernel_cmd)) - self.confirm_remote_startup() - - return self + yield self.confirm_remote_startup() + raise gen.Return(self) def _launch_remote_process(self, kernel_cmd, **kwargs): """ @@ -121,6 +121,7 @@ def _determine_next_host(self): DistributedProcessProxy.host_index += 1 return next_host + @gen.coroutine def confirm_remote_startup(self): """ Confirms the remote kernel has started by obtaining connection information from the remote host.""" self.start_time = RemoteProcessProxy.get_current_time() @@ -128,23 +129,10 @@ def confirm_remote_startup(self): ready_to_connect = False # we're ready to connect when we have a connection file to use while not ready_to_connect: i += 1 - self.handle_timeout() + yield self.handle_timeout() self.log.debug("{}: Waiting to connect. Host: '{}', KernelID: '{}'". format(i, self.assigned_host, self.kernel_id)) if self.assigned_host != '': ready_to_connect = self.receive_connection_info() - - def handle_timeout(self): - """Checks to see if the kernel launch timeout has been exceeded while awaiting connection info.""" - time.sleep(poll_interval) - time_interval = RemoteProcessProxy.get_time_diff(self.start_time, RemoteProcessProxy.get_current_time()) - - if time_interval > self.kernel_launch_timeout: - reason = "Waited too long ({}s) to get connection file. Check Enterprise Gateway log and kernel " \ - "log ({}:{}) for more information.".\ - format(self.kernel_launch_timeout, self.assigned_host, self.kernel_log) - timeout_message = "KernelID: '{}' launch timeout due to: {}".format(self.kernel_id, reason) - self.kill() - self.log_and_raise(http_status_code=500, reason=timeout_message) diff --git a/enterprise_gateway/services/processproxies/processproxy.py b/enterprise_gateway/services/processproxies/processproxy.py index 10e93956b..e07658136 100644 --- a/enterprise_gateway/services/processproxies/processproxy.py +++ b/enterprise_gateway/services/processproxies/processproxy.py @@ -19,7 +19,7 @@ import random from socket import timeout, socket, gethostbyname, gethostname, AF_INET, SOCK_STREAM, SHUT_RDWR, SHUT_WR -from tornado import web +from tornado import web, gen from calendar import timegm from ipython_genutils.py3compat import with_metaclass from jupyter_client import launch_kernel, localinterfaces @@ -150,6 +150,7 @@ def __init__(self, kernel_manager, proxy_config): self.pgid = 0 @abc.abstractmethod + @gen.coroutine def launch_process(self, kernel_cmd, **kwargs): """Provides basic implementation for launching the process corresponding to the process proxy. @@ -208,17 +209,18 @@ def poll(self): return self.send_signal(0) + @gen.coroutine def wait(self): """Wait for the process to become inactive.""" # If we have a local_proc, call its wait method. This will cleanup any defunct processes when the kernel # is shutdown (when using waitAppCompletion = false). Otherwise (if no local_proc) we'll use polling to # determine if a (remote or revived) process is still active. if self.local_proc: - return self.local_proc.wait() + raise gen.Return(self.local_proc.wait()) for i in range(max_poll_attempts): if self.poll(): - time.sleep(poll_interval) + yield gen.sleep(poll_interval) else: break else: @@ -252,6 +254,7 @@ def send_signal(self, signum): result = self.remote_signal(signum) return result + @gen.coroutine def kill(self): """Terminate the process proxy process. @@ -263,7 +266,7 @@ def kill(self): result = self.terminate() # Send -15 signal first i = 1 while self.poll() is None and i <= max_poll_attempts: - time.sleep(poll_interval) + yield gen.sleep(poll_interval) i = i + 1 if i > max_poll_attempts: # Send -9 signal if process is still alive if self.local_proc: @@ -276,7 +279,7 @@ def kill(self): else: result = self.remote_signal(signal.SIGKILL) self.log.debug("SIGKILL signal sent to pid: {}".format(self.pid)) - return result + raise gen.Return(result) def terminate(self): """Gracefully terminate the process proxy process. @@ -632,11 +635,12 @@ def __init__(self, kernel_manager, proxy_config): super(LocalProcessProxy, self).__init__(kernel_manager, proxy_config) kernel_manager.ip = localinterfaces.LOCALHOST + @gen.coroutine def launch_process(self, kernel_cmd, **kwargs): super(LocalProcessProxy, self).launch_process(kernel_cmd, **kwargs) # launch the local run.sh - self.local_proc = launch_kernel(kernel_cmd, **kwargs) + self.local_proc = yield gen.maybe_future(launch_kernel(kernel_cmd, **kwargs)) self.pid = self.local_proc.pid if hasattr(os, "getpgid"): try: @@ -646,7 +650,7 @@ def launch_process(self, kernel_cmd, **kwargs): self.ip = local_ip self.log.info("Local kernel launched on '{}', pid: {}, pgid: {}, KernelID: {}, cmd: '{}'" .format(self.ip, self.pid, self.pgid, self.kernel_id, kernel_cmd)) - return self + raise gen.Return(self) class RemoteProcessProxy(with_metaclass(abc.ABCMeta, BaseProcessProxyABC)): @@ -664,6 +668,7 @@ def __init__(self, kernel_manager, proxy_config): self.tunnel_processes = {} self._prepare_response_socket() + @gen.coroutine def launch_process(self, kernel_cmd, **kwargs): # Pass along port-range info to kernels... kwargs['env']['EG_MIN_PORT_RANGE_SIZE'] = str(min_port_range_size) @@ -968,9 +973,10 @@ def _extract_pid_info(self, connect_info): self.ip = self.assigned_ip self.local_proc = None + @gen.coroutine def handle_timeout(self): """Checks to see if the kernel launch timeout has been exceeded while awaiting connection info.""" - time.sleep(poll_interval) + yield gen.sleep(poll_interval) time_interval = RemoteProcessProxy.get_time_diff(self.start_time, RemoteProcessProxy.get_current_time()) if time_interval > self.kernel_launch_timeout: diff --git a/enterprise_gateway/services/processproxies/yarn.py b/enterprise_gateway/services/processproxies/yarn.py index c458f83a0..70f564715 100644 --- a/enterprise_gateway/services/processproxies/yarn.py +++ b/enterprise_gateway/services/processproxies/yarn.py @@ -9,6 +9,7 @@ import errno import socket +from tornado import gen from jupyter_client import launch_kernel, localinterfaces from yarn_api_client.resource_manager import ResourceManager @@ -49,6 +50,7 @@ def __init__(self, kernel_manager, proxy_config): else: self.resource_mgr = ResourceManager(address=yarn_master) + @gen.coroutine def launch_process(self, kernel_cmd, **kwargs): """Launches the specified process within a YARN cluster environment.""" super(YarnClusterProcessProxy, self).launch_process(kernel_cmd, **kwargs) @@ -60,9 +62,8 @@ def launch_process(self, kernel_cmd, **kwargs): self.log.debug("Yarn cluster kernel launched using YARN endpoint: {}, pid: {}, Kernel ID: {}, cmd: '{}'" .format(self.yarn_endpoint, self.local_proc.pid, self.kernel_id, kernel_cmd)) - self.confirm_remote_startup() - - return self + yield self.confirm_remote_startup() + raise gen.Return(self) def poll(self): """Submitting a new kernel/app to YARN will take a while to be ACCEPTED. @@ -105,7 +106,6 @@ def kill(self): :return: None if the application existed and is not in RUNNING state, False otherwise. """ state = None - result = False if self._get_application_id(): resp = self._kill_app_by_id(self.application_id) self.log.debug( @@ -119,10 +119,10 @@ def kill(self): state = self._query_app_state_by_id(self.application_id) i = i + 1 - if state in YarnClusterProcessProxy.final_states: - result = None - - super(YarnClusterProcessProxy, self).kill() + if state in YarnClusterProcessProxy.final_states: + result = None + else: + result = super(YarnClusterProcessProxy, self).kill() self.log.debug("YarnClusterProcessProxy.kill, application ID: {}, kernel ID: {}, state: {}" .format(self.application_id, self.kernel_id, state)) @@ -146,6 +146,7 @@ def cleanup(self): # for cleanup, we should call the superclass last super(YarnClusterProcessProxy, self).cleanup() + @gen.coroutine def confirm_remote_startup(self): """ Confirms the yarn application is in a started state before returning. Should post-RUNNING states be unexpectedly encountered (FINISHED, KILLED) then we must throw, otherwise the rest of the gateway will @@ -156,7 +157,7 @@ def confirm_remote_startup(self): ready_to_connect = False # we're ready to connect when we have a connection file to use while not ready_to_connect: i += 1 - self.handle_timeout() + yield self.handle_timeout() if self._get_application_id(True): # Once we have an application ID, start monitoring state, obtain assigned host and get connection info @@ -191,9 +192,10 @@ def _get_application_state(self): self.assigned_ip = socket.gethostbyname(self.assigned_host) return app_state + @gen.coroutine def handle_timeout(self): """Checks to see if the kernel launch timeout has been exceeded while awaiting connection info.""" - time.sleep(poll_interval) + yield gen.sleep(poll_interval) time_interval = RemoteProcessProxy.get_time_diff(self.start_time, RemoteProcessProxy.get_current_time()) if time_interval > self.kernel_launch_timeout: