Skip to content

Commit

Permalink
TEZ-4295: Could not decompress data. Buffer length is too small.
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Jun 4, 2021
1 parent 0af54df commit 7b9eba8
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public Writer(Serialization keySerialization, Serialization valSerialization, FS
void setupOutputStream(CompressionCodec codec) throws IOException {
this.checksumOut = new IFileOutputStream(this.rawOut);
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
this.compressor = CodecUtils.getCompressor(codec);
if (this.compressor != null) {
this.compressor.reset();
this.compressedOut = codec.createOutputStream(checksumOut, compressor);
Expand Down Expand Up @@ -773,7 +773,7 @@ public Reader(InputStream in, long length,
checksumIn = new IFileInputStream(in, length, readAhead,
readAheadLength/* , isCompressed */);
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
Expand Down Expand Up @@ -818,7 +818,7 @@ public static void readToMemory(byte[] buffer, InputStream in, int compressedLen
in = checksumIn;
Decompressor decompressor = null;
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
decompressor.reset();
in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.Compressor;
Expand Down Expand Up @@ -77,19 +78,20 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo
IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
throws IOException {
String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
Configurable configurableCodec = (Configurable) codec;
int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE :
configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);

CompressionInputStream in = null;

if (bufferSizeProp != null) {
Configurable configurableCodec = (Configurable) codec;
Configuration conf = configurableCodec.getConf();
int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);

synchronized (codec) {
synchronized (conf) {
int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE
: configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);

int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);

conf.setInt(bufferSizeProp, newBufSize);

in = codec.createInputStream(checksumIn, decompressor);
Expand Down Expand Up @@ -125,4 +127,12 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo

return in;
}

public static Compressor getCompressor(CompressionCodec codec) {
return CodecPool.getCompressor(codec);
}

public static Decompressor getDecompressor(CompressionCodec codec) {
return CodecPool.getDecompressor(codec);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.utils;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class TestCodecUtils {

@Test
public void testConcurrentDecompressorCreationWithModifiedBuffersize() throws Exception {
int modifiedBufferSize = 1000;
int numberOfThreads = 1000;

ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);

Configuration conf = new Configuration();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true);
CompressionCodec codec = CodecUtils.getCodec(conf);

Future<?>[] futures = new Future[numberOfThreads];
final CountDownLatch latch = new CountDownLatch(1);

for (int i = 0; i < numberOfThreads; i++) {
futures[i] = service.submit(() -> {
try {
waitForLatch(latch);
Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));

Decompressor decompressor = CodecUtils.getDecompressor(codec);
CodecUtils.getDecompressedInputStreamWithBufferSize(codec,
Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize);

Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));

CodecPool.returnDecompressor(decompressor);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
latch.countDown();

for (Future<?> f : futures) {
f.get();
}
}

@Test
public void testConcurrentCompressorDecompressorCreation() throws Exception {
int modifiedBufferSize = 1000;
int numberOfThreads = 1000;

ExecutorService service = Executors.newFixedThreadPool(numberOfThreads);

Configuration conf = new Configuration();
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true);
CompressionCodec codec = CodecUtils.getCodec(conf);

Future<?>[] futures = new Future[numberOfThreads];
final CountDownLatch latch = new CountDownLatch(1);

for (int i = 0; i < numberOfThreads; i++) {
// let's "randomly" choose from scenarios and test them concurrently
// 1. getDecompressedInputStreamWithBufferSize
if (i % 3 == 0) {
futures[i] = service.submit(() -> {
try {
waitForLatch(latch);
Assert.assertEquals(
Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));

Decompressor decompressor = CodecUtils.getDecompressor(codec);
CodecUtils.getDecompressedInputStreamWithBufferSize(codec,
Mockito.mock(IFileInputStream.class), decompressor, modifiedBufferSize);

Assert.assertEquals(
Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));

CodecPool.returnDecompressor(decompressor);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
// 2. getCompressor
} else if (i % 3 == 1) {
futures[i] = service.submit(() -> {
waitForLatch(latch);
Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));

Compressor compressor = CodecUtils.getCompressor(codec);

Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));

CodecPool.returnCompressor(compressor);

});
// 3. getDecompressor
} else if (i % 3 == 2) {
futures[i] = service.submit(() -> {
waitForLatch(latch);
Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));

Decompressor decompressor = CodecUtils.getDecompressor(codec);

Assert.assertEquals(Integer.toString(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_DEFAULT),
conf.get(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY));

CodecPool.returnDecompressor(decompressor);
});
}
}
latch.countDown();

for (Future<?> f : futures) {
f.get();
}
}

private void waitForLatch(CountDownLatch latch) {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit 7b9eba8

Please sign in to comment.