Skip to content

Commit

Permalink
fix: minor fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Aydawka committed Jan 8, 2025
1 parent 65ce740 commit 3b593ea
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 13 deletions.
2 changes: 1 addition & 1 deletion eidon_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def worker(
and contains core operations: downloading, processing, and uploading files."""

logger = logging.Logwatch(
"cirrus",
"eidon",
print=True,
thread_id=worker_id,
overall_time_estimator=overall_time_estimator,
Expand Down
2 changes: 1 addition & 1 deletion env_sensor_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def worker(
and contains core operations: downloading, processing, and uploading files."""

logger = logging.Logwatch(
"cirrus",
"env_sensor",
print=True,
thread_id=worker_id,
overall_time_estimator=overall_time_estimator,
Expand Down
15 changes: 6 additions & 9 deletions flio_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

from imaging.imaging_flio_root import Flio

from functools import partial
from multiprocessing.pool import ThreadPool
import argparse
import os
import tempfile
Expand Down Expand Up @@ -61,10 +59,6 @@ def worker(
total_files = len(file_paths)
time_estimator = TimeEstimator(total_files)

# logger.debug(f"Getting batch folder paths in {input_folder}")
#
# logger.info(f"Found {len(file_paths)} items in {input_folder}")

for file_item in file_paths:
path = file_item["file_path"]

Expand Down Expand Up @@ -341,13 +335,16 @@ def worker(
logger.time(time_estimator.step())


def pipeline(study_id: str, workers: int = 4):
def pipeline(study_id: str, workers: int = 4, args: list = None):
"""The function contains the work done by
the main thread, which runs only once for each operation."""

if args is None:
args = []

global overall_time_estimator

# Process cgm data files for a study. Args:study_id (str): the study id
# Process cirrus data files for a study. Args:study_id (str): the study id
if study_id is None or not study_id:
raise ValueError("study_id is required")
# takes an optional argument
Expand Down Expand Up @@ -462,7 +459,7 @@ def pipeline(study_id: str, workers: int = 4):
logger.debug(f"Found {total_files} files in {input_folder}")

workflow_file_dependencies = deps.WorkflowFileDependencies()
file_processor = FileMapProcessor(dependency_folder, ignore_file)
file_processor = FileMapProcessor(dependency_folder, ignore_file, args)

# Guarantees that all paths are considered, even if the number of items is not evenly divisible by workers.
chunk_size = (len(file_paths) + workers - 1) // workers
Expand Down
2 changes: 1 addition & 1 deletion garmin_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def worker(
and contains core operations: downloading, processing, and uploading files."""

logger = logging.Logwatch(
"cirrus",
"garmin",
print=True,
thread_id=worker_id,
overall_time_estimator=overall_time_estimator,
Expand Down
2 changes: 1 addition & 1 deletion optomed_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def worker(
and contains core operations: downloading, processing, and uploading files."""

logger = logging.Logwatch(
"cirrus",
"optomed",
print=True,
thread_id=worker_id,
overall_time_estimator=overall_time_estimator,
Expand Down

0 comments on commit 3b593ea

Please sign in to comment.