Skip to content

Commit

Permalink
🐛 fix: add overall time logging
Browse files Browse the repository at this point in the history
  • Loading branch information
megasanjay committed Dec 5, 2024
1 parent 162229d commit 482f90c
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 125 deletions.
28 changes: 14 additions & 14 deletions cirrus_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ def worker(
should_process = file_processor.file_should_process(path, input_last_modified)

if not should_process:
logger.threadDebug(
logger.debug(
f"The file {path} has not been modified since the last time it was processed",
)
logger.threadDebug(f"Skipping {path} - File has not been modified")
logger.debug(f"Skipping {path} - File has not been modified")

logger.threadTime(time_estimator.step())
overall_logger.time(overall_time_estimator.step())
Expand All @@ -86,15 +86,15 @@ def worker(

file_processor.clear_errors(path)

logger.threadDebug(f"Processing {path}")
logger.debug(f"Processing {path}")

with tempfile.TemporaryDirectory(prefix="cirrus_pipeline_") as temp_folder_path:
step_1_folder = os.path.join(temp_folder_path, "step1", device)
os.makedirs(step_1_folder, exist_ok=True)

download_path = os.path.join(step_1_folder, file_name)

logger.threadDebug(f"Downloading {file_name} to {download_path}")
logger.debug(f"Downloading {file_name} to {download_path}")

with open(file=download_path, mode="wb") as f:
f.write(input_file_client.download_file().readall())
Expand All @@ -104,7 +104,7 @@ def worker(
step2_folder = os.path.join(temp_folder_path, "step2")
os.makedirs(step2_folder, exist_ok=True)

logger.threadDebug(f"Unzipping {download_path} to {step2_folder}")
logger.debug(f"Unzipping {download_path} to {step2_folder}")

zip_files = imaging_utils.list_zip_files(step_1_folder)

Expand All @@ -123,7 +123,7 @@ def worker(
# process the files
cirrus_instance = Cirrus.Cirrus()

logger.threadDebug(f"Organizing {file_name}")
logger.debug(f"Organizing {file_name}")

try:
for step2_data_folder in step2_data_folders:
Expand Down Expand Up @@ -159,7 +159,7 @@ def worker(
"cirrus_onh_optic_disc_cube",
]

logger.threadDebug("Converting to nema compliant dicom files")
logger.debug("Converting to nema compliant dicom files")

try:
for protocol in protocols:
Expand Down Expand Up @@ -200,7 +200,7 @@ def worker(
metadata_folder = os.path.join(temp_folder_path, "metadata")
os.makedirs(metadata_folder, exist_ok=True)

logger.threadDebug("Formatting files and generating metadata")
logger.debug("Formatting files and generating metadata")

try:
for device_folder in device_list:
Expand Down Expand Up @@ -228,7 +228,7 @@ def worker(
logger.threadInfo(f"Formatted {file_name}")

# Upload the processed files to the output folder
logger.threadDebug(
logger.debug(
f"Uploading outputs of {file_name} to {processed_data_output_folder}"
)

Expand All @@ -239,13 +239,13 @@ def worker(

file_processor.delete_preexisting_output_files(path)

logger.threadDebug(f"Uploading outputs for {file_name}")
logger.debug(f"Uploading outputs for {file_name}")

for root, dirs, files in os.walk(destination_folder):
for file in files:
full_file_path = os.path.join(root, file)

logger.threadDebug(f"Found file {full_file_path}")
logger.debug(f"Found file {full_file_path}")

f2 = full_file_path.split("/")[-5:]

Expand All @@ -255,7 +255,7 @@ def worker(
f"{processed_data_output_folder}/{combined_file_name}"
)

logger.threadDebug(
logger.debug(
f"Uploading {combined_file_name} to {output_file_path}"
)

Expand Down Expand Up @@ -284,7 +284,7 @@ def worker(

logger.threadInfo(f"Uploaded outputs for {file_name}")

logger.threadDebug(f"Uploading metadata for {file_name}")
logger.debug(f"Uploading metadata for {file_name}")

for root, dirs, files in os.walk(metadata_folder):
for file in files:
Expand All @@ -298,7 +298,7 @@ def worker(
f"{processed_metadata_output_folder}/{combined_file_name}"
)

logger.threadDebug(
logger.debug(
f"Uploading {full_file_path} to {processed_metadata_output_folder}"
)

Expand Down
188 changes: 77 additions & 111 deletions utils/logwatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import contextlib
import config
import threading
import json
from colorama import just_fix_windows_console, Fore, Back, Style

just_fix_windows_console()
Expand Down Expand Up @@ -74,65 +75,55 @@ def noPrintTrace(self, message: str):
),
).start()

def debug(self, message: str):
def debug(self, message: str): # sourcery skip: class-extract-method
"""Send a debug message to the logwatch server"""
if self.print:
print(Fore.BLUE + message + Style.RESET_ALL)
with contextlib.suppress(Exception):
threading.Thread(
target=requests.post,
args=(
self.drain,
{"level": "debug", "message": message, "type": "text"},
),
).start()
if self.thread_id != 0:
print(f"{Fore.BLUE}[{self.thread_id}] {message}{Style.RESET_ALL}")
else:
print(f"{Fore.BLUE}{message}{Style.RESET_ALL}")

def threadDebug(self, message: str):
"""Send a debug message to the logwatch server"""
if self.print:
print(f"{Fore.BLUE}[{self.thread_id}] {message}{Style.RESET_ALL}")
with contextlib.suppress(Exception):
args = {
"level": "debug",
"message": message,
"type": "text",
}

if self.thread_id != 0:
args["thread"] = self.thread_id

threading.Thread(
target=requests.post,
args=(
self.drain,
{
"level": "debug",
"message": message,
"type": "text",
"thread": self.thread_id,
},
json.dumps(args),
),
).start()

def info(self, message: str):
"""Send an info message to the logwatch server"""
if self.print:
print(Fore.CYAN + message + Style.RESET_ALL)
with contextlib.suppress(Exception):
threading.Thread(
target=requests.post,
args=(
self.drain,
{"level": "info", "message": message, "type": "text"},
),
).start()
if self.thread_id != 0:
print(f"{Fore.CYAN}[{self.thread_id}] {message}{Style.RESET_ALL}")
else:
print(f"{Fore.CYAN}{message}{Style.RESET_ALL}")

def threadInfo(self, message: str):
"""Send a threaded info message to the logwatch server"""
if self.print:
print(f"{Fore.CYAN}[{self.thread_id}] {message}{Style.RESET_ALL}")
with contextlib.suppress(Exception):
args = {
"level": "info",
"message": message,
"type": "text",
}

if self.thread_id != 0:
args["thread"] = self.thread_id

threading.Thread(
target=requests.post,
args=(
self.drain,
{
"level": "info",
"message": message,
"type": "text",
"thread": self.thread_id,
},
json.dumps(args),
),
).start()

Expand All @@ -152,26 +143,22 @@ def fastInfo(self, message: str):
def error(self, message: str):
"""Send an error message to the logwatch server"""
if self.print:
print(Fore.RED + message + Style.RESET_ALL)
with contextlib.suppress(Exception):
requests.post(
self.drain, json={"level": "error", "message": message, "type": "text"}
)
if self.thread_id != 0:
print(f"{Fore.RED}[{self.thread_id}] {message}{Style.RESET_ALL}")
else:
print(f"{Fore.RED}{message}{Style.RESET_ALL}")

def threadError(self, message: str):
"""Send an error message to the logwatch server"""
if self.print:
print(f"{Fore.RED}[{self.thread_id}] {message}{Style.RESET_ALL}")
with contextlib.suppress(Exception):
requests.post(
self.drain,
json={
"level": "error",
"message": message,
"type": "text",
"thread": self.thread_id,
},
)
args = {
"level": "error",
"message": message,
"type": "text",
}

if self.thread_id != 0:
args["thread"] = self.thread_id

requests.post(self.drain, json.dumps(args))

def warn(self, message: str):
"""Send a warning message to the logwatch server"""
Expand All @@ -196,69 +183,48 @@ def critical(self, message: str):
def time(self, message: str):
"""Send a time message to the logwatch server"""
if self.print:
print(Back.GREEN + Fore.WHITE + message + Style.RESET_ALL)
with contextlib.suppress(Exception):
# Not threaded because it's a time message
requests.post(
self.drain, json={"level": "time", "message": message, "type": "text"}
)
if self.thread_id != 0:
print(
f"{Back.GREEN}{Fore.WHITE}[{self.thread_id}] {message}{Style.RESET_ALL}"
)
else:
print(f"{Back.GREEN}{Fore.WHITE}{message}{Style.RESET_ALL}")

def threadTime(self, message: str):
"""Send a time message to the logwatch server"""
if self.print:
print(
Back.GREEN
+ Fore.WHITE
+ f"[{self.thread_id}] "
+ message
+ Style.RESET_ALL
)
with contextlib.suppress(Exception):
requests.post(
self.drain,
json={
"level": "time",
"message": message,
"type": "text",
"thread": self.thread_id,
},
)
args = {
"level": "time",
"message": message,
"type": "text",
}

if self.thread_id != 0:
args["thread"] = self.thread_id

# Not threaded because it's a time message
requests.post(self.drain, json.dumps(args))

def fastTime(self, message: str):
"""Send a threaded time message to the logwatch server. Used for items that need to be processed quickly"""
if self.print:
print(Back.GREEN + Fore.WHITE + message + Style.RESET_ALL)
with contextlib.suppress(Exception):
threading.Thread(
target=requests.post,
args=(
self.drain,
{"level": "time", "message": message, "type": "text"},
),
).start()
if self.thread_id != 0:
print(
f"{Back.GREEN}{Fore.WHITE}[{self.thread_id}] {message}{Style.RESET_ALL}"
)
else:
print(f"{Back.GREEN}{Fore.WHITE}{message}{Style.RESET_ALL}")
with contextlib.suppress(Exception):
args = {
"level": "time",
"message": message,
"type": "text",
}

if self.thread_id != 0:
args["thread"] = self.thread_id

def threadFastTime(self, message: str):
"""Send a threaded time message to the logwatch server. Used for items that need to be processed quickly"""
if self.print:
print(
Back.GREEN
+ Fore.WHITE
+ f"[{self.thread_id}] "
+ message
+ Style.RESET_ALL
)
with contextlib.suppress(Exception):
threading.Thread(
target=requests.post,
args=(
self.drain,
{
"level": "time",
"message": message,
"type": "text",
"thread": self.thread_id,
},
),
args=(self.drain, json.dumps(args)),
).start()

def noPrintTime(self, message: str):
Expand Down

0 comments on commit 482f90c

Please sign in to comment.