From a280113ccef130c1479a881fef617530f221e76d Mon Sep 17 00:00:00 2001 From: VitaliiMaltsev <39538064+VitaliiMaltsev@users.noreply.github.com> Date: Wed, 10 Aug 2022 20:44:51 +0300 Subject: [PATCH] Destination S3: add LZO compression support (#15394) * Fixed bucket naming for S3 * Destination S3: add LZO compression support for parquet files * Destination S3: add LZO compression support for parquet files * implemented logic for aarch64 * removed redundant logging * updated changelog * moved intstall of native-lzo lib to Dockerfile * removed redundant logging * add unit test for aarch64 * bump version * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 2 +- .../connectors/destination-s3/Dockerfile | 29 +++++++++++- .../connectors/destination-s3/build.gradle | 8 +++- .../s3/util/JavaProcessRunner.java | 30 ++++++++++++ .../parquet/ParquetSerializedBufferTest.java | 46 +++++++++++++++++++ docs/integrations/destinations/s3.md | 1 + 7 files changed, 113 insertions(+), 5 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/JavaProcessRunner.java diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 63f6fe788372..0fa910b2a198 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -250,7 +250,7 @@ - name: S3 destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.3.12 + dockerImageTag: 0.3.13 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 icon: s3.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 2ddd01880f89..ad360c84386f 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -3974,7 +3974,7 @@ supported_destination_sync_modes: - "append" - "overwrite" -- dockerImage: "airbyte/destination-s3:0.3.12" +- dockerImage: "airbyte/destination-s3:0.3.13" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index d00a1aa7085e..d0a8e9d0a05b 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -15,6 +15,31 @@ WORKDIR /airbyte ENV APPLICATION destination-s3 COPY --from=build /airbyte /airbyte - -LABEL io.airbyte.version=0.3.12 +RUN /bin/bash -c 'set -e && \ + ARCH=`uname -m` && \ + if [ "$ARCH" == "x86_64" ] || [ "$ARCH" = "amd64" ]; then \ + echo "$ARCH" && \ + apt-get update; \ + apt-get install lzop liblzo2-2 liblzo2-dev -y; \ + elif [ "$ARCH" == "aarch64" ] || [ "$ARCH" = "arm64" ]; then \ + echo "$ARCH" && \ + apt-get update; \ + apt-get install lzop liblzo2-2 liblzo2-dev wget curl unzip zip build-essential maven git -y; \ + wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz -P /tmp; \ + cd /tmp && tar xvfz lzo-2.10.tar.gz; \ + cd /tmp/lzo-2.10/ && ./configure --enable-shared --prefix /usr/local/lzo-2.10; \ + cd /tmp/lzo-2.10/ && make; \ + cd /tmp/lzo-2.10/ && make install; \ + git clone https://github.com/twitter/hadoop-lzo.git /usr/lib/hadoop/lib/hadoop-lzo/; \ + curl -s "https://get.sdkman.io" | bash; \ + source /root/.sdkman/bin/sdkman-init.sh; \ + sdk install java 8.0.342-librca; \ + sdk use java 8.0.342-librca; \ + cd /usr/lib/hadoop/lib/hadoop-lzo/ && C_INCLUDE_PATH=/usr/local/lzo-2.10/include LIBRARY_PATH=/usr/local/lzo-2.10/lib mvn clean package; \ + find /usr/lib/hadoop/lib/hadoop-lzo/ -name '*libgplcompression*' -exec cp {} /usr/lib/ \; ;\ + else \ + echo "unknown arch" ;\ + fi' + +LABEL io.airbyte.version=0.3.13 LABEL io.airbyte.name=airbyte/destination-s3 diff --git a/airbyte-integrations/connectors/destination-s3/build.gradle b/airbyte-integrations/connectors/destination-s3/build.gradle index b99d787bd6bd..6eb28cccadad 100644 --- a/airbyte-integrations/connectors/destination-s3/build.gradle +++ b/airbyte-integrations/connectors/destination-s3/build.gradle @@ -3,6 +3,12 @@ plugins { id 'airbyte-docker' id 'airbyte-integration-test-java' } +repositories { + mavenCentral() + maven { + url = uri("https://maven.twttr.com") + } +} application { mainClass = 'io.airbyte.integrations.destination.s3.S3Destination' @@ -32,7 +38,7 @@ dependencies { } implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'} implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'} - + implementation group: 'com.hadoop.gplcompression', name: 'hadoop-lzo', version: '0.4.20' testImplementation 'org.apache.commons:commons-lang3:3.11' testImplementation 'org.xerial.snappy:snappy-java:1.1.8.4' testImplementation "org.mockito:mockito-inline:4.1.0" diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/JavaProcessRunner.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/JavaProcessRunner.java new file mode 100644 index 000000000000..d14a18738858 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/util/JavaProcessRunner.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3.util; + +import io.airbyte.commons.io.LineGobbler; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JavaProcessRunner { + + private static final Logger LOGGER = LoggerFactory.getLogger(JavaProcessRunner.class); + + public static void runProcess(final String path, final Runtime run, final String... commands) throws IOException, InterruptedException { + LOGGER.info("Running process: " + Arrays.asList(commands)); + final Process pr = path.equals(System.getProperty("user.dir")) ? run.exec(commands) : run.exec(commands, null, new File(path)); + LineGobbler.gobble(pr.getErrorStream(), LOGGER::error); + LineGobbler.gobble(pr.getInputStream(), LOGGER::info); + if (!pr.waitFor(10, TimeUnit.MINUTES)) { + pr.destroy(); + throw new RuntimeException("Timeout while executing: " + Arrays.toString(commands)); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBufferTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBufferTest.java index b94c007f816a..8cb1a6f7d91e 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBufferTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/parquet/ParquetSerializedBufferTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.s3.parquet; +import static io.airbyte.integrations.destination.s3.util.JavaProcessRunner.runProcess; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -77,6 +78,51 @@ public void testCompressedParquetWriter() throws Exception { runTest(195L, 215L, config, getExpectedString()); } + private static String resolveArchitecture() { + return System.getProperty("os.name").replace(' ', '_') + "-" + System.getProperty("os.arch") + "-" + System.getProperty("sun.arch.data.model"); + } + + @Test + public void testLzoCompressedParquet() throws Exception { + final String currentDir = System.getProperty("user.dir"); + Runtime runtime = Runtime.getRuntime(); + final String architecture = resolveArchitecture(); + if (architecture.equals("Linux-amd64-64") || architecture.equals("Linux-x86_64-64")) { + runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get update"); + runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get install lzop liblzo2-2 liblzo2-dev -y"); + runLzoParquetTest(); + } else if (architecture.equals("Linux-aarch64-64") || architecture.equals("Linux-arm64-64")) { + runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get update"); + runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get install lzop liblzo2-2 liblzo2-dev " + + "wget curl unzip zip build-essential maven git -y"); + runProcess(currentDir, runtime, "/bin/sh", "-c", "wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz -P /usr/local/tmp"); + runProcess("/usr/local/tmp/", runtime, "/bin/sh", "-c", "tar xvfz lzo-2.10.tar.gz"); + runProcess("/usr/local/tmp/lzo-2.10/", runtime, "/bin/sh", "-c", "./configure --enable-shared --prefix /usr/local/lzo-2.10"); + runProcess("/usr/local/tmp/lzo-2.10/", runtime, "/bin/sh", "-c", "make && make install"); + runProcess(currentDir, runtime, "/bin/sh", "-c", "git clone https://github.com/twitter/hadoop-lzo.git /usr/lib/hadoop/lib/hadoop-lzo/"); + runProcess(currentDir, runtime, "/bin/sh", "-c", "curl -s https://get.sdkman.io | bash"); + runProcess(currentDir, runtime, "/bin/bash", "-c", "source /root/.sdkman/bin/sdkman-init.sh;" + + " sdk install java 8.0.342-librca;" + + " sdk use java 8.0.342-librca;" + + " cd /usr/lib/hadoop/lib/hadoop-lzo/ " + + "&& C_INCLUDE_PATH=/usr/local/lzo-2.10/include " + + "LIBRARY_PATH=/usr/local/lzo-2.10/lib mvn clean package"); + runProcess(currentDir, runtime, "/bin/sh", "-c", + "find /usr/lib/hadoop/lib/hadoop-lzo/ -name '*libgplcompression*' -exec cp {} /usr/lib/ \\;"); + runLzoParquetTest(); + } + } + + private void runLzoParquetTest() throws Exception { + final S3DestinationConfig config = S3DestinationConfig.getS3DestinationConfig(Jsons.jsonNode(Map.of( + "format", Map.of( + "format_type", "parquet", + "compression_codec", "LZO"), + "s3_bucket_name", "test", + "s3_bucket_region", "us-east-2"))); + runTest(195L, 215L, config, getExpectedString()); + } + private static String getExpectedString() { return "{\"_airbyte_ab_id\": \"\", \"_airbyte_emitted_at\": \"\", " + "\"field1\": 10000.0, \"another_field\": true, " diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index b021c4427067..22e9d9c4c94d 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -320,6 +320,7 @@ In order for everything to work correctly, it is also necessary that the user wh | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.3.13 | 2022-08-09 | [\#15394](https://github.com/airbytehq/airbyte/pull/15394) | Added LZO compression support to Parquet format | | 0.3.12 | 2022-08-05 | [\#14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings | | 0.3.11 | 2022-07-15 | [\#14494](https://github.com/airbytehq/airbyte/pull/14494) | Make S3 output filename configurable. | | 0.3.10 | 2022-06-30 | [\#14332](https://github.com/airbytehq/airbyte/pull/14332) | Change INSTANCE_PROFILE to use `AWSDefaultProfileCredential`, which supports more authentications on AWS |