Skip to content

Commit

Permalink
Pairing on Worker Micronaut (#16364)
Browse files Browse the repository at this point in the history
* add RouteToSyncTaskQueue activity

* use new route activity in connection manager workflow

* format

* call router service for task queue
  • Loading branch information
pmossman committed Sep 8, 2022
1 parent d58af58 commit 253e191
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity;
import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity.FailureCause;
import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity.RecordMetricInput;
import io.airbyte.workers.temporal.scheduling.activities.RouteToSyncTaskQueueActivity;
import io.airbyte.workers.temporal.scheduling.activities.RouteToSyncTaskQueueActivity.RouteToSyncTaskQueueInput;
import io.airbyte.workers.temporal.scheduling.activities.RouteToSyncTaskQueueActivity.RouteToSyncTaskQueueOutput;
import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity;
import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity.DeleteStreamResetRecordsForJobInput;
import io.airbyte.workers.temporal.scheduling.activities.WorkflowConfigActivity;
Expand Down Expand Up @@ -109,6 +112,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String WORKFLOW_CONFIG_TAG = "workflow_config";
private static final int WORKFLOW_CONFIG_CURRENT_VERSION = 1;

private static final String ROUTE_ACTIVITY_TAG = "route_activity";
private static final int ROUTE_ACTIVITY_CURRENT_VERSION = 1;

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

private final WorkflowInternalState workflowInternalState = new WorkflowInternalState();
Expand All @@ -131,6 +137,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private RecordMetricActivity recordMetricActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private WorkflowConfigActivity workflowConfigActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private RouteToSyncTaskQueueActivity routeToSyncTaskQueueActivity;

private CancellationScope cancellableSyncWorkflow;

Expand Down Expand Up @@ -709,6 +717,28 @@ private GeneratedJobInput getJobInput() {
return syncWorkflowInputs;
}

private String getSyncTaskQueue() {
final int taskQueueChangeVersion =
Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION);

if (taskQueueChangeVersion < TASK_QUEUE_CHANGE_CURRENT_VERSION) {
return TemporalJobType.CONNECTION_UPDATER.name();
}

final int routeActivityVersion = Workflow.getVersion(ROUTE_ACTIVITY_TAG, Workflow.DEFAULT_VERSION, ROUTE_ACTIVITY_CURRENT_VERSION);

if (routeActivityVersion < ROUTE_ACTIVITY_CURRENT_VERSION) {
return TemporalJobType.SYNC.name();
}

final RouteToSyncTaskQueueInput routeToSyncTaskQueueInput = new RouteToSyncTaskQueueInput(connectionId);
final RouteToSyncTaskQueueOutput routeToSyncTaskQueueOutput = runMandatoryActivityWithOutput(
routeToSyncTaskQueueActivity::route,
routeToSyncTaskQueueInput);

return routeToSyncTaskQueueOutput.getTaskQueue();
}

/**
* Report the job as started in the job tracker and set it as running in the workflow internal
* state.
Expand All @@ -734,14 +764,8 @@ private void reportJobStarting() {
* make sense.
*/
private StandardSyncOutput runChildWorkflow(final GeneratedJobInput jobInputs) {
final int taskQueueChangeVersion =
Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION);

String taskQueue = TemporalJobType.SYNC.name();
final String taskQueue = getSyncTaskQueue();

if (taskQueueChangeVersion < TASK_QUEUE_CHANGE_CURRENT_VERSION) {
taskQueue = TemporalJobType.CONNECTION_UPDATER.name();
}
final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class,
ChildWorkflowOptions.newBuilder()
.setWorkflowId("sync_" + workflowInternalState.getJobId())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.activities;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@ActivityInterface
public interface RouteToSyncTaskQueueActivity {

@ActivityMethod
RouteToSyncTaskQueueOutput route(RouteToSyncTaskQueueInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
class RouteToSyncTaskQueueInput {

private UUID connectionId;

}

@Data
@NoArgsConstructor
@AllArgsConstructor
class RouteToSyncTaskQueueOutput {

private String taskQueue;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.workers.temporal.sync.RouterService;
import javax.inject.Inject;
import javax.inject.Singleton;

@Singleton
public class RouteToSyncTaskQueueActivityImpl implements RouteToSyncTaskQueueActivity {

@Inject
private RouterService routerService;

@Override
public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) {
final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId());

return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId);
}

}

0 comments on commit 253e191

Please sign in to comment.