Skip to content

Commit

Permalink
fix: remove spectralis file
Browse files Browse the repository at this point in the history
  • Loading branch information
Aydawka committed Feb 7, 2025
1 parent bb60e08 commit 7992fcb
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 710 deletions.
330 changes: 197 additions & 133 deletions spectralis_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,154 +1,55 @@
"""Process spectralis data files"""

import contextlib
from imaging.imaging_spectralis_root import Spectralis

import argparse
import os
import tempfile
import shutil
from imaging.imaging_spectralis_root import Spectralis
import azure.storage.filedatalake as azurelake
import config
import contextlib
import time
import csv
from traceback import format_exc
import json
import sys
import imaging.imaging_utils as imaging_utils
import azure.storage.filedatalake as azurelake
import config
import utils.dependency as deps
from traceback import format_exc
import csv
import utils.logwatch as logging
from utils.file_map_processor import FileMapProcessor
from utils.time_estimator import TimeEstimator


def pipeline(
study_id: str,
): # sourcery skip: low-code-quality, move-assign, use-named-expression
"""Process spectralis data files for a study
Args:
study_id (str): the study id
"""

if study_id is None or not study_id:
raise ValueError("study_id is required")

input_folder = f"{study_id}/pooled-data/Spectralis"
processed_data_output_folder = f"{study_id}/pooled-data/Spectralis-processed"
processed_metadata_output_folder = f"{study_id}/pooled-data/Spectralis-metadata"
dependency_folder = f"{study_id}/dependency/Spectralis"
pipeline_workflow_log_folder = f"{study_id}/logs/Spectralis"
ignore_file = f"{study_id}/ignore/spectralis.ignore"
participant_filter_list_file = f"{study_id}/dependency/PatientID/AllParticipantIDs07-01-2023through07-31-2024.csv"

logger = logging.Logwatch("spectralis", print=True)
from functools import partial
from multiprocessing.pool import ThreadPool

overall_time_estimator = TimeEstimator(1) # default to 1 for now


def worker(
workflow_file_dependencies,
file_processor,
processed_data_output_folder,
processed_metadata_output_folder,
file_paths: list,
worker_id: int,
): # sourcery skip: low-code-quality
"""This function handles the work done by the worker threads,
and contains core operations: downloading, processing, and uploading files."""

logger = logging.Logwatch(
"spectralis",
print=True,
thread_id=worker_id,
overall_time_estimator=overall_time_estimator,
)

# Get the list of blobs in the input folder
file_system_client = azurelake.FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name="stage-1-container",
)

with contextlib.suppress(Exception):
file_system_client.delete_directory(processed_data_output_folder)

with contextlib.suppress(Exception):
file_system_client.delete_directory(processed_metadata_output_folder)

with contextlib.suppress(Exception):
file_system_client.delete_file(f"{dependency_folder}/file_map.json")

file_paths = []
participant_filter_list = []

# Create a temporary folder on the local machine
meta_temp_folder_path = tempfile.mkdtemp(prefix="spectralis_meta_")

# Get the participant filter list file
with contextlib.suppress(Exception):
file_client = file_system_client.get_file_client(
file_path=participant_filter_list_file
)

temp_participant_filter_list_file = os.path.join(
meta_temp_folder_path, "filter_file.csv"
)

with open(file=temp_participant_filter_list_file, mode="wb") as f:
f.write(file_client.download_file().readall())

with open(file=temp_participant_filter_list_file, mode="r") as f:
reader = csv.reader(f)
for row in reader:
participant_filter_list.append(row[0])

# remove the first row
participant_filter_list.pop(0)

batch_folder_paths = file_system_client.get_paths(
path=input_folder, recursive=False
)

logger.debug(f"Getting batch folder paths in {input_folder}")

for batch_folder_path in batch_folder_paths:
t = str(batch_folder_path.name)

batch_folder = t.split("/")[-1]

# Check if the folder name is in the format siteName_dataType_startDate-endDate
if len(batch_folder.split("_")) != 3:
continue

site_name, data_type, start_date_end_date = batch_folder.split("_")

start_date, end_date = start_date_end_date.split("-")

# For each batch folder, get the list of files in the /DICOM folder

dicom_folder_path = f"{input_folder}/{batch_folder}/DICOM"

logger.debug(f"Getting dicom file paths in {dicom_folder_path}")

dicom_file_paths = file_system_client.get_paths(
path=dicom_folder_path, recursive=True
)

count = 0

for dicom_file_path in dicom_file_paths:
count += 1

q = str(dicom_file_path.name)

file_paths.append(
{
"file_path": q,
"status": "failed",
"processed": False,
"batch_folder": batch_folder,
"site_name": site_name,
"data_type": data_type,
"start_date": start_date,
"end_date": end_date,
"organize_result": "",
"organize_error": True,
"convert_error": True,
"format_error": True,
"output_uploaded": False,
"output_files": [],
}
)

logger.debug(f"Added {count} items to the file map - Total: {len(file_paths)}")

logger.info(f"Found {len(file_paths)} items in {input_folder}")

# Create a temporary folder on the local machine
temp_folder_path = tempfile.mkdtemp(prefix="spectralis_")

total_files = len(file_paths)

file_processor = FileMapProcessor(dependency_folder, ignore_file)

workflow_file_dependencies = deps.WorkflowFileDependencies()

time_estimator = TimeEstimator(total_files)

for file_item in file_paths:
Expand Down Expand Up @@ -441,8 +342,157 @@ def pipeline(

logger.time(time_estimator.step())

file_processor.delete_out_of_date_output_files()

def pipeline(study_id: str, workers: int = 4, args: list = None):
"""The function contains the work done by
the main thread, which runs only once for each operation."""

if args is None:
args = []

global overall_time_estimator

# Process cirrus data files for a study. Args:study_id (str): the study id
if study_id is None or not study_id:
raise ValueError("study_id is required")

input_folder = f"{study_id}/pooled-data/Spectralis"
processed_data_output_folder = f"{study_id}/pooled-data/Spectralis-processed"
processed_metadata_output_folder = f"{study_id}/pooled-data/Spectralis-metadata"
dependency_folder = f"{study_id}/dependency/Spectralis"
pipeline_workflow_log_folder = f"{study_id}/logs/Spectralis"
ignore_file = f"{study_id}/ignore/spectralis.ignore"
participant_filter_list_file = f"{study_id}/dependency/PatientID/AllParticipantIDs07-01-2023through07-31-2024.csv"

logger = logging.Logwatch("spectralis", print=True)

# Get the list of blobs in the input folder
file_system_client = azurelake.FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name="stage-1-container",
)

with contextlib.suppress(Exception):
file_system_client.delete_directory(processed_data_output_folder)

with contextlib.suppress(Exception):
file_system_client.delete_directory(processed_metadata_output_folder)

with contextlib.suppress(Exception):
file_system_client.delete_file(f"{dependency_folder}/file_map.json")

file_paths = []
participant_filter_list = []

# Create a temporary folder on the local machine
meta_temp_folder_path = tempfile.mkdtemp(prefix="spectralis_meta_")

# Get the participant filter list file
with contextlib.suppress(Exception):
file_client = file_system_client.get_file_client(
file_path=participant_filter_list_file
)

temp_participant_filter_list_file = os.path.join(
meta_temp_folder_path, "filter_file.csv"
)

with open(file=temp_participant_filter_list_file, mode="wb") as f:
f.write(file_client.download_file().readall())

with open(file=temp_participant_filter_list_file, mode="r") as f:
reader = csv.reader(f)
for row in reader:
participant_filter_list.append(row[0])

# remove the first row
participant_filter_list.pop(0)

batch_folder_paths = file_system_client.get_paths(
path=input_folder, recursive=False
)

logger.debug(f"Getting batch folder paths in {input_folder}")

for batch_folder_path in batch_folder_paths:
t = str(batch_folder_path.name)

batch_folder = t.split("/")[-1]

# Check if the folder name is in the format siteName_dataType_startDate-endDate
if len(batch_folder.split("_")) != 3:
continue

site_name, data_type, start_date_end_date = batch_folder.split("_")

start_date, end_date = start_date_end_date.split("-")

# For each batch folder, get the list of files in the /DICOM folder

dicom_folder_path = f"{input_folder}/{batch_folder}/DICOM"

logger.debug(f"Getting dicom file paths in {dicom_folder_path}")

dicom_file_paths = file_system_client.get_paths(
path=dicom_folder_path, recursive=True
)

count = 0

for dicom_file_path in dicom_file_paths:
count += 1

q = str(dicom_file_path.name)

file_paths.append(
{
"file_path": q,
"status": "failed",
"processed": False,
"batch_folder": batch_folder,
"site_name": site_name,
"data_type": data_type,
"start_date": start_date,
"end_date": end_date,
"organize_result": "",
"organize_error": True,
"convert_error": True,
"format_error": True,
"output_uploaded": False,
"output_files": [],
}
)

logger.debug(f"Added {count} items to the file map - Total: {len(file_paths)}")

total_files = len(file_paths)

logger.info(f"Found {len(file_paths)} items in {input_folder}")

workflow_file_dependencies = deps.WorkflowFileDependencies()
file_processor = FileMapProcessor(dependency_folder, ignore_file, args)

overall_time_estimator = TimeEstimator(total_files)

# Guarantees that all paths are considered, even if the number of items is not evenly divisible by workers.
chunk_size = (len(file_paths) + workers - 1) // workers
# Comprehension that fills out and pass to worker func final 2 args: chunks and worker_id
chunks = [file_paths[i : i + chunk_size] for i in range(0, total_files, chunk_size)]
args = [(chunk, index + 1) for index, chunk in enumerate(chunks)]
pipe = partial(
worker,
workflow_file_dependencies,
file_processor,
processed_data_output_folder,
processed_metadata_output_folder,
)

# Thread pool created
pool = ThreadPool(workers)
# Distributes the pipe function across the threads in the pool
pool.starmap(pipe, args)

file_processor.delete_out_of_date_output_files()
file_processor.remove_seen_flag_from_map()

logger.debug(f"Uploading file map to {dependency_folder}/file_map.json")
Expand Down Expand Up @@ -510,4 +560,18 @@ def pipeline(


if __name__ == "__main__":
pipeline("AI-READI")
sys_args = sys.argv

workers = 4

parser = argparse.ArgumentParser(description="Process spectralis data files")
parser.add_argument(
"--workers", type=int, default=workers, help="Number of workers to use"
)
args = parser.parse_args()

workers = args.workers

print(f"Using {workers} workers to process spectralis data files")

pipeline("AI-READI", workers, sys_args)
Loading

0 comments on commit 7992fcb

Please sign in to comment.