Skip to content

Commit

Permalink
TEZ-4309: TezUtils.addToConfFromByteString throws com.google.protobuf…
Browse files Browse the repository at this point in the history
….CodedInputStream exception (Ramesh Kumar Thangarajan via László Bodor)

Signed-off-by: Laszlo Bodor <[email protected]>
  • Loading branch information
ramesh0201 authored and abstractdog committed May 20, 2021
1 parent 4d37914 commit 0af54df
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
14 changes: 9 additions & 5 deletions tez-api/src/main/java/org/apache/tez/common/TezUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public static UserPayload createUserPayloadFromConf(Configuration conf) throws I
return UserPayload.create(ByteBuffer.wrap(createByteStringFromConf(conf).toByteArray()));
}

private static DAGProtos.ConfigurationProto createConfProto(SnappyInputStream uncompressIs) throws IOException {
CodedInputStream in = CodedInputStream.newInstance(uncompressIs);
in.setSizeLimit(Integer.MAX_VALUE);
return DAGProtos.ConfigurationProto.parseFrom(in);
}

/**
* Convert a byte string to a Configuration object
*
Expand All @@ -112,9 +118,7 @@ public static UserPayload createUserPayloadFromConf(Configuration conf) throws I
public static Configuration createConfFromByteString(ByteString byteString) throws IOException {
Objects.requireNonNull(byteString, "ByteString must be specified");
try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput());) {
CodedInputStream in = CodedInputStream.newInstance(uncompressIs);
in.setSizeLimit(Integer.MAX_VALUE);
DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(in);
DAGProtos.ConfigurationProto confProto = createConfProto(uncompressIs);
Configuration conf = new Configuration(false);
readConfFromPB(confProto, conf);
TezClassLoader.setupForConfiguration(conf);
Expand All @@ -129,7 +133,7 @@ public static Configuration createConfFromBaseConfAndPayload(TaskContext context
UserPayload payload = context.getUserPayload();
ByteString byteString = ByteString.copyFrom(payload.getPayload());
try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput())) {
DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(uncompressIs);
DAGProtos.ConfigurationProto confProto = createConfProto(uncompressIs);
readConfFromPB(confProto, configuration);
TezClassLoader.setupForConfiguration(configuration);
return configuration;
Expand All @@ -139,7 +143,7 @@ public static Configuration createConfFromBaseConfAndPayload(TaskContext context
public static void addToConfFromByteString(Configuration configuration, ByteString byteString)
throws IOException {
try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput())) {
DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(uncompressIs);
DAGProtos.ConfigurationProto confProto = createConfProto(uncompressIs);
readConfFromPB(confProto, configuration);
TezClassLoader.setupForConfiguration(configuration);
}
Expand Down
31 changes: 26 additions & 5 deletions tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,10 @@ public void testByteStringToAndFromConf() throws IOException {
checkConf(conf);
}

@Test (timeout=20000)
public void testByteStringToAndFromLargeConf() throws IOException {
Configuration conf = getConf();
private String constructLargeValue() {
int largeSizeMinimum = 64 * 1024 * 1024;
final String alphaString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
int largeSize = (largeSizeMinimum + alphaString.length() - 1) / alphaString.length();

largeSize *= alphaString.length();
assertTrue(largeSize >= alphaString.length());
StringBuilder sb = new StringBuilder(largeSize);
Expand All @@ -71,9 +68,20 @@ public void testByteStringToAndFromLargeConf() throws IOException {

String largeValue = sb.toString();
Assert.assertEquals(largeSize, largeValue.length());
return largeValue;
}

private ByteString createByteString(Configuration conf, String largeValue) throws IOException {
conf.set("testLargeValue", largeValue);
Assert.assertEquals(conf.size(), 7);
ByteString bsConf = TezUtils.createByteStringFromConf(conf);
return TezUtils.createByteStringFromConf(conf);
}

@Test (timeout=20000)
public void testByteStringToAndFromLargeConf() throws IOException {
Configuration conf = getConf();
String largeValue = constructLargeValue();
ByteString bsConf = createByteString(conf, largeValue);
conf.clear();
Assert.assertEquals(conf.size(), 0);
conf = TezUtils.createConfFromByteString(bsConf);
Expand All @@ -82,6 +90,19 @@ public void testByteStringToAndFromLargeConf() throws IOException {
Assert.assertEquals(conf.get("testLargeValue"), largeValue);
}

@Test (timeout=20000)
public void testByteStringAddToLargeConf() throws IOException {
Configuration conf = getConf();
String largeValue = constructLargeValue();
ByteString bsConf = createByteString(conf, largeValue);
conf.clear();
Assert.assertEquals(conf.size(), 0);
TezUtils.addToConfFromByteString(conf, bsConf);
Assert.assertEquals(conf.size(), 7);
checkConf(conf);
Assert.assertEquals(conf.get("testLargeValue"), largeValue);
}

@Test (timeout=2000)
public void testPayloadToAndFromConf() throws IOException {
Configuration conf = getConf();
Expand Down

0 comments on commit 0af54df

Please sign in to comment.