Skip to content

Commit

Permalink
feat(core): introduce an finally block on flow & flowable (#6686)
Browse files Browse the repository at this point in the history
close #6649
  • Loading branch information
tchiotludo authored Jan 13, 2025
1 parent 55b6720 commit 0d1710d
Show file tree
Hide file tree
Showing 47 changed files with 1,273 additions and 1,141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void run() {
Integer call = PicocliRunner.call(FlowDotCommand.class, ctx, args);

assertThat(call, is(0));
assertThat(out.toString(), containsString("\"root.date\"[shape=box,label=\"date\"];"));
assertThat(out.toString(), containsString("\"root.date\"[shape=box];"));
}
}
}
39 changes: 31 additions & 8 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,15 @@ public TaskRun findTaskRunByTaskIdAndValue(String id, List<String> values)
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors afters tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, null);
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors,
List<ResolvedTask> resolvedAfters
) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, resolvedAfters, null);
}

/**
Expand All @@ -349,15 +353,28 @@ public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolved
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedFinally afters tasks
* @param parentTaskRun the parent task
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors, TaskRun parentTaskRun) {
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
@Nullable List<ResolvedTask> resolvedErrors,
@Nullable List<ResolvedTask> resolvedFinally,
TaskRun parentTaskRun
) {
resolvedTasks = removeDisabled(resolvedTasks);
resolvedErrors = removeDisabled(resolvedErrors);
resolvedFinally = removeDisabled(resolvedFinally);


List<TaskRun> errorsFlow = this.findTaskRunByTasks(resolvedErrors, parentTaskRun);
List<TaskRun> finallyFlow = this.findTaskRunByTasks(resolvedFinally, parentTaskRun);

// finally is already started, just continue theses finally
if (!finallyFlow.isEmpty()) {
return resolvedFinally == null ? Collections.emptyList() : resolvedFinally;
}

// Check if flow has failed task
if (!errorsFlow.isEmpty() || this.hasFailed(resolvedTasks, parentTaskRun)) {
Expand All @@ -366,8 +383,15 @@ public List<ResolvedTask> findTaskDependingFlowState(List<ResolvedTask> resolved
return Collections.emptyList();
}

return resolvedErrors == null ? Collections.emptyList() : resolvedErrors;
if (resolvedFinally != null && resolvedErrors != null && !this.isTerminated(resolvedErrors, parentTaskRun)) {
return resolvedErrors;
} else if (resolvedFinally == null) {
return resolvedErrors == null ? Collections.emptyList() : resolvedErrors;
}
}

if (this.isTerminated(resolvedTasks, parentTaskRun) && resolvedFinally != null) {
return resolvedFinally;
}

return resolvedTasks;
Expand All @@ -390,8 +414,7 @@ private List<ResolvedTask> removeDisabled(List<ResolvedTask> tasks) {
.toList();
}

public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks,
TaskRun parentTaskRun) {
public List<TaskRun> findTaskRunByTasks(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRun) {
if (resolvedTasks == null || this.taskRunList == null) {
return Collections.emptyList();
}
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
Expand Down Expand Up @@ -32,10 +33,7 @@
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,6 +79,15 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
List<Task> errors;

@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;

public List<Task> getFinally() {
return this._finally;
}

@Valid
@Deprecated
List<Listener> listeners;
Expand Down Expand Up @@ -188,6 +195,7 @@ public Stream<Task> allTasks() {
return Stream.of(
this.tasks != null ? this.tasks : new ArrayList<Task>(),
this.errors != null ? this.errors : new ArrayList<Task>(),
this._finally != null ? this._finally : new ArrayList<Task>(),
this.listenersTasks()
)
.flatMap(Collection::stream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class FlowForExecution extends AbstractFlow {
@Valid
List<TaskForExecution> errors;

@Valid
List<TaskForExecution> _finally;

@Valid
List<AbstractTriggerForExecution> triggers;

Expand All @@ -36,6 +39,7 @@ public static FlowForExecution of(Flow flow) {
.inputs(flow.getInputs())
.tasks(flow.getTasks().stream().map(TaskForExecution::of).toList())
.errors(ListUtils.emptyOnNull(flow.getErrors()).stream().map(TaskForExecution::of).toList())
._finally(ListUtils.emptyOnNull(flow.getFinally()).stream().map(TaskForExecution::of).toList())
.triggers(ListUtils.emptyOnNull(flow.getTriggers()).stream().map(AbstractTriggerForExecution::of).toList())
.disabled(flow.isDisabled())
.deleted(flow.isDeleted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public Flow toFlow() {
.variables(this.variables)
.tasks(this.tasks)
.errors(this.errors)
._finally(this._finally)
.listeners(this.listeners)
.triggers(this.triggers)
.pluginDefaults(this.pluginDefaults)
Expand Down Expand Up @@ -70,6 +71,7 @@ public static FlowWithSource of(Flow flow, String source) {
.variables(flow.variables)
.tasks(flow.tasks)
.errors(flow.errors)
._finally(flow._finally)
.listeners(flow.listeners)
.triggers(flow.triggers)
.pluginDefaults(flow.pluginDefaults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.micronaut.core.annotation.Introspected;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
Expand All @@ -19,7 +18,7 @@ public abstract class AbstractGraph {
@JsonInclude
protected String type;
@Setter
protected boolean error;
protected BranchType branchType;

public AbstractGraph() {
this.type = this.getClass().getName();
Expand All @@ -39,8 +38,8 @@ public void updateUidWithChildren(String uid) {
this.uid = uid;
}

public void updateErrorWithChildren(boolean error) {
this.error = error;
public void updateWithChildren(BranchType branchType) {
this.branchType = branchType;
}

public AbstractGraph forExecution() {
Expand All @@ -53,4 +52,9 @@ public boolean equals(Object o) {
if (!(o instanceof AbstractGraph)) return false;
return o.hashCode() == this.hashCode();
}

public enum BranchType {
ERROR,
FINALLY
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public Graph<T, V> addEdge(T previous, T next, V value) {
return this;
}

public Graph<T, V> removeEdge(T previous, T next) {
this.graph.removeEdge(previous, next);

return this;
}

public Set<T> nodes() {
return this.graph.nodes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.tasks.Task;
import lombok.Builder;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -25,6 +25,14 @@ public class GraphCluster extends AbstractGraph {
@JsonIgnore
private final GraphClusterRoot root;

@JsonIgnore
@Getter(AccessLevel.NONE)
private final GraphClusterFinally _finally;

public GraphClusterFinally getFinally() {
return _finally;
}

@JsonIgnore
private final GraphClusterEnd end;

Expand All @@ -41,17 +49,22 @@ public GraphCluster(String uid) {

this.relationType = null;
this.root = new GraphClusterRoot();
this._finally = new GraphClusterFinally();
this.end = new GraphClusterEnd();
this.taskNode = null;

this.addNode(this.root);
this.addNode(this._finally);
this.addNode(this.end);

this.addEdge(this.getFinally(), this.getEnd(), new Relation());
}

public GraphCluster(Task task, TaskRun taskRun, List<String> values, RelationType relationType) {
this(new GraphTask(task.getId(), task, taskRun, values, relationType), task.getId(), relationType);

this.addNode(this.taskNode, false);

this.addEdge(this.getRoot(), this.taskNode, new Relation());
}

Expand All @@ -60,11 +73,15 @@ protected GraphCluster(AbstractGraphTask taskNode, String uid, RelationType rela

this.relationType = relationType;
this.root = new GraphClusterRoot();
this._finally = new GraphClusterFinally();
this.end = new GraphClusterEnd();
this.taskNode = taskNode;

this.addNode(this.root);
this.addNode(this._finally);
this.addNode(this.end);

this.addEdge(this.getFinally(), this.getEnd(), new Relation());
}

public void addNode(AbstractGraph node) {
Expand Down Expand Up @@ -122,12 +139,12 @@ public void updateUidWithChildren(String uid) {
}

@Override
public void updateErrorWithChildren(boolean error) {
this.error = error;
public void updateWithChildren(BranchType branchType) {
this.branchType = branchType;

this.taskNode.error = error;
this.root.error = error;
this.end.error = error;
this.taskNode.branchType = branchType;
this.root.branchType = branchType;
this.end.branchType = branchType;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
@Getter
public class GraphClusterEnd extends AbstractGraph {
public GraphClusterEnd() {
super(IdUtils.create());
super("end-" + IdUtils.create());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.kestra.core.models.hierarchies;

import io.kestra.core.utils.IdUtils;
import lombok.Getter;

@Getter
public class GraphClusterFinally extends AbstractGraph {
public GraphClusterFinally() {
super("finally-" + IdUtils.create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
@Getter
public class GraphClusterRoot extends AbstractGraph {
public GraphClusterRoot() {
super(IdUtils.create());
super("root-" + IdUtils.create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public enum RelationType {
SEQUENTIAL,
CHOICE,
ERROR,
FINALLY,
PARALLEL,
DYNAMIC
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public interface FlowableTask <T extends Output> {
@PluginProperty
List<Task> getErrors();

@Schema(
title = "List of tasks to run after any tasks failed or success on this FlowableTask."
)
@PluginProperty
List<Task> getFinally();

/**
* Create the topology representation of a flowable task.
* <p>
Expand Down Expand Up @@ -71,6 +77,7 @@ default Optional<State.Type> resolveState(RunContext runContext, Execution execu
execution,
this.childTasks(runContext, parentTaskRun),
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
FlowableUtils.resolveTasks(this.getFinally(), parentTaskRun),
parentTaskRun,
runContext,
isAllowFailure(),
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/io/kestra/core/models/templates/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
Expand Down Expand Up @@ -67,6 +68,15 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
private List<Task> errors;

@Valid
@JsonProperty("finally")
@Getter(AccessLevel.NONE)
protected List<Task> _finally;

public List<Task> getFinally() {
return this._finally;
}

@NotNull
@Builder.Default
private final boolean deleted = false;
Expand Down Expand Up @@ -138,6 +148,7 @@ public Template toDeleted() {
this.description,
this.tasks,
this.errors,
this._finally,
true
);
}
Expand Down
Loading

0 comments on commit 0d1710d

Please sign in to comment.