Skip to content

Commit

Permalink
Destination S3: add LZO compression support (#15394)
Browse files Browse the repository at this point in the history
* Fixed bucket naming for S3

* Destination S3: add LZO compression support for parquet files

* Destination S3: add LZO compression support for parquet files

* implemented logic for aarch64

* removed redundant logging

* updated changelog

* moved intstall of native-lzo lib to Dockerfile

* removed redundant logging

* add unit test for aarch64

* bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
VitaliiMaltsev and octavia-squidington-iii authored Aug 10, 2022
1 parent 29c3426 commit a280113
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.12
dockerImageTag: 0.3.13
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3974,7 +3974,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.12"
- dockerImage: "airbyte/destination-s3:0.3.13"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down
29 changes: 27 additions & 2 deletions airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,31 @@ WORKDIR /airbyte
ENV APPLICATION destination-s3

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.12
RUN /bin/bash -c 'set -e && \
ARCH=`uname -m` && \
if [ "$ARCH" == "x86_64" ] || [ "$ARCH" = "amd64" ]; then \
echo "$ARCH" && \
apt-get update; \
apt-get install lzop liblzo2-2 liblzo2-dev -y; \
elif [ "$ARCH" == "aarch64" ] || [ "$ARCH" = "arm64" ]; then \
echo "$ARCH" && \
apt-get update; \
apt-get install lzop liblzo2-2 liblzo2-dev wget curl unzip zip build-essential maven git -y; \
wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz -P /tmp; \
cd /tmp && tar xvfz lzo-2.10.tar.gz; \
cd /tmp/lzo-2.10/ && ./configure --enable-shared --prefix /usr/local/lzo-2.10; \
cd /tmp/lzo-2.10/ && make; \
cd /tmp/lzo-2.10/ && make install; \
git clone https://github.com/twitter/hadoop-lzo.git /usr/lib/hadoop/lib/hadoop-lzo/; \
curl -s "https://get.sdkman.io" | bash; \
source /root/.sdkman/bin/sdkman-init.sh; \
sdk install java 8.0.342-librca; \
sdk use java 8.0.342-librca; \
cd /usr/lib/hadoop/lib/hadoop-lzo/ && C_INCLUDE_PATH=/usr/local/lzo-2.10/include LIBRARY_PATH=/usr/local/lzo-2.10/lib mvn clean package; \
find /usr/lib/hadoop/lib/hadoop-lzo/ -name '*libgplcompression*' -exec cp {} /usr/lib/ \; ;\
else \
echo "unknown arch" ;\
fi'

LABEL io.airbyte.version=0.3.13
LABEL io.airbyte.name=airbyte/destination-s3
8 changes: 7 additions & 1 deletion airbyte-integrations/connectors/destination-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ plugins {
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}
repositories {
mavenCentral()
maven {
url = uri("https://maven.twttr.com")
}
}

application {
mainClass = 'io.airbyte.integrations.destination.s3.S3Destination'
Expand Down Expand Up @@ -32,7 +38,7 @@ dependencies {
}
implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'}

implementation group: 'com.hadoop.gplcompression', name: 'hadoop-lzo', version: '0.4.20'
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.xerial.snappy:snappy-java:1.1.8.4'
testImplementation "org.mockito:mockito-inline:4.1.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3.util;

import io.airbyte.commons.io.LineGobbler;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaProcessRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(JavaProcessRunner.class);

public static void runProcess(final String path, final Runtime run, final String... commands) throws IOException, InterruptedException {
LOGGER.info("Running process: " + Arrays.asList(commands));
final Process pr = path.equals(System.getProperty("user.dir")) ? run.exec(commands) : run.exec(commands, null, new File(path));
LineGobbler.gobble(pr.getErrorStream(), LOGGER::error);
LineGobbler.gobble(pr.getInputStream(), LOGGER::info);
if (!pr.waitFor(10, TimeUnit.MINUTES)) {
pr.destroy();
throw new RuntimeException("Timeout while executing: " + Arrays.toString(commands));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.s3.parquet;

import static io.airbyte.integrations.destination.s3.util.JavaProcessRunner.runProcess;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -77,6 +78,51 @@ public void testCompressedParquetWriter() throws Exception {
runTest(195L, 215L, config, getExpectedString());
}

private static String resolveArchitecture() {
return System.getProperty("os.name").replace(' ', '_') + "-" + System.getProperty("os.arch") + "-" + System.getProperty("sun.arch.data.model");
}

@Test
public void testLzoCompressedParquet() throws Exception {
final String currentDir = System.getProperty("user.dir");
Runtime runtime = Runtime.getRuntime();
final String architecture = resolveArchitecture();
if (architecture.equals("Linux-amd64-64") || architecture.equals("Linux-x86_64-64")) {
runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get update");
runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get install lzop liblzo2-2 liblzo2-dev -y");
runLzoParquetTest();
} else if (architecture.equals("Linux-aarch64-64") || architecture.equals("Linux-arm64-64")) {
runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get update");
runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get install lzop liblzo2-2 liblzo2-dev " +
"wget curl unzip zip build-essential maven git -y");
runProcess(currentDir, runtime, "/bin/sh", "-c", "wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz -P /usr/local/tmp");
runProcess("/usr/local/tmp/", runtime, "/bin/sh", "-c", "tar xvfz lzo-2.10.tar.gz");
runProcess("/usr/local/tmp/lzo-2.10/", runtime, "/bin/sh", "-c", "./configure --enable-shared --prefix /usr/local/lzo-2.10");
runProcess("/usr/local/tmp/lzo-2.10/", runtime, "/bin/sh", "-c", "make && make install");
runProcess(currentDir, runtime, "/bin/sh", "-c", "git clone https://github.com/twitter/hadoop-lzo.git /usr/lib/hadoop/lib/hadoop-lzo/");
runProcess(currentDir, runtime, "/bin/sh", "-c", "curl -s https://get.sdkman.io | bash");
runProcess(currentDir, runtime, "/bin/bash", "-c", "source /root/.sdkman/bin/sdkman-init.sh;" +
" sdk install java 8.0.342-librca;" +
" sdk use java 8.0.342-librca;" +
" cd /usr/lib/hadoop/lib/hadoop-lzo/ " +
"&& C_INCLUDE_PATH=/usr/local/lzo-2.10/include " +
"LIBRARY_PATH=/usr/local/lzo-2.10/lib mvn clean package");
runProcess(currentDir, runtime, "/bin/sh", "-c",
"find /usr/lib/hadoop/lib/hadoop-lzo/ -name '*libgplcompression*' -exec cp {} /usr/lib/ \\;");
runLzoParquetTest();
}
}

private void runLzoParquetTest() throws Exception {
final S3DestinationConfig config = S3DestinationConfig.getS3DestinationConfig(Jsons.jsonNode(Map.of(
"format", Map.of(
"format_type", "parquet",
"compression_codec", "LZO"),
"s3_bucket_name", "test",
"s3_bucket_region", "us-east-2")));
runTest(195L, 215L, config, getExpectedString());
}

private static String getExpectedString() {
return "{\"_airbyte_ab_id\": \"<UUID>\", \"_airbyte_emitted_at\": \"<timestamp>\", "
+ "\"field1\": 10000.0, \"another_field\": true, "
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ In order for everything to work correctly, it is also necessary that the user wh

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.13 | 2022-08-09 | [\#15394](https://github.com/airbytehq/airbyte/pull/15394) | Added LZO compression support to Parquet format |
| 0.3.12 | 2022-08-05 | [\#14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings |
| 0.3.11 | 2022-07-15 | [\#14494](https://github.com/airbytehq/airbyte/pull/14494) | Make S3 output filename configurable. |
| 0.3.10 | 2022-06-30 | [\#14332](https://github.com/airbytehq/airbyte/pull/14332) | Change INSTANCE_PROFILE to use `AWSDefaultProfileCredential`, which supports more authentications on AWS |
Expand Down

0 comments on commit a280113

Please sign in to comment.