Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Avro version 1.11.3 #21174

Merged
merged 1 commit into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion presto-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,13 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.2</version>
<version>1.11.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Presto SPI -->
Expand Down
8 changes: 7 additions & 1 deletion presto-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<scala.version>2.12.2</scala.version>
<dep.avro.version>1.9.0</dep.avro.version>
<dep.avro.version>1.11.3</dep.avro.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -94,6 +94,12 @@
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${dep.avro.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Presto SPI -->
Expand Down
8 changes: 7 additions & 1 deletion presto-record-decoder/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
<version>1.11.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
Expand Down Expand Up @@ -81,35 +80,55 @@ public class TestAvroDecoder
private static final Type DOUBLE_MAP_TYPE = FUNCTION_AND_TYPE_MANAGER.getType(parseTypeSignature("map<varchar,double>"));
private static final Type REAL_MAP_TYPE = FUNCTION_AND_TYPE_MANAGER.getType(parseTypeSignature("map<varchar,real>"));

private static String getAvroSchema(String name, String dataType)
private static Schema getAvroSchema(String name, String dataType)
{
return getAvroSchema(ImmutableMap.of(name, dataType));
}

private static String getAvroSchema(Map<String, String> fields)
{
String fieldSchema = fields.entrySet().stream()
.map(entry -> "{\"name\": \"" + entry.getKey() + "\",\"type\": " + entry.getValue() + ",\"default\": null}")
.collect(Collectors.joining(","));

return "{\"type\" : \"record\"," +
" \"name\" : \"test_schema\"," +
" \"namespace\" : \"com.facebook.presto.decoder.avro\"," +
" \"fields\" :" +
" [" +
fieldSchema +
" ]}";
private static Schema getAvroSchema(Map<String, String> fields)
{
Schema.Parser parser = new Schema.Parser();
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = getFieldBuilder();
for (Map.Entry<String, String> field : fields.entrySet()) {
SchemaBuilder.FieldBuilder<Schema> fieldBuilder = fieldAssembler.name(field.getKey());
Schema fieldSchema = parser.parse(field.getValue());
SchemaBuilder.GenericDefault<Schema> genericDefault = fieldBuilder.type(fieldSchema);
switch (fieldSchema.getType()) {
case ARRAY:
genericDefault.withDefault(ImmutableList.of());
break;
case MAP:
genericDefault.withDefault(ImmutableMap.of());
break;
case UNION:
if (fieldSchema.getTypes().stream()
.map(Schema::getType)
.anyMatch(Schema.Type.NULL::equals)) {
genericDefault.withDefault(null);
}
else {
genericDefault.noDefault();
}
break;
case NULL:
genericDefault.withDefault(null);
break;
default:
genericDefault.noDefault();
}
}
return fieldAssembler.endRecord();
}

private Map<DecoderColumnHandle, FieldValueProvider> buildAndDecodeColumns(Set<DecoderColumnHandle> columns, Map<String, String> fieldSchema, Map<String, Object> fieldValue)
{
String schema = getAvroSchema(fieldSchema);
byte[] avroData = buildAvroData(new Schema.Parser().parse(schema), fieldValue);
Schema schema = getAvroSchema(fieldSchema);
byte[] avroData = buildAvroData(schema, fieldValue);

return decodeRow(
avroData,
columns,
ImmutableMap.of(DATA_SCHEMA, schema));
ImmutableMap.of(DATA_SCHEMA, schema.toString()));
}

private Map<DecoderColumnHandle, FieldValueProvider> buildAndDecodeColumn(DecoderTestColumnHandle column, String columnName, String columnType, Object actualValue)
Expand Down Expand Up @@ -146,12 +165,9 @@ private static GenericData.Record buildAvroRecord(Schema schema, ByteArrayOutput
{
GenericData.Record record = new GenericData.Record(schema);
values.forEach(record::put);
try {
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema));

try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
dataFileWriter.create(schema, outputStream);
dataFileWriter.append(record);
dataFileWriter.close();
}
catch (IOException e) {
throw new RuntimeException("Failed to convert to Avro.", e);
Expand All @@ -177,12 +193,14 @@ public void testSchemaEvolutionAddingColumn()
DecoderTestColumnHandle newlyAddedColumn = new DecoderTestColumnHandle(1, "row1", VARCHAR, "string_field_added", null, null, false, false, false);

// the decoded avro data file does not have string_field_added
byte[] originalData = buildAvroData(new Schema.Parser().parse(
getAvroSchema("string_field", "\"string\"")),
byte[] originalData = buildAvroData(getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.endRecord(),
"string_field", "string_field_value");
String addedColumnSchema = getAvroSchema(ImmutableMap.of(
"string_field", "\"string\"",
"string_field_added", "[\"null\", \"string\"]"));
String addedColumnSchema = getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.name("string_field_added").type().optional().stringType()
.endRecord().toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedRow = decodeRow(
originalData,
ImmutableSet.of(originalColumn, newlyAddedColumn),
Expand Down Expand Up @@ -216,12 +234,16 @@ public void testEnumDecodedAsVarchar()
public void testSchemaEvolutionRenamingColumn()
throws Exception
{
byte[] originalData = buildAvroData(new Schema.Parser().parse(
getAvroSchema("string_field", "\"string\"")),
byte[] originalData = buildAvroData(getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.endRecord(),
"string_field", "string_field_value");

DecoderTestColumnHandle renamedColumn = new DecoderTestColumnHandle(0, "row0", VARCHAR, "string_field_renamed", null, null, false, false, false);
String renamedColumnSchema = getAvroSchema("string_field_renamed", "[\"null\", \"string\"]");
String renamedColumnSchema = getFieldBuilder()
.name("string_field_renamed").type().optional().stringType()
.endRecord()
.toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedEvolvedRow = decodeRow(
originalData,
ImmutableSet.of(renamedColumn),
Expand All @@ -235,16 +257,19 @@ public void testSchemaEvolutionRenamingColumn()
public void testSchemaEvolutionRemovingColumn()
throws Exception
{
byte[] originalData = buildAvroData(new Schema.Parser().parse(
getAvroSchema(ImmutableMap.of(
"string_field", "\"string\"",
"string_field_to_be_removed", "[\"null\", \"string\"]"))),
byte[] originalData = buildAvroData(getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.name("string_field_to_be_removed").type().optional().stringType()
.endRecord(),
ImmutableMap.of(
"string_field", "string_field_value",
"string_field_to_be_removed", "removed_field_value"));

DecoderTestColumnHandle evolvedColumn = new DecoderTestColumnHandle(0, "row0", VARCHAR, "string_field", null, null, false, false, false);
String removedColumnSchema = getAvroSchema("string_field", "\"string\"");
String removedColumnSchema = getFieldBuilder()
.name("string_field").type().stringType().noDefault()
.endRecord()
.toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedEvolvedRow = decodeRow(
originalData,
ImmutableSet.of(evolvedColumn),
Expand All @@ -258,12 +283,16 @@ public void testSchemaEvolutionRemovingColumn()
public void testSchemaEvolutionIntToLong()
throws Exception
{
byte[] originalIntData = buildAvroData(new Schema.Parser().parse(
getAvroSchema("int_to_long_field", "\"int\"")),
byte[] originalIntData = buildAvroData(getFieldBuilder()
.name("int_to_long_field").type().intType().noDefault()
.endRecord(),
"int_to_long_field", 100);

DecoderTestColumnHandle longColumnReadingIntData = new DecoderTestColumnHandle(0, "row0", BIGINT, "int_to_long_field", null, null, false, false, false);
String changedTypeSchema = getAvroSchema("int_to_long_field", "\"long\"");
String changedTypeSchema = getFieldBuilder()
.name("int_to_long_field").type().longType().noDefault()
.endRecord()
.toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedEvolvedRow = decodeRow(
originalIntData,
ImmutableSet.of(longColumnReadingIntData),
Expand All @@ -277,12 +306,16 @@ public void testSchemaEvolutionIntToLong()
public void testSchemaEvolutionIntToDouble()
throws Exception
{
byte[] originalIntData = buildAvroData(new Schema.Parser().parse(
getAvroSchema("int_to_double_field", "\"int\"")),
byte[] originalIntData = buildAvroData(getFieldBuilder()
.name("int_to_double_field").type().intType().noDefault()
.endRecord(),
"int_to_double_field", 100);

DecoderTestColumnHandle doubleColumnReadingIntData = new DecoderTestColumnHandle(0, "row0", DOUBLE, "int_to_double_field", null, null, false, false, false);
String changedTypeSchema = getAvroSchema("int_to_double_field", "\"double\"");
String changedTypeSchema = getFieldBuilder()
.name("int_to_double_field").type().doubleType().noDefault()
.endRecord()
.toString();
Map<DecoderColumnHandle, FieldValueProvider> decodedEvolvedRow = decodeRow(
originalIntData,
ImmutableSet.of(doubleColumnReadingIntData),
Expand All @@ -296,12 +329,16 @@ public void testSchemaEvolutionIntToDouble()
public void testSchemaEvolutionToIncompatibleType()
throws Exception
{
byte[] originalIntData = buildAvroData(new Schema.Parser().parse(
getAvroSchema("int_to_string_field", "\"int\"")),
byte[] originalIntData = buildAvroData(getFieldBuilder()
.name("int_to_string_field").type().intType().noDefault()
.endRecord(),
"int_to_string_field", 100);

DecoderTestColumnHandle stringColumnReadingIntData = new DecoderTestColumnHandle(0, "row0", VARCHAR, "int_to_string_field", null, null, false, false, false);
String changedTypeSchema = getAvroSchema("int_to_string_field", "\"string\"");
String changedTypeSchema = getFieldBuilder()
.name("int_to_string_field").type().stringType().noDefault()
.endRecord()
.toString();

assertThatThrownBy(() -> decodeRow(originalIntData, ImmutableSet.of(stringColumnReadingIntData), ImmutableMap.of(DATA_SCHEMA, changedTypeSchema)))
.isInstanceOf(PrestoException.class)
Expand Down Expand Up @@ -465,26 +502,6 @@ public void testNestedRecord()
checkValue(decodedRow, row, 98247748);
}

@Test
public void testNonExistentFieldsAreNull()
throws Exception
{
DecoderTestColumnHandle row1 = new DecoderTestColumnHandle(0, "row1", createVarcharType(100), "very/deep/varchar", null, null, false, false, false);
DecoderTestColumnHandle row2 = new DecoderTestColumnHandle(1, "row2", BIGINT, "no_bigint", null, null, false, false, false);
DecoderTestColumnHandle row3 = new DecoderTestColumnHandle(2, "row3", DOUBLE, "double_record/is_missing", null, null, false, false, false);
DecoderTestColumnHandle row4 = new DecoderTestColumnHandle(3, "row4", BOOLEAN, "hello", null, null, false, false, false);

Map<DecoderColumnHandle, FieldValueProvider> decodedRow1 = buildAndDecodeColumn(row1, "dummy", "\"long\"", 0L);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow2 = buildAndDecodeColumn(row2, "dummy", "\"long\"", 0L);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow3 = buildAndDecodeColumn(row3, "dummy", "\"long\"", 0L);
Map<DecoderColumnHandle, FieldValueProvider> decodedRow4 = buildAndDecodeColumn(row4, "dummy", "\"long\"", 0L);

checkIsNull(decodedRow1, row1);
checkIsNull(decodedRow2, row2);
checkIsNull(decodedRow3, row3);
checkIsNull(decodedRow4, row4);
}

@Test
public void testRuntimeDecodingFailure()
{
Expand Down Expand Up @@ -632,15 +649,28 @@ private void assertUnsupportedColumnTypeException(ThrowableAssert.ThrowingCallab
.hasMessageMatching("Unsupported column type .* for column .*");
}

private static SchemaBuilder.FieldAssembler<Schema> getFieldBuilder()
{
return SchemaBuilder.record("test_schema")
.namespace("com.facebook.presto.decoder.avro")
.fields();
}

private void singleColumnDecoder(Type columnType)
{
String someSchema = getAvroSchema("dummy", "\"long\"");
String someSchema = getFieldBuilder()
.name("dummy").type().longType().noDefault()
.endRecord()
.toString();
DECODER_FACTORY.create(ImmutableMap.of(DATA_SCHEMA, someSchema), ImmutableSet.of(new DecoderTestColumnHandle(0, "some_column", columnType, "0", null, null, false, false, false)));
}

private void singleColumnDecoder(Type columnType, String mapping, String dataFormat, String formatHint, boolean keyDecoder, boolean hidden, boolean internal)
{
String someSchema = getAvroSchema("dummy", "\"long\"");
String someSchema = getFieldBuilder()
.name("dummy").type().longType().noDefault()
.endRecord()
.toString();
DECODER_FACTORY.create(ImmutableMap.of(DATA_SCHEMA, someSchema), ImmutableSet.of(new DecoderTestColumnHandle(0, "some_column", columnType, mapping, dataFormat, formatHint, keyDecoder, hidden, internal)));
}
}