From 003ab5b467b837cd18f83e45594100bfa1e93d3c Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 26 Sep 2022 16:42:46 -0700 Subject: [PATCH] Revert "Bmoric/temporal cleaning cron (#16414)" This reverts commit bbb59d8675603e708736717b461d3ce562266abf. --- airbyte-commons-temporal/build.gradle | 42 ---- .../temporal/ConnectionManagerUtils.java | 53 ----- .../commons/temporal/TemporalClient.java | 135 ----------- .../temporal/config/TemporalBeanFactory.java | 32 --- .../commons/temporal/TemporalClientTest.java | 89 ------- airbyte-container-orchestrator/build.gradle | 1 - airbyte-cron/build.gradle | 14 +- .../io/airbyte/cron/selfhealing/Temporal.java | 14 +- .../main/resources/application-control.yml | 40 ---- .../src/main/resources/application.yml | 219 ------------------ .../src/main/resources/micronaut-banner.txt | 1 - airbyte-server/build.gradle | 5 - .../java/io/airbyte/server/ServerApp.java | 4 +- airbyte-test-utils/build.gradle | 1 - .../utils/AirbyteAcceptanceTestHarness.java | 8 +- airbyte-tests/build.gradle | 1 - .../test/acceptance/BasicAcceptanceTests.java | 2 +- airbyte-workers/build.gradle | 1 - .../workers/ApplicationInitializer.java | 4 +- .../workers/config/ActivityBeanFactory.java | 2 +- .../workers/config/TemporalBeanFactory.java | 15 +- .../workers/run/TemporalWorkerRunFactory.java | 2 +- .../temporal/CancellationHandler.java | 2 +- .../temporal/ConnectionManagerUtils.java | 8 +- .../temporal/TemporalAttemptExecution.java | 2 - .../workers/temporal/TemporalClient.java | 32 ++- .../workers}/temporal/TemporalJobType.java | 2 +- .../workers}/temporal/TemporalUtils.java | 2 +- .../temporal/TemporalWorkflowUtils.java | 4 +- .../CheckConnectionActivityImpl.java | 2 +- .../catalog/DiscoverCatalogActivityImpl.java | 2 +- .../scheduling/ConnectionManagerWorkflow.java | 4 +- .../ConnectionManagerWorkflowImpl.java | 12 +- .../scheduling/ConnectionUpdaterInput.java | 6 +- .../activities/GenerateInputActivityImpl.java | 2 +- .../activities/RecordMetricActivity.java | 2 +- .../activities/RecordMetricActivityImpl.java | 2 +- .../state/WorkflowInternalState.java | 4 +- .../scheduling/state/WorkflowState.java | 8 +- .../state/listener/NoopStateListener.java | 2 +- .../state/listener/TestStateListener.java | 2 +- .../WorkflowStateChangedListener.java | 2 +- .../temporal/spec/SpecActivityImpl.java | 2 +- .../temporal/sync/DbtLauncherWorker.java | 2 +- .../sync/DbtTransformationActivityImpl.java | 4 +- .../workers/temporal/sync/LauncherWorker.java | 2 +- .../sync/NormalizationActivityImpl.java | 4 +- .../sync/NormalizationLauncherWorker.java | 2 +- .../sync/ReplicationActivityImpl.java | 4 +- .../sync/ReplicationLauncherWorker.java | 2 +- .../workers/temporal/sync/RouterService.java | 2 +- .../workers_models/JobRunConfig.yaml | 0 .../temporal/CancellationHandlerTest.java | 9 +- .../TemporalAttemptExecutionTest.java | 1 - .../workers/temporal/TemporalClientTest.java | 46 +++- .../workers}/temporal/TemporalUtilsTest.java | 9 +- .../CheckConnectionWorkflowTest.java | 2 +- .../ConnectionManagerWorkflowTest.java | 12 +- .../RecordMetricActivityImplTest.java | 2 +- .../temporal/stubs/HeartbeatWorkflow.java | 4 +- .../temporal/sync/RouterServiceTest.java | 2 +- .../temporal/sync/SyncWorkflowTest.java | 2 +- docker-compose.yaml | 13 -- settings.gradle | 4 +- 64 files changed, 158 insertions(+), 759 deletions(-) delete mode 100644 airbyte-commons-temporal/build.gradle delete mode 100644 airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java delete mode 100644 airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java delete mode 100644 airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/config/TemporalBeanFactory.java delete mode 100644 airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java delete mode 100644 airbyte-cron/src/main/resources/application-control.yml delete mode 100644 airbyte-cron/src/main/resources/application.yml delete mode 100644 airbyte-cron/src/main/resources/micronaut-banner.txt rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/CancellationHandler.java (98%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/TemporalJobType.java (85%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/TemporalUtils.java (99%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/TemporalWorkflowUtils.java (96%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/scheduling/ConnectionManagerWorkflow.java (95%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/scheduling/ConnectionUpdaterInput.java (86%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/scheduling/state/WorkflowInternalState.java (80%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/scheduling/state/WorkflowState.java (94%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/scheduling/state/listener/NoopStateListener.java (86%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/scheduling/state/listener/TestStateListener.java (93%) rename {airbyte-commons-temporal/src/main/java/io/airbyte/commons => airbyte-workers/src/main/java/io/airbyte/workers}/temporal/scheduling/state/listener/WorkflowStateChangedListener.java (94%) rename {airbyte-commons-temporal => airbyte-workers}/src/main/resources/workers_models/JobRunConfig.yaml (100%) rename {airbyte-commons-temporal/src/test/java/io/airbyte/commons => airbyte-workers/src/test/java/io/airbyte/workers}/temporal/CancellationHandlerTest.java (85%) rename {airbyte-commons-temporal/src/test/java/io/airbyte/commons => airbyte-workers/src/test/java/io/airbyte/workers}/temporal/TemporalUtilsTest.java (98%) rename {airbyte-commons-temporal/src/test/java/io/airbyte/commons => airbyte-workers/src/test/java/io/airbyte/workers}/temporal/stubs/HeartbeatWorkflow.java (93%) diff --git a/airbyte-commons-temporal/build.gradle b/airbyte-commons-temporal/build.gradle deleted file mode 100644 index 3b4e25641aa3f..0000000000000 --- a/airbyte-commons-temporal/build.gradle +++ /dev/null @@ -1,42 +0,0 @@ -import org.jsonschema2pojo.SourceType - -plugins { - id "java-library" - id 'com.github.eirnym.js2p' version '1.0' -} - -dependencies { - annotationProcessor platform(libs.micronaut.bom) - annotationProcessor libs.bundles.micronaut.annotation.processor - - implementation platform(libs.micronaut.bom) - implementation libs.bundles.micronaut - - implementation 'io.temporal:temporal-sdk:1.8.1' - implementation 'io.temporal:temporal-serviceclient:1.8.1' - - testAnnotationProcessor platform(libs.micronaut.bom) - testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor - - - implementation project(':airbyte-config:config-models') - implementation project(':airbyte-metrics:metrics-lib') - - testImplementation 'io.temporal:temporal-testing:1.8.1' - // Needed to be able to mock final class - testImplementation 'org.mockito:mockito-inline:4.7.0' -} - -jsonSchema2Pojo { - sourceType = SourceType.YAMLSCHEMA - source = files("${sourceSets.main.output.resourcesDir}/workers_models") - targetDirectory = new File(project.buildDir, 'generated/src/gen/java/') - removeOldOutput = true - - targetPackage = 'io.airbyte.persistence.job.models' - - useLongIntegers = true - generateBuilders = true - includeConstructors = false - includeSetters = true -} diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java deleted file mode 100644 index 435f7bac41412..0000000000000 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.commons.temporal; - -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; -import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; -import io.temporal.client.WorkflowClient; -import java.util.UUID; -import javax.inject.Singleton; -import lombok.NoArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@NoArgsConstructor -@Singleton -@Slf4j -public class ConnectionManagerUtils { - - void safeTerminateWorkflow(final WorkflowClient client, final String workflowId, final String reason) { - log.info("Attempting to terminate existing workflow for workflowId {}.", workflowId); - try { - client.newUntypedWorkflowStub(workflowId).terminate(reason); - } catch (final Exception e) { - log.warn( - "Could not terminate temporal workflow due to the following error; " - + "this may be because there is currently no running workflow for this connection.", - e); - } - } - - public void safeTerminateWorkflow(final WorkflowClient client, final UUID connectionId, final String reason) { - safeTerminateWorkflow(client, getConnectionManagerName(connectionId), reason); - } - - public String getConnectionManagerName(final UUID connectionId) { - return "connection_manager_" + connectionId; - } - - public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) { - final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId); - final ConnectionUpdaterInput input = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId); - WorkflowClient.start(connectionManagerWorkflow::run, input); - - return connectionManagerWorkflow; - } - - public ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) { - return client.newWorkflowStub(ConnectionManagerWorkflow.class, - TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId))); - } - -} diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java deleted file mode 100644 index 21242ce9bbe64..0000000000000 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.commons.temporal; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; -import io.micronaut.context.annotation.Requires; -import io.temporal.api.common.v1.WorkflowType; -import io.temporal.api.enums.v1.WorkflowExecutionStatus; -import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest; -import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse; -import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest; -import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse; -import io.temporal.client.WorkflowClient; -import io.temporal.serviceclient.WorkflowServiceStubs; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; -import javax.inject.Inject; -import javax.inject.Singleton; -import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; - -@AllArgsConstructor -@NoArgsConstructor -@Slf4j -@Singleton -@Requires(property = "airbyte.worker.plane", - notEquals = "DATA_PLANE") -public class TemporalClient { - - @Inject - private WorkflowClient client; - @Inject - private WorkflowServiceStubs service; - @Inject - private ConnectionManagerUtils connectionManagerUtils; - - private final Set workflowNames = new HashSet<>(); - - public void restartClosedWorkflowByStatus(final WorkflowExecutionStatus executionStatus) { - final Set workflowExecutionInfos = fetchClosedWorkflowsByStatus(executionStatus); - - final Set nonRunningWorkflow = filterOutRunningWorkspaceId(workflowExecutionInfos); - - nonRunningWorkflow.forEach(connectionId -> { - connectionManagerUtils.safeTerminateWorkflow(client, connectionId, "Terminating workflow in " - + "unreachable state before starting a new workflow for this connection"); - connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId); - }); - } - - Set fetchClosedWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) { - ByteString token; - ListClosedWorkflowExecutionsRequest workflowExecutionsRequest = - ListClosedWorkflowExecutionsRequest.newBuilder() - .setNamespace(client.getOptions().getNamespace()) - .build(); - - final Set workflowExecutionInfos = new HashSet<>(); - do { - final ListClosedWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest = - service.blockingStub().listClosedWorkflowExecutions(workflowExecutionsRequest); - final WorkflowType connectionManagerWorkflowType = WorkflowType.newBuilder().setName(ConnectionManagerWorkflow.class.getSimpleName()).build(); - workflowExecutionInfos.addAll(listOpenWorkflowExecutionsRequest.getExecutionsList().stream() - .filter(workflowExecutionInfo -> workflowExecutionInfo.getType() == connectionManagerWorkflowType || - workflowExecutionInfo.getStatus() == executionStatus) - .flatMap((workflowExecutionInfo -> extractConnectionIdFromWorkflowId(workflowExecutionInfo.getExecution().getWorkflowId()).stream())) - .collect(Collectors.toSet())); - token = listOpenWorkflowExecutionsRequest.getNextPageToken(); - - workflowExecutionsRequest = - ListClosedWorkflowExecutionsRequest.newBuilder() - .setNamespace(client.getOptions().getNamespace()) - .setNextPageToken(token) - .build(); - - } while (token != null && token.size() > 0); - - return workflowExecutionInfos; - } - - @VisibleForTesting - Set filterOutRunningWorkspaceId(final Set workflowIds) { - refreshRunningWorkflow(); - - final Set runningWorkflowByUUID = - workflowNames.stream().flatMap(name -> extractConnectionIdFromWorkflowId(name).stream()).collect(Collectors.toSet()); - - return workflowIds.stream().filter(workflowId -> !runningWorkflowByUUID.contains(workflowId)).collect(Collectors.toSet()); - } - - @VisibleForTesting - void refreshRunningWorkflow() { - workflowNames.clear(); - ByteString token; - ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest = - ListOpenWorkflowExecutionsRequest.newBuilder() - .setNamespace(client.getOptions().getNamespace()) - .build(); - do { - final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest = - service.blockingStub().listOpenWorkflowExecutions(openWorkflowExecutionsRequest); - final Set workflowExecutionInfos = listOpenWorkflowExecutionsRequest.getExecutionsList().stream() - .map((workflowExecutionInfo -> workflowExecutionInfo.getExecution().getWorkflowId())) - .collect(Collectors.toSet()); - workflowNames.addAll(workflowExecutionInfos); - token = listOpenWorkflowExecutionsRequest.getNextPageToken(); - - openWorkflowExecutionsRequest = - ListOpenWorkflowExecutionsRequest.newBuilder() - .setNamespace(client.getOptions().getNamespace()) - .setNextPageToken(token) - .build(); - - } while (token != null && token.size() > 0); - } - - Optional extractConnectionIdFromWorkflowId(final String workflowId) { - if (!workflowId.startsWith("connection_manager_")) { - return Optional.empty(); - } - return Optional.ofNullable(StringUtils.removeStart(workflowId, "connection_manager_")) - .map( - stringUUID -> UUID.fromString(stringUUID)); - } - -} diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/config/TemporalBeanFactory.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/config/TemporalBeanFactory.java deleted file mode 100644 index a82774bdb120d..0000000000000 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/config/TemporalBeanFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.commons.temporal.config; - -import io.airbyte.commons.temporal.TemporalUtils; -import io.airbyte.commons.temporal.TemporalWorkflowUtils; -import io.micronaut.context.annotation.Factory; -import io.temporal.client.WorkflowClient; -import io.temporal.serviceclient.WorkflowServiceStubs; -import javax.inject.Singleton; - -/** - * Micronaut bean factory for Temporal-related singletons. - */ -@Factory -public class TemporalBeanFactory { - - @Singleton - public WorkflowServiceStubs temporalService(final TemporalUtils temporalUtils) { - return temporalUtils.createTemporalService(); - } - - @Singleton - public WorkflowClient workflowClient( - final TemporalUtils temporalUtils, - final WorkflowServiceStubs temporalService) { - return TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace()); - } - -} diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java deleted file mode 100644 index 0ddff18dc49b1..0000000000000 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.commons.temporal; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; -import io.temporal.api.enums.v1.WorkflowExecutionStatus; -import io.temporal.api.workflow.v1.WorkflowExecutionInfo; -import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; -import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; -import io.temporal.client.WorkflowClient; -import io.temporal.client.WorkflowClientOptions; -import io.temporal.serviceclient.WorkflowServiceStubs; -import java.util.Set; -import java.util.UUID; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -public class TemporalClientTest { - - private static final String NAMESPACE = "namespace"; - - private WorkflowClient workflowClient; - private TemporalClient temporalClient; - private WorkflowServiceStubs workflowServiceStubs; - private WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceBlockingStub; - - @BeforeEach - void setup() { - workflowClient = mock(WorkflowClient.class); - when(workflowClient.getOptions()).thenReturn(WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); - - workflowServiceStubs = mock(WorkflowServiceStubs.class); - workflowServiceBlockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); - when(workflowServiceStubs.blockingStub()).thenReturn(workflowServiceBlockingStub); - } - - @Nested - class RestartPerStatus { - - private ConnectionManagerUtils mConnectionManagerUtils; - - @BeforeEach - public void init() { - mConnectionManagerUtils = mock(ConnectionManagerUtils.class); - - temporalClient = spy( - new TemporalClient(workflowClient, workflowServiceStubs, mConnectionManagerUtils)); - } - - @Test - void testRestartFailed() { - final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - - when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); - final UUID connectionId = UUID.fromString("ebbfdc4c-295b-48a0-844f-88551dfad3db"); - final Set workflowIds = Set.of(connectionId); - - doReturn(workflowIds) - .when(temporalClient).fetchClosedWorkflowsByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); - doReturn(workflowIds) - .when(temporalClient).filterOutRunningWorkspaceId(workflowIds); - mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); - temporalClient.restartClosedWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); - verify(mConnectionManagerUtils).safeTerminateWorkflow(eq(workflowClient), eq(connectionId), - anyString()); - verify(mConnectionManagerUtils).startConnectionManagerNoSignal(eq(workflowClient), eq(connectionId)); - } - - } - - private void mockWorkflowStatus(final WorkflowExecutionStatus status) { - when(workflowServiceBlockingStub.describeWorkflowExecution(any())).thenReturn( - DescribeWorkflowExecutionResponse.newBuilder().setWorkflowExecutionInfo( - WorkflowExecutionInfo.newBuilder().setStatus(status).buildPartial()).build()); - } - -} diff --git a/airbyte-container-orchestrator/build.gradle b/airbyte-container-orchestrator/build.gradle index d088356a5047a..4695dc4b4d92b 100644 --- a/airbyte-container-orchestrator/build.gradle +++ b/airbyte-container-orchestrator/build.gradle @@ -13,7 +13,6 @@ dependencies { implementation project(':airbyte-api') implementation project(':airbyte-config:config-models') implementation project(':airbyte-config:config-persistence') - implementation project(':airbyte-commons-temporal') implementation project(':airbyte-db:db-lib') implementation project(':airbyte-json-validation') implementation project(':airbyte-protocol:protocol-models') diff --git a/airbyte-cron/build.gradle b/airbyte-cron/build.gradle index ca20952dc2708..0388fbd6b0286 100644 --- a/airbyte-cron/build.gradle +++ b/airbyte-cron/build.gradle @@ -3,19 +3,9 @@ plugins { } dependencies { - implementation 'com.auth0:java-jwt:3.19.2' - implementation 'io.fabric8:kubernetes-client:5.12.2' - implementation 'io.sentry:sentry:6.3.1' - implementation 'io.temporal:temporal-sdk:1.8.1' - implementation 'io.temporal:temporal-serviceclient:1.8.1' - - implementation project(':airbyte-api') - implementation project(':airbyte-analytics') - implementation project(':airbyte-commons-temporal') implementation project(':airbyte-config:config-models') - implementation project(':airbyte-config:config-persistence') - implementation project(':airbyte-db:db-lib') - implementation project(':airbyte-metrics:metrics-lib') + + runtimeOnly 'io.micronaut:micronaut-http-server-netty:3.6.0' annotationProcessor platform(libs.micronaut.bom) annotationProcessor libs.bundles.micronaut.annotation.processor diff --git a/airbyte-cron/src/main/java/io/airbyte/cron/selfhealing/Temporal.java b/airbyte-cron/src/main/java/io/airbyte/cron/selfhealing/Temporal.java index 69b0e029b26ac..38787b128a16f 100644 --- a/airbyte-cron/src/main/java/io/airbyte/cron/selfhealing/Temporal.java +++ b/airbyte-cron/src/main/java/io/airbyte/cron/selfhealing/Temporal.java @@ -4,10 +4,7 @@ package io.airbyte.cron.selfhealing; -import io.airbyte.commons.temporal.TemporalClient; import io.micronaut.scheduling.annotation.Scheduled; -import io.temporal.api.enums.v1.WorkflowExecutionStatus; -import javax.inject.Named; import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; @@ -15,16 +12,11 @@ @Slf4j public class Temporal { - private final TemporalClient temporalClient; - - public Temporal(@Named("temporalClient") final TemporalClient temporalClient) { - log.debug("Creating temporal self-healing"); - this.temporalClient = temporalClient; + public Temporal() { + log.info("Creating temporal self-healing"); } @Scheduled(fixedRate = "10s") - void cleanTemporal() { - temporalClient.restartClosedWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); - } + void cleanTemporal() {} } diff --git a/airbyte-cron/src/main/resources/application-control.yml b/airbyte-cron/src/main/resources/application-control.yml deleted file mode 100644 index 752bc85a5f112..0000000000000 --- a/airbyte-cron/src/main/resources/application-control.yml +++ /dev/null @@ -1,40 +0,0 @@ -datasources: - config: - connection-test-query: SELECT 1 - connection-timeout: 30000 - idle-timeout: 600000 - maximum-pool-size: 10 - url: ${DATABASE_URL} - driverClassName: org.postgresql.Driver - username: ${DATABASE_USER} - password: ${DATABASE_PASSWORD} - jobs: - connection-test-query: SELECT 1 - connection-timeout: 30000 - idle-timeout: 600000 - maximum-pool-size: 10 - url: ${DATABASE_URL} - driverClassName: org.postgresql.Driver - username: ${DATABASE_USER} - password: ${DATABASE_PASSWORD} - -flyway: - enabled: true - datasources: - config: - enabled: false - locations: - - 'classpath:io/airbyte/db/instance/configs/migrations' - jobs: - enabled: false - locations: - - 'classpath:io/airbyte/db/instance/jobs/migrations' - -jooq: - datasources: - config: - jackson-converter-enabled: true - sql-dialect: POSTGRES - jobs: - jackson-converter-enabled: true - sql-dialect: POSTGRES \ No newline at end of file diff --git a/airbyte-cron/src/main/resources/application.yml b/airbyte-cron/src/main/resources/application.yml deleted file mode 100644 index 6961c8a3801c6..0000000000000 --- a/airbyte-cron/src/main/resources/application.yml +++ /dev/null @@ -1,219 +0,0 @@ -micronaut: - application: - name: airbyte-workers - security: - intercept-url-map: - - pattern: /** - httpMethod: GET - access: - - isAnonymous() - server: - port: 9000 - -airbyte: - activity: - initial-delay: ${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS:30} - max-attempts: ${ACTIVITY_MAX_ATTEMPT:5} - max-delay: ${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS:600} - max-timeout: ${ACTIVITY_MAX_TIMEOUT_SECOND:120} - cloud: - storage: - logs: - type: ${WORKER_LOGS_STORAGE_TYPE:} - gcs: - application-credentials: ${GOOGLE_APPLICATION_CREDENTIALS:} - bucket: ${GCS_LOG_BUCKET:} - minio: - access-key: ${AWS_ACCESS_KEY_ID:} - bucket: ${S3_LOG_BUCKET:} - endpoint: ${S3_MINIO_ENDPOINT:} - secret-access-key: ${AWS_SECRET_ACCESS_KEY:} - s3: - access-key: ${AWS_ACCESS_KEY_ID:} - bucket: ${S3_LOG_BUCKET:} - region: ${S3_LOG_BUCKET_REGION:} - secret-access-key: ${AWS_SECRET_ACCESS_KEY:} - state: - type: ${WORKER_STATE_STORAGE_TYPE:} - gcs: - application-credentials: ${STATE_STORAGE_GCS_APPLICATION_CREDENTIALS:} - bucket: ${STATE_STORAGE_GCS_BUCKET_NAME:} - minio: - access-key: ${STATE_STORAGE_MINIO_ACCESS_KEY:} - bucket: ${STATE_STORAGE_MINIO_BUCKET_NAME:} - endpoint: ${STATE_STORAGE_MINIO_ENDPOINT:} - secret-access-key: ${STATE_STORAGE_MINIO_SECRET_ACCESS_KEY:} - s3: - access-key: ${STATE_STORAGE_S3_ACCESS_KEY:} - bucket: ${STATE_STORAGE_S3_BUCKET_NAME:} - region: ${STATE_STORAGE_S3_BUCKET_REGION:} - secret-access-key: ${STATE_STORAGE_S3_SECRET_ACCESS_KEY:} - connector: - specific-resource-defaults-enabled: ${CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED:false} - container: - orchestrator: - enabled: ${CONTAINER_ORCHESTRATOR_ENABLED:false} - image: ${CONTAINER_ORCHESTRATOR_IMAGE:} - secret-mount-path: ${CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH:} - secret-name: ${CONTAINER_ORCHESTRATOR_SECRET_NAME:} - control: - plane: - auth-endpoint: ${CONTROL_PLANE_AUTH_ENDPOINT:} - data: - sync: - task-queue: ${DATA_SYNC_TASK_QUEUES:SYNC} - plane: - connection-ids-mvp: ${CONNECTION_IDS_FOR_MVP_DATA_PLANE:} - service-account: - credentials-path: ${DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH:} - email: ${DATA_PLANE_SERVICE_ACCOUNT_EMAIL:} - deployment-mode: ${DEPLOYMENT_MODE:OSS} - flyway: - configs: - initialization-timeout-ms: ${CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS:60000} - minimum-migration-version: ${CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION} - jobs: - initialization-timeout-ms: ${JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS:60000} - minimum-migration-version: ${JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION} - internal: - api: - auth-header: - name: ${AIRBYTE_API_AUTH_HEADER_NAME:} - value: ${AIRBYTE_API_AUTH_HEADER_VALUE:} - host: ${INTERNAL_API_HOST} - local: - docker-mount: ${LOCAL_DOCKER_MOUNT} - root: ${LOCAL_ROOT} - worker: - env: ${WORKER_ENVIRONMENT:DOCKER} - check: - enabled: ${SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS:true} - kube: - annotations: ${CHECK_JOB_KUBE_ANNOTATION:} - node-selectors: ${CHECK_JOB_KUBE_NODE_SELECTORS:} - max-workers: ${MAX_CHECK_WORKERS:5} - main: - container: - cpu: - limit: ${CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT:} - request: ${CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST:} - memory: - limit: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT:} - request: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST:} - connection: - enabled: ${SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS:true} - discover: - enabled: ${SHOULD_RUN_DISCOVER_WORKFLOWS:true} - kube: - annotations: ${DISCOVER_JOB_KUBE_ANNOTATIONS:} - node-selectors: ${DISCOVER_JOB_KUBE_NODE_SELECTORS:} - max-workers: ${MAX_DISCOVER_WORKERS:5} - job: - error-reporting: - sentry: - dsn: ${JOB_ERROR_REPORTING_SENTRY_DSN} - strategy: ${JOB_ERROR_REPORTING_STRATEGY:LOGGING} - failed: - max-days: ${MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE:14} - max-jobs: ${MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE:100} - kube: - annotations: ${JOB_KUBE_ANNOTATIONS:} - images: - busybox: ${JOB_KUBE_BUSYBOX_IMAGE:`busybox:1.28`} - curl: ${JOB_KUBE_CURL_IMAGE:`curlimages/curl:7.83.1`} - socat: ${JOB_KUBE_SOCAT_IMAGE:`alpine/socat:1.7.4.3-r0`} - main: - container: - image-pull-policy: ${JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY:IfNotPresent} - image-pull-secret: ${JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET:} - namespace: ${JOB_KUBE_NAMESPACE:default} - node-selectors: ${JOB_KUBE_NODE_SELECTORS:} - sidecar: - container: - image-pull-policy: ${JOB_KUBE_SIDECAR_CONTAINER_IMAGE_PULL_POLICY:IfNotPresent} - tolerations: ${JOB_KUBE_TOLERATIONS:} - main: - container: - cpu: - limit: ${JOB_MAIN_CONTAINER_CPU_LIMIT:} - request: ${JOB_MAIN_CONTAINER_CPU_REQUEST:} - memory: - limit: ${JOB_MAIN_CONTAINER_MEMORY_LIMIT:} - request: ${JOB_MAIN_CONTAINER_MEMORY_REQUEST:} - normalization: - main: - container: - cpu: - limit: ${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT:} - request: ${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST:} - memory: - limit: ${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT:} - request: ${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST:} - plane: ${WORKER_PLANE:CONTROL_PLANE} - replication: - orchestrator: - cpu: - limit: ${REPLICATION_ORCHESTRATOR_CPU_LIMIT:} - request: ${REPLICATION_ORCHESTRATOR_CPU_REQUEST:} - memory: - limit: ${REPLICATION_ORCHESTRATOR_MEMORY_LIMIT:} - request: ${REPLICATION_ORCHESTRATOR_MEMORY_REQUEST:} - spec: - enabled: ${SHOULD_RUN_GET_SPEC_WORKFLOWS:true} - kube: - annotations: ${SPEC_JOB_KUBE_ANNOTATIONS:} - node-selectors: ${SPEC_JOB_KUBE_NODE_SELECTORS:} - max-workers: ${MAX_SPEC_WORKERS:5} - sync: - enabled: ${SHOULD_RUN_SYNC_WORKFLOWS:true} - max-workers: ${MAX_SYNC_WORKERS:5} - max-attempts: ${SYNC_JOB_MAX_ATTEMPTS:3} - max-timeout: ${SYNC_JOB_MAX_TIMEOUT_DAYS:3} - role: ${AIRBYTE_ROLE:} - secret: - persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE} - store: - gcp: - credentials: ${SECRET_STORE_GCP_CREDENTIALS:} - project-id: ${SECRET_STORE_GCP_PROJECT_ID:} - vault: - address: ${VAULT_ADDRESS:} - prefix: ${VAULT_PREFIX:} - token: ${VAULT_AUTH_TOKEN:} - temporal: - worker: - ports: ${TEMPORAL_WORKER_PORTS:} - tracking-strategy: ${TRACKING_STRATEGY:LOGGING} - version: ${AIRBYTE_VERSION} - web-app: - url: ${WEBAPP_URL:} - workflow: - failure: - restart-delay: ${WORKFLOW_FAILURE_RESTART_DELAY_SECONDS:600} - workspace: - docker-mount: ${WORKSPACE_DOCKER_MOUNT:} - root: ${WORKSPACE_ROOT} - -docker: - network: ${DOCKER_NETWORK:host} - -endpoints: - all: - enabled: true - -temporal: - cloud: - client: - cert: ${TEMPORAL_CLOUD_CLIENT_CERT:} - key: ${TEMPORAL_CLOUD_CLIENT_KEY:} - enabled: ${TEMPORAL_CLOUD_ENABLED:false} - host: ${TEMPORAL_CLOUD_HOST:} - namespace: ${TEMPORAL_CLOUD_NAMESPACE:} - host: ${TEMPORAL_HOST:`airbyte-temporal:7233`} - retention: ${TEMPORAL_HISTORY_RETENTION_IN_DAYS:30} - -logger: - levels: - io.airbyte.bootloader: DEBUG -# Uncomment to help resolve issues with conditional beans -# io.micronaut.context.condition: DEBUG diff --git a/airbyte-cron/src/main/resources/micronaut-banner.txt b/airbyte-cron/src/main/resources/micronaut-banner.txt deleted file mode 100644 index 713ab3df590b5..0000000000000 --- a/airbyte-cron/src/main/resources/micronaut-banner.txt +++ /dev/null @@ -1 +0,0 @@ - : airbyte-cron : diff --git a/airbyte-server/build.gradle b/airbyte-server/build.gradle index e3dddbe3ca504..af56de876b02a 100644 --- a/airbyte-server/build.gradle +++ b/airbyte-server/build.gradle @@ -2,15 +2,10 @@ plugins { id 'application' } -configurations.all { - exclude group: 'io.micronaut.jaxrs' -} - dependencies { implementation project(':airbyte-analytics') implementation project(':airbyte-api') implementation project(':airbyte-commons-docker') - implementation project(':airbyte-commons-temporal') implementation project(':airbyte-config:init') implementation project(':airbyte-config:config-models') implementation project(':airbyte-config:config-persistence') diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index b304e2c008bbb..bbb5e4c45bf82 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -12,8 +12,6 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.lang.CloseableShutdownHook; import io.airbyte.commons.resources.MoreResources; -import io.airbyte.commons.temporal.TemporalUtils; -import io.airbyte.commons.temporal.TemporalWorkflowUtils; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; @@ -61,6 +59,8 @@ import io.airbyte.workers.temporal.ConnectionManagerUtils; import io.airbyte.workers.temporal.StreamResetRecordsHelper; import io.airbyte.workers.temporal.TemporalClient; +import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.workers.temporal.TemporalWorkflowUtils; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.net.http.HttpClient; diff --git a/airbyte-test-utils/build.gradle b/airbyte-test-utils/build.gradle index f6c3dfa412d02..8518cb7a5ade5 100644 --- a/airbyte-test-utils/build.gradle +++ b/airbyte-test-utils/build.gradle @@ -5,7 +5,6 @@ plugins { dependencies { api project(':airbyte-db:db-lib') implementation project(':airbyte-api') - implementation project(':airbyte-commons-temporal') implementation project(':airbyte-workers') implementation 'io.fabric8:kubernetes-client:5.12.2' diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index 5f5bd4589fe02..882e88861b343 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -57,14 +57,14 @@ import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; -import io.airbyte.commons.temporal.TemporalUtils; -import io.airbyte.commons.temporal.TemporalWorkflowUtils; -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; -import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import io.airbyte.commons.util.MoreProperties; import io.airbyte.db.Database; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; +import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.workers.temporal.TemporalWorkflowUtils; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; +import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.temporal.client.WorkflowClient; diff --git a/airbyte-tests/build.gradle b/airbyte-tests/build.gradle index 8600ff04a42b9..5931cac20e380 100644 --- a/airbyte-tests/build.gradle +++ b/airbyte-tests/build.gradle @@ -43,7 +43,6 @@ dependencies { acceptanceTestsImplementation project(':airbyte-api') acceptanceTestsImplementation project(':airbyte-commons') - acceptanceTestsImplementation project(':airbyte-commons-temporal') acceptanceTestsImplementation project(':airbyte-config:config-models') acceptanceTestsImplementation project(':airbyte-config:config-persistence') acceptanceTestsImplementation project(':airbyte-db:db-lib') diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 68b7a9c64f424..29c994fc4841a 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -72,12 +72,12 @@ import io.airbyte.api.client.model.generated.SyncMode; import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import io.airbyte.db.Database; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.test.utils.AirbyteAcceptanceTestHarness; import io.airbyte.test.utils.PostgreSQLContainerHelper; import io.airbyte.test.utils.SchemaTableNamePair; +import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import java.io.IOException; import java.net.URISyntaxException; import java.sql.SQLException; diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 4f3303c94df7f..1e93f058e9b1c 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -42,7 +42,6 @@ dependencies { implementation project(':airbyte-analytics') implementation project(':airbyte-api') implementation project(':airbyte-commons-docker') - implementation project(':airbyte-commons-temporal') implementation project(':airbyte-config:config-models') implementation project(':airbyte-config:config-persistence') implementation project(':airbyte-db:jooq') diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java index e0de9ccc832aa..ef2cea7d4ddc4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java @@ -6,8 +6,6 @@ import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClientSingleton; -import io.airbyte.commons.temporal.TemporalJobType; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs.DeploymentMode; import io.airbyte.config.Configs.TrackingStrategy; @@ -24,6 +22,8 @@ import io.airbyte.metrics.lib.MetricEmittingApps; import io.airbyte.persistence.job.JobPersistence; import io.airbyte.workers.process.KubePortManagerSingleton; +import io.airbyte.workers.temporal.TemporalJobType; +import io.airbyte.workers.temporal.TemporalUtils; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl; import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index 67093ddd82b76..a4fbbf66ef50c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -4,8 +4,8 @@ package io.airbyte.workers.config; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.temporal.TemporalUtils; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivity; import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java index 5ed8168cc8b89..b4c332373f8e9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java @@ -7,7 +7,6 @@ import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; import io.airbyte.commons.features.FeatureFlags; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.DefaultJobCreator; import io.airbyte.persistence.job.factory.DefaultSyncJobFactory; @@ -15,6 +14,8 @@ import io.airbyte.persistence.job.factory.SyncJobFactory; import io.airbyte.workers.run.TemporalWorkerRunFactory; import io.airbyte.workers.temporal.TemporalClient; +import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.workers.temporal.TemporalWorkflowUtils; import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Property; import io.micronaut.context.annotation.Requires; @@ -54,6 +55,18 @@ public SyncJobFactory jobFactory( new OAuthConfigSupplier(configRepository, trackingClient)); } + @Singleton + public WorkflowServiceStubs temporalService(final TemporalUtils temporalUtils) { + return temporalUtils.createTemporalService(); + } + + @Singleton + public WorkflowClient workflowClient( + final TemporalUtils temporalUtils, + final WorkflowServiceStubs temporalService) { + return TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace()); + } + @Singleton @Requires(property = "airbyte.worker.plane", pattern = "(?i)^(?!data_plane).*") diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java index 44bcb8a1a400d..4c33b816c6502 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java @@ -7,7 +7,6 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.JobResetConnectionConfig; @@ -19,6 +18,7 @@ import io.airbyte.workers.OutputAndStatus; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.temporal.TemporalClient; +import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalResponse; import java.nio.file.Path; import java.util.UUID; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/CancellationHandler.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CancellationHandler.java similarity index 98% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/CancellationHandler.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/CancellationHandler.java index 9745497b6ae38..aad52655f85b1 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/CancellationHandler.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CancellationHandler.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal; +package io.airbyte.workers.temporal; import io.temporal.activity.ActivityExecutionContext; import io.temporal.client.ActivityCompletionException; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java index e8190f1e2a791..f468ea5275fe1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java @@ -4,14 +4,12 @@ package io.airbyte.workers.temporal; -import io.airbyte.commons.temporal.TemporalJobType; -import io.airbyte.commons.temporal.TemporalWorkflowUtils; -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; -import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; -import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import io.airbyte.workers.temporal.exception.DeletedWorkflowException; import io.airbyte.workers.temporal.exception.UnreachableWorkflowException; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl; +import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; +import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.WorkflowExecutionStatus; import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index a15b109bbb3c9..5f920830afb37 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -9,8 +9,6 @@ import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.SetWorkflowInAttemptRequestBody; import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.commons.temporal.CancellationHandler; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index c35f98700d8ab..feb70822d83a5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -8,9 +8,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; -import io.airbyte.commons.temporal.TemporalJobType; -import io.airbyte.commons.temporal.TemporalWorkflowUtils; -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobDiscoverCatalogConfig; @@ -29,15 +26,16 @@ import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; import io.airbyte.workers.temporal.exception.DeletedWorkflowException; import io.airbyte.workers.temporal.exception.UnreachableWorkflowException; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; import io.airbyte.workers.temporal.spec.SpecWorkflow; import io.airbyte.workers.temporal.sync.SyncWorkflow; import io.micronaut.context.annotation.Requires; import io.temporal.api.common.v1.WorkflowType; import io.temporal.api.enums.v1.WorkflowExecutionStatus; -import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest; -import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse; import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest; import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse; +import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsRequest; +import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsResponse; import io.temporal.client.WorkflowClient; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; @@ -466,6 +464,18 @@ public ManualOperationResult synchronousResetConnection(final UUID connectionId, Optional.of(resetJobId), Optional.empty()); } + public void restartWorkflowByStatus(final WorkflowExecutionStatus executionStatus) { + final Set workflowExecutionInfos = fetchWorkflowsByStatus(executionStatus); + + final Set nonRunningWorkflow = filterOutRunningWorkspaceId(workflowExecutionInfos); + + nonRunningWorkflow.forEach(connectionId -> { + connectionManagerUtils.safeTerminateWorkflow(client, connectionId, "Terminating workflow in " + + "unreachable state before starting a new workflow for this connection"); + connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId); + }); + } + /** * This should be in the class {@li} * @@ -481,17 +491,17 @@ Optional extractConnectionIdFromWorkflowId(final String workflowId) { stringUUID -> UUID.fromString(stringUUID)); } - Set fetchClosedWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) { + Set fetchWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) { ByteString token; - ListClosedWorkflowExecutionsRequest workflowExecutionsRequest = - ListClosedWorkflowExecutionsRequest.newBuilder() + ListWorkflowExecutionsRequest workflowExecutionsRequest = + ListWorkflowExecutionsRequest.newBuilder() .setNamespace(client.getOptions().getNamespace()) .build(); final Set workflowExecutionInfos = new HashSet<>(); do { - final ListClosedWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest = - service.blockingStub().listClosedWorkflowExecutions(workflowExecutionsRequest); + final ListWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest = + service.blockingStub().listWorkflowExecutions(workflowExecutionsRequest); final WorkflowType connectionManagerWorkflowType = WorkflowType.newBuilder().setName(ConnectionManagerWorkflow.class.getSimpleName()).build(); workflowExecutionInfos.addAll(listOpenWorkflowExecutionsRequest.getExecutionsList().stream() .filter(workflowExecutionInfo -> workflowExecutionInfo.getType() == connectionManagerWorkflowType || @@ -501,7 +511,7 @@ Set fetchClosedWorkflowsByStatus(final WorkflowExecutionStatus executionSt token = listOpenWorkflowExecutionsRequest.getNextPageToken(); workflowExecutionsRequest = - ListClosedWorkflowExecutionsRequest.newBuilder() + ListWorkflowExecutionsRequest.newBuilder() .setNamespace(client.getOptions().getNamespace()) .setNextPageToken(token) .build(); diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalJobType.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalJobType.java similarity index 85% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalJobType.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalJobType.java index cb8f66f3f6306..d098a1049f033 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalJobType.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalJobType.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal; +package io.airbyte.workers.temporal; public enum TemporalJobType { GET_SPEC, diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java similarity index 99% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalUtils.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index 68a5e369c7b8e..2bf6e9004936a 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal; +package io.airbyte.workers.temporal; import com.uber.m3.tally.RootScopeBuilder; import com.uber.m3.tally.Scope; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalWorkflowUtils.java similarity index 96% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalWorkflowUtils.java index aeeb018d94a1f..f92120c45bae1 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalWorkflowUtils.java @@ -2,11 +2,11 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal; +package io.airbyte.workers.temporal; import com.google.common.annotations.VisibleForTesting; -import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.persistence.job.models.JobRunConfig; +import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.client.WorkflowOptions; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index fff3ddb5f6eb7..0053aa0d2fe23 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardCheckConnectionInput; @@ -22,6 +21,7 @@ import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index d10c7b94723f0..5d201d95d44bc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardDiscoverCatalogInput; @@ -23,6 +22,7 @@ import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java similarity index 95% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java index dcd7fc6637ecc..8c732e3183288 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java @@ -2,9 +2,9 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal.scheduling; +package io.airbyte.workers.temporal.scheduling; -import io.airbyte.commons.temporal.scheduling.state.WorkflowState; +import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.temporal.workflow.QueryMethod; import io.temporal.workflow.SignalMethod; import io.temporal.workflow.WorkflowInterface; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 3e263bc9472ed..fd0bf6998d68b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -5,13 +5,6 @@ package io.airbyte.workers.temporal.scheduling; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.temporal.TemporalJobType; -import io.airbyte.commons.temporal.TemporalWorkflowUtils; -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; -import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; -import io.airbyte.commons.temporal.scheduling.state.WorkflowInternalState; -import io.airbyte.commons.temporal.scheduling.state.WorkflowState; -import io.airbyte.commons.temporal.scheduling.state.listener.NoopStateListener; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.FailureReason; @@ -28,6 +21,8 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.helper.FailureHelper; +import io.airbyte.workers.temporal.TemporalJobType; +import io.airbyte.workers.temporal.TemporalWorkflowUtils; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput; @@ -68,6 +63,9 @@ import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity; import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity.DeleteStreamResetRecordsForJobInput; import io.airbyte.workers.temporal.scheduling.activities.WorkflowConfigActivity; +import io.airbyte.workers.temporal.scheduling.state.WorkflowInternalState; +import io.airbyte.workers.temporal.scheduling.state.WorkflowState; +import io.airbyte.workers.temporal.scheduling.state.listener.NoopStateListener; import io.airbyte.workers.temporal.sync.SyncWorkflow; import io.temporal.api.enums.v1.ParentClosePolicy; import io.temporal.failure.ActivityFailure; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionUpdaterInput.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java similarity index 86% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionUpdaterInput.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java index e6bf75f12c542..40fc047816872 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionUpdaterInput.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java @@ -2,9 +2,9 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal.scheduling; +package io.airbyte.workers.temporal.scheduling; -import io.airbyte.commons.temporal.scheduling.state.WorkflowState; +import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import java.util.UUID; import javax.annotation.Nullable; import lombok.AllArgsConstructor; @@ -39,7 +39,7 @@ public class ConnectionUpdaterInput { private WorkflowState workflowState; private boolean resetConnection; @Builder.Default - private final boolean fromJobResetFailure = false; + private boolean fromJobResetFailure = false; @Builder.Default private boolean skipScheduling = false; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 5ed4748ec0577..5b41c150828a4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -5,7 +5,6 @@ package io.airbyte.workers.temporal.scheduling.activities; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.TemporalWorkflowUtils; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; @@ -16,6 +15,7 @@ import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.temporal.TemporalWorkflowUtils; import io.airbyte.workers.temporal.exception.RetryableException; import io.micronaut.context.annotation.Requires; import java.util.List; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivity.java index 7de9bdd9857d3..e8eac26d0725c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivity.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.scheduling.activities; -import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.OssMetricsRegistry; +import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import java.util.Optional; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java index b185a4a85aa89..1ec253ecacd53 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java @@ -4,10 +4,10 @@ package io.airbyte.workers.temporal.scheduling.activities; -import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.MetricTags; +import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; import io.micronaut.context.annotation.Requires; import java.util.ArrayList; import java.util.List; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowInternalState.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowInternalState.java similarity index 80% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowInternalState.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowInternalState.java index e3d25f7fee486..6a672f54ee0d4 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowInternalState.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowInternalState.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal.scheduling.state; +package io.airbyte.workers.temporal.scheduling.state; import io.airbyte.config.FailureReason; import java.util.HashSet; @@ -20,7 +20,7 @@ public class WorkflowInternalState { private Integer attemptNumber = null; // StandardSyncOutput standardSyncOutput = null; - private Set failures = new HashSet<>(); + private final Set failures = new HashSet<>(); private Boolean partialSuccess = null; } diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java similarity index 94% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java index b345700fcfa02..edb49645fcf62 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java @@ -2,11 +2,11 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal.scheduling.state; +package io.airbyte.workers.temporal.scheduling.state; -import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener; -import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent; -import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField; +import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener; +import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent; +import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField; import java.util.UUID; import lombok.Getter; import lombok.NoArgsConstructor; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/NoopStateListener.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/NoopStateListener.java similarity index 86% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/NoopStateListener.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/NoopStateListener.java index efb63c09782c0..69cf8583c9bf0 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/NoopStateListener.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/NoopStateListener.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal.scheduling.state.listener; +package io.airbyte.workers.temporal.scheduling.state.listener; import java.util.LinkedList; import java.util.Queue; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/TestStateListener.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java similarity index 93% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/TestStateListener.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java index 9abeeb3115d37..214ad92dac94d 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/TestStateListener.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal.scheduling.state.listener; +package io.airbyte.workers.temporal.scheduling.state.listener; import java.util.LinkedList; import java.util.Optional; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java similarity index 94% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java index 86866bf62da15..ade23612909cd 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal.scheduling.state.listener; +package io.airbyte.workers.temporal.scheduling.state.listener; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index e57dfd82c88fe..c85be72876103 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -6,7 +6,6 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobGetSpecConfig; @@ -19,6 +18,7 @@ import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java index 52ebaa9ce3a93..c0624496cd812 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java @@ -5,12 +5,12 @@ package io.airbyte.workers.temporal.sync; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.OperatorDbtInput; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; import io.airbyte.workers.WorkerConfigs; +import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityExecutionContext; import java.util.Map; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index b10232c5c73ab..f56c5fb566afa 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -9,8 +9,6 @@ import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.CancellationHandler; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.AirbyteConfigValidator; import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs.WorkerEnvironment; @@ -27,7 +25,9 @@ import io.airbyte.workers.general.DbtTransformationWorker; import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; +import io.airbyte.workers.temporal.TemporalUtils; import io.micronaut.context.annotation.Value; import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java index eb07bfbc452d4..b9be375f8879e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java @@ -8,7 +8,6 @@ import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.ResourceRequirements; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; @@ -20,6 +19,7 @@ import io.airbyte.workers.process.KubePodInfo; import io.airbyte.workers.process.KubePodResourceHelper; import io.airbyte.workers.process.KubeProcessFactory; +import io.airbyte.workers.temporal.TemporalUtils; import io.fabric8.kubernetes.api.model.DeletionPropagation; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.KubernetesClientException; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index c42d394dbb57e..a233089b30e0e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -9,8 +9,6 @@ import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.CancellationHandler; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.AirbyteConfigValidator; import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs.WorkerEnvironment; @@ -29,7 +27,9 @@ import io.airbyte.workers.general.DefaultNormalizationWorker; import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; +import io.airbyte.workers.temporal.TemporalUtils; import io.micronaut.context.annotation.Value; import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java index 62bbaadbdb363..11daed6de2776 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java @@ -5,13 +5,13 @@ package io.airbyte.workers.temporal.sync; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; import io.airbyte.workers.WorkerConfigs; +import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityExecutionContext; import java.util.Map; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 7fec50021204a..20e05c14f6629 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -10,8 +10,6 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.CancellationHandler; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.AirbyteConfigValidator; import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs.WorkerEnvironment; @@ -44,7 +42,9 @@ import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; +import io.airbyte.workers.temporal.TemporalUtils; import io.micronaut.context.annotation.Value; import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java index c8da41af2bf3f..ec2a1f530d23e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java @@ -5,13 +5,13 @@ package io.airbyte.workers.temporal.sync; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.ReplicationOutput; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSyncInput; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; +import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityExecutionContext; import java.util.Map; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java index aa030f20a8ce1..84017b83e7785 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java @@ -5,7 +5,7 @@ package io.airbyte.workers.temporal.sync; import com.google.common.annotations.VisibleForTesting; -import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.workers.temporal.TemporalJobType; import io.micronaut.context.annotation.Value; import io.micronaut.core.util.StringUtils; import java.util.Arrays; diff --git a/airbyte-commons-temporal/src/main/resources/workers_models/JobRunConfig.yaml b/airbyte-workers/src/main/resources/workers_models/JobRunConfig.yaml similarity index 100% rename from airbyte-commons-temporal/src/main/resources/workers_models/JobRunConfig.yaml rename to airbyte-workers/src/main/resources/workers_models/JobRunConfig.yaml diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/CancellationHandlerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/CancellationHandlerTest.java similarity index 85% rename from airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/CancellationHandlerTest.java rename to airbyte-workers/src/test/java/io/airbyte/workers/temporal/CancellationHandlerTest.java index 8f157760c76a6..56a37adcb3d83 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/CancellationHandlerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/CancellationHandlerTest.java @@ -2,16 +2,17 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal; +package io.airbyte.workers.temporal; -import io.airbyte.commons.temporal.stubs.HeartbeatWorkflow; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import io.airbyte.workers.temporal.stubs.HeartbeatWorkflow; import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.worker.Worker; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; class CancellationHandlerTest { @@ -39,7 +40,7 @@ void testCancellationHandler() { .setTaskQueue("task-queue") .build()); - Assertions.assertDoesNotThrow(heartbeatWorkflow::execute); + assertDoesNotThrow(heartbeatWorkflow::execute); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java index f88252e5003aa..70d309886bfca 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java @@ -17,7 +17,6 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.generated.AttemptApi; import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.config.Configs; import io.airbyte.db.init.DatabaseInitializationException; import io.airbyte.persistence.job.models.JobRunConfig; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index 37455ce758b99..5be8bd629a85a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -11,6 +11,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -22,11 +23,6 @@ import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.temporal.TemporalJobType; -import io.airbyte.commons.temporal.TemporalWorkflowUtils; -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow.JobInformation; -import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.FailureReason; import io.airbyte.config.JobCheckConnectionConfig; @@ -45,6 +41,9 @@ import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; +import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow.JobInformation; +import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.airbyte.workers.temporal.spec.SpecWorkflow; import io.airbyte.workers.temporal.sync.SyncWorkflow; import io.temporal.api.enums.v1.WorkflowExecutionStatus; @@ -62,6 +61,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; @@ -741,6 +741,42 @@ void testResetConnectionDeletedWorkflow() throws IOException { } + @Nested + class RestartPerStatus { + + private ConnectionManagerUtils mConnectionManagerUtils; + + @BeforeEach + public void init() throws IOException { + mConnectionManagerUtils = mock(ConnectionManagerUtils.class); + + final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "temporal_client_test"); + temporalClient = spy( + new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, mConnectionManagerUtils, + streamResetRecordsHelper)); + } + + @Test + void testRestartFailed() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + final UUID connectionId = UUID.fromString("ebbfdc4c-295b-48a0-844f-88551dfad3db"); + final Set workflowIds = Set.of(connectionId); + + doReturn(workflowIds) + .when(temporalClient).fetchWorkflowsByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + doReturn(workflowIds) + .when(temporalClient).filterOutRunningWorkspaceId(workflowIds); + mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + temporalClient.restartWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + verify(mConnectionManagerUtils).safeTerminateWorkflow(eq(workflowClient), eq(connectionId), + anyString()); + verify(mConnectionManagerUtils).startConnectionManagerNoSignal(eq(workflowClient), eq(connectionId)); + } + + } + @Test @DisplayName("Test manual operation on quarantined workflow causes a restart") void testManualOperationOnQuarantinedWorkflow() { diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalUtilsTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalUtilsTest.java similarity index 98% rename from airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalUtilsTest.java rename to airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalUtilsTest.java index 6d99f56299002..b4fd33ce429fc 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalUtilsTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalUtilsTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal; +package io.airbyte.workers.temporal; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -13,7 +13,8 @@ import static org.mockito.Mockito.when; import io.airbyte.commons.concurrency.VoidCallable; -import io.airbyte.commons.temporal.stubs.HeartbeatWorkflow; +import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.temporal.stubs.HeartbeatWorkflow; import io.temporal.activity.Activity; import io.temporal.activity.ActivityCancellationType; import io.temporal.activity.ActivityExecutionContext; @@ -306,7 +307,7 @@ interface Activity1 { class Activity1Impl implements Activity1 { - private static final Logger LOGGER = LoggerFactory.getLogger(Activity1Impl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TestWorkflow.Activity1Impl.class); private static final String ACTIVITY1 = "activity1"; private final VoidCallable callable; @@ -398,7 +399,7 @@ public void activity(final String arg) { Thread.sleep(10000); return null; } else { - throw new Exception("failed"); + throw new WorkerException("failed"); } } else { return null; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java index c4768ed6acf07..52d21fe19d222 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java @@ -7,7 +7,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import io.airbyte.commons.temporal.TemporalUtils; +import io.airbyte.workers.temporal.TemporalUtils; import io.airbyte.workers.temporal.support.TemporalProxyHelper; import io.micronaut.context.BeanRegistration; import io.micronaut.inject.BeanIdentifier; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 37aa510782658..c51e6575c6826 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -8,13 +8,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import io.airbyte.commons.temporal.TemporalJobType; -import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; -import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; -import io.airbyte.commons.temporal.scheduling.state.WorkflowState; -import io.airbyte.commons.temporal.scheduling.state.listener.TestStateListener; -import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent; -import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.FailureReason; @@ -26,6 +19,7 @@ import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity; import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionActivityInput; @@ -48,6 +42,10 @@ import io.airbyte.workers.temporal.scheduling.activities.RouteToSyncTaskQueueActivity.RouteToSyncTaskQueueOutput; import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity; import io.airbyte.workers.temporal.scheduling.activities.WorkflowConfigActivity; +import io.airbyte.workers.temporal.scheduling.state.WorkflowState; +import io.airbyte.workers.temporal.scheduling.state.listener.TestStateListener; +import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent; +import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.DbtFailureSyncWorkflow; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.EmptySyncWorkflow; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.NormalizationFailureSyncWorkflow; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java index 3559b434ed053..da8786a54de89 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java @@ -9,11 +9,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.MetricTags; import io.airbyte.metrics.lib.OssMetricsRegistry; +import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity.FailureCause; import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity.RecordMetricInput; import java.util.Optional; diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/stubs/HeartbeatWorkflow.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/stubs/HeartbeatWorkflow.java similarity index 93% rename from airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/stubs/HeartbeatWorkflow.java rename to airbyte-workers/src/test/java/io/airbyte/workers/temporal/stubs/HeartbeatWorkflow.java index 3152300ef2477..306b6b9612aa0 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/stubs/HeartbeatWorkflow.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/stubs/HeartbeatWorkflow.java @@ -2,9 +2,9 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.commons.temporal.stubs; +package io.airbyte.workers.temporal.stubs; -import io.airbyte.commons.temporal.TemporalUtils; +import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityCancellationType; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/RouterServiceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/RouterServiceTest.java index 0f9165e99b158..9eecf2dcab731 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/RouterServiceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/RouterServiceTest.java @@ -7,7 +7,7 @@ import static io.airbyte.workers.temporal.sync.RouterService.MVP_DATA_PLANE_TASK_QUEUE; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.workers.temporal.TemporalJobType; import java.util.UUID; import org.junit.jupiter.api.Test; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index fe60a4415ae0c..1979cb9a879e1 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -15,7 +15,6 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; import io.airbyte.config.OperatorDbtInput; @@ -29,6 +28,7 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.TestConfigHelpers; +import io.airbyte.workers.temporal.TemporalUtils; import io.airbyte.workers.temporal.support.TemporalProxyHelper; import io.micronaut.context.BeanRegistration; import io.micronaut.inject.BeanIdentifier; diff --git a/docker-compose.yaml b/docker-compose.yaml index 0f2bd38af1d68..2e88f4a637a8a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -183,27 +183,14 @@ services: container_name: airbyte-cron restart: unless-stopped environment: - - AIRBYTE_VERSION=${VERSION} - - CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-} - - DATABASE_PASSWORD=${DATABASE_PASSWORD} - - DATABASE_URL=${DATABASE_URL} - - DATABASE_USER=${DATABASE_USER} - DB=postgresql - DB_PORT=${DATABASE_PORT} - - JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-} - - JOB_ERROR_REPORTING_STRATEGY=${JOB_ERROR_REPORTING_STRATEGY} - - JOB_ERROR_REPORTING_SENTRY_DSN=${JOB_ERROR_REPORTING_SENTRY_DSN} - - LOCAL_DOCKER_MOUNT=${LOCAL_DOCKER_MOUNT} - - LOCAL_ROOT=${LOCAL_ROOT} - LOG_LEVEL=${LOG_LEVEL} - - INTERNAL_API_HOST=${INTERNAL_API_HOST} - POSTGRES_PWD=${DATABASE_PASSWORD} - POSTGRES_SEEDS=${DATABASE_HOST} - POSTGRES_USER=${DATABASE_USER} - TEMPORAL_HISTORY_RETENTION_IN_DAYS=${TEMPORAL_HISTORY_RETENTION_IN_DAYS} - - SECRET_PERSISTENCE=${SECRET_PERSISTENCE} - WORKSPACE_ROOT=${WORKSPACE_ROOT} - - MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS} volumes: - workspace:${WORKSPACE_ROOT} volumes: diff --git a/settings.gradle b/settings.gradle index 4206edff4c211..09a5cbf56b822 100644 --- a/settings.gradle +++ b/settings.gradle @@ -76,7 +76,6 @@ include ':airbyte-test-utils' // airbyte-workers has a lot of dependencies. include ':airbyte-workers' // reused by acceptance tests in connector base. include ':airbyte-analytics' // transitively used by airbyte-workers. -include ':airbyte-commons-temporal' include ':airbyte-config:config-persistence' // transitively used by airbyte-workers. include ':airbyte-persistence:job-persistence' // transitively used by airbyte-workers. include ':airbyte-db:jooq' // transitively used by airbyte-workers. @@ -89,13 +88,13 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" include ':airbyte-config:init' include ':airbyte-config:specs' include ':airbyte-container-orchestrator' - include ':airbyte-cron' include ':airbyte-metrics:reporter' include ':airbyte-server' include ':airbyte-temporal' include ':airbyte-tests' include ':airbyte-webapp' include ':airbyte-webapp-e2e-tests' + include ':airbyte-cron' } // connectors base @@ -150,4 +149,3 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" } } } -