Skip to content

Commit

Permalink
fix: garmin pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Aydawka committed Jan 9, 2025
1 parent 73c147a commit 924831a
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions garmin_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,12 +747,16 @@ def worker(
logger.time(time_estimator.step())


def pipeline(study_id: str): # sourcery skip: low-code-quality
"""Process fitness tracker data files for a study
Args:
study_id (str): the study id
"""
def pipeline(study_id: str, workers: int = 4, args: list = None):
"""The function contains the work done by
the main thread, which runs only once for each operation."""

if args is None:
args = []

global overall_time_estimator

# Process cirrus data files for a study. Args:study_id (str): the study id
if study_id is None or not study_id:
raise ValueError("study_id is required")

Expand Down Expand Up @@ -813,6 +817,8 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality

logger.debug(f"Getting file paths in {input_folder}")

file_processor = FileMapProcessor(dependency_folder, ignore_file, args)

for path in paths:
t = str(path.name)

Expand Down Expand Up @@ -870,6 +876,7 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality

# Create the output folder
file_system_client.create_directory(processed_data_output_folder)
workflow_file_dependencies = deps.WorkflowFileDependencies()

# Download the redcap export file
red_cap_export_file_path = os.path.join(meta_temp_folder_path, "redcap_export.tsv")
Expand All @@ -881,17 +888,36 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality
with open(red_cap_export_file_path, "wb") as data:
red_cap_export_file_client.download_file().readinto(data)


workflow_file_dependencies = deps.WorkflowFileDependencies()
file_processor = FileMapProcessor(dependency_folder, ignore_file)

total_files = len(file_paths)


manifest = garmin_metadata.GarminManifest(processed_data_output_folder)

manifest.read_redcap_file(red_cap_export_file_path)

overall_time_estimator = TimeEstimator(total_files)

# Guarantees that all paths are considered, even if the number of items is not evenly divisible by workers.
chunk_size = (len(file_paths) + workers - 1) // workers
# Comprehension that fills out and pass to worker func final 2 args: chunks and worker_id
chunks = [file_paths[i : i + chunk_size] for i in range(0, total_files, chunk_size)]
args = [(chunk, index + 1) for index, chunk in enumerate(chunks)]
pipe = partial(
worker,
workflow_file_dependencies,
file_processor,
processed_data_output_folder,
manifest
)

# Thread pool created
pool = ThreadPool(workers)
# Distributes the pipe function across the threads in the pool
pool.starmap(pipe, args)

file_processor.delete_out_of_date_output_files()
file_processor.remove_seen_flag_from_map()


file_processor.delete_out_of_date_output_files()
file_processor.remove_seen_flag_from_map()

Expand Down Expand Up @@ -1052,7 +1078,7 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality
if __name__ == "__main__":
sys_args = sys.argv

workers = 6
workers = 4

parser = argparse.ArgumentParser(description="Process garmin data files")
parser.add_argument(
Expand Down

0 comments on commit 924831a

Please sign in to comment.