Skip to content

Commit

Permalink
fix: update files
Browse files Browse the repository at this point in the history
  • Loading branch information
Aydawka committed Feb 8, 2025
1 parent 7992fcb commit 6340f45
Show file tree
Hide file tree
Showing 12 changed files with 968 additions and 3,435 deletions.
5 changes: 1 addition & 4 deletions cirrus_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,8 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
new_names_dict = dict([(val[4], tag) for tag, val in new_dict_items.items()])
keyword_dict.update(new_names_dict)

for idx, path in enumerate(paths):
if idx == 5:
break
for path in paths:
t = str(path.name)

file_name = t.split("/")[-1]

# Check if the item is an .fda.zip file
Expand Down
312 changes: 194 additions & 118 deletions ecg_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,139 +1,57 @@
"""Process ecg data files"""

import contextlib
import ecg.ecg_root as ecg
import ecg.ecg_metadata as ecg_metadata

import argparse
import os
import tempfile
import shutil
import ecg.ecg_root as ecg
import ecg.ecg_metadata as ecg_metadata
import contextlib
import time
from traceback import format_exc
import sys

import azure.storage.filedatalake as azurelake
import config
import utils.dependency as deps
import time
import csv
from traceback import format_exc
import utils.logwatch as logging
from utils.file_map_processor import FileMapProcessor
from utils.time_estimator import TimeEstimator


def pipeline(study_id: str): # sourcery skip: low-code-quality
"""Process ecg data files for a study
Args:
study_id (str): the study id
"""

if study_id is None or not study_id:
raise ValueError("study_id is required")

input_folder = f"{study_id}/pooled-data/ECG"
processed_data_output_folder = f"{study_id}/pooled-data/ECG-processed"
dependency_folder = f"{study_id}/dependency/ECG"
participant_filter_list_file = f"{study_id}/dependency/PatientID/AllParticipantIDs07-01-2023through07-31-2024.csv"
pipeline_workflow_log_folder = f"{study_id}/logs/ECG"
data_plot_output_folder = f"{study_id}/pooled-data/ECG-dataplot"
ignore_file = f"{study_id}/ignore/ecg.ignore"

logger = logging.Logwatch("ecg", print=True)
from functools import partial
from multiprocessing.pool import ThreadPool

overall_time_estimator = TimeEstimator(1) # default to 1 for now


def worker(
workflow_file_dependencies,
file_processor,
processed_data_output_folder,
participant_filter_list,
data_plot_output_folder,
manifest,
file_paths: list,
worker_id: int,
): # sourcery skip: low-code-quality
"""This function handles the work done by the worker threads,
and contains core operations: downloading, processing, and uploading files."""

logger = logging.Logwatch(
"garmin",
print=True,
thread_id=worker_id,
overall_time_estimator=overall_time_estimator,
)

# Get the list of blobs in the input folder
file_system_client = azurelake.FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name="stage-1-container",
)

with contextlib.suppress(Exception):
file_system_client.delete_directory(data_plot_output_folder)

with contextlib.suppress(Exception):
file_system_client.delete_directory(processed_data_output_folder)

with contextlib.suppress(Exception):
file_system_client.delete_file(f"{dependency_folder}/file_map.json")

paths = file_system_client.get_paths(path=input_folder)

file_paths = []
participant_filter_list = []

for path in paths:
t = str(path.name)

original_file_name = t.split("/")[-1]

# Check if the item is a xml file
if original_file_name.split(".")[-1] != "xml":
continue

# Get the parent folder of the file.
# The name of this folder is in the format siteName_dataType_startDate-endDate
batch_folder = t.split("/")[-2]

# Check if the folder name is in the format siteName_dataType_startDate-endDate
if len(batch_folder.split("_")) != 3:
continue

site_name, data_type, start_date_end_date = batch_folder.split("_")

start_date = start_date_end_date.split("-")[0]
end_date = start_date_end_date.split("-")[1]

file_paths.append(
{
"file_path": t,
"status": "failed",
"processed": False,
"batch_folder": batch_folder,
"site_name": site_name,
"data_type": data_type,
"start_date": start_date,
"end_date": end_date,
"convert_error": True,
"output_uploaded": False,
"output_files": [],
}
)

logger.debug(f"Found {len(file_paths)} files in {input_folder}")

# Create a temporary folder on the local machine
temp_folder_path = tempfile.mkdtemp()

# Create the output folder
file_system_client.create_directory(processed_data_output_folder)

# Create a temporary folder on the local machine
meta_temp_folder_path = tempfile.mkdtemp()

# Get the participant filter list file
with contextlib.suppress(Exception):
file_client = file_system_client.get_file_client(
file_path=participant_filter_list_file
)

temp_participant_filter_list_file = os.path.join(
meta_temp_folder_path, "filter_file.csv"
)

with open(file=temp_participant_filter_list_file, mode="wb") as f:
f.write(file_client.download_file().readall())

with open(file=temp_participant_filter_list_file, mode="r") as f:
reader = csv.reader(f)
for row in reader:
participant_filter_list.append(row[0])

# remove the first row
participant_filter_list.pop(0)

file_processor = FileMapProcessor(dependency_folder, ignore_file)

workflow_file_dependencies = deps.WorkflowFileDependencies()

total_files = len(file_paths)

manifest = ecg_metadata.ECGManifest()

time_estimator = TimeEstimator(total_files)

for file_item in file_paths:
Expand Down Expand Up @@ -341,8 +259,151 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality

logger.time(time_estimator.step())

file_processor.delete_out_of_date_output_files()

def pipeline(study_id: str, workers: int = 4, args: list = None):
"""The function contains the work done by
the main thread, which runs only once for each operation."""

if args is None:
args = []

global overall_time_estimator

# Process cirrus data files for a study. Args:study_id (str): the study id
if study_id is None or not study_id:
raise ValueError("study_id is required")

input_folder = f"{study_id}/pooled-data/ECG"
processed_data_output_folder = f"{study_id}/pooled-data/ECG-processed"
dependency_folder = f"{study_id}/dependency/ECG"
participant_filter_list_file = f"{study_id}/dependency/PatientID/AllParticipantIDs07-01-2023through07-31-2024.csv"
pipeline_workflow_log_folder = f"{study_id}/logs/ECG"
data_plot_output_folder = f"{study_id}/pooled-data/ECG-dataplot"
ignore_file = f"{study_id}/ignore/ecg.ignore"

logger = logging.Logwatch("ecg", print=True)

# Get the list of blobs in the input folder
file_system_client = azurelake.FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name="stage-1-container",
)

with contextlib.suppress(Exception):
file_system_client.delete_directory(data_plot_output_folder)

with contextlib.suppress(Exception):
file_system_client.delete_directory(processed_data_output_folder)

with contextlib.suppress(Exception):
file_system_client.delete_file(f"{dependency_folder}/file_map.json")


file_paths = []
participant_filter_list = []

meta_temp_folder_path = tempfile.mkdtemp()

# Get the participant filter list file
with contextlib.suppress(Exception):
file_client = file_system_client.get_file_client(
file_path=participant_filter_list_file
)

temp_participant_filter_list_file = os.path.join(
meta_temp_folder_path, "filter_file.csv"
)

with open(file=temp_participant_filter_list_file, mode="wb") as f:
f.write(file_client.download_file().readall())

with open(file=temp_participant_filter_list_file, mode="r") as f:
reader = csv.reader(f)
for row in reader:
participant_filter_list.append(row[0])

# remove the first row
participant_filter_list.pop(0)

paths = file_system_client.get_paths(path=input_folder)
file_processor = FileMapProcessor(dependency_folder, ignore_file)

for path in paths:
t = str(path.name)

original_file_name = t.split("/")[-1]

# Check if the item is a xml file
if original_file_name.split(".")[-1] != "xml":
continue

# Get the parent folder of the file.
# The name of this folder is in the format siteName_dataType_startDate-endDate
batch_folder = t.split("/")[-2]

# Check if the folder name is in the format siteName_dataType_startDate-endDate
if len(batch_folder.split("_")) != 3:
continue

site_name, data_type, start_date_end_date = batch_folder.split("_")

start_date = start_date_end_date.split("-")[0]
end_date = start_date_end_date.split("-")[1]

file_paths.append(
{
"file_path": t,
"status": "failed",
"processed": False,
"batch_folder": batch_folder,
"site_name": site_name,
"data_type": data_type,
"start_date": start_date,
"end_date": end_date,
"convert_error": True,
"output_uploaded": False,
"output_files": [],
}
)

total_files = len(file_paths)

logger.debug(f"Found {len(file_paths)} files in {input_folder}")

workflow_file_dependencies = deps.WorkflowFileDependencies()

manifest = ecg_metadata.ECGManifest()

# Create a temporary folder on the local machine
temp_folder_path = tempfile.mkdtemp()

# Create the output folder
file_system_client.create_directory(processed_data_output_folder)


overall_time_estimator = TimeEstimator(total_files)

# Guarantees that all paths are considered, even if the number of items is not evenly divisible by workers.
chunk_size = (len(file_paths) + workers - 1) // workers
# Comprehension that fills out and pass to worker func final 2 args: chunks and worker_id
chunks = [file_paths[i : i + chunk_size] for i in range(0, total_files, chunk_size)]
args = [(chunk, index + 1) for index, chunk in enumerate(chunks)]
pipe = partial(
worker,
workflow_file_dependencies,
file_processor,
processed_data_output_folder,
participant_filter_list,
data_plot_output_folder,
manifest
)

# Thread pool created
pool = ThreadPool(workers)
# Distributes the pipe function across the threads in the pool
pool.starmap(pipe, args)

file_processor.delete_out_of_date_output_files()
file_processor.remove_seen_flag_from_map()

# Write the manifest to a file
Expand Down Expand Up @@ -425,7 +486,22 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality


if __name__ == "__main__":
pipeline("AI-READI")
sys_args = sys.argv

workers = 4

parser = argparse.ArgumentParser(description="Process garmin data files")
parser.add_argument(
"--workers", type=int, default=workers, help="Number of workers to use"
)
args = parser.parse_args()

workers = args.workers

print(f"Using {workers} workers to process garmin data files")

pipeline("AI-READI", workers, sys_args)


# delete the ecg.log file
if os.path.exists("ecg.log"):
Expand Down
Loading

0 comments on commit 6340f45

Please sign in to comment.