Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposing trace record meta info in the task context #5402

Merged
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/process.md
Original file line number Diff line number Diff line change
@@ -1283,6 +1283,25 @@ If the task execution fail reporting an exit status in the range between 137 and

The directive {ref}`process-maxretries` set the maximum number of time the same task can be re-executed.

### Dynamic task resources with previous execution trace
:::{versionadded} 24.10.0
:::

Similarly to the previous scenario, task resources can be also updated according to the metrics included in the {ref}`trace record <trace-report>` of the previous task attempt. The metrics can be accessed through the `task.previousTrace` variable. For example:
jorgee marked this conversation as resolved.
Show resolved Hide resolved

```groovy
process foo {
memory { task.attempt > 1 ? task.previousTrace.memory * 2 : (1.GB) }
errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' }
maxRetries 3

script:
<your job here>
}
```
In the above example the {ref}`process-memory` is set according to previous trace record metrics. In the first attempt, when no trace metrics are available, it is set to one GB, while in the other attempts it is doubling the previously allocated memory. The available metrics in the trace record can be found in the {ref}`Trace Report section <trace-report>`.
jorgee marked this conversation as resolved.
Show resolved Hide resolved


### Dynamic retry with backoff

There are cases in which the required execution resources may be temporary unavailable e.g. network congestion. In these cases immediately re-executing the task will likely result in the identical error. A retry with an exponential backoff delay can better recover these error conditions:
12 changes: 12 additions & 0 deletions docs/reference/process.md
Original file line number Diff line number Diff line change
@@ -26,6 +26,18 @@ The following task properties are defined in the process body:
: *Available only in `exec:` blocks*
: The current task name.

`task.previousError`
: :::{versionadded} 24.09.2-edge
pditommaso marked this conversation as resolved.
Show resolved Hide resolved
:::
: The error message of the task execution.
: Since the error message is only available after the task has been executed, it can only be used when `task.attempt` is bigger than 1.

`task.previousTrace`
: :::{versionadded} 24.09.2-edge
pditommaso marked this conversation as resolved.
Show resolved Hide resolved
:::
: The trace record of the task execution.
: Since the trace record is only available after the task has been executed, it can only be used when `task.attempt` is bigger than 1.

`task.process`
: The current process name.

Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@

package nextflow.processor

import static nextflow.processor.TaskProcessor.*

import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
@@ -617,7 +619,7 @@ class TaskPollingMonitor implements TaskMonitor {
if (evict(handler)) {
handler.decProcessForks()
}
fault = handler.task.processor.resumeOrDie(handler?.task, error)
fault = handler.task.processor.resumeOrDie(handler?.task, error, handler.getTraceRecord())
log.trace "Task fault (1): $fault"
}
finally {
@@ -683,7 +685,7 @@ class TaskPollingMonitor implements TaskMonitor {

protected void finalizeTask( TaskHandler handler ) {
// finalize the task execution
final fault = handler.task.processor.finalizeTask(handler.task)
final fault = handler.task.processor.finalizeTask(handler)

// notify task completion
session.notifyTaskComplete(handler)
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@
*/
package nextflow.processor

import nextflow.trace.TraceRecord

import static nextflow.processor.ErrorStrategy.*

import java.lang.reflect.InvocationTargetException
@@ -1016,7 +1018,7 @@ class TaskProcessor {
* a {@link ErrorStrategy#TERMINATE})
*/
@PackageScope
final synchronized resumeOrDie( TaskRun task, Throwable error ) {
final synchronized resumeOrDie( TaskRun task, Throwable error, TraceRecord traceRecord = null) {
log.debug "Handling unexpected condition for\n task: name=${safeTaskName(task)}; work-dir=${task?.workDirStr}\n error [${error?.class?.name}]: ${error?.getMessage()?:error}"

ErrorStrategy errorStrategy = TERMINATE
@@ -1061,6 +1063,10 @@ class TaskProcessor {
task.config.exitStatus = task.exitStatus
task.config.errorCount = procErrCount
task.config.retryCount = taskErrCount
//Add trace of the previous execution in the task context for next execution
if ( traceRecord )
task.config.previousTrace = traceRecord
task.config.previousError = error.getMessage()
jorgee marked this conversation as resolved.
Show resolved Hide resolved

errorStrategy = checkErrorStrategy(task, error, taskErrCount, procErrCount, submitRetries)
if( errorStrategy.soft ) {
@@ -2361,7 +2367,8 @@ class TaskProcessor {
* @param task The {@code TaskRun} instance to finalize
*/
@PackageScope
final finalizeTask( TaskRun task ) {
final finalizeTask( TaskHandler handler) {
def task = handler.task
log.trace "finalizing process > ${safeTaskName(task)} -- $task"

def fault = null
@@ -2384,7 +2391,7 @@ class TaskProcessor {
collectOutputs(task)
}
catch ( Throwable error ) {
fault = resumeOrDie(task, error)
fault = resumeOrDie(task, error, handler.getTraceRecord())
log.trace "Task fault (3): $fault"
}

Original file line number Diff line number Diff line change
@@ -175,7 +175,7 @@ class MockTaskHandler extends TaskHandler {
task.code.call()
}
status = TaskStatus.COMPLETED
task.processor.finalizeTask(task)
task.processor.finalizeTask(this)
}

@Override
11 changes: 11 additions & 0 deletions tests/checks/trace-access.nf/.checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set -e

echo ''
$NXF_RUN | tee stdout

[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > foo'` == 1 ]] || false
[[ `grep 'INFO' .nextflow.log | grep -c 'Re-submitted process > foo'` == 3 ]] || false

[[ `grep -c 'mem: 8 GB (previous: 4294967296) (error: Process .* terminated with an error exit status (137))' stdout` == 1 ]] || false


24 changes: 24 additions & 0 deletions tests/trace-access.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
process foo {
memory { task.attempt > 1 ? task.previousTrace.memory * 2 : (1.GB) }
errorStrategy 'retry'
maxRetries 3
input:
val i
output:
stdout
script:
if( task.attempt <= 3 ){
"""
exit 137
"""
} else {
"""
echo 'mem: $task.memory (previous: $task.previousTrace.memory) (error: $task.previousError)'
exit 0
"""
}
}

workflow {
foo(channel.of(1)).view()
}