From 482f90c2581e42b7aaa8c9c60e92110d34029e41 Mon Sep 17 00:00:00 2001 From: Sanjay Soundarajan Date: Wed, 4 Dec 2024 17:14:19 -0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix:=20add=20overall=20time=20lo?= =?UTF-8?q?gging?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cirrus_pipeline_parallel.py | 28 +++--- utils/logwatch.py | 188 +++++++++++++++--------------------- 2 files changed, 91 insertions(+), 125 deletions(-) diff --git a/cirrus_pipeline_parallel.py b/cirrus_pipeline_parallel.py index 4f7135b..be5c13b 100644 --- a/cirrus_pipeline_parallel.py +++ b/cirrus_pipeline_parallel.py @@ -73,10 +73,10 @@ def worker( should_process = file_processor.file_should_process(path, input_last_modified) if not should_process: - logger.threadDebug( + logger.debug( f"The file {path} has not been modified since the last time it was processed", ) - logger.threadDebug(f"Skipping {path} - File has not been modified") + logger.debug(f"Skipping {path} - File has not been modified") logger.threadTime(time_estimator.step()) overall_logger.time(overall_time_estimator.step()) @@ -86,7 +86,7 @@ def worker( file_processor.clear_errors(path) - logger.threadDebug(f"Processing {path}") + logger.debug(f"Processing {path}") with tempfile.TemporaryDirectory(prefix="cirrus_pipeline_") as temp_folder_path: step_1_folder = os.path.join(temp_folder_path, "step1", device) @@ -94,7 +94,7 @@ def worker( download_path = os.path.join(step_1_folder, file_name) - logger.threadDebug(f"Downloading {file_name} to {download_path}") + logger.debug(f"Downloading {file_name} to {download_path}") with open(file=download_path, mode="wb") as f: f.write(input_file_client.download_file().readall()) @@ -104,7 +104,7 @@ def worker( step2_folder = os.path.join(temp_folder_path, "step2") os.makedirs(step2_folder, exist_ok=True) - logger.threadDebug(f"Unzipping {download_path} to {step2_folder}") + logger.debug(f"Unzipping {download_path} to {step2_folder}") zip_files = imaging_utils.list_zip_files(step_1_folder) @@ -123,7 +123,7 @@ def worker( # process the files cirrus_instance = Cirrus.Cirrus() - logger.threadDebug(f"Organizing {file_name}") + logger.debug(f"Organizing {file_name}") try: for step2_data_folder in step2_data_folders: @@ -159,7 +159,7 @@ def worker( "cirrus_onh_optic_disc_cube", ] - logger.threadDebug("Converting to nema compliant dicom files") + logger.debug("Converting to nema compliant dicom files") try: for protocol in protocols: @@ -200,7 +200,7 @@ def worker( metadata_folder = os.path.join(temp_folder_path, "metadata") os.makedirs(metadata_folder, exist_ok=True) - logger.threadDebug("Formatting files and generating metadata") + logger.debug("Formatting files and generating metadata") try: for device_folder in device_list: @@ -228,7 +228,7 @@ def worker( logger.threadInfo(f"Formatted {file_name}") # Upload the processed files to the output folder - logger.threadDebug( + logger.debug( f"Uploading outputs of {file_name} to {processed_data_output_folder}" ) @@ -239,13 +239,13 @@ def worker( file_processor.delete_preexisting_output_files(path) - logger.threadDebug(f"Uploading outputs for {file_name}") + logger.debug(f"Uploading outputs for {file_name}") for root, dirs, files in os.walk(destination_folder): for file in files: full_file_path = os.path.join(root, file) - logger.threadDebug(f"Found file {full_file_path}") + logger.debug(f"Found file {full_file_path}") f2 = full_file_path.split("/")[-5:] @@ -255,7 +255,7 @@ def worker( f"{processed_data_output_folder}/{combined_file_name}" ) - logger.threadDebug( + logger.debug( f"Uploading {combined_file_name} to {output_file_path}" ) @@ -284,7 +284,7 @@ def worker( logger.threadInfo(f"Uploaded outputs for {file_name}") - logger.threadDebug(f"Uploading metadata for {file_name}") + logger.debug(f"Uploading metadata for {file_name}") for root, dirs, files in os.walk(metadata_folder): for file in files: @@ -298,7 +298,7 @@ def worker( f"{processed_metadata_output_folder}/{combined_file_name}" ) - logger.threadDebug( + logger.debug( f"Uploading {full_file_path} to {processed_metadata_output_folder}" ) diff --git a/utils/logwatch.py b/utils/logwatch.py index 08cd624..123f6f6 100644 --- a/utils/logwatch.py +++ b/utils/logwatch.py @@ -2,6 +2,7 @@ import contextlib import config import threading +import json from colorama import just_fix_windows_console, Fore, Back, Style just_fix_windows_console() @@ -74,65 +75,55 @@ def noPrintTrace(self, message: str): ), ).start() - def debug(self, message: str): + def debug(self, message: str): # sourcery skip: class-extract-method """Send a debug message to the logwatch server""" if self.print: - print(Fore.BLUE + message + Style.RESET_ALL) - with contextlib.suppress(Exception): - threading.Thread( - target=requests.post, - args=( - self.drain, - {"level": "debug", "message": message, "type": "text"}, - ), - ).start() + if self.thread_id != 0: + print(f"{Fore.BLUE}[{self.thread_id}] {message}{Style.RESET_ALL}") + else: + print(f"{Fore.BLUE}{message}{Style.RESET_ALL}") - def threadDebug(self, message: str): - """Send a debug message to the logwatch server""" - if self.print: - print(f"{Fore.BLUE}[{self.thread_id}] {message}{Style.RESET_ALL}") with contextlib.suppress(Exception): + args = { + "level": "debug", + "message": message, + "type": "text", + } + + if self.thread_id != 0: + args["thread"] = self.thread_id + threading.Thread( target=requests.post, args=( self.drain, - { - "level": "debug", - "message": message, - "type": "text", - "thread": self.thread_id, - }, + json.dumps(args), ), ).start() def info(self, message: str): """Send an info message to the logwatch server""" if self.print: - print(Fore.CYAN + message + Style.RESET_ALL) - with contextlib.suppress(Exception): - threading.Thread( - target=requests.post, - args=( - self.drain, - {"level": "info", "message": message, "type": "text"}, - ), - ).start() + if self.thread_id != 0: + print(f"{Fore.CYAN}[{self.thread_id}] {message}{Style.RESET_ALL}") + else: + print(f"{Fore.CYAN}{message}{Style.RESET_ALL}") - def threadInfo(self, message: str): - """Send a threaded info message to the logwatch server""" - if self.print: - print(f"{Fore.CYAN}[{self.thread_id}] {message}{Style.RESET_ALL}") with contextlib.suppress(Exception): + args = { + "level": "info", + "message": message, + "type": "text", + } + + if self.thread_id != 0: + args["thread"] = self.thread_id + threading.Thread( target=requests.post, args=( self.drain, - { - "level": "info", - "message": message, - "type": "text", - "thread": self.thread_id, - }, + json.dumps(args), ), ).start() @@ -152,26 +143,22 @@ def fastInfo(self, message: str): def error(self, message: str): """Send an error message to the logwatch server""" if self.print: - print(Fore.RED + message + Style.RESET_ALL) - with contextlib.suppress(Exception): - requests.post( - self.drain, json={"level": "error", "message": message, "type": "text"} - ) + if self.thread_id != 0: + print(f"{Fore.RED}[{self.thread_id}] {message}{Style.RESET_ALL}") + else: + print(f"{Fore.RED}{message}{Style.RESET_ALL}") - def threadError(self, message: str): - """Send an error message to the logwatch server""" - if self.print: - print(f"{Fore.RED}[{self.thread_id}] {message}{Style.RESET_ALL}") with contextlib.suppress(Exception): - requests.post( - self.drain, - json={ - "level": "error", - "message": message, - "type": "text", - "thread": self.thread_id, - }, - ) + args = { + "level": "error", + "message": message, + "type": "text", + } + + if self.thread_id != 0: + args["thread"] = self.thread_id + + requests.post(self.drain, json.dumps(args)) def warn(self, message: str): """Send a warning message to the logwatch server""" @@ -196,69 +183,48 @@ def critical(self, message: str): def time(self, message: str): """Send a time message to the logwatch server""" if self.print: - print(Back.GREEN + Fore.WHITE + message + Style.RESET_ALL) - with contextlib.suppress(Exception): - # Not threaded because it's a time message - requests.post( - self.drain, json={"level": "time", "message": message, "type": "text"} - ) + if self.thread_id != 0: + print( + f"{Back.GREEN}{Fore.WHITE}[{self.thread_id}] {message}{Style.RESET_ALL}" + ) + else: + print(f"{Back.GREEN}{Fore.WHITE}{message}{Style.RESET_ALL}") - def threadTime(self, message: str): - """Send a time message to the logwatch server""" - if self.print: - print( - Back.GREEN - + Fore.WHITE - + f"[{self.thread_id}] " - + message - + Style.RESET_ALL - ) with contextlib.suppress(Exception): - requests.post( - self.drain, - json={ - "level": "time", - "message": message, - "type": "text", - "thread": self.thread_id, - }, - ) + args = { + "level": "time", + "message": message, + "type": "text", + } + + if self.thread_id != 0: + args["thread"] = self.thread_id + + # Not threaded because it's a time message + requests.post(self.drain, json.dumps(args)) def fastTime(self, message: str): """Send a threaded time message to the logwatch server. Used for items that need to be processed quickly""" if self.print: - print(Back.GREEN + Fore.WHITE + message + Style.RESET_ALL) - with contextlib.suppress(Exception): - threading.Thread( - target=requests.post, - args=( - self.drain, - {"level": "time", "message": message, "type": "text"}, - ), - ).start() + if self.thread_id != 0: + print( + f"{Back.GREEN}{Fore.WHITE}[{self.thread_id}] {message}{Style.RESET_ALL}" + ) + else: + print(f"{Back.GREEN}{Fore.WHITE}{message}{Style.RESET_ALL}") + with contextlib.suppress(Exception): + args = { + "level": "time", + "message": message, + "type": "text", + } + + if self.thread_id != 0: + args["thread"] = self.thread_id - def threadFastTime(self, message: str): - """Send a threaded time message to the logwatch server. Used for items that need to be processed quickly""" - if self.print: - print( - Back.GREEN - + Fore.WHITE - + f"[{self.thread_id}] " - + message - + Style.RESET_ALL - ) - with contextlib.suppress(Exception): threading.Thread( target=requests.post, - args=( - self.drain, - { - "level": "time", - "message": message, - "type": "text", - "thread": self.thread_id, - }, - ), + args=(self.drain, json.dumps(args)), ).start() def noPrintTime(self, message: str):