From 7308d587609d2454f3802a462a5b1819669565bc Mon Sep 17 00:00:00 2001 From: aydawka Date: Tue, 26 Nov 2024 23:51:26 -0800 Subject: [PATCH] fix: thread id --- cgm_pipeline_parallel.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/cgm_pipeline_parallel.py b/cgm_pipeline_parallel.py index ea78f24..a5e47c4 100644 --- a/cgm_pipeline_parallel.py +++ b/cgm_pipeline_parallel.py @@ -38,14 +38,13 @@ def worker(study_id: str, processed_data_qc_folder, processed_data_output_folder, file_paths: list, -): # sourcery skip: low-code-quality + worker_id: int + ): # sourcery skip: low-code-quality """Process cgm data files for a study Args: study_id (str): the study id """ - thread_id = threading.get_ident() - - logger = logging.Logwatch("cgm", print=True, thread_id=thread_id) + logger = logging.Logwatch("cgm", print=True, thread_id=worker_id) # Get the list of blobs in the input folder file_system_client = azurelake.FileSystemClient.from_connection_string( @@ -372,11 +371,12 @@ def pipeline(study_id: str): file_processor = FileMapProcessor(dependency_folder, ignore_file) manifest = cgm_manifest.CGMManifest() - workers = 4 + workers = 4 # This guarantees all paths are considered, even if the number of items is not evenly divisible by workers. chunk_size = (len(file_paths) + workers - 1) // workers chunks = [file_paths[i:i + chunk_size] for i in range(0, total_files, chunk_size)] + args = [(chunk, index + 1) for index, chunk in enumerate(chunks)] pipe = partial(worker, study_id, workflow_file_dependencies, @@ -384,12 +384,11 @@ def pipeline(study_id: str): manifest, participant_filter_list, processed_data_qc_folder, - processed_data_output_folder) - # for chunk in chunks: - # pipe(chunk) + processed_data_output_folder + ) pool = ThreadPool(workers) - pool.map(pipe, chunks) + pool.starmap(pipe, args) file_processor.delete_out_of_date_output_files() file_processor.remove_seen_flag_from_map()