From 75763ccb90cb854ddb0470900a0ee3434ba429fc Mon Sep 17 00:00:00 2001 From: Miguel Prieto Date: Wed, 11 Sep 2024 17:09:18 -0400 Subject: [PATCH] Fix flaky test which is failing because integration_task_2 is not COMPLETED immediately tasks[2].status == Task.Status.COMPLETED | | | | false class com.netflix.conductor.common.metadata.tasks.Task$Status SCHEDULED --- test-harness/build.gradle | 1 + ...rchicalForkJoinSubworkflowRerunSpec.groovy | 76 ++++++++++--------- 2 files changed, 43 insertions(+), 34 deletions(-) diff --git a/test-harness/build.gradle b/test-harness/build.gradle index db73b2706..5f8a64a59 100644 --- a/test-harness/build.gradle +++ b/test-harness/build.gradle @@ -40,4 +40,5 @@ dependencies { testImplementation "org.junit.vintage:junit-vintage-engine" testImplementation "jakarta.ws.rs:jakarta.ws.rs-api:${revJAXRS}" testImplementation "org.glassfish.jersey.core:jersey-common:${revJerseyCommon}" + testImplementation "org.awaitility:awaitility:${revAwaitility}" } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy index 1a8f1dff4..508cb3f16 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy @@ -12,6 +12,8 @@ */ package com.netflix.conductor.test.integration +import java.util.concurrent.TimeUnit + import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task @@ -30,6 +32,8 @@ import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOI import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask +import static org.awaitility.Awaitility.await + class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { @Shared @@ -217,17 +221,18 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { workflowExecutor.rerun(reRunWorkflowRequest) then: "verify that the root workflow created a new execution" - with(workflowExecutionService.getExecutionStatus(rootWorkflowId, true)) { - status == Workflow.WorkflowStatus.RUNNING - tasks.size() == 4 - tasks[0].taskType == TASK_TYPE_FORK - tasks[0].status == Task.Status.COMPLETED - tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.IN_PROGRESS - tasks[2].taskType == 'integration_task_2' - tasks[2].status == Task.Status.SCHEDULED - tasks[3].taskType == TASK_TYPE_JOIN - tasks[3].status == Task.Status.IN_PROGRESS + await().atMost(10, TimeUnit.SECONDS).until { + def executionStatus = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) + executionStatus.status == Workflow.WorkflowStatus.RUNNING && + executionStatus.tasks.size() == 4 && + executionStatus.tasks[0].taskType == TASK_TYPE_FORK && + executionStatus.tasks[0].status == Task.Status.COMPLETED && + executionStatus.tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW && + executionStatus.tasks[1].status == Task.Status.IN_PROGRESS && + executionStatus.tasks[2].taskType == 'integration_task_2' && + executionStatus.tasks[2].status == Task.Status.SCHEDULED && + executionStatus.tasks[3].taskType == TASK_TYPE_JOIN && + executionStatus.tasks[3].status == Task.Status.IN_PROGRESS } when: "poll and complete the integration_task_2 task in the root workflow" @@ -238,17 +243,18 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId workflowExecutor.decide(newMidLevelWorkflowId) - with(workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true)) { - status == Workflow.WorkflowStatus.RUNNING - tasks.size() == 4 - tasks[0].taskType == TASK_TYPE_FORK - tasks[0].status == Task.Status.COMPLETED - tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW - tasks[1].status == Task.Status.IN_PROGRESS - tasks[2].taskType == 'integration_task_2' - tasks[2].status == Task.Status.COMPLETED - tasks[3].taskType == TASK_TYPE_JOIN - tasks[3].status == Task.Status.IN_PROGRESS + await().atMost(10, TimeUnit.SECONDS).until { + def executionStatus = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true) + executionStatus.status == Workflow.WorkflowStatus.RUNNING && + executionStatus.tasks.size() == 4 && + executionStatus.tasks[0].taskType == TASK_TYPE_FORK && + executionStatus.tasks[0].status == Task.Status.COMPLETED && + executionStatus.tasks[1].taskType == TASK_TYPE_SUB_WORKFLOW && + executionStatus.tasks[1].status == Task.Status.IN_PROGRESS && + executionStatus.tasks[2].taskType == 'integration_task_2' && + executionStatus.tasks[2].status == Task.Status.COMPLETED && + executionStatus.tasks[3].taskType == TASK_TYPE_JOIN && + executionStatus.tasks[3].status == Task.Status.IN_PROGRESS } when: "poll and complete the integration_task_2 task in the root-level workflow" @@ -258,11 +264,12 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { then: "verify that a new leaf workflow is created and is in RUNNING state" newLeafWorkflowId != leafWorkflowId - with(workflowExecutionService.getExecutionStatus(newLeafWorkflowId, true)) { - status == Workflow.WorkflowStatus.RUNNING - tasks.size() == 1 - tasks[0].taskType == 'integration_task_1' - tasks[0].status == Task.Status.SCHEDULED + await().atMost(10, TimeUnit.SECONDS).until { + def executionStatus = workflowExecutionService.getExecutionStatus(newLeafWorkflowId, true) + executionStatus.status == Workflow.WorkflowStatus.RUNNING && + executionStatus.tasks.size() == 1 && + executionStatus.tasks[0].taskType == 'integration_task_1' && + executionStatus.tasks[0].status == Task.Status.SCHEDULED } when: "poll and complete the two tasks in the leaf workflow" @@ -270,13 +277,14 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) then: "the new leaf workflow is in COMPLETED state" - with(workflowExecutionService.getExecutionStatus(newLeafWorkflowId, true)) { - status == Workflow.WorkflowStatus.COMPLETED - tasks.size() == 2 - tasks[0].taskType == 'integration_task_1' - tasks[0].status == Task.Status.COMPLETED - tasks[1].taskType == 'integration_task_2' - tasks[1].status == Task.Status.COMPLETED + await().atMost(10, TimeUnit.SECONDS).until { + def executionStatus = workflowExecutionService.getExecutionStatus(newLeafWorkflowId, true) + executionStatus.status == Workflow.WorkflowStatus.COMPLETED && + executionStatus.tasks.size() == 2 && + executionStatus.tasks[0].taskType == 'integration_task_1' && + executionStatus.tasks[0].status == Task.Status.COMPLETED && + executionStatus.tasks[1].taskType == 'integration_task_2' && + executionStatus.tasks[1].status == Task.Status.COMPLETED } when: "the new mid level and root workflows are 'decided'"