Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issues in channel serialization #112

Merged
merged 5 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 39 additions & 30 deletions src/main/resources/com/askimed/nf/test/lang/process/WorkflowMock.nf
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import groovy.json.JsonGenerator.Converter

nextflow.enable.dsl=2


// comes from testflight to find json files
// comes from nf-test to store json files
params.nf_test_output = ""

// process mapping
Expand All @@ -15,7 +14,6 @@ ${mapping}
// include test process
include { ${process} } from '${script}'


// define custom rules for JSON that will be generated.
def jsonOutput =
new JsonGenerator.Options()
Expand All @@ -26,42 +24,53 @@ def jsonOutput =

workflow {

${process}(*input)
//run process
${process}(*input)

if (${process}.output){
// consumes all output channels and stores items in a json
def channel = Channel.empty()
for (def name in ${process}.out.getNames()) {
channel << tuple(name, ${process}.out.getProperty(name))
}

def array = ${process}.out as Object[]
for (def i = 0; i < array.length ; i++) {
channel << tuple(i, array[i])
}
if (${process}.output){

// consumes all named output channels and stores items in a json file
for (def name in ${process}.out.getNames()) {
serializeChannel(name, ${process}.out.getProperty(name), jsonOutput)
}

// consumes all unnamed output channels and stores items in a json file
def array = ${process}.out as Object[]
for (def i = 0; i < array.length ; i++) {
serializeChannel(i, array[i], jsonOutput)
}

channel.subscribe { outputTupel ->
def sortedList = outputTupel[1].toList()
sortedList.subscribe { list ->
def map = new HashMap()
def outputName = outputTupel[0]
map[outputName] = list
new File("\${params.nf_test_output}/output_\${outputName}.json").text = jsonOutput.toJson(map)
}
}
}

}

def serializeChannel(name, channel, jsonOutput) {
def _name = name
println "Process channel \${_name}..."
def list = [ ]
channel.subscribe(
onNext: {
list.add(it)
},
onComplete: {
def map = new HashMap()
map[_name] = list
def filename = "\${params.nf_test_output}/output_\${_name}.json"
new File(filename).text = jsonOutput.toJson(map)
println "Wrote channel \${_name} to \${filename}"
}
)
}


workflow.onComplete {

def result = [
success: workflow.success,
exitStatus: workflow.exitStatus,
errorMessage: workflow.errorMessage,
errorReport: workflow.errorReport
]
def result = [
success: workflow.success,
exitStatus: workflow.exitStatus,
errorMessage: workflow.errorMessage,
errorReport: workflow.errorReport
]
new File("\${params.nf_test_output}/workflow.json").text = jsonOutput.toJson(result)

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import groovy.json.JsonGenerator.Converter

nextflow.enable.dsl=2


// comes from testflight to find json files
// comes from nf-test to store json files
params.nf_test_output = ""

// process mapping
Expand All @@ -25,42 +24,53 @@ def jsonOutput =

workflow {

${workflow}(*input)

if (${workflow}.output){
// consumes all output channels and stores items in a json
def channel = Channel.empty()
for (def name in ${workflow}.out.getNames()) {
channel << tuple(name, ${workflow}.out.getProperty(name))
}
//run workflow
${workflow}(*input)

if (${workflow}.output){

// consumes all named output channels and stores items in a json file
for (def name in ${workflow}.out.getNames()) {
serializeChannel(name, ${workflow}.out.getProperty(name), jsonOutput)
}

def array = ${workflow}.out as Object[]
for (def i = 0; i < array.length ; i++) {
channel << tuple(i, array[i])
}

channel.subscribe { outputTupel ->
def sortedList = outputTupel[1].toList()
sortedList.subscribe { list ->
def map = new HashMap()
def outputName = outputTupel[0]
map[outputName] = list
new File("\${params.nf_test_output}/output_\${outputName}.json").text = jsonOutput.toJson(map)
}
}
}
// consumes all unnamed output channels and stores items in a json file
def array = ${workflow}.out as Object[]
for (def i = 0; i < array.length ; i++) {
serializeChannel(i, array[i], jsonOutput)
}

}
}


def serializeChannel(name, channel, jsonOutput) {
def _name = name
println "Process channel \${_name}..."
def list = [ ]
channel.subscribe(
onNext: {
list.add(it)
},
onComplete: {
def map = new HashMap()
map[_name] = list
def filename = "\${params.nf_test_output}/output_\${_name}.json"
new File(filename).text = jsonOutput.toJson(map)
println "Wrote channel \${_name} to \${filename}"
}
)
}


workflow.onComplete {

def result = [
success: workflow.success,
exitStatus: workflow.exitStatus,
errorMessage: workflow.errorMessage,
errorReport: workflow.errorReport
]
def result = [
success: workflow.success,
exitStatus: workflow.exitStatus,
errorMessage: workflow.errorMessage,
errorReport: workflow.errorReport
]
new File("\${params.nf_test_output}/workflow.json").text = jsonOutput.toJson(result)

}
17 changes: 17 additions & 0 deletions src/test/java/com/askimed/nf/test/lang/WorkflowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,15 @@ public void testWorkflowWithRelativePath() throws Exception {
assertEquals(0, exitCode);

}

@Test
public void testWorkflowUnamedOutputs() throws Exception {
App app = new App();
int exitCode = app.run(new String[] { "test", "test-data/workflow/unnamed/trial.unnamed.nf.test" });
assertEquals(0, exitCode);

}

@Test
public void testWorkflowWithNoOutputs() throws Exception {

Expand Down Expand Up @@ -88,5 +96,14 @@ public void testParamsIssue34Setup() throws Exception {
assertEquals(0, exitCode);

}

@Test
public void testHangingWorkflowIssue57() throws Exception {

App app = new App();
int exitCode = app.run(new String[] { "test", "test-data/workflow/hanging/meaningless_workflow.nf.test","--debug"});
assertEquals(0, exitCode);

}

}
10 changes: 10 additions & 0 deletions test-data/workflow/hanging/meaningless_workflow.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
workflow PipeWf {
take:
inputCh

main:
inputCh.set { outputCh }

emit:
outputCh
}
28 changes: 28 additions & 0 deletions test-data/workflow/hanging/meaningless_workflow.nf.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
nextflow_workflow {

name "Test workflow"
script "test-data/workflow/hanging/meaningless_workflow.nf"
workflow "PipeWf"

test("PipeWf will hang") {

when {
workflow {
"""
input[0] = Channel.from([
[
["patientID": "patientA"],
'test_file_1.txt'
]
])
"""
}
}

then {
assert workflow.success
}

}

}
30 changes: 30 additions & 0 deletions test-data/workflow/unnamed/trial.unnamed.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env nextflow
nextflow.enable.dsl=2

process sayHello {
input:
val cheers

output:
stdout emit: verbiage
path "*.txt", emit: output_files

script:
"""
echo -n $cheers
echo -n $cheers > ${cheers}.txt
"""
}

workflow trial {
take: things
main:
sayHello(things)
emit:
sayHello.out.verbiage
sayHello.out.output_files
}

workflow {
Channel.from('a','b') | trial
}
26 changes: 26 additions & 0 deletions test-data/workflow/unnamed/trial.unnamed.nf.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
nextflow_workflow {

name "Test workflow"
script "test-data/workflow/unnamed/trial.unnamed.nf"
workflow "trial"

test("Should run without failures") {
when {
params {
outdir = "tests/results"
}
workflow {
"""
input[0] = Channel.of('a','b')
"""
}
}

then {
//check if test case succeeded
assert workflow.success
assert workflow.out[0].size() == 2
assert workflow.out[1].size() == 2
}
}
}