Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding total timeout to task models #313

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ public boolean isValid(TaskDef taskDef, ConstraintValidatorContext context) {
}
}

// Check if timeoutSeconds is greater than totalTimeoutSeconds
if (taskDef.getTimeoutSeconds() > 0
&& taskDef.getTotalTimeoutSeconds() > 0
&& taskDef.getTimeoutSeconds() > taskDef.getTotalTimeoutSeconds()) {
valid = false;
String message =
String.format(
"TaskDef: %s timeoutSeconds: %d must be less than or equal to totalTimeoutSeconds: %d",
taskDef.getName(),
taskDef.getTimeoutSeconds(),
taskDef.getTotalTimeoutSeconds());
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
}

return valid;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ public boolean isRetriable() {
@ProtoField(id = 42)
private boolean subworkflowChanged;

@ProtoField(id = 43)
private long firstStartTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be added to equals, hashcode and toString?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think we should change the hashCode to be aware of the task's PK -- which is taskName. Thoughts @jmigueprieto

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@v1r3n hashCode is considering taskDefName

  @Override
    public int hashCode() {
        return Objects.hash(
                getTaskType(),
                getStatus(),
                getInputData(),
                getReferenceTaskName(),
                getWorkflowPriority(),
                getRetryCount(),
                getSeq(),
                getCorrelationId(),
                getPollCount(),
                getTaskDefName(),
                ...
}

Or do you want to use just "task name"?
PROS

  • It will be simpler
  • (small) Performance gain

CONS

  • We'll need to modify equals: if only id is used in hashCode but equals considers additional fields, the contract will be broken.
  • We'll have an incomplete equality: this might break existing code e.g.: tests that assert assertNotEquals(task0, task1). If both task have the same name but different fields, the assertion is true now but will be false is we just consider task name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can analyze this further in another PR. But, for the moment I believe that we should stick to considering the added fields to equals and hashCode


// If the task is an event associated with a parent task, the id of the parent task
private String parentTaskId;

Expand Down Expand Up @@ -716,7 +719,9 @@ public boolean isLoopOverTask() {
return iteration > 0;
}

/** * @return the priority defined on workflow */
/**
* @return the priority defined on workflow
*/
public int getWorkflowPriority() {
return workflowPriority;
}
Expand Down Expand Up @@ -765,6 +770,14 @@ public void setParentTaskId(String parentTaskId) {
this.parentTaskId = parentTaskId;
}

public long getFirstStartTime() {
return firstStartTime;
}

public void setFirstStartTime(long firstStartTime) {
this.firstStartTime = firstStartTime;
}

public Task copy() {
Task copy = new Task();
copy.setCallbackAfterSeconds(callbackAfterSeconds);
Expand Down Expand Up @@ -798,6 +811,7 @@ public Task copy() {
copy.setSubWorkflowId(getSubWorkflowId());
copy.setSubworkflowChanged(subworkflowChanged);
copy.setParentTaskId(parentTaskId);
copy.setFirstStartTime(firstStartTime);
return copy;
}

Expand All @@ -819,6 +833,7 @@ public Task deepCopy() {
deepCopy.setReasonForIncompletion(reasonForIncompletion);
deepCopy.setSeq(seq);
deepCopy.setParentTaskId(parentTaskId);
deepCopy.setFirstStartTime(firstStartTime);
return deepCopy;
}

Expand Down Expand Up @@ -919,6 +934,9 @@ public String toString() {
+ ", subworkflowChanged='"
+ subworkflowChanged
+ '\''
+ ", firstStartTime='"
+ firstStartTime
+ '\''
+ '}';
}

Expand Down Expand Up @@ -973,7 +991,8 @@ && getWorkflowPriority() == task.getWorkflowPriority()
task.getExternalOutputPayloadStoragePath())
&& Objects.equals(getIsolationGroupId(), task.getIsolationGroupId())
&& Objects.equals(getExecutionNameSpace(), task.getExecutionNameSpace())
&& Objects.equals(getParentTaskId(), task.getParentTaskId());
&& Objects.equals(getParentTaskId(), task.getParentTaskId())
&& Objects.equals(getFirstStartTime(), task.getFirstStartTime());
}

@Override
Expand Down Expand Up @@ -1016,6 +1035,7 @@ public int hashCode() {
getExternalOutputPayloadStoragePath(),
getIsolationGroupId(),
getExecutionNameSpace(),
getParentTaskId());
getParentTaskId(),
getFirstStartTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public enum RetryLogic {
@ProtoField(id = 21)
private String baseType;

@ProtoField(id = 22)
@NotNull
private long totalTimeoutSeconds;

private SchemaDef inputSchema;
private SchemaDef outputSchema;
private boolean enforceSchema;
Expand Down Expand Up @@ -464,6 +468,14 @@ public void setEnforceSchema(boolean enforceSchema) {
this.enforceSchema = enforceSchema;
}

public long getTotalTimeoutSeconds() {
return totalTimeoutSeconds;
}

public void setTotalTimeoutSeconds(@NotNull long totalTimeoutSeconds) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: a long (primitive type) cannot be null. The @NotNull is redundant.

this.totalTimeoutSeconds = totalTimeoutSeconds;
}

@Override
public String toString() {
return name;
Expand Down Expand Up @@ -497,7 +509,8 @@ && getRetryLogic() == taskDef.getRetryLogic()
&& Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail())
&& Objects.equals(getBaseType(), taskDef.getBaseType())
&& Objects.equals(getInputSchema(), taskDef.getInputSchema())
&& Objects.equals(getOutputSchema(), taskDef.getOutputSchema());
&& Objects.equals(getOutputSchema(), taskDef.getOutputSchema())
&& Objects.equals(getTotalTimeoutSeconds(), taskDef.getTotalTimeoutSeconds());
}

@Override
Expand All @@ -523,6 +536,7 @@ public int hashCode() {
getOwnerEmail(),
getBaseType(),
getInputSchema(),
getOutputSchema());
getOutputSchema(),
getTotalTimeoutSeconds());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ public void testTaskDef() {
assertTrue(validationErrors.contains("ownerEmail cannot be empty"));
}

@Test
public void testTaskDefTotalTimeOutSeconds() {
TaskDef taskDef = new TaskDef();
taskDef.setName("test-task");
taskDef.setRetryCount(1);
taskDef.setTimeoutSeconds(1000);
taskDef.setTotalTimeoutSeconds(900);
taskDef.setResponseTimeoutSeconds(1);
taskDef.setOwnerEmail("[email protected]");

Set<ConstraintViolation<Object>> result = validator.validate(taskDef);
assertEquals(1, result.size());

List<String> validationErrors = new ArrayList<>();
result.forEach(e -> validationErrors.add(e.getMessage()));

assertTrue(
validationErrors.toString(),
validationErrors.contains(
"TaskDef: test-task timeoutSeconds: 1000 must be less than or equal to totalTimeoutSeconds: 900"));
}

@Test
public void testTaskDefInvalidEmail() {
TaskDef taskDef = new TaskDef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testDeepCopyTask() {
final Task task = new Task();
// In order to avoid forgetting putting inside the copy method the newly added fields check
// the number of declared fields.
final int expectedTaskFieldsNumber = 41;
final int expectedTaskFieldsNumber = 42;
final int declaredFieldsNumber = task.getClass().getDeclaredFields().length;

assertEquals(expectedTaskFieldsNumber, declaredFieldsNumber);
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/com/netflix/conductor/model/TaskModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public boolean isRetriable() {
/** Time when the task was last updated */
private long updateTime;

/** Time when first task started */
private long firstStartTime;

private int startDelayInSeconds;

private String retriedTaskId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,7 @@ public TaskPb.Task toProto(Task from) {
to.setSubWorkflowId( from.getSubWorkflowId() );
}
to.setSubworkflowChanged( from.isSubworkflowChanged() );
to.setFirstStartTime( from.getFirstStartTime() );
return to.build();
}

Expand Down Expand Up @@ -788,6 +789,7 @@ public Task fromProto(TaskPb.Task from) {
to.setIteration( from.getIteration() );
to.setSubWorkflowId( from.getSubWorkflowId() );
to.setSubworkflowChanged( from.getSubworkflowChanged() );
to.setFirstStartTime( from.getFirstStartTime() );
return to;
}

Expand Down Expand Up @@ -875,6 +877,7 @@ public TaskDefPb.TaskDef toProto(TaskDef from) {
if (from.getBaseType() != null) {
to.setBaseType( from.getBaseType() );
}
to.setTotalTimeoutSeconds( from.getTotalTimeoutSeconds() );
return to.build();
}

Expand Down Expand Up @@ -904,6 +907,7 @@ public TaskDef fromProto(TaskDefPb.TaskDef from) {
to.setPollTimeoutSeconds( from.getPollTimeoutSeconds() );
to.setBackoffScaleFactor( from.getBackoffScaleFactor() );
to.setBaseType( from.getBaseType() );
to.setTotalTimeoutSeconds( from.getTotalTimeoutSeconds() );
return to;
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/task.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,5 @@ message Task {
int32 iteration = 40;
string sub_workflow_id = 41;
bool subworkflow_changed = 42;
int64 first_start_time = 43;
}
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/taskdef.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ message TaskDef {
int32 poll_timeout_seconds = 19;
int32 backoff_scale_factor = 20;
string base_type = 21;
int64 total_timeout_seconds = 22;
}
Loading