From 924831af51e93c0db3630930feda511fb470daef Mon Sep 17 00:00:00 2001 From: aydawka Date: Thu, 9 Jan 2025 12:52:40 -0800 Subject: [PATCH] fix: garmin pipeline --- garmin_pipeline_parallel.py | 48 ++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 11 deletions(-) diff --git a/garmin_pipeline_parallel.py b/garmin_pipeline_parallel.py index d46daba..80d95e6 100644 --- a/garmin_pipeline_parallel.py +++ b/garmin_pipeline_parallel.py @@ -747,12 +747,16 @@ def worker( logger.time(time_estimator.step()) -def pipeline(study_id: str): # sourcery skip: low-code-quality - """Process fitness tracker data files for a study - Args: - study_id (str): the study id - """ +def pipeline(study_id: str, workers: int = 4, args: list = None): + """The function contains the work done by + the main thread, which runs only once for each operation.""" + if args is None: + args = [] + + global overall_time_estimator + + # Process cirrus data files for a study. Args:study_id (str): the study id if study_id is None or not study_id: raise ValueError("study_id is required") @@ -813,6 +817,8 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality logger.debug(f"Getting file paths in {input_folder}") + file_processor = FileMapProcessor(dependency_folder, ignore_file, args) + for path in paths: t = str(path.name) @@ -870,6 +876,7 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality # Create the output folder file_system_client.create_directory(processed_data_output_folder) + workflow_file_dependencies = deps.WorkflowFileDependencies() # Download the redcap export file red_cap_export_file_path = os.path.join(meta_temp_folder_path, "redcap_export.tsv") @@ -881,17 +888,36 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality with open(red_cap_export_file_path, "wb") as data: red_cap_export_file_client.download_file().readinto(data) - - workflow_file_dependencies = deps.WorkflowFileDependencies() - file_processor = FileMapProcessor(dependency_folder, ignore_file) - total_files = len(file_paths) - manifest = garmin_metadata.GarminManifest(processed_data_output_folder) manifest.read_redcap_file(red_cap_export_file_path) + overall_time_estimator = TimeEstimator(total_files) + + # Guarantees that all paths are considered, even if the number of items is not evenly divisible by workers. + chunk_size = (len(file_paths) + workers - 1) // workers + # Comprehension that fills out and pass to worker func final 2 args: chunks and worker_id + chunks = [file_paths[i : i + chunk_size] for i in range(0, total_files, chunk_size)] + args = [(chunk, index + 1) for index, chunk in enumerate(chunks)] + pipe = partial( + worker, + workflow_file_dependencies, + file_processor, + processed_data_output_folder, + manifest + ) + + # Thread pool created + pool = ThreadPool(workers) + # Distributes the pipe function across the threads in the pool + pool.starmap(pipe, args) + + file_processor.delete_out_of_date_output_files() + file_processor.remove_seen_flag_from_map() + + file_processor.delete_out_of_date_output_files() file_processor.remove_seen_flag_from_map() @@ -1052,7 +1078,7 @@ def pipeline(study_id: str): # sourcery skip: low-code-quality if __name__ == "__main__": sys_args = sys.argv - workers = 6 + workers = 4 parser = argparse.ArgumentParser(description="Process garmin data files") parser.add_argument(