Skip to content

Commit

Permalink
fix: env sensor pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Aydawka committed Jan 9, 2025
1 parent 27cb41e commit e39b82a
Showing 1 changed file with 6 additions and 10 deletions.
16 changes: 6 additions & 10 deletions env_sensor_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def worker(
manifest,
processed_data_output_folder,
red_cap_export_file_path,
data_plot_output_folder,
file_paths: list,
worker_id: int,
): # sourcery skip: low-code-quality
Expand Down Expand Up @@ -353,6 +354,7 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
participant_filter_list.pop(0)

paths = file_system_client.get_paths(path=input_folder, recursive=False)
file_processor = FileMapProcessor(dependency_folder, ignore_file, args)

for path in paths:
t = str(path.name)
Expand Down Expand Up @@ -401,27 +403,22 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
}
)

total_files = len(file_paths)

logger.info(f"Found {total_files} items in {input_folder}")

workflow_file_dependencies = deps.WorkflowFileDependencies()
file_processor = FileMapProcessor(dependency_folder, ignore_file, args)

# Download the redcap export file
red_cap_export_file_path = os.path.join(meta_temp_folder_path, "redcap_export.csv")

red_cap_export_file_client = file_system_client.get_file_client(
file_path=red_cap_export_file
)

with open(red_cap_export_file_path, "wb") as data:
red_cap_export_file_client.download_file().readinto(data)

total_files = len(file_paths)

logger.info(f"Found {total_files} items in {input_folder}")


manifest = es_metadata.ESManifest()


overall_time_estimator = TimeEstimator(total_files)

# Guarantees that all paths are considered, even if the number of items is not evenly divisible by workers.
Expand All @@ -443,7 +440,6 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
# Distributes the pipe function across the threads in the pool
pool.starmap(pipe, args)


file_processor.delete_out_of_date_output_files()
file_processor.remove_seen_flag_from_map()

Expand Down

0 comments on commit e39b82a

Please sign in to comment.