Skip to content

Commit

Permalink
fix: thread id
Browse files Browse the repository at this point in the history
  • Loading branch information
Aydawka committed Nov 27, 2024
1 parent c998dbf commit 7308d58
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions cgm_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ def worker(study_id: str,
processed_data_qc_folder,
processed_data_output_folder,
file_paths: list,
): # sourcery skip: low-code-quality
worker_id: int
): # sourcery skip: low-code-quality
"""Process cgm data files for a study
Args:
study_id (str): the study id
"""
thread_id = threading.get_ident()

logger = logging.Logwatch("cgm", print=True, thread_id=thread_id)
logger = logging.Logwatch("cgm", print=True, thread_id=worker_id)

# Get the list of blobs in the input folder
file_system_client = azurelake.FileSystemClient.from_connection_string(
Expand Down Expand Up @@ -372,24 +371,24 @@ def pipeline(study_id: str):
file_processor = FileMapProcessor(dependency_folder, ignore_file)

manifest = cgm_manifest.CGMManifest()
workers = 4

workers = 4
# This guarantees all paths are considered, even if the number of items is not evenly divisible by workers.
chunk_size = (len(file_paths) + workers - 1) // workers
chunks = [file_paths[i:i + chunk_size] for i in range(0, total_files, chunk_size)]
args = [(chunk, index + 1) for index, chunk in enumerate(chunks)]
pipe = partial(worker,
study_id,
workflow_file_dependencies,
file_processor,
manifest,
participant_filter_list,
processed_data_qc_folder,
processed_data_output_folder)
# for chunk in chunks:
# pipe(chunk)
processed_data_output_folder
)

pool = ThreadPool(workers)
pool.map(pipe, chunks)
pool.starmap(pipe, args)

file_processor.delete_out_of_date_output_files()
file_processor.remove_seen_flag_from_map()
Expand Down

0 comments on commit 7308d58

Please sign in to comment.