Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4295: Could not decompress data. Buffer length is too small. #130

Merged
merged 1 commit into from
Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.nio.ByteBuffer;

import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnection;
Expand All @@ -37,8 +36,6 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.TaskContext;
Expand Down Expand Up @@ -262,29 +259,4 @@ public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
in.close();
}
}

public static String getBufferSizeProperty(CompressionCodec codec) {
return getBufferSizeProperty(codec.getClass().getName());
}

public static String getBufferSizeProperty(String className) {
switch (className) {
case "org.apache.hadoop.io.compress.DefaultCodec":
case "org.apache.hadoop.io.compress.BZip2Codec":
case "org.apache.hadoop.io.compress.GzipCodec":
return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.ZStandardCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ public Writer(Serialization keySerialization, Serialization valSerialization, FS
void setupOutputStream(CompressionCodec codec) throws IOException {
this.checksumOut = new IFileOutputStream(this.rawOut);
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
this.compressor = CodecUtils.getCompressor(codec);
if (this.compressor != null) {
this.compressor.reset();
this.compressedOut = codec.createOutputStream(checksumOut, compressor);
this.compressedOut = CodecUtils.createOutputStream(codec, checksumOut, compressor);
this.out = new FSDataOutputStream(this.compressedOut, null);
this.compressOutput = true;
} else {
Expand Down Expand Up @@ -773,9 +773,9 @@ public Reader(InputStream in, long length,
checksumIn = new IFileInputStream(in, length, readAhead,
readAheadLength/* , isCompressed */);
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
this.in = CodecUtils.createInputStream(codec, checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
Expand Down Expand Up @@ -818,7 +818,7 @@ public static void readToMemory(byte[] buffer, InputStream in, int compressedLen
in = checksumIn;
Decompressor decompressor = null;
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
decompressor = CodecUtils.getDecompressor(codec);
if (decompressor != null) {
decompressor.reset();
in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,33 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

public final class CodecUtils {

private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(CodecUtils.class);
@VisibleForTesting
static final int DEFAULT_BUFFER_SIZE = 256 * 1024;

private CodecUtils() {
}
Expand Down Expand Up @@ -76,20 +82,21 @@ public static CompressionCodec getCodec(Configuration conf) throws IOException {
public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCodec codec,
IFileInputStream checksumIn, Decompressor decompressor, int compressedLength)
throws IOException {
String bufferSizeProp = TezRuntimeUtils.getBufferSizeProperty(codec);
Configurable configurableCodec = (Configurable) codec;
int originalSize = bufferSizeProp == null ? DEFAULT_BUFFER_SIZE :
configurableCodec.getConf().getInt(bufferSizeProp, DEFAULT_BUFFER_SIZE);

String bufferSizeProp = getBufferSizeProperty(codec);
CompressionInputStream in = null;

if (bufferSizeProp != null) {
Configurable configurableCodec = (Configurable) codec;
Configuration conf = configurableCodec.getConf();
int newBufSize = Math.min(compressedLength, DEFAULT_BUFFER_SIZE);
LOG.trace("buffer size was set according to min(compressedLength, {}): {}={}",
DEFAULT_BUFFER_SIZE, bufferSizeProp, newBufSize);

synchronized (codec) {
synchronized (conf) {
int defaultBufferSize = getDefaultBufferSize(conf, codec);
int originalSize = conf.getInt(bufferSizeProp, defaultBufferSize);

int newBufSize = Math.min(compressedLength, defaultBufferSize);
LOG.debug("buffer size was set according to min({}, {}) => {}={}", compressedLength,
defaultBufferSize, bufferSizeProp, newBufSize);

conf.setInt(bufferSizeProp, newBufSize);

in = codec.createInputStream(checksumIn, decompressor);
Expand Down Expand Up @@ -117,12 +124,86 @@ public static InputStream getDecompressedInputStreamWithBufferSize(CompressionCo
* issues above for Compressor instances as well, even when we tried to leverage from
* smaller buffer size only on decompression paths.
*/
configurableCodec.getConf().setInt(bufferSizeProp, originalSize);
conf.setInt(bufferSizeProp, originalSize);
}
} else {
in = codec.createInputStream(checksumIn, decompressor);
}

return in;
}

public static Compressor getCompressor(CompressionCodec codec) {
synchronized (((Configurable) codec).getConf()) {
return CodecPool.getCompressor(codec);
}
}

public static Decompressor getDecompressor(CompressionCodec codec) {
synchronized (((Configurable) codec).getConf()) {
return CodecPool.getDecompressor(codec);
}
}

public static CompressionInputStream createInputStream(CompressionCodec codec,
InputStream checksumIn, Decompressor decompressor) throws IOException {
synchronized (((Configurable) codec).getConf()) {
return codec.createInputStream(checksumIn, decompressor);
}
}

public static CompressionOutputStream createOutputStream(CompressionCodec codec,
OutputStream checksumOut, Compressor compressor) throws IOException {
synchronized (((Configurable) codec).getConf()) {
return codec.createOutputStream(checksumOut, compressor);
}
}

public static String getBufferSizeProperty(CompressionCodec codec) {
return getBufferSizeProperty(codec.getClass().getName());
}

public static String getBufferSizeProperty(String codecClassName) {
switch (codecClassName) {
case "org.apache.hadoop.io.compress.DefaultCodec":
case "org.apache.hadoop.io.compress.BZip2Codec":
case "org.apache.hadoop.io.compress.GzipCodec":
return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.ZStandardCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
case "org.apache.hadoop.io.compress.LzoCodec":
case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY;
case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY;
default:
return null;
}
}

public static int getDefaultBufferSize(Configuration conf, CompressionCodec codec) {
return getDefaultBufferSize(conf, codec.getClass().getName());
}

public static int getDefaultBufferSize(Configuration conf, String codecClassName) {
switch (codecClassName) {
case "org.apache.hadoop.io.compress.DefaultCodec":
case "org.apache.hadoop.io.compress.BZip2Codec":
case "org.apache.hadoop.io.compress.GzipCodec":
return CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
case "org.apache.hadoop.io.compress.SnappyCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT;
case "org.apache.hadoop.io.compress.ZStandardCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT;
case "org.apache.hadoop.io.compress.LzoCodec":
case "com.hadoop.compression.lzo.LzoCodec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT;
case "org.apache.hadoop.io.compress.Lz4Codec":
return CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT;
default:
return DEFAULT_BUFFER_SIZE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;

import com.google.common.annotations.VisibleForTesting;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -33,7 +37,10 @@
/**
* A dummy codec. It passes everything to underlying stream
*/
public class DummyCompressionCodec implements CompressionCodec {
public class DummyCompressionCodec implements CompressionCodec, Configurable {
@VisibleForTesting
int createInputStreamCalled = 0;
private Configuration conf;

@Override
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
Expand Down Expand Up @@ -62,6 +69,7 @@ public CompressionInputStream createInputStream(InputStream in) throws IOExcepti

@Override
public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException {
createInputStreamCalled += 1;
return new DummyCompressionInputStream(in);
}

Expand Down Expand Up @@ -128,4 +136,14 @@ public void resetState() throws IOException {
//no-op
}
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
}

@Override
public Configuration getConf() {
return conf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand All @@ -37,7 +36,6 @@

import com.google.common.collect.Sets;

import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -273,7 +271,8 @@ public void testDiskMergeWithCodec() throws Throwable {
InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());

// Create a mock compressor. We will check if it is used.
CompressionCodec dummyCodec = spy(new DummyCompressionCodec());
DummyCompressionCodec dummyCodec = new DummyCompressionCodec();
dummyCodec.setConf(conf);

MergeManager mergeManager =
new MergeManager(conf, localFs, localDirAllocator, inputContext, null, null, null, null,
Expand Down Expand Up @@ -312,7 +311,7 @@ public void testDiskMergeWithCodec() throws Throwable {
mo4.commit();

mergeManager.close(true);
verify(dummyCodec, atLeastOnce()).createOutputStream(any(), any());
Assert.assertTrue(dummyCodec.createInputStreamCalled > 0);
}

@Test(timeout = 60000l)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
Expand All @@ -66,6 +65,7 @@
import org.apache.tez.runtime.library.testutils.KVDataGen;
import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
import org.apache.tez.runtime.library.utils.BufferUtils;
import org.apache.tez.runtime.library.utils.CodecUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -734,23 +734,23 @@ public void testReadToDisk() throws IOException {
public void testInMemoryBufferSize() throws IOException {
Configurable configurableCodec = (Configurable) codec;
int originalCodecBufferSize =
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), -1);
configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), -1);

// for smaller amount of data, codec buffer should be sized according to compressed data length
List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
Writer writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);

Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));

// buffer size cannot grow infinitely with compressed data size
data = KVDataGen.generateTestDataOfKeySize(false, 20000, rnd.nextInt(100));
writer = writeTestFile(false, false, data, codec);
readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);

Assert.assertEquals(originalCodecBufferSize, // original size is repaired
configurableCodec.getConf().getInt(TezRuntimeUtils.getBufferSizeProperty(codec), 0));
configurableCodec.getConf().getInt(CodecUtils.getBufferSizeProperty(codec), 0));
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -766,7 +766,7 @@ private void tryWriteFileWithBufferSize(int bufferSize, String codecClassName)
Configuration conf = new Configuration();

System.out.println("trying with buffer size: " + bufferSize);
conf.set(TezRuntimeUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
conf.set(CodecUtils.getBufferSizeProperty(codecClassName), Integer.toString(bufferSize));
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codecToTest =
codecFactory.getCodecByClassName(codecClassName);
Expand Down
Loading