diff --git a/airbyte-config/models/src/main/resources/types/StandardDiscoverSchemaInput.yaml b/airbyte-config/models/src/main/resources/types/StandardDiscoverSchemaInput.yaml index 032088f5d7d78..2eb0820da56ec 100644 --- a/airbyte-config/models/src/main/resources/types/StandardDiscoverSchemaInput.yaml +++ b/airbyte-config/models/src/main/resources/types/StandardDiscoverSchemaInput.yaml @@ -1,18 +1,14 @@ -{ - "$schema": "http://json-schema.org/draft-07/schema#", - "$id": "https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardDiscoverSchemaOutput.yaml", - "title": "StandardDiscoverSchemaOutput", - "description": "information required for connection.", - "type": "object", - "required": ["connectionConfiguration"], - "additionalProperties": false, - "properties": - { - "connectionConfiguration": - { - "description": "Integration specific blob. Must be a valid JSON string.", - "type": "object", - "existingJavaType": "com.fasterxml.jackson.databind.JsonNode", - }, - }, -} +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardDiscoverSchemaOutput.yaml +title: StandardDiscoverSchemaOutput +description: information required for connection. +type: object +required: + - connectionConfiguration +additionalProperties: false +properties: + connectionConfiguration: + description: Integration specific blob. Must be a valid JSON string. + type: object + existingJavaType: com.fasterxml.jackson.databind.JsonNode diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java index 6be7234659cc6..b9ea097d9df50 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/WorkerRunFactory.java @@ -26,11 +26,14 @@ import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobDiscoverSchemaConfig; +import io.airbyte.config.JobGetSpecConfig; import io.airbyte.config.JobOutput; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardDiscoverSchemaInput; import io.airbyte.config.StandardSyncInput; +import io.airbyte.workers.DefaultGetSpecWorker; +import io.airbyte.workers.GetSpecWorker; import io.airbyte.workers.Worker; import io.airbyte.workers.process.ProcessBuilderFactory; import io.airbyte.workers.protocols.singer.DefaultSingerTap; @@ -40,6 +43,7 @@ import io.airbyte.workers.protocols.singer.SingerSyncWorker; import io.airbyte.workers.wrappers.JobOutputCheckConnectionWorker; import io.airbyte.workers.wrappers.JobOutputDiscoverSchemaWorker; +import io.airbyte.workers.wrappers.JobOutputGetSpecWorker; import io.airbyte.workers.wrappers.JobOutputSyncWorker; import java.nio.file.Path; import org.slf4j.Logger; @@ -78,22 +82,23 @@ public WorkerRun create(final Job job) { LOGGER.info("job root: {}", jobRoot); switch (job.getConfig().getConfigType()) { - case CHECK_CONNECTION_SOURCE: - case CHECK_CONNECTION_DESTINATION: + case CHECK_CONNECTION_SOURCE, CHECK_CONNECTION_DESTINATION -> { final StandardCheckConnectionInput checkConnectionInput = getCheckConnectionInput(job.getConfig().getCheckConnection()); return creator.create( jobRoot, checkConnectionInput, new JobOutputCheckConnectionWorker( new SingerCheckConnectionWorker(new SingerDiscoverSchemaWorker(job.getConfig().getCheckConnection().getDockerImage(), pbf)))); - case DISCOVER_SCHEMA: + } + case DISCOVER_SCHEMA -> { final StandardDiscoverSchemaInput discoverSchemaInput = getDiscoverSchemaInput(job.getConfig().getDiscoverSchema()); return creator.create( jobRoot, discoverSchemaInput, new JobOutputDiscoverSchemaWorker( new SingerDiscoverSchemaWorker(job.getConfig().getDiscoverSchema().getDockerImage(), pbf))); - case SYNC: + } + case SYNC -> { final StandardSyncInput syncInput = getSyncInput(job.getConfig().getSync()); final SingerDiscoverSchemaWorker discoverSchemaWorker = new SingerDiscoverSchemaWorker(job.getConfig().getSync().getSourceDockerImage(), pbf); return creator.create( @@ -107,8 +112,16 @@ public WorkerRun create(final Job job) { new SingerSyncWorker( new DefaultSingerTap(job.getConfig().getSync().getSourceDockerImage(), pbf, discoverSchemaWorker), new DefaultSingerTarget(job.getConfig().getSync().getDestinationDockerImage(), pbf)))); - default: - throw new RuntimeException("Unexpected config type: " + job.getConfig().getConfigType()); + } + case GET_SPEC -> { + final JobGetSpecConfig getSpecInput = job.getConfig().getGetSpec(); + final GetSpecWorker worker = new DefaultGetSpecWorker(pbf); + return creator.create( + jobRoot, + getSpecInput, + new JobOutputGetSpecWorker(worker)); + } + default -> throw new RuntimeException("Unexpected config type: " + job.getConfig().getConfigType()); } } diff --git a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/WorkerRunFactoryTest.java b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/WorkerRunFactoryTest.java index c74dcc77a2591..0942c103993ce 100644 --- a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/WorkerRunFactoryTest.java +++ b/airbyte-scheduler/src/test/java/io/airbyte/scheduler/WorkerRunFactoryTest.java @@ -33,6 +33,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; import io.airbyte.config.JobConfig; +import io.airbyte.config.JobGetSpecConfig; import io.airbyte.config.JobOutput; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardDiscoverSchemaInput; @@ -41,6 +42,7 @@ import io.airbyte.workers.process.ProcessBuilderFactory; import io.airbyte.workers.wrappers.JobOutputCheckConnectionWorker; import io.airbyte.workers.wrappers.JobOutputDiscoverSchemaWorker; +import io.airbyte.workers.wrappers.JobOutputGetSpecWorker; import io.airbyte.workers.wrappers.JobOutputSyncWorker; import java.io.IOException; import java.nio.file.Files; @@ -121,4 +123,18 @@ void testSync() { Assertions.assertTrue(argument.getValue() instanceof JobOutputSyncWorker); } + @SuppressWarnings("unchecked") + @Test + void testGetSpec() { + when(job.getConfig().getConfigType()).thenReturn(JobConfig.ConfigType.GET_SPEC); + JobGetSpecConfig expectedConfig = new JobGetSpecConfig().withDockerImage("notarealimage"); + when(job.getConfig().getGetSpec()).thenReturn(expectedConfig); + + factory.create(job); + + ArgumentCaptor> argument = ArgumentCaptor.forClass(Worker.class); + verify(creator).create(eq(rootPath.resolve("1").resolve("2")), eq(expectedConfig), argument.capture()); + Assertions.assertTrue(argument.getValue() instanceof JobOutputGetSpecWorker); + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/wrappers/JobOutputGetSpecWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/wrappers/JobOutputGetSpecWorker.java new file mode 100644 index 0000000000000..bec140d056b1b --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/wrappers/JobOutputGetSpecWorker.java @@ -0,0 +1,40 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.workers.wrappers; + +import io.airbyte.config.JobGetSpecConfig; +import io.airbyte.config.JobOutput; +import io.airbyte.config.StandardGetSpecOutput; +import io.airbyte.workers.GetSpecWorker; + +public class JobOutputGetSpecWorker extends OutputConvertingWorker { + + public JobOutputGetSpecWorker(GetSpecWorker innerWorker) { + super( + innerWorker, + standardGetSpecOutput -> new JobOutput().withOutputType(JobOutput.OutputType.GET_SPEC).withGetSpec(standardGetSpecOutput)); + } + +}