Skip to content

Commit

Permalink
Convert airbyte-workers to Micronaut (airbytehq#16434)
Browse files Browse the repository at this point in the history
* WIP Convert airbyte-workers to Micronaut framework

* Rebase cleanup

* Fix broken tests

* Simplify code

* Support control vs data plane configuration

* make WORFKLOW_PROXY_CACHE non-static to avoid cacheing mocks in unit tests

* Formatting

* Pairing on Worker Micronaut (airbytehq#16364)

* add RouteToSyncTaskQueue activity

* use new route activity in connection manager workflow

* format

* call router service for task queue

* Revert temporal proxy changes

* Formatting

* Fix default value

* register new route activity in test

* fix SyncWorkflowTest now that it isn't doing any routing

* Update dependencies

* More dependency updates

* Update dependencies

* Improve conditional bean check

* Match existing Optional functionality

* Add notEquals check

* Add missing env var to Helm chart

* Fix typo

* Mark LogConfigs as Singleton

* Env vars for log/state storage type

* Remove use of Optional in bean declarations

* Fix typo in config property name

* Support Temporal Cloud namespace

* Change to @value

* Use correct value for conditional check

* Upgrade Micronaut

* Fix merge conflict

* Formatting

* Add missing env var

* Use sync task queue environment variable

* Handle sync task queue as set

* format and force http

* Handle case where sync task queue is empty

* Add correct path to config property

* Remove unused import

* Remove unused parameter

* Formatting

* Use pattern for condition process factory beans

* Cleanup

* PR feedback

* Revert hack for testing

Co-authored-by: pmossman <[email protected]>
  • Loading branch information
2 people authored and jhammarstedt committed Oct 31, 2022
1 parent a990577 commit 76c9fd6
Show file tree
Hide file tree
Showing 111 changed files with 4,995 additions and 1,794 deletions.
4 changes: 2 additions & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ LOG_LEVEL=INFO

### APPLICATIONS ###
# Worker #
WORKERS_MICRONAUT_ENVIRONMENTS=control
# Relevant to scaling.
MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
Expand All @@ -92,7 +93,6 @@ ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=
ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=
WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=


### FEATURE FLAGS ###
AUTO_DISABLE_FAILING_CONNECTIONS=false
EXPOSE_SECRETS_IN_EXPORT=false
Expand All @@ -104,4 +104,4 @@ METRIC_CLIENT=
# Useful only when metric client is set to be otel. Must start with http:// or https://.
OTEL_COLLECTOR_ENDPOINT="http://host.docker.internal:4317"

USE_STREAM_CAPABLE_STATE=true
USE_STREAM_CAPABLE_STATE=true
6 changes: 6 additions & 0 deletions .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ VERSION=dev
DATABASE_USER=docker
DATABASE_PASSWORD=docker
DATABASE_DB=airbyte
DATABASE_URL=jdbc:postgresql://db:5432/airbyte
CONFIG_ROOT=/data
WORKSPACE_ROOT=/tmp/workspace
DATA_DOCKER_MOUNT=airbyte_data_dev
Expand All @@ -24,6 +25,11 @@ API_URL=/api/v1/
INTERNAL_API_HOST=airbyte-server:8001
SYNC_JOB_MAX_ATTEMPTS=3
SYNC_JOB_MAX_TIMEOUT_DAYS=3
WORKERS_MICRONAUT_ENVIRONMENTS=control

# Sentry
SENTRY_DSN=""

# Migration Configuration
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.35.15.001
JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001
3 changes: 3 additions & 0 deletions airbyte-config/config-models/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ plugins {
}

dependencies {
annotationProcessor libs.bundles.micronaut.annotation.processor
implementation libs.bundles.micronaut.annotation

implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-commons')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public EnvConfigs() {
public EnvConfigs(final Map<String, String> envMap) {
this.getEnv = envMap::get;
this.getAllEnvKeys = envMap::keySet;
this.logConfigs = new LogConfigs(getLogConfiguration().orElse(null));
this.logConfigs = new LogConfigs(getLogConfiguration());
this.stateStorageCloudConfigs = getStateStorageConfiguration().orElse(null);

validateSyncWorkflowConfigs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@
package io.airbyte.config.helpers;

import io.airbyte.config.storage.CloudStorageConfigs;
import java.util.Optional;
import javax.inject.Named;
import javax.inject.Singleton;

/**
* Describes logging configuration. For now it just contains configuration around storage medium,
* but in the future will have other configuration options (e.g. json logging, etc).
*/
@Singleton
public class LogConfigs {

public final static LogConfigs EMPTY = new LogConfigs(null);
public final static LogConfigs EMPTY = new LogConfigs(Optional.empty());

private final CloudStorageConfigs storageConfigs;

public LogConfigs(final CloudStorageConfigs storageConfigs) {
this.storageConfigs = storageConfigs;
public LogConfigs(@Named("logStorageConfigs") final Optional<CloudStorageConfigs> storageConfigs) {
this.storageConfigs = storageConfigs.orElse(null);
}

public CloudStorageConfigs getStorageConfigs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,38 +10,39 @@
import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig;
import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig;
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import java.util.Optional;
import org.junit.jupiter.api.Test;

@SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert")
class CloudLogsClientTest {

@Test
void createCloudLogClientTestMinio() {
final var configs = new LogConfigs(CloudStorageConfigs.minio(new MinioConfig(
final var configs = new LogConfigs(Optional.of(CloudStorageConfigs.minio(new MinioConfig(
"test-bucket",
"access-key",
"access-key-secret",
"minio-endpoint")));
"minio-endpoint"))));

assertEquals(S3Logs.class, CloudLogs.createCloudLogClient(configs).getClass());
}

@Test
void createCloudLogClientTestAws() {
final var configs = new LogConfigs(CloudStorageConfigs.s3(new S3Config(
final var configs = new LogConfigs(Optional.of(CloudStorageConfigs.s3(new S3Config(
"test-bucket",
"access-key",
"access-key-secret",
"us-east-1")));
"us-east-1"))));

assertEquals(S3Logs.class, CloudLogs.createCloudLogClient(configs).getClass());
}

@Test
void createCloudLogClientTestGcs() {
final var configs = new LogConfigs(CloudStorageConfigs.gcs(new GcsConfig(
final var configs = new LogConfigs(Optional.of(CloudStorageConfigs.gcs(new GcsConfig(
"storage-bucket",
"path/to/google/secret")));
"path/to/google/secret"))));

assertEquals(GcsLogs.class, CloudLogs.createCloudLogClient(configs).getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
Expand All @@ -27,11 +28,11 @@ class S3LogsTest {
private static final Region REGION = Region.of(REGION_STRING);
private static final String BUCKET_NAME = "airbyte-kube-integration-logging-test";

private static final LogConfigs LOG_CONFIGS = new LogConfigs(CloudStorageConfigs.s3(new CloudStorageConfigs.S3Config(
private static final LogConfigs LOG_CONFIGS = new LogConfigs(Optional.of(CloudStorageConfigs.s3(new CloudStorageConfigs.S3Config(
System.getenv(LogClientSingleton.S3_LOG_BUCKET),
System.getenv(LogClientSingleton.AWS_ACCESS_KEY_ID),
System.getenv(LogClientSingleton.AWS_SECRET_ACCESS_KEY),
System.getenv(LogClientSingleton.S3_LOG_BUCKET_REGION))));
System.getenv(LogClientSingleton.S3_LOG_BUCKET_REGION)))));

private S3Client s3Client;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AsyncKubePodStatus;
Expand All @@ -23,7 +22,6 @@
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
import io.airbyte.workers.storage.StateClients;
import io.airbyte.workers.temporal.sync.DbtLauncherWorker;
import io.airbyte.workers.temporal.sync.NormalizationLauncherWorker;
Expand Down Expand Up @@ -58,6 +56,13 @@ public class ContainerOrchestratorApp {

public static final int MAX_SECONDS_TO_WAIT_FOR_FILE_COPY = 60;

// TODO Move the following to configuration once converted to a Micronaut service

// IMPORTANT: Changing the storage location will orphan already existing kube pods when the new
// version is deployed!
private static final Path STATE_STORAGE_PREFIX = Path.of("/state");
private static final Integer KUBE_HEARTBEAT_PORT = 9000;

private final String application;
private final Map<String, String> envMap;
private final JobRunConfig jobRunConfig;
Expand Down Expand Up @@ -114,7 +119,7 @@ private void runInternal(final DefaultAsyncStateManager asyncStateManager) {
throw new IllegalStateException("Could not find job orchestrator for application: " + application);
}

final var heartbeatServer = new WorkerHeartbeatServer(WorkerApp.KUBE_HEARTBEAT_PORT);
final var heartbeatServer = new WorkerHeartbeatServer(KUBE_HEARTBEAT_PORT);
heartbeatServer.startBackground();

asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.RUNNING);
Expand Down Expand Up @@ -146,7 +151,7 @@ public void run() {

// IMPORTANT: Changing the storage location will orphan already existing kube pods when the new
// version is deployed!
final var documentStoreClient = StateClients.create(configs.getStateStorageCloudConfigs(), WorkerApp.STATE_STORAGE_PREFIX);
final var documentStoreClient = StateClients.create(configs.getStateStorageCloudConfigs(), STATE_STORAGE_PREFIX);
final var asyncStateManager = new DefaultAsyncStateManager(documentStoreClient);

runInternal(asyncStateManager);
Expand Down Expand Up @@ -212,7 +217,8 @@ private static ProcessFactory getProcessBuilderFactory(final Configs configs, fi
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + WorkerApp.KUBE_HEARTBEAT_PORT;
// TODO move port to configuration
final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
log.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace());

// this needs to have two ports for the source and two ports for the destination (all four must be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.process;
package io.airbyte.container_orchestrator;

import com.google.common.collect.ImmutableMap;
import com.google.common.net.HttpHeaders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.sql.DataSource;
import org.elasticsearch.common.collect.Map;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down
13 changes: 11 additions & 2 deletions airbyte-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,19 @@ dependencies {
implementation project(':airbyte-notification')
implementation project(':airbyte-oauth')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-scheduler:client')
implementation(project(':airbyte-scheduler:client')) {
exclude module: 'airbyte-workers'
}
implementation project(':airbyte-scheduler:scheduler-models')
implementation project(':airbyte-scheduler:scheduler-persistence')
implementation project(':airbyte-workers')
implementation(project(':airbyte-workers')) {
// Temporary hack to avoid dependency conflicts
exclude group: 'io.micronaut'
exclude group: 'io.micronaut.flyway'
exclude group: 'io.micronaut.jaxrs'
exclude group: 'io.micronaut.security'
exclude group: 'io.micronaut.sql'
}

implementation libs.flyway.core
implementation 'com.github.slugify:slugify:2.4'
Expand Down
22 changes: 19 additions & 3 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@
import io.airbyte.server.handlers.DbMigrationHandler;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.temporal.ConnectionManagerUtils;
import io.airbyte.workers.temporal.StreamResetRecordsHelper;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalUtils;
import io.airbyte.workers.temporal.TemporalWorkflowUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.http.HttpClient;
Expand Down Expand Up @@ -224,14 +227,27 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
webUrlHelper,
jobErrorReportingClient);

final TemporalUtils temporalUtils = new TemporalUtils(
configs.getTemporalCloudClientCert(),
configs.getTemporalCloudClientKey(),
configs.temporalCloudEnabled(),
configs.getTemporalCloudHost(),
configs.getTemporalCloudNamespace(),
configs.getTemporalHost(),
configs.getTemporalRetentionInDays());

final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configsDatabase);
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService();
final WorkflowServiceStubs temporalService = temporalUtils.createTemporalService();
final ConnectionManagerUtils connectionManagerUtils = new ConnectionManagerUtils();
final StreamResetRecordsHelper streamResetRecordsHelper = new StreamResetRecordsHelper(jobPersistence, streamResetPersistence);

final TemporalClient temporalClient = new TemporalClient(
TemporalUtils.createWorkflowClient(temporalService, TemporalUtils.getNamespace()),
configs.getWorkspaceRoot(),
TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace()),
temporalService,
streamResetPersistence);
streamResetPersistence,
connectionManagerUtils,
streamResetRecordsHelper);

final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient);
final DefaultSynchronousSchedulerClient syncSchedulerClient =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class ArchiveHandlerTest {
private JsonSecretsProcessor jsonSecretsProcessor;
private ConfigRepository configRepository;
private ArchiveHandler archiveHandler;
private WorkspaceHelper workspaceHelper;

private static class NoOpFileTtlManager extends FileTtlManager {

Expand Down Expand Up @@ -146,14 +147,16 @@ void setup() throws Exception {

jobPersistence.setVersion(VERSION.serialize());

workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

archiveHandler = new ArchiveHandler(
VERSION,
configRepository,
secretsRepositoryReader,
secretsRepositoryWriter,
jobPersistence,
seedPersistence,
new WorkspaceHelper(configRepository, jobPersistence),
workspaceHelper,
new NoOpFileTtlManager(),
true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.test.airbyte_test_container.AirbyteTestContainer;
import io.airbyte.workers.temporal.TemporalUtils;
import io.airbyte.workers.temporal.TemporalWorkflowUtils;
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow;
import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
Expand Down Expand Up @@ -315,8 +316,9 @@ private void assignEnvVars() {
}

private WorkflowClient getWorkflowClient() {
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(
TemporalUtils.getAirbyteTemporalOptions("localhost:7233"),
final TemporalUtils temporalUtils = new TemporalUtils(null, null, null, null, null, null, null);
final WorkflowServiceStubs temporalService = temporalUtils.createTemporalService(
TemporalWorkflowUtils.getAirbyteTemporalOptions("localhost:7233"),
TemporalUtils.DEFAULT_NAMESPACE);
return WorkflowClient.newInstance(temporalService);
}
Expand Down
Loading

0 comments on commit 76c9fd6

Please sign in to comment.