diff --git a/airbyte-integrations/base-java/Dockerfile b/airbyte-integrations/base-java/Dockerfile index a523a45fc32ad..5271b3d58e9b9 100644 --- a/airbyte-integrations/base-java/Dockerfile +++ b/airbyte-integrations/base-java/Dockerfile @@ -10,6 +10,6 @@ ENV AIRBYTE_SPEC_CMD "./javabase.sh --spec" ENV AIRBYTE_CHECK_CMD "./javabase.sh --check" ENV AIRBYTE_DISCOVER_CMD "./javabase.sh --discover" ENV AIRBYTE_READ_CMD "./javabase.sh --read" -ENV AIRBYTE_READ_CMD "./javabase.sh --write" +ENV AIRBYTE_WRITE_CMD "./javabase.sh --write" ENTRYPOINT ["/airbyte/base.sh"] diff --git a/airbyte-integrations/base-java/javabase.sh b/airbyte-integrations/base-java/javabase.sh index 48ffa37c92276..ea43f1a711813 100755 --- a/airbyte-integrations/base-java/javabase.sh +++ b/airbyte-integrations/base-java/javabase.sh @@ -2,6 +2,7 @@ set -e -# wrap run script in a script so that we can lazy evaluate the value of APPLICATION. APPLICATION is +# Wrap run script in a script so that we can lazy evaluate the value of APPLICATION. APPLICATION is # set by the dockerfile that inherits base-java, so it cannot be evaluated when base-java is built. -bin/"$APPLICATION" "$@" +# We also need to make sure that stdin of the script is piped to the stdin of the java application. +cat <&0 | bin/"$APPLICATION" "$@" diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java new file mode 100644 index 0000000000000..c32a03d8ac8ce --- /dev/null +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/FailureTrackingConsumer.java @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.base; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class FailureTrackingConsumer implements DestinationConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(FailureTrackingConsumer.class); + + private boolean hasFailed = false; + + protected abstract void acceptTracked(T t) throws Exception; + + public void accept(T t) throws Exception { + try { + acceptTracked(t); + } catch (Exception e) { + hasFailed = true; + throw e; + } + } + + protected abstract void close(boolean hasFailed) throws Exception; + + public void close() throws Exception { + LOGGER.info("hasFailed: {}.", hasFailed); + close(hasFailed); + } + +} diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java index cc94543c45112..15dadf9fe4db1 100644 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java @@ -24,8 +24,6 @@ package io.airbyte.integrations.base; -import java.nio.file.Path; - public class JavaBaseConstants { public static String ARGS_CONFIG_KEY = "config"; @@ -36,8 +34,4 @@ public class JavaBaseConstants { public static String ARGS_CATALOG_DESC = "input path for the catalog"; public static String ARGS_PATH_DESC = "path to the json-encoded state file"; - // todo (cgardens) - this mount path should be passed in by the worker and read as an arg or - // environment variable by the runner. - public static Path LOCAL_MOUNT = Path.of("/local"); - } diff --git a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java new file mode 100644 index 0000000000000..487fd5754cfab --- /dev/null +++ b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/FailureTrackingConsumerTest.java @@ -0,0 +1,71 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.base; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.Test; + +class FailureTrackingConsumerTest { + + @Test + void testNoFailure() throws Exception { + final TestConsumer consumer = spy(new TestConsumer()); + consumer.accept(""); + consumer.close(); + + verify(consumer).close(false); + } + + @Test + void testWithFailure() throws Exception { + final TestConsumer consumer = spy(new TestConsumer()); + doThrow(new RuntimeException()).when(consumer).acceptTracked(""); + + // verify the exception still gets thrown. + assertThrows(RuntimeException.class, () -> consumer.accept("")); + consumer.close(); + + verify(consumer).close(true); + } + + static class TestConsumer extends FailureTrackingConsumer { + + @Override + protected void acceptTracked(String s) { + + } + + @Override + protected void close(boolean hasFailed) { + + } + + } + +} diff --git a/airbyte-integrations/base/base.sh b/airbyte-integrations/base/base.sh index d27d1096cc18a..869ca581f33e0 100755 --- a/airbyte-integrations/base/base.sh +++ b/airbyte-integrations/base/base.sh @@ -51,6 +51,9 @@ function main() { # todo: state should be optional: --state "$STATE_FILE" eval "$AIRBYTE_READ_CMD" --config "$CONFIG_FILE" --catalog "$CATALOG_FILE" ;; + write) + eval "$AIRBYTE_WRITE_CMD" --config "$CONFIG_FILE" --catalog "$CATALOG_FILE" + ;; *) error "Unknown command: $CMD" ;; diff --git a/airbyte-integrations/csv-destination/.dockerignore b/airbyte-integrations/csv-destination/.dockerignore new file mode 100644 index 0000000000000..65c7d0ad3e73c --- /dev/null +++ b/airbyte-integrations/csv-destination/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/csv-destination/Dockerfile b/airbyte-integrations/csv-destination/Dockerfile new file mode 100644 index 0000000000000..f82c9be477826 --- /dev/null +++ b/airbyte-integrations/csv-destination/Dockerfile @@ -0,0 +1,8 @@ +FROM airbyte/base-java:dev + +WORKDIR /airbyte +ENV APPLICATION csv-destination + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 diff --git a/airbyte-integrations/csv-destination/build.gradle b/airbyte-integrations/csv-destination/build.gradle new file mode 100644 index 0000000000000..81223f4551346 --- /dev/null +++ b/airbyte-integrations/csv-destination/build.gradle @@ -0,0 +1,31 @@ +import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage +plugins { + id 'com.bmuschko.docker-remote-api' + id 'application' +} +dependencies { + implementation project(':airbyte-config:models') + implementation project(':airbyte-singer') + implementation project(':airbyte-integrations:base-java') + + implementation 'org.apache.commons:commons-csv:1.4' +} + +application { + mainClass = 'io.airbyte.integrations.destination.csv.CsvDestination' +} + + +def image = 'airbyte/airbyte-csv-destination:dev' + +task imageName { + doLast { + println "IMAGE $image" + } +} + +task buildImage(type: DockerBuildImage) { + inputDir = projectDir + images.add(image) + dependsOn ':airbyte-integrations:base-java:buildImage' +} diff --git a/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java new file mode 100644 index 0000000000000..70b8c6ac6c26c --- /dev/null +++ b/airbyte-integrations/csv-destination/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -0,0 +1,215 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.csv; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.DestinationConnectionSpecification; +import io.airbyte.config.Schema; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.StandardCheckConnectionOutput.Status; +import io.airbyte.config.StandardDiscoverSchemaOutput; +import io.airbyte.config.Stream; +import io.airbyte.integrations.base.Destination; +import io.airbyte.integrations.base.DestinationConsumer; +import io.airbyte.integrations.base.FailureTrackingConsumer; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.singer.SingerMessage; +import io.airbyte.singer.SingerMessage.Type; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CsvDestination implements Destination { + + private static final Logger LOGGER = LoggerFactory.getLogger(CsvDestination.class); + + static final String COLUMN_NAME = "data"; // we output all data as a blob to a single column. + static final String DESTINATION_PATH_FIELD = "destination_path"; + + @Override + public DestinationConnectionSpecification spec() throws IOException { + final String resourceString = MoreResources.readResource("spec.json"); + return Jsons.deserialize(resourceString, DestinationConnectionSpecification.class); + } + + @Override + public StandardCheckConnectionOutput check(JsonNode config) { + try { + FileUtils.forceMkdir(getDestinationPath(config).toFile()); + } catch (Exception e) { + return new StandardCheckConnectionOutput().withStatus(Status.FAILURE).withMessage(e.getMessage()); + } + return new StandardCheckConnectionOutput().withStatus(Status.SUCCESS); + } + + // todo (cgardens) - we currently don't leverage discover in our destinations, so skipping + // implementing it... for now. + @Override + public StandardDiscoverSchemaOutput discover(JsonNode config) { + throw new RuntimeException("Not Implemented"); + } + + /** + * @param config - csv destination config. + * @param schema - schema of the incoming messages. + * @return - a consumer to handle writing records to the filesystem. + * @throws IOException - exception throw in manipulating the filesytem. + */ + @Override + public DestinationConsumer write(JsonNode config, Schema schema) throws IOException { + final Path destinationDir = getDestinationPath(config); + + FileUtils.forceMkdir(destinationDir.toFile()); + + final long now = Instant.now().toEpochMilli(); + final Map writeConfigs = new HashMap<>(); + for (final Stream stream : schema.getStreams()) { + final Path tmpPath = destinationDir.resolve(stream.getName() + "_" + now + ".csv"); + final Path finalPath = destinationDir.resolve(stream.getName() + ".csv"); + final FileWriter fileWriter = new FileWriter(tmpPath.toFile()); + final CSVPrinter printer = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(COLUMN_NAME)); + writeConfigs.put(stream.getName(), new WriteConfig(printer, tmpPath, finalPath)); + } + + return new CsvConsumer(writeConfigs, schema); + } + + /** + * Extract provided relative path from csv config object and append to local mount path. + * + * @param config - csv config object + * @return absolute path with the relative path appended to the local volume mount. + */ + private Path getDestinationPath(JsonNode config) { + final String destinationRelativePath = config.get(DESTINATION_PATH_FIELD).asText(); + Preconditions.checkNotNull(destinationRelativePath); + + return Path.of(destinationRelativePath); + } + + /** + * This consumer writes individual records to temporary files. If all of the messages are written + * successfully, it moves the tmp files to files named by their respective stream. If there are any + * failures, nothing is written. + */ + private static class CsvConsumer extends FailureTrackingConsumer { + + private final Map writeConfigs; + private final Schema schema; + + public CsvConsumer(Map writeConfigs, Schema schema) { + this.schema = schema; + LOGGER.info("initializing consumer."); + + this.writeConfigs = writeConfigs; + } + + @Override + protected void acceptTracked(SingerMessage singerMessage) throws Exception { + + // ignore other message types. + if (singerMessage.getType() == Type.RECORD) { + if (!writeConfigs.containsKey(singerMessage.getStream())) { + throw new IllegalArgumentException( + String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s", + Jsons.serialize(schema), Jsons.serialize(singerMessage))); + } + + writeConfigs.get(singerMessage.getStream()).getWriter().printRecord(Jsons.serialize(singerMessage.getRecord())); + } + } + + @Override + protected void close(boolean hasFailed) throws IOException { + LOGGER.info("finalizing consumer."); + + for (final Map.Entry entries : writeConfigs.entrySet()) { + try { + entries.getValue().getWriter().flush(); + entries.getValue().getWriter().close(); + } catch (Exception e) { + hasFailed = true; + LOGGER.error("failed to close writer for: {}.", entries.getKey()); + } + } + // do not persist the data, if there are any failures. + if (!hasFailed) { + for (final WriteConfig writeConfig : writeConfigs.values()) { + Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + // clean up tmp files. + for (final WriteConfig writeConfig : writeConfigs.values()) { + Files.deleteIfExists(writeConfig.getTmpPath()); + } + + } + + } + + private static class WriteConfig { + + private final CSVPrinter writer; + private final Path tmpPath; + private final Path finalPath; + + public WriteConfig(CSVPrinter writer, Path tmpPath, Path finalPath) { + this.writer = writer; + this.tmpPath = tmpPath; + this.finalPath = finalPath; + } + + public CSVPrinter getWriter() { + return writer; + } + + public Path getTmpPath() { + return tmpPath; + } + + public Path getFinalPath() { + return finalPath; + } + + } + + public static void main(String[] args) throws Exception { + new IntegrationRunner(new CsvDestination()).run(args); + } + +} diff --git a/airbyte-integrations/csv-destination/src/main/resources/spec.json b/airbyte-integrations/csv-destination/src/main/resources/spec.json new file mode 100644 index 0000000000000..99c54a269b0c2 --- /dev/null +++ b/airbyte-integrations/csv-destination/src/main/resources/spec.json @@ -0,0 +1,20 @@ +{ + "destinationId": "", + "destinationSpecificationId": "", + "documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv", + "specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "CSV Destination Spec", + "type": "object", + "required": ["destination_path"], + "additionalProperties": false, + "properties": { + "destination_path": { + "description": "Path to the directory where csv files will be written. Must start with the local mount \"/local\". Any other directory appended on the end will be placed inside that local mount. For more information check out our docs", + "type": "string", + "examples": ["/local"], + "pattern": "(^\\/local\\/.*)|(^\\/local$)" + } + } + } +} diff --git a/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java new file mode 100644 index 0000000000000..40c72b293c55a --- /dev/null +++ b/airbyte-integrations/csv-destination/src/test/java/io/airbyte/integrations/destination/csv/CsvDestinationTest.java @@ -0,0 +1,204 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.csv; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.DataType; +import io.airbyte.config.DestinationConnectionSpecification; +import io.airbyte.config.Field; +import io.airbyte.config.Schema; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.StandardCheckConnectionOutput.Status; +import io.airbyte.config.Stream; +import io.airbyte.integrations.base.DestinationConsumer; +import io.airbyte.singer.SingerMessage; +import io.airbyte.singer.SingerMessage.Type; +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CsvDestinationTest { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private static final String USERS_STREAM_NAME = "users"; + private static final String TASKS_STREAM_NAME = "tasks"; + private static final String USERS_FILE = USERS_STREAM_NAME + ".csv"; + private static final String TASKS_FILE = TASKS_STREAM_NAME + ".csv"; + private static final SingerMessage SINGER_MESSAGE_USERS1 = new SingerMessage().withType(Type.RECORD).withStream(USERS_STREAM_NAME) + .withRecord(objectMapper.createObjectNode().put("name", "john").put("id", "10")); + private static final SingerMessage SINGER_MESSAGE_USERS2 = new SingerMessage().withType(Type.RECORD).withStream(USERS_STREAM_NAME) + .withRecord(objectMapper.createObjectNode().put("name", "susan").put("id", "30")); + private static final SingerMessage SINGER_MESSAGE_TASKS1 = new SingerMessage().withType(Type.RECORD).withStream(TASKS_STREAM_NAME) + .withRecord(objectMapper.createObjectNode().put("goal", "announce the game.")); + private static final SingerMessage SINGER_MESSAGE_TASKS2 = new SingerMessage().withType(Type.RECORD).withStream(TASKS_STREAM_NAME) + .withRecord(objectMapper.createObjectNode().put("goal", "ship some code.")); + private static final SingerMessage SINGER_MESSAGE_RECORD = new SingerMessage().withType(Type.STATE) + .withValue(objectMapper.createObjectNode().put("checkpoint", "now!")); + + private static final Schema CATALOG = new Schema().withStreams(Lists.newArrayList( + new Stream().withName(USERS_STREAM_NAME) + .withFields(Lists.newArrayList(new Field().withName("name").withDataType(DataType.STRING).withSelected(true), + new Field().withName("id").withDataType(DataType.STRING).withSelected(true))), + new Stream().withName(TASKS_STREAM_NAME) + .withFields(Lists.newArrayList(new Field().withName("goal").withDataType(DataType.STRING).withSelected(true))))); + + private Path destinationPath; + private ObjectNode config; + + @BeforeEach + void setup() throws IOException { + destinationPath = Files.createTempDirectory("test"); + config = objectMapper.createObjectNode().put(CsvDestination.DESTINATION_PATH_FIELD, destinationPath.toString()); + } + + @Test + void testSpec() throws IOException { + final DestinationConnectionSpecification actual = new CsvDestination().spec(); + final String resourceString = MoreResources.readResource("spec.json"); + final DestinationConnectionSpecification expected = Jsons.deserialize(resourceString, DestinationConnectionSpecification.class); + + assertEquals(expected, actual); + } + + @Test + void testCheckSuccess() { + final StandardCheckConnectionOutput actual = new CsvDestination().check(config); + final StandardCheckConnectionOutput expected = new StandardCheckConnectionOutput().withStatus(Status.SUCCESS); + assertEquals(expected, actual); + } + + @Test + void testCheckFailure() throws IOException { + final Path looksLikeADirectoryButIsAFile = destinationPath.resolve("file"); + FileUtils.touch(looksLikeADirectoryButIsAFile.toFile()); + final ObjectNode config = objectMapper.createObjectNode().put(CsvDestination.DESTINATION_PATH_FIELD, looksLikeADirectoryButIsAFile.toString()); + final StandardCheckConnectionOutput actual = new CsvDestination().check(config); + final StandardCheckConnectionOutput expected = new StandardCheckConnectionOutput().withStatus(Status.FAILURE); + + // the message includes the random file path, so just verify it exists and then remove it when we do + // rest of the comparison. + assertNotNull(actual.getMessage()); + actual.setMessage(null); + assertEquals(expected, actual); + } + + @Test + void testWriteSuccess() throws Exception { + final DestinationConsumer consumer = new CsvDestination().write(config, CATALOG); + + consumer.accept(SINGER_MESSAGE_USERS1); + consumer.accept(SINGER_MESSAGE_TASKS1); + consumer.accept(SINGER_MESSAGE_USERS2); + consumer.accept(SINGER_MESSAGE_TASKS2); + consumer.accept(SINGER_MESSAGE_RECORD); + consumer.close(); + + // verify contents of CSV file + final List usersActual = Files.readAllLines(destinationPath.resolve(USERS_FILE)); + // csv adds all of these goofy quotes. + final List usersExpected = Lists.newArrayList( + CsvDestination.COLUMN_NAME, + "\"{\"\"name\"\":\"\"john\"\",\"\"id\"\":\"\"10\"\"}\"", + "\"{\"\"name\"\":\"\"susan\"\",\"\"id\"\":\"\"30\"\"}\""); + + assertEquals(usersExpected, usersActual); + + final List tasksActual = Files.readAllLines(destinationPath.resolve(TASKS_FILE)); + final List tasksExpected = Lists.newArrayList( + CsvDestination.COLUMN_NAME, + "\"{\"\"goal\"\":\"\"announce the game.\"\"}\"", + "\"{\"\"goal\"\":\"\"ship some code.\"\"}\""); + + assertEquals(tasksActual, tasksExpected); + + // verify that the file is parsable as json (sanity check since the quoting is so goofy). + final List actualUsersJson = csvToJson(destinationPath.resolve(USERS_FILE)); + final List expectedUsersJson = Lists.newArrayList(SINGER_MESSAGE_USERS1.getRecord(), SINGER_MESSAGE_USERS2.getRecord()); + assertEquals(expectedUsersJson, actualUsersJson); + + final List actualTasksJson = csvToJson(destinationPath.resolve(TASKS_FILE)); + final List expectedTasksJson = Lists.newArrayList(SINGER_MESSAGE_TASKS1.getRecord(), SINGER_MESSAGE_TASKS2.getRecord()); + assertEquals(expectedTasksJson, actualTasksJson); + + // verify tmp files are cleaned up + final Set actualFilenames = Files.list(destinationPath).map(Path::getFileName).map(Path::toString).collect(Collectors.toSet()); + final Set expectedFilenames = Sets.newHashSet(USERS_FILE, TASKS_FILE); + assertEquals(expectedFilenames, actualFilenames); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + void testWriteFailure() throws Exception { + // hack to force an exception to be thrown from within the consumer. + final SingerMessage spiedMessage = spy(SINGER_MESSAGE_USERS1); + doThrow(new RuntimeException()).when(spiedMessage).getStream(); + + final DestinationConsumer consumer = spy(new CsvDestination().write(config, CATALOG)); + + assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage)); + consumer.accept(SINGER_MESSAGE_USERS2); + consumer.close(); + + // verify tmp files are cleaned up and no files are output at all + final Set actualFilenames = Files.list(destinationPath).map(Path::getFileName).map(Path::toString).collect(Collectors.toSet()); + assertEquals(Collections.emptySet(), actualFilenames); + } + + private List csvToJson(Path csvPath) throws IOException { + final Reader in = new FileReader(csvPath.toFile()); + final Iterable records = CSVFormat.DEFAULT + .withHeader(CsvDestination.COLUMN_NAME) + .withFirstRecordAsHeader() + .parse(in); + + return StreamSupport.stream(records.spliterator(), false) + .map(record -> Jsons.deserialize(record.toMap().get(CsvDestination.COLUMN_NAME))) + .collect(Collectors.toList()); + } + +} diff --git a/docs/integrations/destinations/local-csv2.md b/docs/integrations/destinations/local-csv2.md new file mode 100644 index 0000000000000..c816908435ebc --- /dev/null +++ b/docs/integrations/destinations/local-csv2.md @@ -0,0 +1,34 @@ +# Local CSV + +## Overview + +This destination writes data to a directory on the _local_ filesystem on the host running Airbyte. By default, data is written to `/tmp/airbyte_local`. To change this location, modify the `LOCAL_ROOT` environment variable for Airbyte in the `.env` file. + +### Sync Overview + +#### Output schema + +This destination outputs files with the name of the stream. Each row will be written as a new line in the output CSV file. + +#### Data Type Mapping + +The output file will have a single column called `data` which will be populated by the full record as a json blob. + +#### Features + +This section should contain a table with the following format: + +| Feature | Supported | +| :--- | :--- | +| Full Refresh Sync | Yes | + +#### Performance considerations + +This integration will be constrained by the speed at which your filesystem accepts writes. + +## Getting Started + +### Requirements: + +* The `destination_path` field must start with `/local` which is the name of the local mount that points to `LOCAL_ROOT`. Any other directories in this path will be placed inside the `LOCAL_ROOT`. By default, the value of `LOCAL_ROOT` is `/tmp/airbyte_local`. e.g. if `destination_path` is `/local/my/data`, the output will be written to `/tmp/airbyte_local/my/data`. +