Skip to content

Commit

Permalink
TEZ-4295: Could not decompress data. Buffer length is too small.
Browse files Browse the repository at this point in the history
  • Loading branch information
abstractdog committed Jun 12, 2021
1 parent 0af54df commit 0d04a2f
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.nio.ByteBuffer;

import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnection;
Expand All @@ -37,8 +36,6 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.TaskContext;
Expand Down Expand Up @@ -262,29 +259,4 @@ public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
in.close();
}
}

public static String getBufferSizeProperty(CompressionCodec codec) {
return getBufferSizeProperty(codec.getClass().getName());
}

public static String getBufferSizeProperty(String className) {
switch (className) {
case "org.apache.hadoop.io.compress.DefaultCodec":
case "org.apache.hadoop.io.compress.BZip2Codec":
case "org.apache.hadoop.io.compress.GzipCodec":
return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.ZStandardCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public Writer(Serialization keySerialization, Serialization valSerialization, FS
void setupOutputStream(CompressionCodec codec) throws IOException {
this.checksumOut = new IFileOutputStream(this.rawOut);
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
this.compressor = CodecUtils.getCompressor(codec);
if (this.compressor != null) {
this.compressor.reset();
this.compressedOut = codec.createOutputStream(checksumOut, compressor);
Expand Down Expand Up @@ -773,7 +773,7 @@ public Reader(InputStream in, long length,
checksumIn = new IFileInputStream(in, length, readAhead,
readAheadLength/* , isCompressed */);
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
Expand Down Expand Up @@ -818,7 +818,7 @@ public static void readToMemory(byte[] buffer, InputStream in, int compressedLen
in = checksumIn;
Decompressor decompressor = null;
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
decompressor.reset();
in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.Compressor;
Expand All @@ -31,16 +34,18 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

public final class CodecUtils {

private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024;
@VisibleForTesting
static final int DEFAULT_BUFFER_SIZE = 256 * 1024;

private CodecUtils() {
}
Expand Down Expand Up @@ -76,20 +81,21 @@ public static CompressionCodec getCodec(Configuration conf) throws IOException {
public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
throws IOException {
String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
Configurable configurableCodec = (Configurable) codec;
int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE :
configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);

String bufferSizeProp = getBufferSizeProperty(codec);
CompressionInputStream in = null;

if (bufferSizeProp != null) {
Configurable configurableCodec = (Configurable) codec;
Configuration conf = configurableCodec.getConf();
int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);

synchronized (codec) {
synchronized (conf) {
int defaultBufferSize = getDefaultBufferSize(conf, codec);
int originalSize = conf.getInt(bufferSizeProp, defaultBufferSize);

int newBufSize = Math.min(compressedLength, defaultBufferSize);
LOG.info("buffer size was set according to min({}, {}) => {}={}", compressedLength,
defaultBufferSize, bufferSizeProp, newBufSize);

conf.setInt(bufferSizeProp, newBufSize);

in = codec.createInputStream(checksumIn, decompressor);
Expand Down Expand Up @@ -117,12 +123,76 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo
* issues above for Compressor instances as well, even when we tried to leverage from
* smaller buffer size only on decompression paths.
*/
configurableCodec.getConf().setInt(bufferSizeProp, originalSize);
conf.setInt(bufferSizeProp, originalSize);
}
} else {
in = codec.createInputStream(checksumIn, decompressor);
}

return in;
}

public static Compressor getCompressor(CompressionCodec codec) {
Configurable configurableCodec = (Configurable) codec;
Configuration conf = configurableCodec.getConf();
String bufferSizeProp = CodecUtils.getBufferSizeProperty(codec);
LOG.info("getting compressor with buffer size: {}", conf.getInt(bufferSizeProp, getDefaultBufferSize(conf, codec)));
return CodecPool.getCompressor(codec);
}

public static Decompressor getDecompressor(CompressionCodec codec) {
Configurable configurableCodec = (Configurable) codec;
Configuration conf = configurableCodec.getConf();
String bufferSizeProp = CodecUtils.getBufferSizeProperty(codec);
LOG.info("getting decompressor with buffer size: {}", conf.getInt(bufferSizeProp, getDefaultBufferSize(conf, codec)));
return CodecPool.getDecompressor(codec);
}

public static String getBufferSizeProperty(CompressionCodec codec) {
return getBufferSizeProperty(codec.getClass().getName());
}

public static String getBufferSizeProperty(String codecClassName) {
switch (codecClassName) {
case "org.apache.hadoop.io.compress.DefaultCodec":
case "org.apache.hadoop.io.compress.BZip2Codec":
case "org.apache.hadoop.io.compress.GzipCodec":
return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.ZStandardCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.LzoCodec":
case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
default:
return null;
}
}

public static int getDefaultBufferSize(Configuration conf, CompressionCodec codec) {
return getDefaultBufferSize(conf, codec.getClass().getName());
}

public static int getDefaultBufferSize(Configuration conf, String codecClassName) {
switch (codecClassName) {
case "org.apache.hadoop.io.compress.DefaultCodec":
case "org.apache.hadoop.io.compress.BZip2Codec":
case "org.apache.hadoop.io.compress.GzipCodec":
return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT;
case "org.apache.hadoop.io.compress.ZStandardCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT;
case "org.apache.hadoop.io.compress.LzoCodec":
case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT;
case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT;
default:
return DEFAULT_BUFFER_SIZE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
Expand All @@ -66,6 +65,7 @@
import org.apache.tez.runtime.library.testutils.KVDataGen;
import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
import org.apache.tez.runtime.library.utils.BufferUtils;
import org.apache.tez.runtime.library.utils.CodecUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -734,23 +734,23 @@ public void testReadToDisk() throws IOException {
public void testInMemoryBufferSize() throws IOException {
Configurable configurableCodec = (Configurable) codec;
int originalCodecBufferSize =
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1);
configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), -1);

// for smaller amount of data, codec buffer should be sized according to compressed data length
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
Writer writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);

Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));

// buffer size cannot grow infinitely with compressed data size
data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100));
writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);

Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -766,7 +766,7 @@ private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName)
Configuration conf = new Configuration();

System.out.println("trying with buffer size: " + bufferSize);
conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
conf.set(CodecUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codecToTest =
codecFactory.getCodecByClassName(codecClassName);
Expand Down
Loading

0 comments on commit 0d04a2f

Please sign in to comment.