Skip to content

Commit

Permalink
TEZ-4295: Could not decompress data. Buffer length is too small. (#130)…
Browse files Browse the repository at this point in the history
… (Laszlo Bodor reviewed by Ashutosh Chauhan)
  • Loading branch information
abstractdog authored Jun 16, 2021
1 parent 0af54df commit b6c7fed
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.nio.ByteBuffer;

import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnection;
Expand All @@ -37,8 +36,6 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.TaskContext;
Expand Down Expand Up @@ -262,29 +259,4 @@ public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
in.close();
}
}

public static String getBufferSizeProperty(CompressionCodec codec) {
return getBufferSizeProperty(codec.getClass().getName());
}

public static String getBufferSizeProperty(String className) {
switch (className) {
case "org.apache.hadoop.io.compress.DefaultCodec":
case "org.apache.hadoop.io.compress.BZip2Codec":
case "org.apache.hadoop.io.compress.GzipCodec":
return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.ZStandardCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ public Writer(Serialization keySerialization, Serialization valSerialization, FS
void setupOutputStream(CompressionCodec codec) throws IOException {
this.checksumOut = new IFileOutputStream(this.rawOut);
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
this.compressor = CodecUtils.getCompressor(codec);
if (this.compressor != null) {
this.compressor.reset();
this.compressedOut = codec.createOutputStream(checksumOut, compressor);
this.compressedOut = CodecUtils.createOutputStream(codec, checksumOut, compressor);
this.out = new FSDataOutputStream(this.compressedOut, null);
this.compressOutput = true;
} else {
Expand Down Expand Up @@ -773,9 +773,9 @@ public Reader(InputStream in, long length,
checksumIn = new IFileInputStream(in, length, readAhead,
readAheadLength/* , isCompressed */);
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
this.in = CodecUtils.createInputStream(codec, checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
Expand Down Expand Up @@ -818,7 +818,7 @@ public static void readToMemory(byte[] buffer, InputStream in, int compressedLen
in = checksumIn;
Decompressor decompressor = null;
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
decompressor.reset();
in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,33 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

public final class CodecUtils {

private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(CodecUtils.class);
@VisibleForTesting
static final int DEFAULT_BUFFER_SIZE = 256 * 1024;

private CodecUtils() {
}
Expand Down Expand Up @@ -76,20 +82,21 @@ public static CompressionCodec getCodec(Configuration conf) throws IOException {
public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
throws IOException {
String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
Configurable configurableCodec = (Configurable) codec;
int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE :
configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);

String bufferSizeProp = getBufferSizeProperty(codec);
CompressionInputStream in = null;

if (bufferSizeProp != null) {
Configurable configurableCodec = (Configurable) codec;
Configuration conf = configurableCodec.getConf();
int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);

synchronized (codec) {
synchronized (conf) {
int defaultBufferSize = getDefaultBufferSize(conf, codec);
int originalSize = conf.getInt(bufferSizeProp, defaultBufferSize);

int newBufSize = Math.min(compressedLength, defaultBufferSize);
LOG.debug("buffer size was set according to min({}, {}) => {}={}", compressedLength,
defaultBufferSize, bufferSizeProp, newBufSize);

conf.setInt(bufferSizeProp, newBufSize);

in = codec.createInputStream(checksumIn, decompressor);
Expand Down Expand Up @@ -117,12 +124,86 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo
* issues above for Compressor instances as well, even when we tried to leverage from
* smaller buffer size only on decompression paths.
*/
configurableCodec.getConf().setInt(bufferSizeProp, originalSize);
conf.setInt(bufferSizeProp, originalSize);
}
} else {
in = codec.createInputStream(checksumIn, decompressor);
}

return in;
}

public static Compressor getCompressor(CompressionCodec codec) {
synchronized (((Configurable) codec).getConf()) {
return CodecPool.getCompressor(codec);
}
}

public static Decompressor getDecompressor(CompressionCodec codec) {
synchronized (((Configurable) codec).getConf()) {
return CodecPool.getDecompressor(codec);
}
}

public static CompressionInputStream createInputStream(CompressionCodec codec,
InputStream checksumIn, Decompressor decompressor) throws IOException {
synchronized (((Configurable) codec).getConf()) {
return codec.createInputStream(checksumIn, decompressor);
}
}

public static CompressionOutputStream createOutputStream(CompressionCodec codec,
OutputStream checksumOut, Compressor compressor) throws IOException {
synchronized (((Configurable) codec).getConf()) {
return codec.createOutputStream(checksumOut, compressor);
}
}

public static String getBufferSizeProperty(CompressionCodec codec) {
return getBufferSizeProperty(codec.getClass().getName());
}

public static String getBufferSizeProperty(String codecClassName) {
switch (codecClassName) {
case "org.apache.hadoop.io.compress.DefaultCodec":
case "org.apache.hadoop.io.compress.BZip2Codec":
case "org.apache.hadoop.io.compress.GzipCodec":
return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.ZStandardCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.LzoCodec":
case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
default:
return null;
}
}

public static int getDefaultBufferSize(Configuration conf, CompressionCodec codec) {
return getDefaultBufferSize(conf, codec.getClass().getName());
}

public static int getDefaultBufferSize(Configuration conf, String codecClassName) {
switch (codecClassName) {
case "org.apache.hadoop.io.compress.DefaultCodec":
case "org.apache.hadoop.io.compress.BZip2Codec":
case "org.apache.hadoop.io.compress.GzipCodec":
return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT;
case "org.apache.hadoop.io.compress.ZStandardCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT;
case "org.apache.hadoop.io.compress.LzoCodec":
case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT;
case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT;
default:
return DEFAULT_BUFFER_SIZE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;

import com.google.common.annotations.VisibleForTesting;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -33,7 +37,10 @@
/**
* A dummy codec. It passes everything to underlying stream
*/
public class DummyCompressionCodec implements CompressionCodec {
public class DummyCompressionCodec implements CompressionCodec, Configurable {
@VisibleForTesting
int createInputStreamCalled = 0;
private Configuration conf;

@Override
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
Expand Down Expand Up @@ -62,6 +69,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti

@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException {
createInputStreamCalled += 1;
return new DummyCompressionInputStream(in);
}

Expand Down Expand Up @@ -128,4 +136,14 @@ public void resetState() throws IOException {
//no-op
}
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
}

@Override
public Configuration getConf() {
return conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand All @@ -37,7 +36,6 @@

import com.google.common.collect.Sets;

import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -273,7 +271,8 @@ public void testDiskMergeWithCodec() throws Throwable {
InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());

// Create a mock compressor. We will check if it is used.
CompressionCodec dummyCodec = spy(new DummyCompressionCodec());
DummyCompressionCodec dummyCodec = new DummyCompressionCodec();
dummyCodec.setConf(conf);

MergeManager mergeManager =
new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null,
Expand Down Expand Up @@ -312,7 +311,7 @@ public void testDiskMergeWithCodec() throws Throwable {
mo4.commit();

mergeManager.close(true);
verify(dummyCodec, atLeastOnce()).createOutputStream(any(), any());
Assert.assertTrue(dummyCodec.createInputStreamCalled > 0);
}

@Test(timeout = 60000l)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
Expand All @@ -66,6 +65,7 @@
import org.apache.tez.runtime.library.testutils.KVDataGen;
import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
import org.apache.tez.runtime.library.utils.BufferUtils;
import org.apache.tez.runtime.library.utils.CodecUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -734,23 +734,23 @@ public void testReadToDisk() throws IOException {
public void testInMemoryBufferSize() throws IOException {
Configurable configurableCodec = (Configurable) codec;
int originalCodecBufferSize =
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1);
configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), -1);

// for smaller amount of data, codec buffer should be sized according to compressed data length
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
Writer writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);

Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));

// buffer size cannot grow infinitely with compressed data size
data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100));
writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);

Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -766,7 +766,7 @@ private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName)
Configuration conf = new Configuration();

System.out.println("trying with buffer size: " + bufferSize);
conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
conf.set(CodecUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codecToTest =
codecFactory.getCodecByClassName(codecClassName);
Expand Down
Loading

0 comments on commit b6c7fed

Please sign in to comment.