Skip to content

Commit

Permalink
Fixed an issue where logging names were not being used correctly, cho…
Browse files Browse the repository at this point in the history
…se more appropriate messaging levels, and added a more complete resource cleanup mechanism
  • Loading branch information
christophertubbs committed Sep 10, 2024
1 parent 903bf73 commit fb76c69
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 51 deletions.
166 changes: 119 additions & 47 deletions python/services/evaluationservice/dmod/evaluationservice/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
from utilities.common import ErrorCounter
from utilities import streams

CT = typing.TypeVar("CT")
"""A type of object that may be cleaned up"""

EXCEPTION_LIMIT: typing.Final[int] = 10
"""
The maximum number of a specific type of error to catch before exiting. If an error occurs 11 times in rapid
Expand Down Expand Up @@ -80,9 +83,7 @@ def signal_handler(signum: int, frame):
signal_description: str = signal.strsignal(signum).split(":")[0]
service.error(f"Received external signal: {signal_description}")

if REDIS_PARAMETERS_FOR_PROCESS.is_valid:
service.error("Cleaning up redis resources...")
cleanup(redis_parameters=REDIS_PARAMETERS_FOR_PROCESS)
Cleanupable.cleanup_all()

service.error("Now exiting...")
sys.exit(signum)
Expand Down Expand Up @@ -332,16 +333,17 @@ def mark_complete(self, connection: redis.Redis, object_manager: DMODObjectManag
try:
object_manager.free(self.evaluation_id, fail_on_missing_scope=False)
except KeyError:
service.info(f"There is no scope for '{self.evaluation_id}' in the object manager to close")
service.error(f"There is no scope for '{self.evaluation_id}' in the object manager to close")
except Exception as exception:
service.error(
f"Failed to clear the scope of shared objects in evaluation '{self.evaluation_id}'",
exception=exception
)

try:
service.info(
f"{self.stream_name}:{self.group_name}:{self.consumer_name} will now delete message '{self.message_id}' since it has been consumed"
service.debug(
f"{self.stream_name}:{self.group_name}:{self.consumer_name} will now delete message '{self.message_id}' "
f"since it has been consumed"
)
confirmation = connection.xdel(self.stream_name, self.message_id)
return bool(confirmation)
Expand All @@ -350,6 +352,63 @@ def mark_complete(self, connection: redis.Redis, object_manager: DMODObjectManag
return False


class Cleanupable(typing.Generic[CT]):
"""
A wrapper that details how to clean up resources
"""
items_to_cleanup: typing.List[Cleanupable] = []
"""A class level collection of objects marked to be cleaned"""

@classmethod
def cleanup_all(cls):
"""
Call the cleanup function for all marked objects
"""
for item in cls.items_to_cleanup:
try:
service.debug(f"Calling '{item}'...")
item.cleanup()
except Exception as exception:
service.error(f"Could not call '{item.method}' to clean up '{item.item}'", exception)

@classmethod
def schedule_for_cleanup(cls, item: CT, method: typing.Callable[[CT], typing.Any] = None):
"""
Record an object that needs to be cleaned up later
Args:
item: The item to clean up
method: How to clean the item up
"""
cls.items_to_cleanup.append(cls(item=item, method=method))

def __init__(self, item: CT, method: typing.Callable[[CT], typing.Any]):
"""
Constructor
Args:
item: The item to clean up
method: How to clean the item up
"""
self.item = item
self.method = method

def cleanup(self):
"""
Call the method that will perform clean up operations
"""
self.method(self.item)

def __call__(self):
self.cleanup()

def __del__(self):
self.cleanup()

def __str__(self):
return f"{self.method.__qualname__}({self.item})"


def launch_evaluation(
stream_message: streams.StreamMessage,
worker_pool: futures.Executor,
Expand All @@ -369,8 +428,18 @@ def launch_evaluation(
payload = stream_message.payload
evaluation_id = payload.get('evaluation_id')
scope = object_manager.establish_scope(evaluation_id)

service.debug(f"Launching an evaluation for {evaluation_id}...")
instructions = payload.get("instructions")

if not instructions:
raise ValueError(f"Cannot launch an evaluation with no instructions: {stream_message}")

if isinstance(instructions, dict):
instructions = json.dumps(instructions, indent=4)

try:
# Build communicators that will communicate evaluation updates outside of the evaluation process
communicators: CommunicatorGroup = utilities.get_communicators(
communicator_id=evaluation_id,
verbosity=payload.get("verbosity"),
Expand All @@ -380,23 +449,14 @@ def launch_evaluation(
password=service.REDIS_PASSWORD,
include_timestamp=False
)
service.info(f"Communicators have been created for the evaluation named '{evaluation_id}'")
service.debug(f"Communicators have been created for the evaluation named '{evaluation_id}'")
except Exception as exception:
service.error(
message=f"Could not create communicators for evaluation: {evaluation_id} due to {exception}",
exception=exception
)
return None

service.info(f"Launching an evaluation for {evaluation_id}...")
instructions = payload.get("instructions")

if not instructions:
raise ValueError(f"Cannot launch an evaluation with no instructions: {stream_message}")

if isinstance(instructions, dict):
instructions = json.dumps(instructions, indent=4)

arguments = WorkerProcessArguments(
evaluation_id=payload['evaluation_id'],
instructions=instructions,
Expand All @@ -406,7 +466,7 @@ def launch_evaluation(
)

try:
service.info(f"Submitting the evaluation job for {evaluation_id}...")
service.debug(f"Submitting the evaluation job for {evaluation_id}...")
evaluation_job: futures.Future = worker_pool.submit(
worker.evaluate,
**arguments.kwargs
Expand All @@ -428,10 +488,9 @@ def launch_evaluation(
job=new_job
)

service.info(f"Preparing to monitor objects for {evaluation_id}...")
service.debug(f"Preparing to monitor objects for {evaluation_id}...")
try:
object_manager.monitor_operation(evaluation_id, evaluation_job)
service.info(f"Evaluation for {evaluation_id} has been launched.")
except BaseException as exception:
service.error(f"Could not monitor {evaluation_id} due to: {exception}")
traceback.print_exc()
Expand Down Expand Up @@ -466,7 +525,7 @@ def interpret_message(
)

if 'purpose' not in stream_message.payload:
service.info(f"A purpose was not communicated through the {service.EVALUATION_QUEUE_NAME} stream")
service.debug(f"A purpose was not communicated through the {service.EVALUATION_QUEUE_NAME} stream")
return None

purpose = stream_message.payload.get("purpose").lower()
Expand All @@ -477,7 +536,7 @@ def interpret_message(
stop_signal.set()
service.info("Exit message received. Closing the runner.")
else:
service.info(
service.error(
f"Runner => The purpose was not to launch or terminate. Only launching is handled through the runner."
f"{os.linesep}Message: {json.dumps(stream_message.payload)}"
)
Expand Down Expand Up @@ -554,6 +613,16 @@ def get_consumer_name() -> str:
return f"Listener {os.getpid()}"


def shutdown_pool(pool: futures.Executor):
"""
Shutdown an actively running executor
Args:
pool: The executor to shut down
"""
if pool and hasattr(pool, "shutdown") and isinstance(getattr(pool, "shutdown"), typing.Callable):
pool.shutdown()

def listen(
stream_parameters: streams.StreamParameters,
job_limit: int = None
Expand All @@ -565,7 +634,6 @@ def listen(
stream_parameters: The means to connect to redis
job_limit: The maximum number of jobs that may be run at once
"""

job_limit = job_limit or int(float(os.environ.get("MAXIMUM_RUNNING_JOBS", os.cpu_count())))
"""The maximum number of jobs that may be actively running from this listener"""

Expand Down Expand Up @@ -600,6 +668,7 @@ def listen(

try:
with get_object_manager(monitor_scope=True) as object_manager, executor_type() as worker_pool:
Cleanupable.schedule_for_cleanup(worker_pool, shutdown_pool)
object_manager.logger = get_logger()

monitoring_thread = threading.Thread(
Expand All @@ -617,7 +686,7 @@ def listen(

while not stop_signal.is_set():
if already_listening:
service.info(f"{consumer_name}: Starting to listen for evaluation jobs again")
service.warn(f"{consumer_name}: Starting to listen for evaluation jobs again")
else:
already_listening = True

Expand All @@ -632,7 +701,7 @@ def listen(
)

for message in message_stream:
service.info(
service.debug(
f"{message.stream_name}:{message.group_name}:{consumer_name}: Received message "
f"'{message.message_id}'"
)
Expand All @@ -651,13 +720,19 @@ def listen(
else:
# Since this message isn't considered one for the runner, acknowledge that it's been seen
# and move on so something else may view the message
service.info(f"{message.stream_name}:{message.group_name}:{consumer_name}: Acknowledging message '{message.message_id}'")
service.debug(
f"{message.stream_name}:{message.group_name}:{consumer_name}: "
f"Acknowledging message '{message.message_id}'"
)
connection.xack(
stream_parameters.stream_name,
stream_parameters.group_name,
message.message_id
)
service.info(f"{message.stream_name}:{message.group_name} will no longer use message '{message.message_id}'")
service.debug(
f"{message.stream_name}:{message.group_name} will no longer use message "
f"'{message.message_id}'"
)

if stop_signal.is_set():
break
Expand All @@ -672,7 +747,6 @@ def listen(
monitoring_thread.join(timeout=5)



def initialize_consumer(stream_name: str, group_name: str, consumer_name: str, redis_connection: redis.Redis) -> None:
"""
Create a consumer that will retrieve messages for a group. Generated streams will have a limited lifespan
Expand All @@ -683,12 +757,12 @@ def initialize_consumer(stream_name: str, group_name: str, consumer_name: str, r
consumer_name: The name of the consumer to create
redis_connection: A redis connection to communicate through
"""
service.info(
service.debug(
f"initializing a consumer named '{consumer_name}' for the '{group_name}' group for the {stream_name} stream"
)

try:
service.info(
service.debug(
f"Making sure that the '{group_name}' group has been added to the '{stream_name}' stream"
)
# Create a group on the given stream, creating the stream if it doesn't exist, and allow it to read all
Expand All @@ -698,32 +772,34 @@ def initialize_consumer(stream_name: str, group_name: str, consumer_name: str, r
if 'consumer group name already exists' not in str(group_create_error).lower():
raise group_create_error

service.info(f"The '{group_name}' consumer group is active on the '{stream_name}' stream")
service.info(
service.debug(f"The '{group_name}' consumer group is active on the '{stream_name}' stream")
service.debug(
f"Adding the '{consumer_name}' consumer to the '{group_name}' group on the '{stream_name}' stream"
)

# Create a consumer for a group that may read from the stream and add data to the group
redis_connection.xgroup_createconsumer(name=stream_name, groupname=group_name, consumername=consumer_name)


def cleanup(redis_parameters: streams.StreamParameters) -> None:
def cleanup_redis(redis_parameters: streams.StreamParameters) -> None:
"""
Clean up any leftover artifacts that might be considered no longer needed
Args:
redis_parameters: The means to connect to redis and access a stream
"""
connection = redis_parameters.get_connection()
if redis_parameters.is_valid:
service.debug("Cleaning up redis resources...")
connection = redis_parameters.get_connection()

try:
connection.xgroup_delconsumer(
name=redis_parameters.stream_name,
groupname=redis_parameters.group_name,
consumername=get_consumer_name()
)
except Exception as exception:
service.error(exception)
try:
connection.xgroup_delconsumer(
name=redis_parameters.stream_name,
groupname=redis_parameters.group_name,
consumername=get_consumer_name()
)
except Exception as exception:
service.error(exception)


def main(*args):
Expand All @@ -748,23 +824,19 @@ def main(*args):
group_name=arguments.group_name,
)

# Basic information needs to be accessible globally for the cleanup process to run in case of an interrupt
REDIS_PARAMETERS_FOR_PROCESS.kvstore_parameters = redis_parameters.kvstore_parameters
REDIS_PARAMETERS_FOR_PROCESS.stream_name = redis_parameters.stream_name
REDIS_PARAMETERS_FOR_PROCESS.group_name = redis_parameters.group_name
Cleanupable.schedule_for_cleanup(redis_parameters, cleanup_redis)

try:
listen(stream_parameters=redis_parameters, job_limit=arguments.limit)
exit_code = SUCCESSFUL_EXIT
except KeyboardInterrupt:
exit_code = SUCCESSFUL_EXIT
cleanup(redis_parameters=REDIS_PARAMETERS_FOR_PROCESS)
except Exception as exception:
service.error(exception)
exit_code = ERROR_EXIT
finally:
try:
cleanup(redis_parameters=redis_parameters)
cleanup_redis(redis_parameters=redis_parameters)
except Exception as exception:
service.error(exception)
exit_code = ERROR_EXIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def info(message: MESSAGE, logger_name: str = None):
message: The message to log
logger_name: The name of the logger to use. The default is used if none is passed
"""
log(message, logger_name, level=logging.INFO)
log(message, logger_name=logger_name, level=logging.INFO)


def warn(message: MESSAGE, logger_name: str = None):
Expand All @@ -655,7 +655,7 @@ def warn(message: MESSAGE, logger_name: str = None):
message: The message to log
logger_name: The name of the logger to use. The default is used if none is passed
"""
log(message, logger_name, level=logging.WARNING)
log(message, logger_name=logger_name, level=logging.WARNING)


def warning(message: MESSAGE, logger_name: str = None):
Expand All @@ -666,7 +666,7 @@ def warning(message: MESSAGE, logger_name: str = None):
message: The message to log
logger_name: The name of the logger to use. The default is used if none is passed
"""
log(message, logger_name, level=logging.WARNING)
log(message, logger_name=logger_name, level=logging.WARNING)


def error(message: MESSAGE, exception: Exception = None, logger_name: str = None):
Expand All @@ -689,7 +689,7 @@ def debug(message: MESSAGE, logger_name: str = None):
message: A diagnostic message or exception to write to a log
logger_name: The name of the logger to write to
"""
log(message, logger_name, level=logging.DEBUG)
log(message, logger_name=logger_name, level=logging.DEBUG)


def get_logger(logger_name: str = None) -> ConfiguredLogger:
Expand Down

0 comments on commit fb76c69

Please sign in to comment.