From 74bdf21eb4fb96de06e517fc0436d2d9277155a8 Mon Sep 17 00:00:00 2001 From: "kedia,Akanksha" Date: Tue, 17 Oct 2023 17:40:12 +0530 Subject: [PATCH] Update Avro version 1.11.3 --- presto-bigquery/pom.xml | 8 +- presto-kafka/pom.xml | 8 +- presto-record-decoder/pom.xml | 8 +- .../presto/decoder/avro/TestAvroDecoder.java | 162 +++++++++++------- 4 files changed, 117 insertions(+), 69 deletions(-) diff --git a/presto-bigquery/pom.xml b/presto-bigquery/pom.xml index d52cfb3021d12..41796cb3cd9ec 100644 --- a/presto-bigquery/pom.xml +++ b/presto-bigquery/pom.xml @@ -207,7 +207,13 @@ org.apache.avro avro - 1.9.2 + 1.11.3 + + + org.slf4j + slf4j-api + + diff --git a/presto-kafka/pom.xml b/presto-kafka/pom.xml index effe35caa6d00..902b26fa799b3 100644 --- a/presto-kafka/pom.xml +++ b/presto-kafka/pom.xml @@ -15,7 +15,7 @@ ${project.parent.basedir} 2.12.2 - 1.9.0 + 1.11.3 @@ -94,6 +94,12 @@ org.apache.avro avro ${dep.avro.version} + + + org.slf4j + slf4j-api + + diff --git a/presto-record-decoder/pom.xml b/presto-record-decoder/pom.xml index b3cf6f2ab7c88..00f8eafb90703 100644 --- a/presto-record-decoder/pom.xml +++ b/presto-record-decoder/pom.xml @@ -54,7 +54,13 @@ org.apache.avro avro - 1.8.1 + 1.11.3 + + + org.slf4j + slf4j-api + + diff --git a/presto-record-decoder/src/test/java/com/facebook/presto/decoder/avro/TestAvroDecoder.java b/presto-record-decoder/src/test/java/com/facebook/presto/decoder/avro/TestAvroDecoder.java index 40db5d3dcabfc..e9aa0639ec38c 100644 --- a/presto-record-decoder/src/test/java/com/facebook/presto/decoder/avro/TestAvroDecoder.java +++ b/presto-record-decoder/src/test/java/com/facebook/presto/decoder/avro/TestAvroDecoder.java @@ -48,7 +48,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.BooleanType.BOOLEAN; @@ -81,35 +80,55 @@ public class TestAvroDecoder private static final Type DOUBLE_MAP_TYPE = FUNCTION_AND_TYPE_MANAGER.getType(parseTypeSignature("map")); private static final Type REAL_MAP_TYPE = FUNCTION_AND_TYPE_MANAGER.getType(parseTypeSignature("map")); - private static String getAvroSchema(String name, String dataType) + private static Schema getAvroSchema(String name, String dataType) { return getAvroSchema(ImmutableMap.of(name, dataType)); } - private static String getAvroSchema(Map fields) - { - String fieldSchema = fields.entrySet().stream() - .map(entry -> "{\"name\": \"" + entry.getKey() + "\",\"type\": " + entry.getValue() + ",\"default\": null}") - .collect(Collectors.joining(",")); - - return "{\"type\" : \"record\"," + - " \"name\" : \"test_schema\"," + - " \"namespace\" : \"com.facebook.presto.decoder.avro\"," + - " \"fields\" :" + - " [" + - fieldSchema + - " ]}"; + private static Schema getAvroSchema(Map fields) + { + Schema.Parser parser = new Schema.Parser(); + SchemaBuilder.FieldAssembler fieldAssembler = getFieldBuilder(); + for (Map.Entry field : fields.entrySet()) { + SchemaBuilder.FieldBuilder fieldBuilder = fieldAssembler.name(field.getKey()); + Schema fieldSchema = parser.parse(field.getValue()); + SchemaBuilder.GenericDefault genericDefault = fieldBuilder.type(fieldSchema); + switch (fieldSchema.getType()) { + case ARRAY: + genericDefault.withDefault(ImmutableList.of()); + break; + case MAP: + genericDefault.withDefault(ImmutableMap.of()); + break; + case UNION: + if (fieldSchema.getTypes().stream() + .map(Schema::getType) + .anyMatch(Schema.Type.NULL::equals)) { + genericDefault.withDefault(null); + } + else { + genericDefault.noDefault(); + } + break; + case NULL: + genericDefault.withDefault(null); + break; + default: + genericDefault.noDefault(); + } + } + return fieldAssembler.endRecord(); } private Map buildAndDecodeColumns(Set columns, Map fieldSchema, Map fieldValue) { - String schema = getAvroSchema(fieldSchema); - byte[] avroData = buildAvroData(new Schema.Parser().parse(schema), fieldValue); + Schema schema = getAvroSchema(fieldSchema); + byte[] avroData = buildAvroData(schema, fieldValue); return decodeRow( avroData, columns, - ImmutableMap.of(DATA_SCHEMA, schema)); + ImmutableMap.of(DATA_SCHEMA, schema.toString())); } private Map buildAndDecodeColumn(DecoderTestColumnHandle column, String columnName, String columnType, Object actualValue) @@ -146,12 +165,9 @@ private static GenericData.Record buildAvroRecord(Schema schema, ByteArrayOutput { GenericData.Record record = new GenericData.Record(schema); values.forEach(record::put); - try { - DataFileWriter dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema)); - + try (DataFileWriter dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema))) { dataFileWriter.create(schema, outputStream); dataFileWriter.append(record); - dataFileWriter.close(); } catch (IOException e) { throw new RuntimeException("Failed to convert to Avro.", e); @@ -177,12 +193,14 @@ public void testSchemaEvolutionAddingColumn() DecoderTestColumnHandle newlyAddedColumn = new DecoderTestColumnHandle(1, "row1", VARCHAR, "string_field_added", null, null, false, false, false); // the decoded avro data file does not have string_field_added - byte[] originalData = buildAvroData(new Schema.Parser().parse( - getAvroSchema("string_field", "\"string\"")), + byte[] originalData = buildAvroData(getFieldBuilder() + .name("string_field").type().stringType().noDefault() + .endRecord(), "string_field", "string_field_value"); - String addedColumnSchema = getAvroSchema(ImmutableMap.of( - "string_field", "\"string\"", - "string_field_added", "[\"null\", \"string\"]")); + String addedColumnSchema = getFieldBuilder() + .name("string_field").type().stringType().noDefault() + .name("string_field_added").type().optional().stringType() + .endRecord().toString(); Map decodedRow = decodeRow( originalData, ImmutableSet.of(originalColumn, newlyAddedColumn), @@ -216,12 +234,16 @@ public void testEnumDecodedAsVarchar() public void testSchemaEvolutionRenamingColumn() throws Exception { - byte[] originalData = buildAvroData(new Schema.Parser().parse( - getAvroSchema("string_field", "\"string\"")), + byte[] originalData = buildAvroData(getFieldBuilder() + .name("string_field").type().stringType().noDefault() + .endRecord(), "string_field", "string_field_value"); DecoderTestColumnHandle renamedColumn = new DecoderTestColumnHandle(0, "row0", VARCHAR, "string_field_renamed", null, null, false, false, false); - String renamedColumnSchema = getAvroSchema("string_field_renamed", "[\"null\", \"string\"]"); + String renamedColumnSchema = getFieldBuilder() + .name("string_field_renamed").type().optional().stringType() + .endRecord() + .toString(); Map decodedEvolvedRow = decodeRow( originalData, ImmutableSet.of(renamedColumn), @@ -235,16 +257,19 @@ public void testSchemaEvolutionRenamingColumn() public void testSchemaEvolutionRemovingColumn() throws Exception { - byte[] originalData = buildAvroData(new Schema.Parser().parse( - getAvroSchema(ImmutableMap.of( - "string_field", "\"string\"", - "string_field_to_be_removed", "[\"null\", \"string\"]"))), + byte[] originalData = buildAvroData(getFieldBuilder() + .name("string_field").type().stringType().noDefault() + .name("string_field_to_be_removed").type().optional().stringType() + .endRecord(), ImmutableMap.of( "string_field", "string_field_value", "string_field_to_be_removed", "removed_field_value")); DecoderTestColumnHandle evolvedColumn = new DecoderTestColumnHandle(0, "row0", VARCHAR, "string_field", null, null, false, false, false); - String removedColumnSchema = getAvroSchema("string_field", "\"string\""); + String removedColumnSchema = getFieldBuilder() + .name("string_field").type().stringType().noDefault() + .endRecord() + .toString(); Map decodedEvolvedRow = decodeRow( originalData, ImmutableSet.of(evolvedColumn), @@ -258,12 +283,16 @@ public void testSchemaEvolutionRemovingColumn() public void testSchemaEvolutionIntToLong() throws Exception { - byte[] originalIntData = buildAvroData(new Schema.Parser().parse( - getAvroSchema("int_to_long_field", "\"int\"")), + byte[] originalIntData = buildAvroData(getFieldBuilder() + .name("int_to_long_field").type().intType().noDefault() + .endRecord(), "int_to_long_field", 100); DecoderTestColumnHandle longColumnReadingIntData = new DecoderTestColumnHandle(0, "row0", BIGINT, "int_to_long_field", null, null, false, false, false); - String changedTypeSchema = getAvroSchema("int_to_long_field", "\"long\""); + String changedTypeSchema = getFieldBuilder() + .name("int_to_long_field").type().longType().noDefault() + .endRecord() + .toString(); Map decodedEvolvedRow = decodeRow( originalIntData, ImmutableSet.of(longColumnReadingIntData), @@ -277,12 +306,16 @@ public void testSchemaEvolutionIntToLong() public void testSchemaEvolutionIntToDouble() throws Exception { - byte[] originalIntData = buildAvroData(new Schema.Parser().parse( - getAvroSchema("int_to_double_field", "\"int\"")), + byte[] originalIntData = buildAvroData(getFieldBuilder() + .name("int_to_double_field").type().intType().noDefault() + .endRecord(), "int_to_double_field", 100); DecoderTestColumnHandle doubleColumnReadingIntData = new DecoderTestColumnHandle(0, "row0", DOUBLE, "int_to_double_field", null, null, false, false, false); - String changedTypeSchema = getAvroSchema("int_to_double_field", "\"double\""); + String changedTypeSchema = getFieldBuilder() + .name("int_to_double_field").type().doubleType().noDefault() + .endRecord() + .toString(); Map decodedEvolvedRow = decodeRow( originalIntData, ImmutableSet.of(doubleColumnReadingIntData), @@ -296,12 +329,16 @@ public void testSchemaEvolutionIntToDouble() public void testSchemaEvolutionToIncompatibleType() throws Exception { - byte[] originalIntData = buildAvroData(new Schema.Parser().parse( - getAvroSchema("int_to_string_field", "\"int\"")), + byte[] originalIntData = buildAvroData(getFieldBuilder() + .name("int_to_string_field").type().intType().noDefault() + .endRecord(), "int_to_string_field", 100); DecoderTestColumnHandle stringColumnReadingIntData = new DecoderTestColumnHandle(0, "row0", VARCHAR, "int_to_string_field", null, null, false, false, false); - String changedTypeSchema = getAvroSchema("int_to_string_field", "\"string\""); + String changedTypeSchema = getFieldBuilder() + .name("int_to_string_field").type().stringType().noDefault() + .endRecord() + .toString(); assertThatThrownBy(() -> decodeRow(originalIntData, ImmutableSet.of(stringColumnReadingIntData), ImmutableMap.of(DATA_SCHEMA, changedTypeSchema))) .isInstanceOf(PrestoException.class) @@ -465,26 +502,6 @@ public void testNestedRecord() checkValue(decodedRow, row, 98247748); } - @Test - public void testNonExistentFieldsAreNull() - throws Exception - { - DecoderTestColumnHandle row1 = new DecoderTestColumnHandle(0, "row1", createVarcharType(100), "very/deep/varchar", null, null, false, false, false); - DecoderTestColumnHandle row2 = new DecoderTestColumnHandle(1, "row2", BIGINT, "no_bigint", null, null, false, false, false); - DecoderTestColumnHandle row3 = new DecoderTestColumnHandle(2, "row3", DOUBLE, "double_record/is_missing", null, null, false, false, false); - DecoderTestColumnHandle row4 = new DecoderTestColumnHandle(3, "row4", BOOLEAN, "hello", null, null, false, false, false); - - Map decodedRow1 = buildAndDecodeColumn(row1, "dummy", "\"long\"", 0L); - Map decodedRow2 = buildAndDecodeColumn(row2, "dummy", "\"long\"", 0L); - Map decodedRow3 = buildAndDecodeColumn(row3, "dummy", "\"long\"", 0L); - Map decodedRow4 = buildAndDecodeColumn(row4, "dummy", "\"long\"", 0L); - - checkIsNull(decodedRow1, row1); - checkIsNull(decodedRow2, row2); - checkIsNull(decodedRow3, row3); - checkIsNull(decodedRow4, row4); - } - @Test public void testRuntimeDecodingFailure() { @@ -632,15 +649,28 @@ private void assertUnsupportedColumnTypeException(ThrowableAssert.ThrowingCallab .hasMessageMatching("Unsupported column type .* for column .*"); } + private static SchemaBuilder.FieldAssembler getFieldBuilder() + { + return SchemaBuilder.record("test_schema") + .namespace("com.facebook.presto.decoder.avro") + .fields(); + } + private void singleColumnDecoder(Type columnType) { - String someSchema = getAvroSchema("dummy", "\"long\""); + String someSchema = getFieldBuilder() + .name("dummy").type().longType().noDefault() + .endRecord() + .toString(); DECODER_FACTORY.create(ImmutableMap.of(DATA_SCHEMA, someSchema), ImmutableSet.of(new DecoderTestColumnHandle(0, "some_column", columnType, "0", null, null, false, false, false))); } private void singleColumnDecoder(Type columnType, String mapping, String dataFormat, String formatHint, boolean keyDecoder, boolean hidden, boolean internal) { - String someSchema = getAvroSchema("dummy", "\"long\""); + String someSchema = getFieldBuilder() + .name("dummy").type().longType().noDefault() + .endRecord() + .toString(); DECODER_FACTORY.create(ImmutableMap.of(DATA_SCHEMA, someSchema), ImmutableSet.of(new DecoderTestColumnHandle(0, "some_column", columnType, mapping, dataFormat, formatHint, keyDecoder, hidden, internal))); } }