diff --git a/sdk/pom.xml b/sdk/pom.xml index c49c17570901..5c6553b749a0 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -719,6 +719,12 @@ ${avro.version} + + org.xerial.snappy + snappy-java + 1.1.2.1 + + org.apache.commons commons-compress diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java index cacba0ea1704..501b4307fc29 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/SerializableUtils.java @@ -23,6 +23,9 @@ import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.common.base.Preconditions; +import org.xerial.snappy.SnappyInputStream; +import org.xerial.snappy.SnappyOutputStream; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -43,7 +46,7 @@ public class SerializableUtils { public static byte[] serializeToByteArray(Serializable value) { try { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - try (ObjectOutputStream oos = new ObjectOutputStream(buffer)) { + try (ObjectOutputStream oos = new ObjectOutputStream(new SnappyOutputStream(buffer))) { oos.writeObject(value); } return buffer.toByteArray(); @@ -66,7 +69,7 @@ public static Object deserializeFromByteArray(byte[] encodedValue, String description) { try { try (ObjectInputStream ois = new ObjectInputStream( - new ByteArrayInputStream(encodedValue))) { + new SnappyInputStream(new ByteArrayInputStream(encodedValue)))) { return ois.readObject(); } } catch (IOException | ClassNotFoundException exn) {