From 7ffc7832af935449554537b52b03e37120ac5799 Mon Sep 17 00:00:00 2001 From: Laszlo Bodor Date: Fri, 4 Jun 2021 09:39:52 +0200 Subject: [PATCH] TEZ-4295: Could not decompress data. Buffer length is too small. --- .../library/common/sort/impl/IFile.java | 6 +- .../tez/runtime/library/utils/CodecUtils.java | 27 ++- .../runtime/library/utils/TestCodecUtils.java | 168 ++++++++++++++++++ 3 files changed, 189 insertions(+), 12 deletions(-) create mode 100644 tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index a4bbf5aabf..4b91c34f8b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -363,7 +363,7 @@ public Writer(Serialization keySerialization, Serialization valSerialization, FS void setupOutputStream(CompressionCodec codec) throws IOException { this.checksumOut = new IFileOutputStream(this.rawOut); if (codec != null) { - this.compressor = CodecPool.getCompressor(codec); + this.compressor = CodecUtils.getCompressor(codec); if (this.compressor != null) { this.compressor.reset(); this.compressedOut = codec.createOutputStream(checksumOut, compressor); @@ -773,7 +773,7 @@ public Reader(InputStream in, long length, checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/* , isCompressed */); if (isCompressed && codec != null) { - decompressor = CodecPool.getDecompressor(codec); + decompressor = CodecUtils.getDecompressor(codec); if (decompressor != null) { this.in = codec.createInputStream(checksumIn, decompressor); } else { @@ -818,7 +818,7 @@ public static void readToMemory(byte[] buffer, InputStream in, int compressedLen in = checksumIn; Decompressor decompressor = null; if (isCompressed && codec != null) { - decompressor = CodecPool.getDecompressor(codec); + decompressor = CodecUtils.getDecompressor(codec); if (decompressor != null) { decompressor.reset(); in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor, diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java index 8e5154f3b0..88c8eb5a1d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/CodecUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.Compressor; @@ -40,7 +41,7 @@ public final class CodecUtils { private static final Logger LOG = LoggerFactory.getLogger(IFile.class); - private static final int DEFAULT_BUFFER_SIZE = 128 * 1024; + private static final int DEFAULT_BUFFER_SIZE = 256 * 1024; private CodecUtils() { } @@ -77,19 +78,19 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo IFileInputStream checksumIn, Decompressor decompressor, int compressedLength) throws IOException { String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec); - Configurable configurableCodec = (Configurable) codec; - int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE : - configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE); - CompressionInputStream in = null; if (bufferSizeProp != null) { + Configurable configurableCodec = (Configurable) codec; Configuration conf = configurableCodec.getConf(); - int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE); - LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}", - DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize); - synchronized (codec) { + synchronized (conf) { + int originalSize = configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE); + + int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE); + LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}", + DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize); + conf.setInt(bufferSizeProp, newBufSize); in = codec.createInputStream(checksumIn, decompressor); @@ -125,4 +126,12 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo return in; } + + public static Compressor getCompressor(CompressionCodec codec) { + return CodecPool.getCompressor(codec); + } + + public static Decompressor getDecompressor(CompressionCodec codec) { + return CodecPool.getDecompressor(codec); + } } \ No newline at end of file diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java new file mode 100644 index 0000000000..f1500b2209 --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/utils/TestCodecUtils.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.utils; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestCodecUtils { + + @Test + public void testConcurrentDecompressorCreationWithModifiedBuffersize() throws Exception { + int modifiedBufferSize = 1000; + int numberOfThreads = 1000; + + ExecutorService service = Executors.newFixedThreadPool(numberOfThreads); + + Configuration conf = new Configuration(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true); + CompressionCodec codec = CodecUtils.getCodec(conf); + + Future[] futures = new Future[numberOfThreads]; + final CountDownLatch latch = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; i++) { + futures[i] = service.submit(() -> { + try { + waitForLatch(latch); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + Decompressor decompressor = CodecUtils.getDecompressor(codec); + CodecUtils.getDecompressedInputStreamWithBufferSize(codec, + Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + CodecPool.returnDecompressor(decompressor); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + latch.countDown(); + + for (Future f : futures) { + f.get(); + } + } + + @Test + public void testConcurrentCompressorDecompressorCreation() throws Exception { + int modifiedBufferSize = 1000; + int numberOfThreads = 1000; + + ExecutorService service = Executors.newFixedThreadPool(numberOfThreads); + + Configuration conf = new Configuration(); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true); + CompressionCodec codec = CodecUtils.getCodec(conf); + + Future[] futures = new Future[numberOfThreads]; + final CountDownLatch latch = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; i++) { + // let's "randomly" choose from scenarios and test them concurrently + // 1. getDecompressedInputStreamWithBufferSize + if (i % 3 == 0) { + futures[i] = service.submit(() -> { + try { + waitForLatch(latch); + + Assert.assertEquals( + Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + Decompressor decompressor = CodecUtils.getDecompressor(codec); + CodecUtils.getDecompressedInputStreamWithBufferSize(codec, + Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize); + + Assert.assertEquals( + Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + CodecPool.returnDecompressor(decompressor); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + // 2. getCompressor + } else if (i % 3 == 1) { + futures[i] = service.submit(() -> { + waitForLatch(latch); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + Compressor compressor = CodecUtils.getCompressor(codec); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + CodecPool.returnCompressor(compressor); + + }); + // 3. getDecompressor + } else if (i % 3 == 2) { + futures[i] = service.submit(() -> { + waitForLatch(latch); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + Decompressor decompressor = CodecUtils.getDecompressor(codec); + + Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT), + conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY)); + + CodecPool.returnDecompressor(decompressor); + }); + } + } + latch.countDown(); + + for (Future f : futures) { + f.get(); + } + } + + private void waitForLatch(CountDownLatch latch) { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +}