#!/usr/bin/env nextflow git_status="${workflow.projectDir}/check_clean.sh".execute().text if(git_status=~"dirty"){ //throw new Exception("The script directory is dirty. Revert or commit before running!") } equal="${workflow.projectDir}/check_equal.sh".execute().text if(equal=~"inequal"){ //throw new Exception("main.nf is out-of-sync with individual script files.") } branch="${workflow.projectDir}/get_branch.sh".execute().text println("Branch: " + branch) if(branch=="master"){ branch_str="" } else{ branch_str=branch+"-" } channel.metaClass.customUnique { unique(source){ it.tokenize('.')[0] } } Channel .fromFilePairs( "$params.inputDir/*R{1,2}.*.fastq.gz", size:2 ){ file -> file.baseName.replaceAll("_R[12]", "").replaceAll("[.]fastq", "") } .toSortedList( { a, b -> b[1][1] <=> a[1][1] } ) .set{ all_files } process removeDuplicates { label 'code' input: val all_files output: val uf into (file_pairs_1, file_pairs_2, file_pairs_3, file_pairs_4) exec: uf=all_files.unique{ it[0].tokenize('.')[0] }.take(params.sample_limit) } i=0 commits="./get_git_commits.sh".execute().text.tokenize("\n").reverse().collectEntries{ [it, ++i] } step_2_script="step_2_trim_quality" process step_2_trim_quality { storeDir "${params.outputDir}/${step_2_script}" label 'throughput' clusterOptions "-l walltime=24:00:00 -l select=1:ncpus=8:mem=96gb -J 1-${files_in.size()}" when: params.run.qualitytrim input: val(version) from branch_str + commits["${workflow.projectDir}/get_last_commit_for_file.sh ${workflow.projectDir}/${step_2_script}.nf".execute().text] val(files_in) from file_pairs_1.flatten().collate(3).toList() output: tuple (val(samples), file(P1), file(P2), file(U1), file(U2)) into step_2_trim_quality_output_grouped val version into s2v script: samples=[]; pair_1=[]; pair_2=[] files_in.forEach{samples+=it[0]; pair_1+=it[1].toString(); pair_2+=it[2].toString()} P1=[]; P2=[]; U1=[]; U2=[] files_in.forEach{P1+=it[0]+"_${version}_1P.fq.gz"; P2+=it[0]+"_${version}_2P.fq.gz"; U1+=it[0]+"_${version}_1U.fq.gz"; U2+=it[0]+"_${version}_2U.fq.gz"} output_dir=params.outputDir + '/' + step_2_script n=files_in.size() """ n=$n source ${workflow.projectDir}/array_job_start.sh module load trimmomatic for i in `seq \$start \$end`; do sample=`echo '${samples.join(" ")}' | cut -d' ' -f\$i` pair_1=`echo '${pair_1.join(" ")}' | cut -d' ' -f\$i` pair_2=`echo '${pair_2.join(" ")}' | cut -d' ' -f\$i` ln -fs \$pair_1 \${sample}_R1_001.fq.gz ln -fs \$pair_2 \${sample}_R2_001.fq.gz trimmomatic PE -threads ${task.cpus} \ -basein \${sample}_R1_001.fq.gz \ -baseout \${sample}_${version}.fq \ LEADING:3 TRAILING:3 SLIDINGWINDOW:4:15 MINLEN:50 pigz -p ${task.cpus} \${sample}_*.fq done source ${workflow.projectDir}/array_job_cleanup.sh """ } step_2_trim_quality_output_grouped .flatMap{ batch -> res=[]; rows=batch[0].size()-1; (0..rows).each{ row -> item=[] batch.each{ item += it[row] } res.add(item) }; res;} .set{step_2_trim_quality_output} s2v .first() .set{ step_2_version } process step_2_code { label 'code' storeDir "${params.outputDir}/${script_name}/code" input: val(version) from step_2_version val(prior_versions) from "" path(code) from "${workflow.projectDir}/${step_2_script}.nf" val(prior_code) from "" val(script_name) from step_2_script output: path("${script_name}.${version}.nf") into (step_2_code, step_2_cumulative_code) script: template 'nf_export_code.sh' } step_2_version .set{ step_2_cumulative_versions } step_3_script="step_3_kraken_batch" step_3_batch_size=50 process step_3_kraken_batch { storeDir "${params.outputDir}/${script_name}" label 'general' input: val(files_in) from file_pairs_2.flatten().collate(3).collate(step_3_batch_size) val(version) from commits["${workflow.projectDir}/get_last_commit_for_file.sh ${workflow.projectDir}/${step_3_script}.nf".execute().text] val(script_name) from step_3_script output: tuple(val(samples), file(reads_out), file(reports_out)) into step_3_kraken_output_grouped val(version) into s3v script: samples=[] files_in.forEach{samples+=it[0]} pair_1=[] pair_2=[] files_in.forEach{pair_1+=it[1].toString(); pair_2+=it[2].toString()} reads_out=[] reports_out=[] files_in.forEach{reads_out+=it[0]+"_${version}_kraken_reads.txt"; reports_out+=it[0]+"_${version}_kraken_report.txt"} output_dir=params.outputDir + '/' + script_name """ for i in {1..${files_in.size}}; do sample=`echo '${samples.join(" ")}' | cut -d' ' -f\$i` pair_1=`echo '${pair_1.join(" ")}' | cut -d' ' -f\$i` pair_2=`echo '${pair_2.join(" ")}' | cut -d' ' -f\$i` if [[ -f ${output_dir}/\${sample}_${version}_kraken_reads.txt && -f ${output_dir}/\${sample}_${version}_kraken_report.txt ]]; then cp ${output_dir}/\${sample}_${version}_kraken_reads.txt ./ cp ${output_dir}/\${sample}_${version}_kraken_report.txt ./ else ${params.kraken_executable} --db ${params.kraken_db} --threads ${task.cpus} \ --output \${sample}_${version}_kraken_reads.txt \ --report \${sample}_${version}_kraken_report.txt \ --paired \$pair_1 \$pair_2 fi done """ } step_3_kraken_output_grouped .flatMap{ o -> res=[]; n=step_3_batch_size-1; (0..n).each{res.add([o[0][it], o[1][it], o[2][it]])}; res;} .set{ step_3_kraken_output } s3v .first() .set{ step_3_version } process step_3_code { label 'code' storeDir "${params.outputDir}/${script_name}/code" input: val(version) from step_3_version path(code) from "${workflow.projectDir}/${step_3_script}.nf" val(prior_versions) from "" val(prior_code) from "" val(script_name) from step_3_script output: path("${script_name}.${version}.nf") into (step_3_code, step_3_cumulative_code) script: template 'nf_export_code.sh' } step_3_version .set{ step_3_cumulative_versions } step_4_script="step_4_hg_alignment" process step_4_hg_alignment { storeDir "${params.outputDir}/${script_name}" clusterOptions "-l walltime=24:00:00 -l select=1:ncpus=8:mem=96gb -J 1-${files_in.size()}" label 'throughput' input: val(files_in) from file_pairs_3.flatten().collate(3).toList() val(version) from commits["${workflow.projectDir}/get_last_commit_for_file.sh ${workflow.projectDir}/${step_4_script}.nf".execute().text] val(script_name) from step_4_script output: tuple(val(samples), file(bam_out), file(bai_out), file(unaligned_pairs_out_1), file(unaligned_pairs_out_2)) into (step_4_hg_alignment_grouped_1, step_4_hg_alignment_grouped_2) val(version) into s4v script: samples=[]; pair_1=[]; pair_2=[] files_in.forEach{samples+=it[0]; pair_1+=it[1].toString(); pair_2+=it[2].toString()} bam_out=[]; bai_out=[]; fastq=[]; unaligned_pairs_out_1=[]; unaligned_pairs_out_2=[] files_in.forEach{bam_out+=it[0]+"_${version}_hg.bam"; bai_out+=it[0]+"_${version}_hg.bam.bai"; fastq+=it[0]+"_${version}_hg_unaligned_pairs"; unaligned_pairs_out_1+=it[0]+"_${version}_hg_unaligned_pairs.1.fastq.gz"; unaligned_pairs_out_2+=it[0]+"_${version}_hg_unaligned_pairs.2.fastq.gz"} output_dir=params.outputDir + '/' + script_name n=files_in.size() """ module load anaconda3/personal source activate bowtie module load samtools n=$n source ${workflow.projectDir}/array_job_start.sh for i in `seq \$start \$end`; do sample=`echo '${samples.join(' ')}' | cut -d' ' -f\$i` pair_1=`echo '${pair_1.join(' ')}' | cut -d' ' -f\$i` pair_2=`echo '${pair_2.join(' ')}' | cut -d' ' -f\$i` bam=`echo '${bam_out.join(' ')}' | cut -d' ' -f\$i` bai=`echo '${bai_out.join(' ')}' | cut -d' ' -f\$i` fastq=`echo '${fastq.join(' ')}' | cut -d' ' -f\$i` if [[ -f ${output_dir}/\$bam && -f ${output_dir}/\$bai && -f ${output_dir}/\${fastq}.gz ]]; then cp ${output_dir}/\$bam ./ cp ${output_dir}/\$bai ./ cp ${output_dir}/\${fastq}.gz ./ else bowtie2 -x ${params.hg_bowtie_index} \ -1 \$pair_1 -2 \$pair_2 \ -p ${task.cpus} -X 1000 -S \${TMPDIR}/\${sample}.sam samtools view -b \${TMPDIR}/\${sample}.sam -@ ${task.cpus} > \${TMPDIR}/\${sample}_hg.bam samtools sort -@ ${task.cpus} \${TMPDIR}/\${sample}_hg.bam `basename \$bam .bam` samtools index \$bam samtools view -u -f 76 -@ ${task.cpus} \$bam | samtools bam2fq - > \${fastq}.1.fastq samtools view -u -f 140 -@ ${task.cpus} \$bam | samtools bam2fq - > \${fastq}.2.fastq pigz -p ${task.cpus} \${fastq}.?.fastq fi done source ${workflow.projectDir}/array_job_cleanup.sh """ } step_4_hg_alignment_grouped_1 .flatMap{ o -> items=[]; rows=o[0].size()-1; (0..rows).each{items.add([o[0][it], o[1][it], o[2][it]])}; items;} .set{ step_4_hg_alignment_output } step_4_hg_alignment_grouped_2 .flatMap{ o -> items=[]; rows=o[0].size() - 1; (0..rows).each{items.add([o[0][it], o[3][it], o[4][it]])}; items;} .into{ step_4_hg_unaligned_pairs_output; step_4_hg_unaligned_pairs_output_2 } s4v .first() .set{ step_4_version } process step_4_code { label 'code' storeDir "${params.outputDir}/${script_name}/code" input: val(version) from step_4_version val(prior_versions) from "" path(code) from "${workflow.projectDir}/${step_4_script}.nf" val(prior_code) from "" val(script_name) from step_4_script output: path("${script_name}.${version}.nf") into (step_4_code, step_4_cumulative_code) script: template 'nf_export_code.sh' } step_4_version .set{ step_4_cumulative_versions } step_5_script="step_5_panphlan" process step_5_panphlan { when: params.run.panphlan storeDir "${params.outputDir}/${script_name}" label 'throughput' input: val(files_in) from step_4_hg_unaligned_pairs_output.toList() val(version) from commits["${workflow.projectDir}/get_last_commit_for_file.sh ${workflow.projectDir}/${step_5_script}.nf".execute().text] val(prior_versions) from step_4_version val(script_name) from step_5_script val(organism) from Channel.of("Prevotella_melaninogenica") output: tuple(file("${organism}_${prior_versions}-${version}_samples.txt"), file("${organism}_${prior_versions}-${version}_matrix.txt"), file("${organism}_${prior_versions}-${version}_covmat.txt"), file("${organism}_${prior_versions}-${version}_covplot.png")) into step_5_panphlan_output file(csvs) into panphlan_csvs val(version) into s5v script: println files_in samples=[]; pair_1=[]; pair_2=[] files_in.forEach{samples+=it[0]; pair_1+=it[1].toString(); pair_2+=it[2].toString()} csvs=[] files_in.forEach{csvs+='output/'+it[0]+"_${organism}_${prior_versions}-${version}.csv"} output_dir=params.outputDir + '/' + script_name n=files_in.size() """ module load anaconda3/personal module load bowtie2 module load samtools mkdir output for i in `seq 1 ${n}`; do sample=`echo "${samples.join(' ')}" | cut -d' ' -f\$i` pair_1=`echo "${pair_1.join(' ')}" | cut -d' ' -f\$i` pair_2=`echo "${pair_2.join(' ')}" | cut -d' ' -f\$i` if [ -f ${output_dir}/\${sample}_${organism}_${prior_versions}-${version}.csv ]; then cp ${output_dir}/\${sample}_${organism}_${prior_versions}-${version}.csv ./ else cat \$pair_1 \$pair_2 > \${sample}.fastq.gz ~/panphlan3/panphlan_map.py -p ${params.panphlan_pangenomes}/${organism}/${organism}_pangenome.tsv \ --indexes ${params.panphlan_pangenomes}/${organism}/${organism} \ -i \${sample}.fastq.gz \ -o ./output/\${sample}_${organism}_${prior_versions}-${version}.csv \ --nproc ${task.cpus} -m ${task.memory.toUnit('GB')} rm \${sample}.fastq.gz fi done ~/panphlan3/panphlan_profiling.py -i ./output -p ${params.panphlan_pangenomes}/${organism}/${organism}_pangenome.tsv \ --o_matrix ${organism}_${prior_versions}-${version}_matrix.txt --o_covmat ${organism}_${prior_versions}-${version}_covmat.txt \ --o_covplot_normed ${organism}_${prior_versions}-${version}_covplot.png -v --add_ref echo "${samples.join(' ')}" > ${organism}_${prior_versions}-${version}_samples.txt """ } /*step_5_panphlan_grouped .flatMap{ o -> items=[]; rows=o[0].size()-1; (0..rows).each{items.add([o[0][it], o[1][it], o[2][it]])}; items;} .set{ step_5_panphlan_output }*/ s5v .first() .set{ step_5_version } process step_5_code { label 'code' storeDir "${params.outputDir}/${script_name}/code" input: val(version) from step_5_version val(prior_versions) from step_4_cumulative_versions path(code) from "${workflow.projectDir}/${step_5_script}.nf" path(prior_code) from step_4_cumulative_code val(script_name) from step_5_script output: path("${script_name}.${version}.nf") into step_5_code path("cumulative_code.${prior_versions}-${version}.nf") into step_5_cumulative_code script: template 'nf_export_code.sh' } step_4_cumulative_versions .merge(step_5_version) .map{ it[0] + '-' + it[1] } .set{ step_5_cumulative_versions } step_6_script="step_6_kraken_GTDB_batch" step_6_batch_size=50 process step_6_kraken_batch { storeDir "${params.outputDir}/${script_name}" label 'large' input: val(files_in) from file_pairs_4.flatten().collate(3).collate(step_6_batch_size) val(version) from commits["${workflow.projectDir}/get_last_commit_for_file.sh ${workflow.projectDir}/${step_6_script}.nf".execute().text] val(script_name) from step_6_script output: tuple(val(samples), file(reads_out), file(reports_out)) into step_6_kraken_output_grouped val(version) into s6v script: samples=[] files_in.forEach{samples+=it[0]} pair_1=[] pair_2=[] files_in.forEach{pair_1+=it[1].toString(); pair_2+=it[2].toString()} reads_out=[] reports_out=[] files_in.forEach{reads_out+=it[0]+"_${version}_kraken_gtdb_reads.txt"; reports_out+=it[0]+"_${version}_kraken_gtdb_report.txt"} output_dir=params.outputDir + '/' + script_name """ module load gcc/8.2.0 for i in {1..${files_in.size}}; do sample=`echo '${samples.join(" ")}' | cut -d' ' -f\$i` pair_1=`echo '${pair_1.join(" ")}' | cut -d' ' -f\$i` pair_2=`echo '${pair_2.join(" ")}' | cut -d' ' -f\$i` if [[ -f ${output_dir}/\${sample}_${version}_kraken_reads.txt && -f ${output_dir}/\${sample}_${version}_kraken_report.txt ]]; then cp ${output_dir}/\${sample}_${version}_kraken_reads.txt ./ cp ${output_dir}/\${sample}_${version}_kraken_report.txt ./ else ${params.kraken_executable} --db ${params.kraken_db_gtdb} --threads ${task.cpus} \ --output \${sample}_${version}_kraken_gtdb_reads.txt \ --report \${sample}_${version}_kraken_gtdb_report.txt \ --paired \$pair_1 \$pair_2 fi done """ } step_6_kraken_output_grouped .flatMap{ o -> res=[]; n=step_6_batch_size-1; (0..n).each{res.add([o[0][it], o[1][it], o[2][it]])}; res;} .set{ step_6_kraken_output } s6v .first() .set{ step_6_version } process step_6_code { label 'code' storeDir "${params.outputDir}/${script_name}/code" input: val(version) from step_6_version path(code) from "${workflow.projectDir}/${step_6_script}.nf" val(prior_versions) from "" val(prior_code) from "" val(script_name) from step_6_script output: path("${script_name}.${version}.nf") into (step_6_code, step_6_cumulative_code) script: template 'nf_export_code.sh' } step_6_version .set{ step_6_cumulative_versions }