Skip to content

Commit

Permalink
fix: parallelization files (#36)
Browse files Browse the repository at this point in the history
* feat: add spectralis

* fix: data

* fix: thread number
  • Loading branch information
Aydawka authored Jan 17, 2025
1 parent a1d68ed commit a69337b
Show file tree
Hide file tree
Showing 4 changed files with 623 additions and 46 deletions.
81 changes: 43 additions & 38 deletions env_sensor_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,41 +220,41 @@ def worker(

file_processor.delete_preexisting_output_files(path)

with open(f"{output_file}", "rb") as data:
f2 = output_file.split("/")[-1]

output_file_path = f"{processed_data_output_folder}/environmental_sensor/leelab_anura/{pid}/{f2}"

logger.debug(f"Uploading {output_file} to {output_file_path}")

try:
output_file_client = file_system_client.get_file_client(
file_path=output_file_path
)

# Check if the file already exists. If it does, throw an exception
if output_file_client.exists():
raise Exception(
f"File {output_file_path} already exists. Throwing exception"
)

output_file_client.upload_data(data, overwrite=True)

logger.info(f"Uploaded {output_file_path}")
except Exception:
outputs_uploaded = False
logger.error(f"Failed to upload {output_file_path}")

error_exception = "".join(format_exc().splitlines())

logger.error(error_exception)
file_processor.append_errors(error_exception, path)

logger.time(time_estimator.step())
continue

file_item["output_files"].append(output_file_path)
workflow_output_files.append(output_file_path)
# with open(f"{output_file}", "rb") as data:
# f2 = output_file.split("/")[-1]
#
# output_file_path = f"{processed_data_output_folder}/environmental_sensor/leelab_anura/{pid}/{f2}"
#
# logger.debug(f"Uploading {output_file} to {output_file_path}")
#
# try:
# output_file_client = file_system_client.get_file_client(
# file_path=output_file_path
# )
#
# # Check if the file already exists. If it does, throw an exception
# if output_file_client.exists():
# raise Exception(
# f"File {output_file_path} already exists. Throwing exception"
# )
#
# output_file_client.upload_data(data, overwrite=True)
#
# logger.info(f"Uploaded {output_file_path}")
# except Exception:
# outputs_uploaded = False
# logger.error(f"Failed to upload {output_file_path}")
#
# error_exception = "".join(format_exc().splitlines())
#
# logger.error(error_exception)
# file_processor.append_errors(error_exception, path)
#
# logger.time(time_estimator.step())
# continue
#
# file_item["output_files"].append(output_file_path)
# workflow_output_files.append(output_file_path)

file_processor.confirm_output_files(path, workflow_output_files, "")

Expand Down Expand Up @@ -290,7 +290,7 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
raise ValueError("study_id is required")

input_folder = f"{study_id}/pooled-data/EnvSensor"
manual_input_folder = f"{study_id}/pooled-data/EnvSensor-manual-year2-parallel"
manual_input_folder = f"{study_id}/pooled-data/EnvSensor-manual-year2"
processed_data_output_folder = f"{study_id}/pooled-data/EnvSensor-processed-parallel"
dependency_folder = f"{study_id}/dependency/EnvSensor"
data_plot_output_folder = f"{study_id}/pooled-data/EnvSensor-dataplot-parallel"
Expand Down Expand Up @@ -356,7 +356,9 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
paths = file_system_client.get_paths(path=input_folder, recursive=False)
file_processor = FileMapProcessor(dependency_folder, ignore_file, args)

for path in paths:
for idx, path in enumerate(paths):
if idx == 50:
break
t = str(path.name)

file_name = t.split("/")[-1]
Expand Down Expand Up @@ -406,6 +408,7 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
total_files = len(file_paths)

logger.info(f"Found {total_files} items in {input_folder}")
file_processor = FileMapProcessor(dependency_folder, ignore_file, args)

workflow_file_dependencies = deps.WorkflowFileDependencies()

Expand All @@ -430,9 +433,10 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
worker,
workflow_file_dependencies,
file_processor,
manifest,
processed_data_output_folder,
data_plot_output_folder,
red_cap_export_file_path,
data_plot_output_folder,
)

# Thread pool created
Expand Down Expand Up @@ -597,6 +601,7 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
f"Uploaded dependencies to {dependency_folder}/file_dependencies/{json_file_name}"
)

# Clean up the temporary folder
shutil.rmtree(meta_temp_folder_path)


Expand Down
6 changes: 3 additions & 3 deletions flio_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
raise ValueError("study_id is required")

input_folder = f"{study_id}/pooled-data/Flio"
processed_data_output_folder = f"{study_id}/pooled-data/Flio-processed"
processed_metadata_output_folder = f"{study_id}/pooled-data/Flio-metadata"
processed_data_output_folder = f"{study_id}/pooled-data/Flio-processed_parallel"
processed_metadata_output_folder = f"{study_id}/pooled-data/Flio-metadata_parallel"
dependency_folder = f"{study_id}/dependency/Flio"
pipeline_workflow_log_folder = f"{study_id}/logs/Flio"
ignore_file = f"{study_id}/ignore/flio.ignore"
Expand Down Expand Up @@ -450,7 +450,7 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
logger.info(f"Found {len(file_paths)} items in {input_folder}")

workflow_file_dependencies = deps.WorkflowFileDependencies()
file_processor = FileMapProcessor(dependency_folder, ignore_file)
file_processor = FileMapProcessor(dependency_folder, ignore_file, args)

overall_time_estimator = TimeEstimator(total_files)

Expand Down
5 changes: 0 additions & 5 deletions garmin_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import contextlib
import time
from traceback import format_exc
import json
import sys

import azure.storage.filedatalake as azurelake
Expand Down Expand Up @@ -914,10 +913,6 @@ def pipeline(study_id: str, workers: int = 4, args: list = None):
# Distributes the pipe function across the threads in the pool
pool.starmap(pipe, args)

file_processor.delete_out_of_date_output_files()
file_processor.remove_seen_flag_from_map()


file_processor.delete_out_of_date_output_files()
file_processor.remove_seen_flag_from_map()

Expand Down
Loading

0 comments on commit a69337b

Please sign in to comment.