Skip to content

Commit

Permalink
Decompress outputs into outputfolder (#199)
Browse files Browse the repository at this point in the history
* Decompress outputs into `outputfolder`

Close: #175

* Ask for one CPU locally

* Move decompressed files into `outputfolder`

* Simplify README.md
  • Loading branch information
dachengx authored Sep 6, 2024
1 parent 127c882 commit 832c5d7
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 23 deletions.
18 changes: 7 additions & 11 deletions alea/submitters/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ htcondor_configurations:
dagman_maxidle: 100000
dagman_retry: 2
dagman_maxjobs: 100000
pegasus_transfer_threads: 4
max_jobs_to_combine: 100
combine_n_outputs: 100
singularity_image: "/cvmfs/singularity.opensciencegrid.org/xenonnt/montecarlo:2024.04.1"
workflow_id: "lq_b8_cevns_30"
```
Expand All @@ -53,8 +52,7 @@ htcondor_configurations:
- `dagman_maxidle`: maximum of jobs allowed to be idle. The default 100000 is good for most cases.
- `dagman_retry`: number of automatic retry for each job when failure happen for whatever reason. Note that everytime it retries, we will have new resources requirement `n_retry * request_memory` and `n_retry * request_disk` to get rid of failure due to resource shortage.
- `dagman_maxjobs`: maximum of jobs allowed to be running. The default 100000 is good for most cases.
- `pegasus_transfer_threads`: number of threads for transfering handled by `Pegasus`. The default 4 is good so in most cases you want to keep it.
- `max_jobs_to_combine`: number of toymc job to combine when concluding. Be cautious to put a number larger than 200 here, since it might be too risky...
- `combine_n_outputs`: number of toymc job to combine when concluding. Be cautious to put a number larger than 200 here, since it might be too risky...
- `singularity_image`: the jobs will be running in this singularity image.
- `workflow_id`: name of user's choice for this workflow. If not specified it will put the datetime as `workflow_id`.

Expand All @@ -64,9 +62,7 @@ Make sure you configured the running config well, then you just simply pass `--h

In the end of the return, it should give you something like this:
```
Worfklow written to
/scratch/yuanlq/workflows/runs/lq_b8_cevns_30
pegasus-status -l /scratch/yuanlq/workflows/lq_b8_cevns_30/runs
```
Keep this directory in mind, since all logs will go there and we call it "run directory"

Expand All @@ -92,21 +88,21 @@ condor_rm 12973662

If you want to check the status of jobs.
```
pegasus-status -l /scratch/yuanlq/workflows/runs/lq_b8_cevns_30
pegasus-status -l /scratch/yuanlq/workflows/lq_b8_cevns_30/runs
```

If you want to know more details, like checking why the job failed, just do this in your "run directory". This command should give you a summary of the workflow, including errors encountered if any.
```
pegasus-analyzer /scratch/yuanlq/workflows/runs/lq_b8_cevns_30
pegasus-analyzer /scratch/yuanlq/workflows/lq_b8_cevns_30/runs
```

Let's say now the workflow is ended (you see nothing from `condor_q`). If it didn't finish successfully for weird error, a good thing to do is just to rerun it. However, keep in mind that the workflow itself will automatically retries up to `dagman_retry` times (defined in your running config). To rerun the failed jobs only, just do this.
```
pegasus-run /scratch/yuanlq/workflows/runs/lq_b8_cevns_30
pegasus-run /scratch/yuanlq/workflows/lq_b8_cevns_30/runs
```

To collect the final outputs, there are two ways
- Check your folder `/scratch/$USER/workflows/outputs/<workflow_id>/`. There should be a single tarball containing all toymc files and computation results.
- Check your folder `/scratch/$USER/workflows/<workflow_id>/outputs/`. There should be a single tarball containing all toymc files and computation results.
- A redundant way is to get files from dCache, in which you have to use `gfal` command to approach. For example ```gfal-ls davs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/yuanlq/lq_b8_cevns_30/``` and to get the files, for example do ```gfal-ls davs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/yuanlq/lq_b8_cevns_30/00/00/```. This contains both the final tarball and all `.h5` files before tarballing. To get them you want to do something like ```gfal-copy davs://xenon-gridftp.grid.uchicago.edu:2880/xenon/scratch/yuanlq/lq_b8_cevns_30/00/00/lq_b8_cevns_30-combined_output.tar.gz . -t 7200``` Note that this command works also on Midway/DaLI.

### Example Workflow
Expand Down
50 changes: 38 additions & 12 deletions alea/submitters/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,17 @@ def _generate_tc(self):
arch=Arch.X86_64,
)

# Wrappers that untar outputs
separate = Transformation(
name="separate",
site="local",
pfn=self.top_dir / "alea/submitters/separate.sh",
is_stageable=True,
arch=Arch.X86_64,
)

tc = TransformationCatalog()
tc.add_transformations(run_toymc_wrapper, combine)
tc.add_transformations(run_toymc_wrapper, combine, separate)

return tc

Expand Down Expand Up @@ -416,6 +425,13 @@ def _generate_rc(self):
"combine.sh",
"file://{}".format(self.top_dir / "alea/submitters/combine.sh"),
)
# Add separate executable
self.f_separate = File("separate.sh")
rc.add_replica(
"local",
"separate.sh",
"file://{}".format(self.top_dir / "alea/submitters/separate.sh"),
)

return rc

Expand Down Expand Up @@ -477,6 +493,25 @@ def _add_combine_job(self, combine_i):

return combine_job

def _add_separate_job(self, combine_i):
"""Add a separate job to the workflow."""
logger.info(f"Adding separate job {combine_i} to the workflow")
separate_name = "separate"
separate_job = self._initialize_job(
name=separate_name,
cores=1,
memory=self.request_memory * 2,
disk=self.combine_disk,
run_on_submit_node=True,
)

# Separate job configuration: all toymc results and files will be combined into one tarball
separate_job.add_inputs(File(f"{self.workflow_id}-{combine_i}-combined_output.tar.gz"))
separate_job.add_args(f"{self.workflow_id}-{combine_i}", self.outputfolder)
self.wf.add_jobs(separate_job)

return separate_job

def _add_limit_threshold(self):
"""Add the Neyman thresholds limit_threshold to the replica catalog."""
self.f_limit_threshold = File(os.path.basename(self.limit_threshold))
Expand Down Expand Up @@ -556,6 +591,7 @@ def _generate_workflow(self, name="run_toymc_wrapper"):
# If the number of jobs to combine is reached, add a new combine job
if new_to_combine:
combine_job = self._add_combine_job(combine_i)
self._add_separate_job(combine_i)

# Reorganize the script to get the executable and arguments,
# in which the paths are corrected
Expand Down Expand Up @@ -589,6 +625,7 @@ def _generate_workflow(self, name="run_toymc_wrapper"):
self.f_run_toymc_wrapper,
self.f_alea_run_toymc,
self.f_combine,
self.f_separate,
)
if self.added_limit_threshold:
job.add_inputs(self.f_limit_threshold)
Expand Down Expand Up @@ -652,16 +689,6 @@ def _plan_and_submit(self):
**self.pegasus_config,
)

print(f"Worfklow written to \n\n\t{self.runs_dir}\n\n")

def _warn_outputfolder(self):
"""Warn users about the outputfolder in running config won't be really used."""
logger.warning(
"The outputfolder in the running configuration "
f"{self.outputfolder} won't be used in this submission."
)
logger.warning(f"Instead, you should find your outputs at {self.outputs_dir}")

def _check_filename_unique(self):
"""Check if all the files in the template path are unique.
Expand Down Expand Up @@ -703,7 +730,6 @@ def submit(self, **kwargs):
self.wf.graph(
output=os.path.join(self.outputs_dir, "workflow_graph.svg"), label="xform-id"
)
self._warn_outputfolder()


class Shell(object):
Expand Down
31 changes: 31 additions & 0 deletions alea/submitters/separate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/bin/bash

set -e

# Extract the arguments
workflow_id=$1
outputfolder=$2

# Sanity check: these are the files in the current directory
ls -lh

# Make input filename
# This file will be used to store the input of the workflow
input_filename=$workflow_id-combined_output.tar.gz

# Make a temporary directory for decompressed files
mkdir decompressed

# Untar the output file into the .h5 files
tar -xzf $input_filename -C decompressed

# Check the output
echo "Checking the output"
ls -lh

# Move the outputs
mv decompressed/* $outputfolder/

# Goodbye
echo "Done. Exiting."
exit 0

0 comments on commit 832c5d7

Please sign in to comment.