From b65e39e8680c8c3e4d2a53ddcf082c757f754722 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Fri, 4 Jun 2021 09:39:52 +0200 Subject: [PATCH] TEZ-4295: Could not decompress data. Buffer length is too small. --- .../library/common/TezRuntimeUtils.java | 28 --- .../library/common/sort/impl/IFile.java | 10 +- .../tez/runtime/library/utils/CodecUtils.java | 109 ++++++-- .../orderedgrouped/DummyCompressionCodec.java | 17 +- .../orderedgrouped/TestMergeManager.java | 5 +- .../library/common/sort/impl/TestIFile.java | 10 +- .../runtime/library/utils/TestCodecUtils.java | 234 ++++++++++++++++++ 7 files changed, 358 insertions(+), 55 deletions(-) create mode 100644 tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index a1df131121..9d9b8c16c0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.http.BaseHttpConnection; import org.apache.tez.http.HttpConnection; @@ -37,8 +36,6 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.TaskContext; @@ -262,29 +259,4 @@ public static int deserializeShuffleProviderMetaData(ByteBuffer meta) in.close(); } } - - public static String getBufferSizeProperty(CompressionCodec codec) { - return getBufferSizeProperty(codec.getClass().getName()); - } - - public static String getBufferSizeProperty(String className) { - switch (className) { - case "org.apache.hadoop.io.compress.DefaultCodec": - case "org.apache.hadoop.io.compress.BZip2Codec": - case "org.apache.hadoop.io.compress.GzipCodec": - return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; - case "org.apache.hadoop.io.compress.SnappyCodec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY; - case "org.apache.hadoop.io.compress.ZStandardCodec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY; - case "org.apache.hadoop.io.compress.LzoCodec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY; - case "com.hadoop.compression.lzo.LzoCodec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY; - case "org.apache.hadoop.io.compress.Lz4Codec": - return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY; - default: - return null; - } - } } diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index a4bbf5aabf..8f673185b0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -363,10 +363,10 @@ public Writer(Serialization keySerialization, Serialization valSerialization, FS void setupOutputStream(CompressionCodec codec) throws IOException { this.checksumOut = new IFileOutputStream(this.rawOut); if (codec != null) { - this.compressor = CodecPool.getCompressor(codec); + this.compressor = CodecUtils.getCompressor(codec); if (this.compressor != null) { this.compressor.reset(); - this.compressedOut = codec.createOutputStream(checksumOut, compressor); + this.compressedOut = CodecUtils.createOutputStream(codec, checksumOut, compressor); this.out = new FSDataOutputStream(this.compressedOut, null); this.compressOutput = true; } else { @@ -773,9 +773,9 @@ public Reader(InputStream in, long length, checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/* , isCompressed */); if (isCompressed && codec != null) { - decompressor = CodecPool.getDecompressor(codec); + decompressor = CodecUtils.getDecompressor(codec); if (decompressor != null) { - this.in = codec.createInputStream(checksumIn, decompressor); + this.in = CodecUtils.createInputStream(codec, checksumIn, decompressor); } else { LOG.warn("Could not obtain decompressor from CodecPool"); this.in = checksumIn; @@ -818,7 +818,7 @@ public static void readToMemory(byte[] buffer, InputStream in, int compressedLen in = checksumIn; Decompressor decompressor = null; if (isCompressed && codec != null) { - decompressor = CodecPool.getDecompressor(codec); + decompressor = CodecUtils.getDecompressor(codec); if (decompressor != null) { decompressor.reset(); in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java index 8e5154f3b0..340ecceea8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java @@ -20,27 +20,33 @@ import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; -import org.apache.tez.runtime.library.common.TezRuntimeUtils; -import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + public final class CodecUtils { - private static final Logger LOG = LoggerFactory.getLogger(IFile.class); - private static final int DEFAULT_BUFFER_SIZE = 128 * 1024; + private static final Logger LOG = LoggerFactory.getLogger(CodecUtils.class); + @VisibleForTesting + static final int DEFAULT_BUFFER_SIZE = 256 * 1024; private CodecUtils() { } @@ -76,20 +82,21 @@ public static CompressionCodec getCodec(Configuration conf) throws IOException { public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec, IFileInputStream checksumIn, Decompressor decompressor, int compressedLength) throws IOException { - String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec); - Configurable configurableCodec = (Configurable) codec; - int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE : - configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE); - + String bufferSizeProp = getBufferSizeProperty(codec); CompressionInputStream in = null; if (bufferSizeProp != null) { + Configurable configurableCodec = (Configurable) codec; Configuration conf = configurableCodec.getConf(); - int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE); - LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}", - DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize); - synchronized (codec) { + synchronized (conf) { + int defaultBufferSize = getDefaultBufferSize(conf, codec); + int originalSize = conf.getInt(bufferSizeProp, defaultBufferSize); + + int newBufSize = Math.min(compressedLength, defaultBufferSize); + LOG.debug("buffer size was set according to min({}, {}) => {}={}", compressedLength, + defaultBufferSize, bufferSizeProp, newBufSize); + conf.setInt(bufferSizeProp, newBufSize); in = codec.createInputStream(checksumIn, decompressor); @@ -117,7 +124,7 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo * issues above for Compressor instances as well, even when we tried to leverage from * smaller buffer size only on decompression paths. */ - configurableCodec.getConf().setInt(bufferSizeProp, originalSize); + conf.setInt(bufferSizeProp, originalSize); } } else { in = codec.createInputStream(checksumIn, decompressor); @@ -125,4 +132,78 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo return in; } + + public static Compressor getCompressor(CompressionCodec codec) { + synchronized (((Configurable) codec).getConf()) { + return CodecPool.getCompressor(codec); + } + } + + public static Decompressor getDecompressor(CompressionCodec codec) { + synchronized (((Configurable) codec).getConf()) { + return CodecPool.getDecompressor(codec); + } + } + + public static CompressionInputStream createInputStream(CompressionCodec codec, + InputStream checksumIn, Decompressor decompressor) throws IOException { + synchronized (((Configurable) codec).getConf()) { + return codec.createInputStream(checksumIn, decompressor); + } + } + + public static CompressionOutputStream createOutputStream(CompressionCodec codec, + OutputStream checksumOut, Compressor compressor) throws IOException { + synchronized (((Configurable) codec).getConf()) { + return codec.createOutputStream(checksumOut, compressor); + } + } + + public static String getBufferSizeProperty(CompressionCodec codec) { + return getBufferSizeProperty(codec.getClass().getName()); + } + + public static String getBufferSizeProperty(String codecClassName) { + switch (codecClassName) { + case "org.apache.hadoop.io.compress.DefaultCodec": + case "org.apache.hadoop.io.compress.BZip2Codec": + case "org.apache.hadoop.io.compress.GzipCodec": + return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; + case "org.apache.hadoop.io.compress.SnappyCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY; + case "org.apache.hadoop.io.compress.ZStandardCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY; + case "org.apache.hadoop.io.compress.LzoCodec": + case "com.hadoop.compression.lzo.LzoCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY; + case "org.apache.hadoop.io.compress.Lz4Codec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY; + default: + return null; + } + } + + public static int getDefaultBufferSize(Configuration conf, CompressionCodec codec) { + return getDefaultBufferSize(conf, codec.getClass().getName()); + } + + public static int getDefaultBufferSize(Configuration conf, String codecClassName) { + switch (codecClassName) { + case "org.apache.hadoop.io.compress.DefaultCodec": + case "org.apache.hadoop.io.compress.BZip2Codec": + case "org.apache.hadoop.io.compress.GzipCodec": + return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; + case "org.apache.hadoop.io.compress.SnappyCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT; + case "org.apache.hadoop.io.compress.ZStandardCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT; + case "org.apache.hadoop.io.compress.LzoCodec": + case "com.hadoop.compression.lzo.LzoCodec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT; + case "org.apache.hadoop.io.compress.Lz4Codec": + return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT; + default: + return DEFAULT_BUFFER_SIZE; + } + } } \ No newline at end of file diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java index 962a9e0207..ca0f8b8170 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/DummyCompressionCodec.java @@ -18,6 +18,8 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; @@ -33,7 +35,9 @@ /** * A dummy codec. It passes everything to underlying stream */ -public class DummyCompressionCodec implements CompressionCodec { +public class DummyCompressionCodec implements CompressionCodec, Configurable { + int createInputStreamCalled = 0; + private Configuration conf; @Override public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { @@ -62,6 +66,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti @Override public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException { + createInputStreamCalled += 1; return new DummyCompressionInputStream(in); } @@ -128,4 +133,14 @@ public void resetState() throws IOException { //no-op } } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } } diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 13f090cffb..2135b491f4 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -273,7 +273,8 @@ public void testDiskMergeWithCodec() throws Throwable { InputContext inputContext = createMockInputContext(UUID.randomUUID().toString()); // Create a mock compressor. We will check if it is used. - CompressionCodec dummyCodec = spy(new DummyCompressionCodec()); + DummyCompressionCodec dummyCodec = new DummyCompressionCodec(); + dummyCodec.setConf(conf); MergeManager mergeManager = new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null, @@ -312,7 +313,7 @@ public void testDiskMergeWithCodec() throws Throwable { mo4.commit(); mergeManager.close(true); - verify(dummyCodec, atLeastOnce()).createOutputStream(any(), any()); + Assert.assertTrue(dummyCodec.createInputStreamCalled > 0); } @Test(timeout = 60000l) diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java index bf35955625..960aee345a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java @@ -57,7 +57,6 @@ import org.apache.hadoop.util.NativeCodeLoader; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; -import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter; import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader; @@ -66,6 +65,7 @@ import org.apache.tez.runtime.library.testutils.KVDataGen; import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair; import org.apache.tez.runtime.library.utils.BufferUtils; +import org.apache.tez.runtime.library.utils.CodecUtils; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -734,7 +734,7 @@ public void testReadToDisk() throws IOException { public void testInMemoryBufferSize() throws IOException { Configurable configurableCodec = (Configurable) codec; int originalCodecBufferSize = - configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1); + configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), -1); // for smaller amount of data, codec buffer should be sized according to compressed data length List data = KVDataGen.generateTestData(false, rnd.nextInt(100)); @@ -742,7 +742,7 @@ public void testInMemoryBufferSize() throws IOException { readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec); Assert.assertEquals(originalCodecBufferSize, // original size is repaired - configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0)); + configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0)); // buffer size cannot grow infinitely with compressed data size data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100)); @@ -750,7 +750,7 @@ public void testInMemoryBufferSize() throws IOException { readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec); Assert.assertEquals(originalCodecBufferSize, // original size is repaired - configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0)); + configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0)); } @Test(expected = IllegalArgumentException.class) @@ -766,7 +766,7 @@ private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName) Configuration conf = new Configuration(); System.out.println("trying with buffer size: " + bufferSize); - conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize)); + conf.set(CodecUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize)); CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); CompressionCodec codecToTest = codecFactory.getCodecByClassName(codecClassName); diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java new file mode 100644 index 0000000000..684eed41a2 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.utils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DecompressorStream; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.apache.hadoop.io.compress.ZStandardCodec; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.DummyCompressionCodec; +import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestCodecUtils { + private static final Logger LOG = LoggerFactory.getLogger(TestCodecUtils.class); + + @Test + public void testConcurrentDecompressorCreationWithModifiedBuffersize() throws Exception { + testConcurrentDecompressorCreationWithModifiedBuffersizeOnCodec(new DefaultCodec()); + } + + private void testConcurrentDecompressorCreationWithModifiedBuffersizeOnCodec( + CompressionCodec codec) throws InterruptedException, ExecutionException { + int modifiedBufferSize = 1000; + int numberOfThreads = 1000; + + ExecutorService service = Executors.newFixedThreadPool(numberOfThreads); + + Configuration conf = new Configuration(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true); + ((Configurable) codec).setConf(conf); + + Future[] futures = new Future[numberOfThreads]; + final CountDownLatch latch = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; i++) { + futures[i] = service.submit(() -> { + try { + waitForLatch(latch); + + Decompressor decompressor = CodecUtils.getDecompressor(codec); + DecompressorStream stream = + (DecompressorStream) CodecUtils.getDecompressedInputStreamWithBufferSize(codec, + Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize); + + Assert.assertEquals("stream buffer size is incorrect", modifiedBufferSize, + getBufferSize(stream)); + + CodecPool.returnDecompressor(decompressor); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + latch.countDown(); + + for (Future f : futures) { + f.get(); + } + } + + @Test + public void testConcurrentCompressorDecompressorCreation() throws Exception { + testConcurrentCompressorDecompressorCreationOnCodec(new DefaultCodec()); + } + + private void testConcurrentCompressorDecompressorCreationOnCodec(CompressionCodec codec) + throws IOException, InterruptedException, ExecutionException { + int modifiedBufferSize = 1000; + int numberOfThreads = 1000; + + ExecutorService service = Executors.newFixedThreadPool(numberOfThreads); + + Configuration conf = new Configuration(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true); + ((Configurable) codec).setConf(conf); + + Future[] futures = new Future[numberOfThreads]; + final CountDownLatch latch = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; i++) { + // let's "randomly" choose from scenarios and test them concurrently + // 1. getDecompressedInputStreamWithBufferSize + if (i % 3 == 0) { + futures[i] = service.submit(() -> { + try { + waitForLatch(latch); + + Decompressor decompressor = CodecUtils.getDecompressor(codec); + CompressionInputStream stream = + (CompressionInputStream) CodecUtils.getDecompressedInputStreamWithBufferSize(codec, + Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize); + + Assert.assertEquals("stream buffer size is incorrect", modifiedBufferSize, + getBufferSize(stream)); + + CodecPool.returnDecompressor(decompressor); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + // 2. getCompressor + } else if (i % 3 == 1) { + futures[i] = service.submit(() -> { + try { + waitForLatch(latch); + + Compressor compressor = CodecUtils.getCompressor(codec); + CompressionOutputStream stream = + CodecUtils.createOutputStream(codec, Mockito.mock(OutputStream.class), compressor); + + Assert.assertEquals("stream buffer size is incorrect", + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, getBufferSize(stream)); + + CodecPool.returnCompressor(compressor); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + // 3. getDecompressor + } else if (i % 3 == 2) { + futures[i] = service.submit(() -> { + try { + waitForLatch(latch); + + Decompressor decompressor = CodecUtils.getDecompressor(codec); + CompressionInputStream stream = + CodecUtils.createInputStream(codec, Mockito.mock(InputStream.class), decompressor); + + Assert.assertEquals("stream buffer size is incorrect", + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, getBufferSize(stream)); + + CodecPool.returnDecompressor(decompressor); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + } + latch.countDown(); + + for (Future f : futures) { + f.get(); + } + } + + @Test + public void testDefaultBufferSize() { + Configuration conf = new Configuration(); // config with no buffersize set + + Assert.assertEquals(CodecUtils.DEFAULT_BUFFER_SIZE, + CodecUtils.getDefaultBufferSize(conf, new DummyCompressionCodec())); + Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new DefaultCodec())); + Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new BZip2Codec())); + Assert.assertEquals(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new GzipCodec())); + Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new SnappyCodec())); + Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new ZStandardCodec())); + Assert.assertEquals(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT, + CodecUtils.getDefaultBufferSize(conf, new Lz4Codec())); + } + + private void waitForLatch(CountDownLatch latch) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private int getBufferSize(Object stream) { + try { + Field field = stream.getClass().getDeclaredField("buffer"); + field.setAccessible(true); + byte[] buffer = (byte[]) field.get(stream); + return buffer.length; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private int getDirectBufferSize(Object compressorOrDecompressor) throws NoSuchFieldException, + SecurityException, IllegalArgumentException, IllegalAccessException { + Field field = compressorOrDecompressor.getClass().getDeclaredField("directBufferSize"); + field.setAccessible(true); + return (int) field.get(compressorOrDecompressor); + } +}