diff --git a/batch/README.md b/batch/README.md new file mode 100644 index 0000000..04de002 --- /dev/null +++ b/batch/README.md @@ -0,0 +1,11 @@ +## Batch Data Processing + +The script in this folder can be used to achieve batch data processing with Google's Pipelines API. + +Usage: +``` +bash submit_batch.sh batch.json batch.tsv +``` + +Options that apply to all samples in the batch are read from the JSON file. The tsv file (tab-separated) can be used to modify specific fields for each sample. In `submit_batch.sh`, the variable `n_concurrent` controls the number of concurrent jobs while `polling_interval` controls the polling interval. + diff --git a/batch/batch.json b/batch/batch.json new file mode 100644 index 0000000..2978912 --- /dev/null +++ b/batch/batch.json @@ -0,0 +1,5 @@ +{ + "REF": "gs://sentieon-test/pipeline_test/reference/hs37d5.fa", + "ZONES": "us-central1-a,us-central1-b,us-central1-c,us-central1-f", + "PROJECT_ID": "{PROJECT_ID}" +} diff --git a/batch/batch.tsv b/batch/batch.tsv new file mode 100644 index 0000000..4ebeddf --- /dev/null +++ b/batch/batch.tsv @@ -0,0 +1,6 @@ +FQ1 FQ2 OUTPUT_BUCKET +gs://sentieon-test/pipeline_test/inputs/test_normal_1.fastq.gz gs://sentieon-test/pipeline_test/inputs/test_normal_2.fastq.gz gs://YOUR_BUCKET/folder/s1 +gs://sentieon-test/pipeline_test/inputs/test_normal2_1.fastq.gz gs://sentieon-test/pipeline_test/inputs/test_normal2_2.fastq.gz gs://YOUR_BUCKET/folder/s2 +gs://sentieon-test/pipeline_test/inputs/test_tumor_1.fastq.gz gs://sentieon-test/pipeline_test/inputs/test_tumor_2.fastq.gz gs://YOUR_BUCKET/folder/s3 +gs://sentieon-test/pipeline_test/inputs/test_tumor2_1.fastq.gz gs://sentieon-test/pipeline_test/inputs/test_tumor2_2.fastq.gz gs://YOUR_BUCKET/folder/s4 +gs://sentieon-test/pipeline_test/inputs/test1_1.fastq.gz gs://sentieon-test/pipeline_test/inputs/test1_2.fastq.gz gs://YOUR_BUCKET/folder/s5 diff --git a/batch/submit_batch.sh b/batch/submit_batch.sh new file mode 100644 index 0000000..597532d --- /dev/null +++ b/batch/submit_batch.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash + +set -e + +n_concurrent=2 +runner_script=../runner/sentieon_runner.py +polling_interval=20 # in seconds + +base_json=$1; shift +batch_tsv=$1; shift + +header=() +read -r header_line < "$batch_tsv" +header=($header_line) # No JSON keys should contain a space + +jobs_to_run=() +b=() +job_base=$( grep -v "}" "$base_json") + +while IFS='' read -r line || [[ -n "$line" ]]; do + IFS=' ' read -r -a line_arr <<< "$line" + + job="$job_base" + for idx in "${!line_arr[@]}"; do + job+=", \"${header[$idx]}\":\"${line_arr[$idx]}\"" + done + job+=" }" + jobs_to_run+=("$job") +done <<< "$(tail -n +2 "$batch_tsv")" + +running=() +echo "${jobs_to_run[@]}" + +while [[ -n "${jobs_to_run[@]}" ]]; do + to_run="${jobs_to_run[0]}" + unset jobs_to_run[0] + jobs_to_run=( "${jobs_to_run[@]}" ) + + # Check running jobs + while [[ "${#running[@]}" -ge "$n_concurrent" ]]; do + sleep "${polling_interval}" + + for idx in "${!running[@]}"; do + if ! ps -p ${running[$idx]} > /dev/null; then + echo "finished job with PID ${running[$idx]}" + unset running[$idx] + running=( "${running[@]}" ) + fi + done + done + + # Spawn new jobs + echo $to_run | python2 $runner_script /dev/stdin & + running+=( "$!" ) + + # Sleep + sleep "${polling_interval}" +done