From 85f3d7ed005087a33aab7097a148ea689e4e9918 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 6 Oct 2020 16:11:40 -0700 Subject: [PATCH 01/17] mvp of csv destination write --- airbyte-integrations/base-java/Dockerfile | 2 +- airbyte-integrations/base-java/javabase.sh | 2 +- .../integrations/base/StatefulConsumer.java | 54 +++++ airbyte-integrations/base/base.sh | 3 + .../csv-destination/.dockerignore | 3 + .../csv-destination/Dockerfile | 8 + .../csv-destination/build.gradle | 31 +++ .../destination/csv/CsvDestination.java | 194 ++++++++++++++++++ .../src/main/resources/spec.json | 28 +++ 9 files changed, 323 insertions(+), 2 deletions(-) create mode 100644 airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java create mode 100644 airbyte-integrations/csv-destination/.dockerignore create mode 100644 airbyte-integrations/csv-destination/Dockerfile create mode 100644 airbyte-integrations/csv-destination/build.gradle create mode 100644 airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java create mode 100644 airbyte-integrations/csv-destination/src/main/resources/spec.json diff --git a/airbyte-integrations/base-java/Dockerfile b/airbyte-integrations/base-java/Dockerfile index a523a45fc32ad..5271b3d58e9b9 100644 --- a/airbyte-integrations/base-java/Dockerfile +++ b/airbyte-integrations/base-java/Dockerfile @@ -10,6 +10,6 @@ ENV AIRBYTE_SPEC_CMD "./javabase.sh --spec" ENV AIRBYTE_CHECK_CMD "./javabase.sh --check" ENV AIRBYTE_DISCOVER_CMD "./javabase.sh --discover" ENV AIRBYTE_READ_CMD "./javabase.sh --read" -ENV AIRBYTE_READ_CMD "./javabase.sh --write" +ENV AIRBYTE_WRITE_CMD "./javabase.sh --write" ENTRYPOINT ["/airbyte/base.sh"] diff --git a/airbyte-integrations/base-java/javabase.sh b/airbyte-integrations/base-java/javabase.sh index 48ffa37c92276..82e5138f50768 100755 --- a/airbyte-integrations/base-java/javabase.sh +++ b/airbyte-integrations/base-java/javabase.sh @@ -4,4 +4,4 @@ set -e # wrap run script in a script so that we can lazy evaluate the value of APPLICATION. APPLICATION is # set by the dockerfile that inherits base-java, so it cannot be evaluated when base-java is built. -bin/"$APPLICATION" "$@" +cat <&0 | bin/"$APPLICATION" "$@" diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java new file mode 100644 index 0000000000000..9894280ada080 --- /dev/null +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.base; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class StatefulConsumer implements DestinationConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(StatefulConsumer.class); + + private boolean hasFailed = false; + + protected abstract void acceptInternal(T t) throws Exception; + + public void accept(T t) throws Exception { + try { + acceptInternal(t); + } catch (Exception e) { + hasFailed = true; + throw e; + } + } + + protected abstract void close(boolean hasFailed) throws Exception; + + public void close() throws Exception { + LOGGER.info("hasFailed: {}.", hasFailed); + close(hasFailed); + } + +} diff --git a/airbyte-integrations/base/base.sh b/airbyte-integrations/base/base.sh index d27d1096cc18a..869ca581f33e0 100755 --- a/airbyte-integrations/base/base.sh +++ b/airbyte-integrations/base/base.sh @@ -51,6 +51,9 @@ function main() { # todo: state should be optional: --state "$STATE_FILE" eval "$AIRBYTE_READ_CMD" --config "$CONFIG_FILE" --catalog "$CATALOG_FILE" ;; + write) + eval "$AIRBYTE_WRITE_CMD" --config "$CONFIG_FILE" --catalog "$CATALOG_FILE" + ;; *) error "Unknown command: $CMD" ;; diff --git a/airbyte-integrations/csv-destination/.dockerignore b/airbyte-integrations/csv-destination/.dockerignore new file mode 100644 index 0000000000000..65c7d0ad3e73c --- /dev/null +++ b/airbyte-integrations/csv-destination/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/csv-destination/Dockerfile b/airbyte-integrations/csv-destination/Dockerfile new file mode 100644 index 0000000000000..f82c9be477826 --- /dev/null +++ b/airbyte-integrations/csv-destination/Dockerfile @@ -0,0 +1,8 @@ +FROM airbyte/base-java:dev + +WORKDIR /airbyte +ENV APPLICATION csv-destination + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 diff --git a/airbyte-integrations/csv-destination/build.gradle b/airbyte-integrations/csv-destination/build.gradle new file mode 100644 index 0000000000000..81223f4551346 --- /dev/null +++ b/airbyte-integrations/csv-destination/build.gradle @@ -0,0 +1,31 @@ +import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage +plugins { + id 'com.bmuschko.docker-remote-api' + id 'application' +} +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-singer') + implementation project(':airbyte-integrations:base-java') + + implementation 'org.apache.commons:commons-csv:1.4' +} + +application { + mainClass = 'io.airbyte.integrations.destination.csv.CsvDestination' +} + + +def image = 'airbyte/airbyte-csv-destination:dev' + +task imageName { + doLast { + println "IMAGE $image" + } +} + +task buildImage(type: DockerBuildImage) { + inputDir = projectDir + images.add(image) + dependsOn ':airbyte-integrations:base-java:buildImage' +} diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java new file mode 100644 index 0000000000000..7bc186f98ecc0 --- /dev/null +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -0,0 +1,194 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.csv; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.DestinationConnectionSpecification; +import io.airbyte.config.Schema; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.StandardCheckConnectionOutput.Status; +import io.airbyte.config.StandardDiscoverSchemaOutput; +import io.airbyte.config.Stream; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.DestinationConsumer; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.base.StatefulConsumer; +import io.airbyte.singer.SingerMessage; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CsvDestination implements Destination { + + private static final String COLUMN_NAME = "data"; + + private static final Logger LOGGER = LoggerFactory.getLogger(CsvDestination.class); + + private static final String DESTINATION_PATH_FIELD = "destination_path"; + + @Override + public DestinationConnectionSpecification spec() throws IOException { + final String resourceString = MoreResources.readResource("spec.json"); + return Jsons.deserialize(resourceString, DestinationConnectionSpecification.class); + } + + @Override + public StandardCheckConnectionOutput check(JsonNode config) { + try { + FileUtils.forceMkdir(getDestinationPath(config).toFile()); + } catch (IOException e) { + return new StandardCheckConnectionOutput().withStatus(Status.FAILURE).withMessage(e.getMessage()); + } + return new StandardCheckConnectionOutput().withStatus(Status.SUCCESS); + } + + @Override + public StandardDiscoverSchemaOutput discover(JsonNode config) { + throw new RuntimeException("Not Implemented"); + } + + @Override + public DestinationConsumer write(JsonNode config, Schema schema) throws IOException { + final Path destinationDir = getDestinationPath(config); + + FileUtils.forceMkdir(destinationDir.toFile()); + + final long now = Instant.now().toEpochMilli(); + final Map writeConfigs = new HashMap<>(); + for (final Stream stream : schema.getStreams()) { + final Path tmpPath = destinationDir.resolve(stream.getName() + "_" + now + ".csv"); + final Path finalPath = destinationDir.resolve(stream.getName() + ".csv"); + final FileWriter fileWriter = new FileWriter(tmpPath.toFile()); + final CSVPrinter printer = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(COLUMN_NAME)); + writeConfigs.put(stream.getName(), new WriteConfig(printer, tmpPath, finalPath)); + } + + return new CsvConsumer(writeConfigs, schema); + } + + /** + * Extract provided relative path from csv config object and append to local mount path. + * + * @param config - csv config object + * @return absolute path with the relative path appended to the local volume mount. + */ + private Path getDestinationPath(JsonNode config) { + final String destinationRelativePath = config.get(DESTINATION_PATH_FIELD).asText(); + Preconditions.checkNotNull(destinationRelativePath); + + // append destination path to the local mount. + return JavaBaseConstants.LOCAL_MOUNT.resolve(destinationRelativePath); + } + + public static class WriteConfig { + + private final CSVPrinter writer; + private final Path tmpPath; + private final Path finalPath; + + public WriteConfig(CSVPrinter writer, Path tmpPath, Path finalPath) { + this.writer = writer; + this.tmpPath = tmpPath; + this.finalPath = finalPath; + } + + public CSVPrinter getWriter() { + return writer; + } + + public Path getTmpPath() { + return tmpPath; + } + + public Path getFinalPath() { + return finalPath; + } + + } + + public static class CsvConsumer extends StatefulConsumer { + + private final Map writeConfigs; + private final Schema schema; + + public CsvConsumer(Map writeConfigs, Schema schema) { + this.schema = schema; + LOGGER.info("initializing consumer."); + + this.writeConfigs = writeConfigs; + } + + @Override + protected void acceptInternal(SingerMessage singerMessage) throws Exception { + if (writeConfigs.containsKey(singerMessage.getStream())) { + writeConfigs.get(singerMessage.getStream()).getWriter().printRecord(Jsons.serialize(singerMessage.getRecord())); + } else { + throw new IllegalArgumentException( + String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", + Jsons.serialize(schema), Jsons.serialize(singerMessage))); + } + } + + @Override + protected void close(boolean hasFailed) throws IOException { + LOGGER.info("finalizing consumer."); + + for (final Map.Entry entries : writeConfigs.entrySet()) { + try { + entries.getValue().getWriter().flush(); + entries.getValue().getWriter().close(); + } catch (Exception e) { + hasFailed = true; + LOGGER.error("failed to close writer for: {}.", entries.getKey()); + } + } + if (!hasFailed) { + for (final WriteConfig writeConfig : writeConfigs.values()) { + Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + } + + } + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new CsvDestination()).run(args); + } + +} diff --git a/airbyte-integrations/csv-destination/src/main/resources/spec.json b/airbyte-integrations/csv-destination/src/main/resources/spec.json new file mode 100644 index 0000000000000..547590d7801be --- /dev/null +++ b/airbyte-integrations/csv-destination/src/main/resources/spec.json @@ -0,0 +1,28 @@ +{ + "destinationId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6", + "destinationSpecificationId": "8442ee76-cc1d-419a-bd8b-859a090366d4", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv", + "specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Singer CSV Target Spec", + "type": "object", + "required": ["delimiter", "quotechar"], + "additionalProperties": false, + "properties": { + "delimiter": { + "description": "Delimiter used to separate fields.", + "type": "string", + "examples": [","] + }, + "quotechar": { + "description": "The character used to quote strings containing special characters in the CSV. See python docs for more details.", + "type": "string", + "examples": ["\""] + }, + "destination_path": { + "description": "Path to the directory where csv files will be written. Check out the docs for more details on the root of this path.", + "type": "string" + } + } + } +} From 8787d7c73abd9846e36788758ee365b7e0069724 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 15:44:47 -0700 Subject: [PATCH 02/17] rename the consumer --- .../{StatefulConsumer.java => FailureTrackingConsumer.java} | 4 ++-- .../airbyte/integrations/destination/csv/CsvDestination.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) rename airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/{StatefulConsumer.java => FailureTrackingConsumer.java} (90%) diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java similarity index 90% rename from airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java rename to airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java index 9894280ada080..20c70e283bcca 100644 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java @@ -27,9 +27,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class StatefulConsumer implements DestinationConsumer { +public abstract class FailureTrackingConsumer implements DestinationConsumer { - private static final Logger LOGGER = LoggerFactory.getLogger(StatefulConsumer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FailureTrackingConsumer.class); private boolean hasFailed = false; diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index 7bc186f98ecc0..d9bbe2146ab04 100644 --- a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -36,9 +36,9 @@ import io.airbyte.config.Stream; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.DestinationConsumer; +import io.airbyte.integrations.base.FailureTrackingConsumer; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.base.StatefulConsumer; import io.airbyte.singer.SingerMessage; import java.io.FileWriter; import java.io.IOException; @@ -142,7 +142,7 @@ public Path getFinalPath() { } - public static class CsvConsumer extends StatefulConsumer { + public static class CsvConsumer extends FailureTrackingConsumer { private final Map writeConfigs; private final Schema schema; From a100b2cb7284549924020278ae6998183f0cd14f Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 15:51:24 -0700 Subject: [PATCH 03/17] clean up --- .../integrations/destination/csv/CsvDestination.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index d9bbe2146ab04..04a206c11d0f4 100644 --- a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -56,10 +56,9 @@ public class CsvDestination implements Destination { - private static final String COLUMN_NAME = "data"; - private static final Logger LOGGER = LoggerFactory.getLogger(CsvDestination.class); + private static final String COLUMN_NAME = "data"; // we output all data as a blog to a single column calle data. private static final String DESTINATION_PATH_FIELD = "destination_path"; @Override @@ -78,6 +77,7 @@ public StandardCheckConnectionOutput check(JsonNode config) { return new StandardCheckConnectionOutput().withStatus(Status.SUCCESS); } + // todo (cgardens) - we currently don't leverage discover in our destinations, so skipping implementing it... for now. @Override public StandardDiscoverSchemaOutput discover(JsonNode config) { throw new RuntimeException("Not Implemented"); @@ -183,6 +183,10 @@ protected void close(boolean hasFailed) throws IOException { Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING); } } + for (final WriteConfig writeConfig : writeConfigs.values()) { + Files.deleteIfExists(writeConfig.getTmpPath()); + } + } } From e8cc2b995d03fb64557656ecac11af24b238afa1 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 17:09:13 -0700 Subject: [PATCH 04/17] remove local mount from integration --- .../io/airbyte/integrations/base/JavaBaseConstants.java | 5 ----- .../integrations/destination/csv/CsvDestination.java | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java index cc94543c45112..afa1fc858352b 100644 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java @@ -35,9 +35,4 @@ public class JavaBaseConstants { public static String ARGS_CONFIG_DESC = "path to the json configuration file"; public static String ARGS_CATALOG_DESC = "input path for the catalog"; public static String ARGS_PATH_DESC = "path to the json-encoded state file"; - - // todo (cgardens) - this mount path should be passed in by the worker and read as an arg or - // environment variable by the runner. - public static Path LOCAL_MOUNT = Path.of("/local"); - } diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index 04a206c11d0f4..dc502c36084a0 100644 --- a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -77,7 +77,8 @@ public StandardCheckConnectionOutput check(JsonNode config) { return new StandardCheckConnectionOutput().withStatus(Status.SUCCESS); } - // todo (cgardens) - we currently don't leverage discover in our destinations, so skipping implementing it... for now. + // todo (cgardens) - we currently don't leverage discover in our destinations, so skipping + // implementing it... for now. @Override public StandardDiscoverSchemaOutput discover(JsonNode config) { throw new RuntimeException("Not Implemented"); @@ -112,8 +113,7 @@ private Path getDestinationPath(JsonNode config) { final String destinationRelativePath = config.get(DESTINATION_PATH_FIELD).asText(); Preconditions.checkNotNull(destinationRelativePath); - // append destination path to the local mount. - return JavaBaseConstants.LOCAL_MOUNT.resolve(destinationRelativePath); + return Path.of(destinationRelativePath); } public static class WriteConfig { From 54bf8ef07eb261fd8fc56c8a0bd211f755d39160 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 17:09:24 -0700 Subject: [PATCH 05/17] update spec --- .../src/main/resources/spec.json | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/airbyte-integrations/csv-destination/src/main/resources/spec.json b/airbyte-integrations/csv-destination/src/main/resources/spec.json index 547590d7801be..b94852dd22d9e 100644 --- a/airbyte-integrations/csv-destination/src/main/resources/spec.json +++ b/airbyte-integrations/csv-destination/src/main/resources/spec.json @@ -1,27 +1,19 @@ { - "destinationId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6", - "destinationSpecificationId": "8442ee76-cc1d-419a-bd8b-859a090366d4", + "destinationId": "", + "destinationSpecificationId": "", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv", "specification": { "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Singer CSV Target Spec", + "title": "CSV Destination Spec", "type": "object", - "required": ["delimiter", "quotechar"], + "required": ["destination_path"], "additionalProperties": false, "properties": { - "delimiter": { - "description": "Delimiter used to separate fields.", - "type": "string", - "examples": [","] - }, - "quotechar": { - "description": "The character used to quote strings containing special characters in the CSV. See python docs for more details.", - "type": "string", - "examples": ["\""] - }, "destination_path": { - "description": "Path to the directory where csv files will be written. Check out the docs for more details on the root of this path.", - "type": "string" + "description": "Path to the directory where csv files will be written. Must start with the local mount \"/local\". Any other directory appended on the end will be placed inside that local mount.", + "type": "string", + "examples": ["/local"], + "pattern": "(^\\/local\\/.*)|(^\\/local$)" } } } From 121d9b423c039f801a59f6797cf62c2217b296b0 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 17:11:47 -0700 Subject: [PATCH 06/17] clean up --- .../java/io/airbyte/integrations/base/JavaBaseConstants.java | 3 +-- .../airbyte/integrations/destination/csv/CsvDestination.java | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java index afa1fc858352b..15dadf9fe4db1 100644 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java @@ -24,8 +24,6 @@ package io.airbyte.integrations.base; -import java.nio.file.Path; - public class JavaBaseConstants { public static String ARGS_CONFIG_KEY = "config"; @@ -35,4 +33,5 @@ public class JavaBaseConstants { public static String ARGS_CONFIG_DESC = "path to the json configuration file"; public static String ARGS_CATALOG_DESC = "input path for the catalog"; public static String ARGS_PATH_DESC = "path to the json-encoded state file"; + } diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index dc502c36084a0..d86ec340dee62 100644 --- a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -38,7 +38,6 @@ import io.airbyte.integrations.base.DestinationConsumer; import io.airbyte.integrations.base.FailureTrackingConsumer; import io.airbyte.integrations.base.IntegrationRunner; -import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.singer.SingerMessage; import java.io.FileWriter; import java.io.IOException; From 1300a65c803bd63c4a7b73e58962274f217c6dee Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 17:19:15 -0700 Subject: [PATCH 07/17] add rough docs --- docs/integrations/destinations/local-csv2.md | 34 ++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 docs/integrations/destinations/local-csv2.md diff --git a/docs/integrations/destinations/local-csv2.md b/docs/integrations/destinations/local-csv2.md new file mode 100644 index 0000000000000..f1eafe6b7ae13 --- /dev/null +++ b/docs/integrations/destinations/local-csv2.md @@ -0,0 +1,34 @@ +# Local CSV + +## Overview + +This destination writes data to a directory on the _local_ filesystem on the host running Airbyte. By default, data is written to `/tmp/airbyte_local`. To change this location, modify the `LOCAL_ROOT` environment variable for Airbyte. + +### Sync Overview + +#### Output schema + +This destination outputs files with the name of the stream. Each row will be written as a new line in the output CSV file. + +#### Data Type Mapping + +The output file will have a single column called `data` which will be populated by the full record as a json blob. + +#### Features + +This section should contain a table with the following format: + +| Feature | Supported | +| :--- | :--- | +| Full Refresh Sync | Yes | + +#### Performance considerations + +This integration will be constrained by the speed at which your filesystem accepts writes. + +## Getting Started + +### Requirements: + +* The `destination_path` field must start with `/local` which is the name of the local mount that points to `LOCAL_ROOT`. Any other directories in this path will be placed inside the `LOCAL_ROOT`. By default, the value of `LOCAL_ROOT` is `/tmp/airbyte_local`. e.g. if `destination_path` is `/local/my/data`, the output will be written to `/tmp/airbyte_local/my/data`. + From ddf557af0f08ab2c962cbe0f8c027fefa5341508 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 17:38:13 -0700 Subject: [PATCH 08/17] various clean up --- .../base/FailureTrackingConsumer.java | 4 +- .../destination/csv/CsvDestination.java | 68 +++++++++++-------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java index 20c70e283bcca..c32a03d8ac8ce 100644 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java @@ -33,11 +33,11 @@ public abstract class FailureTrackingConsumer implements DestinationConsumer< private boolean hasFailed = false; - protected abstract void acceptInternal(T t) throws Exception; + protected abstract void acceptTracked(T t) throws Exception; public void accept(T t) throws Exception { try { - acceptInternal(t); + acceptTracked(t); } catch (Exception e) { hasFailed = true; throw e; diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index d86ec340dee62..93556b6f70852 100644 --- a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -83,6 +83,13 @@ public StandardDiscoverSchemaOutput discover(JsonNode config) { throw new RuntimeException("Not Implemented"); } + /** + * + * @param config - csv destination config. + * @param schema - schema of the incoming messages. + * @return - a consumer to handle writing records to the filesystem. + * @throws IOException - exception throw in manipulating the filesytem. + */ @Override public DestinationConsumer write(JsonNode config, Schema schema) throws IOException { final Path destinationDir = getDestinationPath(config); @@ -115,32 +122,11 @@ private Path getDestinationPath(JsonNode config) { return Path.of(destinationRelativePath); } - public static class WriteConfig { - - private final CSVPrinter writer; - private final Path tmpPath; - private final Path finalPath; - - public WriteConfig(CSVPrinter writer, Path tmpPath, Path finalPath) { - this.writer = writer; - this.tmpPath = tmpPath; - this.finalPath = finalPath; - } - - public CSVPrinter getWriter() { - return writer; - } - - public Path getTmpPath() { - return tmpPath; - } - - public Path getFinalPath() { - return finalPath; - } - - } - + /** + * This consumer writes individual records to temporary files. If all of the messages are written + * successfully, it moves the tmp files to files named by their respective stream. If there are any + * failures, nothing is written. + */ public static class CsvConsumer extends FailureTrackingConsumer { private final Map writeConfigs; @@ -154,7 +140,7 @@ public CsvConsumer(Map writeConfigs, Schema schema) { } @Override - protected void acceptInternal(SingerMessage singerMessage) throws Exception { + protected void acceptTracked(SingerMessage singerMessage) throws Exception { if (writeConfigs.containsKey(singerMessage.getStream())) { writeConfigs.get(singerMessage.getStream()).getWriter().printRecord(Jsons.serialize(singerMessage.getRecord())); } else { @@ -177,11 +163,13 @@ protected void close(boolean hasFailed) throws IOException { LOGGER.error("failed to close writer for: {}.", entries.getKey()); } } + // do not persist the data, if there are any failures. if (!hasFailed) { for (final WriteConfig writeConfig : writeConfigs.values()) { Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING); } } + // clean up tmp files. for (final WriteConfig writeConfig : writeConfigs.values()) { Files.deleteIfExists(writeConfig.getTmpPath()); } @@ -190,6 +178,32 @@ protected void close(boolean hasFailed) throws IOException { } + public static class WriteConfig { + + private final CSVPrinter writer; + private final Path tmpPath; + private final Path finalPath; + + public WriteConfig(CSVPrinter writer, Path tmpPath, Path finalPath) { + this.writer = writer; + this.tmpPath = tmpPath; + this.finalPath = finalPath; + } + + public CSVPrinter getWriter() { + return writer; + } + + public Path getTmpPath() { + return tmpPath; + } + + public Path getFinalPath() { + return finalPath; + } + + } + public static void main(String[] args) throws Exception { new IntegrationRunner(new CsvDestination()).run(args); } From c09b25a11d892c9135ea241f078562e788d3174d Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 17:51:06 -0700 Subject: [PATCH 09/17] add FailureTrackingConsumerTest --- .../base/FailureTrackingConsumerTest.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java diff --git a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java new file mode 100644 index 0000000000000..d33561028efcc --- /dev/null +++ b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java @@ -0,0 +1,45 @@ +package io.airbyte.integrations.base; + + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.Test; + +class FailureTrackingConsumerTest { + @Test + void testNoFailure() throws Exception { + final TestConsumer consumer = spy(new TestConsumer()); + consumer.accept(""); + consumer.close(); + + verify(consumer).close(false); + } + + @Test + void testWithFailure() throws Exception { + final TestConsumer consumer = spy(new TestConsumer()); + doThrow(new RuntimeException()).when(consumer).acceptTracked(""); + + // verify the exception still gets thrown. + assertThrows(RuntimeException.class, () -> consumer.accept("")); + consumer.close(); + + verify(consumer).close(true); + } + + static class TestConsumer extends FailureTrackingConsumer { + + @Override + protected void acceptTracked(String s) { + + } + + @Override + protected void close(boolean hasFailed) { + + } + } +} From 9a3a1db7f9ccf3832f79431a3a1c6852e9cce96d Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 19:38:31 -0700 Subject: [PATCH 10/17] CsvDestinationTest --- .../base/FailureTrackingConsumerTest.java | 28 ++- .../destination/csv/CsvDestination.java | 10 +- .../destination/csv/CsvDestinationTest.java | 203 ++++++++++++++++++ 3 files changed, 235 insertions(+), 6 deletions(-) create mode 100644 airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java diff --git a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java index d33561028efcc..487fd5754cfab 100644 --- a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java +++ b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java @@ -1,5 +1,28 @@ -package io.airbyte.integrations.base; +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package io.airbyte.integrations.base; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.doThrow; @@ -9,6 +32,7 @@ import org.junit.jupiter.api.Test; class FailureTrackingConsumerTest { + @Test void testNoFailure() throws Exception { final TestConsumer consumer = spy(new TestConsumer()); @@ -41,5 +65,7 @@ protected void acceptTracked(String s) { protected void close(boolean hasFailed) { } + } + } diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index 93556b6f70852..b45a69e681eb5 100644 --- a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -57,8 +57,8 @@ public class CsvDestination implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(CsvDestination.class); - private static final String COLUMN_NAME = "data"; // we output all data as a blog to a single column calle data. - private static final String DESTINATION_PATH_FIELD = "destination_path"; + static final String COLUMN_NAME = "data"; // we output all data as a blog to a single column. + static final String DESTINATION_PATH_FIELD = "destination_path"; @Override public DestinationConnectionSpecification spec() throws IOException { @@ -70,7 +70,7 @@ public DestinationConnectionSpecification spec() throws IOException { public StandardCheckConnectionOutput check(JsonNode config) { try { FileUtils.forceMkdir(getDestinationPath(config).toFile()); - } catch (IOException e) { + } catch (Exception e) { return new StandardCheckConnectionOutput().withStatus(Status.FAILURE).withMessage(e.getMessage()); } return new StandardCheckConnectionOutput().withStatus(Status.SUCCESS); @@ -127,7 +127,7 @@ private Path getDestinationPath(JsonNode config) { * successfully, it moves the tmp files to files named by their respective stream. If there are any * failures, nothing is written. */ - public static class CsvConsumer extends FailureTrackingConsumer { + private static class CsvConsumer extends FailureTrackingConsumer { private final Map writeConfigs; private final Schema schema; @@ -178,7 +178,7 @@ protected void close(boolean hasFailed) throws IOException { } - public static class WriteConfig { + private static class WriteConfig { private final CSVPrinter writer; private final Path tmpPath; diff --git a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java new file mode 100644 index 0000000000000..ce39d01969218 --- /dev/null +++ b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java @@ -0,0 +1,203 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.csv; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.DataType; +import io.airbyte.config.DestinationConnectionSpecification; +import io.airbyte.config.Field; +import io.airbyte.config.Schema; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.StandardCheckConnectionOutput.Status; +import io.airbyte.config.Stream; +import io.airbyte.integrations.base.DestinationConsumer; +import io.airbyte.singer.SingerMessage; +import io.airbyte.singer.SingerMessage.Type; +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CsvDestinationTest { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private static final String USERS_STREAM_NAME = "users"; + private static final String TASKS_STREAM_NAME = "tasks"; + private static final String USERS_FILE = USERS_STREAM_NAME + ".csv"; + private static final String TASKS_FILE = TASKS_STREAM_NAME + ".csv"; + private static final SingerMessage SINGER_MESSAGE_USERS1 = new SingerMessage().withType(Type.RECORD).withStream(USERS_STREAM_NAME) + .withRecord(objectMapper.createObjectNode().put("name", "john").put("id", "10")); + private static final SingerMessage SINGER_MESSAGE_USERS2 = new SingerMessage().withType(Type.RECORD).withStream(USERS_STREAM_NAME) + .withRecord(objectMapper.createObjectNode().put("name", "susan").put("id", "30")); + private static final SingerMessage SINGER_MESSAGE_TASKS1 = new SingerMessage().withType(Type.RECORD).withStream(TASKS_STREAM_NAME) + .withRecord(objectMapper.createObjectNode().put("goal", "announce the game.")); + private static final SingerMessage SINGER_MESSAGE_TASKS2 = new SingerMessage().withType(Type.RECORD).withStream(TASKS_STREAM_NAME) + .withRecord(objectMapper.createObjectNode().put("goal", "ship some code.")); + + private static final Schema CATALOG = new Schema().withStreams(Lists.newArrayList( + new Stream().withName(USERS_STREAM_NAME) + .withFields(Lists.newArrayList(new Field().withName("name").withDataType(DataType.STRING).withSelected(true), + new Field().withName("id").withDataType(DataType.STRING).withSelected(true))), + new Stream().withName(TASKS_STREAM_NAME) + .withFields(Lists.newArrayList(new Field().withName("goal").withDataType(DataType.STRING).withSelected(true))))); + + private Path destinationPath; + private ObjectNode config; + + @BeforeEach + void setup() throws IOException { + destinationPath = Files.createTempDirectory("test"); + config = objectMapper.createObjectNode().put(CsvDestination.DESTINATION_PATH_FIELD, destinationPath.toString()); + } + + @Test + void testSpec() throws IOException { + final DestinationConnectionSpecification actual = new CsvDestination().spec(); + final String resourceString = MoreResources.readResource("spec.json"); + final DestinationConnectionSpecification expected = Jsons.deserialize(resourceString, DestinationConnectionSpecification.class); + + assertEquals(expected, actual); + } + + @Test + void testCheckSuccess() { + final StandardCheckConnectionOutput actual = new CsvDestination().check(config); + final StandardCheckConnectionOutput expected = new StandardCheckConnectionOutput().withStatus(Status.SUCCESS); + assertEquals(expected, actual); + } + + @Test + void testCheckFailure() throws IOException { + Path looksLikeADirectoryButIsAFile = destinationPath.resolve("file"); + FileUtils.touch(looksLikeADirectoryButIsAFile.toFile()); + final ObjectNode config = objectMapper.createObjectNode().put(CsvDestination.DESTINATION_PATH_FIELD, looksLikeADirectoryButIsAFile.toString()); + final StandardCheckConnectionOutput actual = new CsvDestination().check(config); + final StandardCheckConnectionOutput expected = new StandardCheckConnectionOutput().withStatus(Status.FAILURE); + + // the message includes the random file path, so just verify it exists and then remove it when we do + // rest of the comparison. + assertNotNull(actual.getMessage()); + actual.setMessage(null); + assertEquals(expected, actual); + } + + @Test + void testWriteSuccess() throws Exception { + final DestinationConsumer consumer = new CsvDestination().write(config, CATALOG); + + consumer.accept(SINGER_MESSAGE_USERS1); + consumer.accept(SINGER_MESSAGE_TASKS1); + consumer.accept(SINGER_MESSAGE_USERS2); + consumer.accept(SINGER_MESSAGE_TASKS2); + consumer.close(); + + // verify contents of CSV file + final List usersActual = Files.readAllLines(destinationPath.resolve(USERS_FILE)); + // csv add all of these goofy quotes. + final List usersExpected = Lists.newArrayList( + CsvDestination.COLUMN_NAME, + "\"{\"\"name\"\":\"\"john\"\",\"\"id\"\":\"\"10\"\"}\"", + "\"{\"\"name\"\":\"\"susan\"\",\"\"id\"\":\"\"30\"\"}\""); + + assertEquals(usersExpected, usersActual); + + final List tasksActual = Files.readAllLines(destinationPath.resolve(TASKS_FILE)); + final List tasksExpected = Lists.newArrayList( + CsvDestination.COLUMN_NAME, + "\"{\"\"goal\"\":\"\"announce the game.\"\"}\"", + "\"{\"\"goal\"\":\"\"ship some code.\"\"}\""); + + assertEquals(tasksActual, tasksExpected); + + // verify that the file is parsable as json (sanity check since the quoting is so goofy). + List actualUsersJson = csvToJson(destinationPath.resolve(USERS_FILE)); + List expectedUsersJson = Lists.newArrayList(SINGER_MESSAGE_USERS1.getRecord(), SINGER_MESSAGE_USERS2.getRecord()); + assertEquals(expectedUsersJson, actualUsersJson); + + List actualTasksJson = csvToJson(destinationPath.resolve(TASKS_FILE)); + List expectedTasksJson = Lists.newArrayList(SINGER_MESSAGE_TASKS1.getRecord(), SINGER_MESSAGE_TASKS2.getRecord()); + assertEquals(expectedTasksJson, actualTasksJson); + + // verify tmp files are cleaned up + final Set actualFilenames = Files.list(destinationPath).map(Path::getFileName).map(Path::toString).collect(Collectors.toSet()); + final Set expectedFilenames = Sets.newHashSet(USERS_FILE, TASKS_FILE); + assertEquals(expectedFilenames, actualFilenames); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + void testWriteFailure() throws Exception { + // hack to force an exception to be thrown from within the consumer. + final SingerMessage spiedMessage = spy(SINGER_MESSAGE_USERS1); + doThrow(new RuntimeException()).when(spiedMessage).getStream(); + + final DestinationConsumer consumer = spy(new CsvDestination().write(config, CATALOG)); + + assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage)); + consumer.accept(SINGER_MESSAGE_USERS2); + consumer.close(); + + // verify tmp files are cleaned up and no file are output + final Set actualFilenames = Files.list(destinationPath).map(Path::getFileName).map(Path::toString).collect(Collectors.toSet()); + assertEquals(Collections.emptySet(), actualFilenames); + } + + private List csvToJson(Path csvPath) throws IOException { + Reader in = new FileReader(csvPath.toFile()); + Iterable records = CSVFormat.DEFAULT + .withHeader(CsvDestination.COLUMN_NAME) + .withFirstRecordAsHeader() + .parse(in); + + final List jsonRecords = new ArrayList<>(); + for (final CSVRecord record : records) { + jsonRecords.add(Jsons.deserialize(record.toMap().get(CsvDestination.COLUMN_NAME))); + } + return jsonRecords; + } + +} From 39bc4c748edd7abf79ebe3f2f48a41d8f922c1e5 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 19:41:18 -0700 Subject: [PATCH 11/17] clean up --- .../integrations/destination/csv/CsvDestinationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java index ce39d01969218..f8793da7aaabe 100644 --- a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java +++ b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java @@ -137,7 +137,7 @@ void testWriteSuccess() throws Exception { // verify contents of CSV file final List usersActual = Files.readAllLines(destinationPath.resolve(USERS_FILE)); - // csv add all of these goofy quotes. + // csv adds all of these goofy quotes. final List usersExpected = Lists.newArrayList( CsvDestination.COLUMN_NAME, "\"{\"\"name\"\":\"\"john\"\",\"\"id\"\":\"\"10\"\"}\"", @@ -181,7 +181,7 @@ void testWriteFailure() throws Exception { consumer.accept(SINGER_MESSAGE_USERS2); consumer.close(); - // verify tmp files are cleaned up and no file are output + // verify tmp files are cleaned up and no files are output at all final Set actualFilenames = Files.list(destinationPath).map(Path::getFileName).map(Path::toString).collect(Collectors.toSet()); assertEquals(Collections.emptySet(), actualFilenames); } From 6afcb678e5eb410c13714957128f29a1584f3f26 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 19:45:39 -0700 Subject: [PATCH 12/17] clean up --- .../destination/csv/CsvDestinationTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java index f8793da7aaabe..16c0fae8d1d32 100644 --- a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java +++ b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java @@ -112,7 +112,7 @@ void testCheckSuccess() { @Test void testCheckFailure() throws IOException { - Path looksLikeADirectoryButIsAFile = destinationPath.resolve("file"); + final Path looksLikeADirectoryButIsAFile = destinationPath.resolve("file"); FileUtils.touch(looksLikeADirectoryButIsAFile.toFile()); final ObjectNode config = objectMapper.createObjectNode().put(CsvDestination.DESTINATION_PATH_FIELD, looksLikeADirectoryButIsAFile.toString()); final StandardCheckConnectionOutput actual = new CsvDestination().check(config); @@ -154,12 +154,12 @@ void testWriteSuccess() throws Exception { assertEquals(tasksActual, tasksExpected); // verify that the file is parsable as json (sanity check since the quoting is so goofy). - List actualUsersJson = csvToJson(destinationPath.resolve(USERS_FILE)); - List expectedUsersJson = Lists.newArrayList(SINGER_MESSAGE_USERS1.getRecord(), SINGER_MESSAGE_USERS2.getRecord()); + final List actualUsersJson = csvToJson(destinationPath.resolve(USERS_FILE)); + final List expectedUsersJson = Lists.newArrayList(SINGER_MESSAGE_USERS1.getRecord(), SINGER_MESSAGE_USERS2.getRecord()); assertEquals(expectedUsersJson, actualUsersJson); - List actualTasksJson = csvToJson(destinationPath.resolve(TASKS_FILE)); - List expectedTasksJson = Lists.newArrayList(SINGER_MESSAGE_TASKS1.getRecord(), SINGER_MESSAGE_TASKS2.getRecord()); + final List actualTasksJson = csvToJson(destinationPath.resolve(TASKS_FILE)); + final List expectedTasksJson = Lists.newArrayList(SINGER_MESSAGE_TASKS1.getRecord(), SINGER_MESSAGE_TASKS2.getRecord()); assertEquals(expectedTasksJson, actualTasksJson); // verify tmp files are cleaned up @@ -187,8 +187,8 @@ void testWriteFailure() throws Exception { } private List csvToJson(Path csvPath) throws IOException { - Reader in = new FileReader(csvPath.toFile()); - Iterable records = CSVFormat.DEFAULT + final Reader in = new FileReader(csvPath.toFile()); + final Iterable records = CSVFormat.DEFAULT .withHeader(CsvDestination.COLUMN_NAME) .withFirstRecordAsHeader() .parse(in); From 020e1f859a735c01699576966a67dc2451b39cd5 Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 8 Oct 2020 08:32:41 -0700 Subject: [PATCH 13/17] add toRuntime --- .../concurrency/LifecycledCallable.java | 1 + .../airbyte/commons/exception/Exceptions.java | 52 ++++++++++++++ .../VoidCallable.java | 3 +- .../io/airbyte/commons/io/LineGobbler.java | 2 +- .../concurrency/LifecycledCallableTest.java | 1 + .../commons/exception/ExceptionsTest.java | 70 +++++++++++++++++++ 6 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 airbyte-commons/src/main/java/io/airbyte/commons/exception/Exceptions.java rename airbyte-commons/src/main/java/io/airbyte/commons/{concurrency => functional}/VoidCallable.java (95%) create mode 100644 airbyte-commons/src/test/java/io/airbyte/commons/exception/ExceptionsTest.java diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/LifecycledCallable.java b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/LifecycledCallable.java index 2e7e942b49ba8..4e2a1f18f8803 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/LifecycledCallable.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/LifecycledCallable.java @@ -25,6 +25,7 @@ package io.airbyte.commons.concurrency; import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.functional.VoidCallable; import java.util.concurrent.Callable; public class LifecycledCallable implements Callable { diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/exception/Exceptions.java b/airbyte-commons/src/main/java/io/airbyte/commons/exception/Exceptions.java new file mode 100644 index 0000000000000..3b926dde718a2 --- /dev/null +++ b/airbyte-commons/src/main/java/io/airbyte/commons/exception/Exceptions.java @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.commons.exception; + +import io.airbyte.commons.functional.VoidCallable; +import java.util.concurrent.Callable; + +public class Exceptions { + + public static T toRuntime(Callable callable) { + try { + return callable.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void toRuntimeVoid(VoidCallable callable) { + try { + callable.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + +} diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/VoidCallable.java b/airbyte-commons/src/main/java/io/airbyte/commons/functional/VoidCallable.java similarity index 95% rename from airbyte-commons/src/main/java/io/airbyte/commons/concurrency/VoidCallable.java rename to airbyte-commons/src/main/java/io/airbyte/commons/functional/VoidCallable.java index 0ef4a31946796..6b8e846248f72 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/VoidCallable.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/functional/VoidCallable.java @@ -22,10 +22,11 @@ * SOFTWARE. */ -package io.airbyte.commons.concurrency; +package io.airbyte.commons.functional; import java.util.concurrent.Callable; +@FunctionalInterface public interface VoidCallable extends Callable { default @Override Void call() throws Exception { diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java b/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java index afe35617efcdc..e15fe1ad4f683 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java @@ -24,7 +24,7 @@ package io.airbyte.commons.io; -import io.airbyte.commons.concurrency.VoidCallable; +import io.airbyte.commons.functional.VoidCallable; import java.io.BufferedReader; import java.io.InputStream; import java.util.Map; diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/LifecycledCallableTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/LifecycledCallableTest.java index 834bfd4204571..a21907db04ad9 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/LifecycledCallableTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/LifecycledCallableTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.when; import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.functional.VoidCallable; import java.util.concurrent.Callable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/exception/ExceptionsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/exception/ExceptionsTest.java new file mode 100644 index 0000000000000..a2ee6f31dadae --- /dev/null +++ b/airbyte-commons/src/test/java/io/airbyte/commons/exception/ExceptionsTest.java @@ -0,0 +1,70 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.commons.exception; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +class ExceptionsTest { + + @Test + void testToRuntime() { + assertEquals("hello", Exceptions.toRuntime(() -> callable("hello", false))); + assertThrows(RuntimeException.class, () -> Exceptions.toRuntime(() -> callable("goodbye", true))); + } + + @Test + void testToRuntimeVoid() { + List list = new ArrayList<>(); + assertThrows(RuntimeException.class, () -> Exceptions.toRuntimeVoid(() -> voidCallable(list, "hello", true))); + assertEquals(0, list.size()); + + Exceptions.toRuntimeVoid(() -> voidCallable(list, "goodbye", false)); + assertEquals(1, list.size()); + assertEquals("goodbye", list.get(0)); + } + + private String callable(String input, boolean shouldThrow) throws IOException { + if (shouldThrow) { + throw new IOException(); + } else { + return input; + } + } + + private void voidCallable(List list, String input, boolean shouldThrow) throws IOException { + if (shouldThrow) { + throw new IOException(); + } else { + list.add(input); + } + } + +} From dc8179472945305df2e31561542e795ef359b884 Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 8 Oct 2020 11:59:53 -0700 Subject: [PATCH 14/17] fix destination --- .../destination/csv/CsvDestination.java | 16 ++++++++++------ .../destination/csv/CsvDestinationTest.java | 15 +++++++++------ 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index b45a69e681eb5..0a6e979e4f0b7 100644 --- a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -39,6 +39,7 @@ import io.airbyte.integrations.base.FailureTrackingConsumer; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.singer.SingerMessage; +import io.airbyte.singer.SingerMessage.Type; import java.io.FileWriter; import java.io.IOException; import java.nio.file.Files; @@ -84,7 +85,6 @@ public StandardDiscoverSchemaOutput discover(JsonNode config) { } /** - * * @param config - csv destination config. * @param schema - schema of the incoming messages. * @return - a consumer to handle writing records to the filesystem. @@ -141,12 +141,16 @@ public CsvConsumer(Map writeConfigs, Schema schema) { @Override protected void acceptTracked(SingerMessage singerMessage) throws Exception { - if (writeConfigs.containsKey(singerMessage.getStream())) { + + // ignore other message types. + if (singerMessage.getType() == Type.RECORD) { + if (!writeConfigs.containsKey(singerMessage.getStream())) { + throw new IllegalArgumentException( + String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", + Jsons.serialize(schema), Jsons.serialize(singerMessage))); + } + writeConfigs.get(singerMessage.getStream()).getWriter().printRecord(Jsons.serialize(singerMessage.getRecord())); - } else { - throw new IllegalArgumentException( - String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", - Jsons.serialize(schema), Jsons.serialize(singerMessage))); } } diff --git a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java index 16c0fae8d1d32..18dc857a6aaee 100644 --- a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java +++ b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java @@ -50,11 +50,11 @@ import java.io.Reader; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVRecord; import org.apache.commons.io.FileUtils; @@ -77,6 +77,10 @@ class CsvDestinationTest { .withRecord(objectMapper.createObjectNode().put("goal", "announce the game.")); private static final SingerMessage SINGER_MESSAGE_TASKS2 = new SingerMessage().withType(Type.RECORD).withStream(TASKS_STREAM_NAME) .withRecord(objectMapper.createObjectNode().put("goal", "ship some code.")); + // todo (cgardens) - may want to codify this in the integration scaffold. like there's a setting + // that says it just ignores state. + private static final SingerMessage SINGER_MESSAGE_RECORD = new SingerMessage().withType(Type.STATE) + .withValue(objectMapper.createObjectNode().put("checkpoint", "now!")); private static final Schema CATALOG = new Schema().withStreams(Lists.newArrayList( new Stream().withName(USERS_STREAM_NAME) @@ -133,6 +137,7 @@ void testWriteSuccess() throws Exception { consumer.accept(SINGER_MESSAGE_TASKS1); consumer.accept(SINGER_MESSAGE_USERS2); consumer.accept(SINGER_MESSAGE_TASKS2); + consumer.accept(SINGER_MESSAGE_RECORD); consumer.close(); // verify contents of CSV file @@ -193,11 +198,9 @@ private List csvToJson(Path csvPath) throws IOException { .withFirstRecordAsHeader() .parse(in); - final List jsonRecords = new ArrayList<>(); - for (final CSVRecord record : records) { - jsonRecords.add(Jsons.deserialize(record.toMap().get(CsvDestination.COLUMN_NAME))); - } - return jsonRecords; + return StreamSupport.stream(records.spliterator(), false) + .map(record -> Jsons.deserialize(record.toMap().get(CsvDestination.COLUMN_NAME))) + .collect(Collectors.toList()); } } From 1aba6bd4812f90dab385c6a6bb47c7252b2baa43 Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 8 Oct 2020 12:03:12 -0700 Subject: [PATCH 15/17] remove fake news comment --- .../integrations/destination/csv/CsvDestinationTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java index 18dc857a6aaee..40c72b293c55a 100644 --- a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java +++ b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java @@ -77,8 +77,6 @@ class CsvDestinationTest { .withRecord(objectMapper.createObjectNode().put("goal", "announce the game.")); private static final SingerMessage SINGER_MESSAGE_TASKS2 = new SingerMessage().withType(Type.RECORD).withStream(TASKS_STREAM_NAME) .withRecord(objectMapper.createObjectNode().put("goal", "ship some code.")); - // todo (cgardens) - may want to codify this in the integration scaffold. like there's a setting - // that says it just ignores state. private static final SingerMessage SINGER_MESSAGE_RECORD = new SingerMessage().withType(Type.STATE) .withValue(objectMapper.createObjectNode().put("checkpoint", "now!")); From 4addc80275d5469c3538e6d0d6f72b623cf124f5 Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 8 Oct 2020 13:36:19 -0700 Subject: [PATCH 16/17] feedback --- airbyte-integrations/base-java/javabase.sh | 3 ++- .../airbyte/integrations/destination/csv/CsvDestination.java | 2 +- .../csv-destination/src/main/resources/spec.json | 2 +- docs/integrations/destinations/local-csv2.md | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/base-java/javabase.sh b/airbyte-integrations/base-java/javabase.sh index 82e5138f50768..ea43f1a711813 100755 --- a/airbyte-integrations/base-java/javabase.sh +++ b/airbyte-integrations/base-java/javabase.sh @@ -2,6 +2,7 @@ set -e -# wrap run script in a script so that we can lazy evaluate the value of APPLICATION. APPLICATION is +# Wrap run script in a script so that we can lazy evaluate the value of APPLICATION. APPLICATION is # set by the dockerfile that inherits base-java, so it cannot be evaluated when base-java is built. +# We also need to make sure that stdin of the script is piped to the stdin of the java application. cat <&0 | bin/"$APPLICATION" "$@" diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index 0a6e979e4f0b7..70b8c6ac6c26c 100644 --- a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -58,7 +58,7 @@ public class CsvDestination implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(CsvDestination.class); - static final String COLUMN_NAME = "data"; // we output all data as a blog to a single column. + static final String COLUMN_NAME = "data"; // we output all data as a blob to a single column. static final String DESTINATION_PATH_FIELD = "destination_path"; @Override diff --git a/airbyte-integrations/csv-destination/src/main/resources/spec.json b/airbyte-integrations/csv-destination/src/main/resources/spec.json index b94852dd22d9e..99c54a269b0c2 100644 --- a/airbyte-integrations/csv-destination/src/main/resources/spec.json +++ b/airbyte-integrations/csv-destination/src/main/resources/spec.json @@ -10,7 +10,7 @@ "additionalProperties": false, "properties": { "destination_path": { - "description": "Path to the directory where csv files will be written. Must start with the local mount \"/local\". Any other directory appended on the end will be placed inside that local mount.", + "description": "Path to the directory where csv files will be written. Must start with the local mount \"/local\". Any other directory appended on the end will be placed inside that local mount. For more information check out our docs", "type": "string", "examples": ["/local"], "pattern": "(^\\/local\\/.*)|(^\\/local$)" diff --git a/docs/integrations/destinations/local-csv2.md b/docs/integrations/destinations/local-csv2.md index f1eafe6b7ae13..c816908435ebc 100644 --- a/docs/integrations/destinations/local-csv2.md +++ b/docs/integrations/destinations/local-csv2.md @@ -2,7 +2,7 @@ ## Overview -This destination writes data to a directory on the _local_ filesystem on the host running Airbyte. By default, data is written to `/tmp/airbyte_local`. To change this location, modify the `LOCAL_ROOT` environment variable for Airbyte. +This destination writes data to a directory on the _local_ filesystem on the host running Airbyte. By default, data is written to `/tmp/airbyte_local`. To change this location, modify the `LOCAL_ROOT` environment variable for Airbyte in the `.env` file. ### Sync Overview From d35e67034858c109f49579c5b1cf3358e634ae87 Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 8 Oct 2020 13:46:23 -0700 Subject: [PATCH 17/17] Revert "add toRuntime" This reverts commit 020e1f859a735c01699576966a67dc2451b39cd5. --- .../concurrency/LifecycledCallable.java | 1 - .../VoidCallable.java | 3 +- .../airbyte/commons/exception/Exceptions.java | 52 -------------- .../io/airbyte/commons/io/LineGobbler.java | 2 +- .../concurrency/LifecycledCallableTest.java | 1 - .../commons/exception/ExceptionsTest.java | 70 ------------------- 6 files changed, 2 insertions(+), 127 deletions(-) rename airbyte-commons/src/main/java/io/airbyte/commons/{functional => concurrency}/VoidCallable.java (95%) delete mode 100644 airbyte-commons/src/main/java/io/airbyte/commons/exception/Exceptions.java delete mode 100644 airbyte-commons/src/test/java/io/airbyte/commons/exception/ExceptionsTest.java diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/LifecycledCallable.java b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/LifecycledCallable.java index 4e2a1f18f8803..2e7e942b49ba8 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/LifecycledCallable.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/LifecycledCallable.java @@ -25,7 +25,6 @@ package io.airbyte.commons.concurrency; import io.airbyte.commons.functional.CheckedConsumer; -import io.airbyte.commons.functional.VoidCallable; import java.util.concurrent.Callable; public class LifecycledCallable implements Callable { diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/functional/VoidCallable.java b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/VoidCallable.java similarity index 95% rename from airbyte-commons/src/main/java/io/airbyte/commons/functional/VoidCallable.java rename to airbyte-commons/src/main/java/io/airbyte/commons/concurrency/VoidCallable.java index 6b8e846248f72..0ef4a31946796 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/functional/VoidCallable.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/VoidCallable.java @@ -22,11 +22,10 @@ * SOFTWARE. */ -package io.airbyte.commons.functional; +package io.airbyte.commons.concurrency; import java.util.concurrent.Callable; -@FunctionalInterface public interface VoidCallable extends Callable { default @Override Void call() throws Exception { diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/exception/Exceptions.java b/airbyte-commons/src/main/java/io/airbyte/commons/exception/Exceptions.java deleted file mode 100644 index 3b926dde718a2..0000000000000 --- a/airbyte-commons/src/main/java/io/airbyte/commons/exception/Exceptions.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.commons.exception; - -import io.airbyte.commons.functional.VoidCallable; -import java.util.concurrent.Callable; - -public class Exceptions { - - public static T toRuntime(Callable callable) { - try { - return callable.call(); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static void toRuntimeVoid(VoidCallable callable) { - try { - callable.call(); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - -} diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java b/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java index e15fe1ad4f683..afe35617efcdc 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/io/LineGobbler.java @@ -24,7 +24,7 @@ package io.airbyte.commons.io; -import io.airbyte.commons.functional.VoidCallable; +import io.airbyte.commons.concurrency.VoidCallable; import java.io.BufferedReader; import java.io.InputStream; import java.util.Map; diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/LifecycledCallableTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/LifecycledCallableTest.java index a21907db04ad9..834bfd4204571 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/LifecycledCallableTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/LifecycledCallableTest.java @@ -31,7 +31,6 @@ import static org.mockito.Mockito.when; import io.airbyte.commons.functional.CheckedConsumer; -import io.airbyte.commons.functional.VoidCallable; import java.util.concurrent.Callable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/exception/ExceptionsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/exception/ExceptionsTest.java deleted file mode 100644 index a2ee6f31dadae..0000000000000 --- a/airbyte-commons/src/test/java/io/airbyte/commons/exception/ExceptionsTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.commons.exception; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import org.junit.jupiter.api.Test; - -class ExceptionsTest { - - @Test - void testToRuntime() { - assertEquals("hello", Exceptions.toRuntime(() -> callable("hello", false))); - assertThrows(RuntimeException.class, () -> Exceptions.toRuntime(() -> callable("goodbye", true))); - } - - @Test - void testToRuntimeVoid() { - List list = new ArrayList<>(); - assertThrows(RuntimeException.class, () -> Exceptions.toRuntimeVoid(() -> voidCallable(list, "hello", true))); - assertEquals(0, list.size()); - - Exceptions.toRuntimeVoid(() -> voidCallable(list, "goodbye", false)); - assertEquals(1, list.size()); - assertEquals("goodbye", list.get(0)); - } - - private String callable(String input, boolean shouldThrow) throws IOException { - if (shouldThrow) { - throw new IOException(); - } else { - return input; - } - } - - private void voidCallable(List list, String input, boolean shouldThrow) throws IOException { - if (shouldThrow) { - throw new IOException(); - } else { - list.add(input); - } - } - -}