diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializer.java index fa39e588..9d6faf0e 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializer.java @@ -17,24 +17,21 @@ import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDataFormatDeserializer; import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration; import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerDataParser; -import com.amazonaws.services.schemaregistry.utils.AVROUtils; -import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import lombok.Builder; import lombok.Getter; import lombok.NonNull; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import org.apache.avro.specific.SpecificData; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificRecord; -import java.io.IOException; import java.nio.ByteBuffer; /** @@ -44,8 +41,9 @@ @Slf4j public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserializer { private static final GlueSchemaRegistryDeserializerDataParser DESERIALIZER_DATA_PARSER = - GlueSchemaRegistryDeserializerDataParser.getInstance(); - private static final AVROUtils AVRO_UTILS = AVROUtils.getInstance(); + GlueSchemaRegistryDeserializerDataParser.getInstance(); + //TODO: Make this configurable if requested by customers. + private static final long MAX_DATUM_READER_CACHE_SIZE = 100; @Getter @Setter @@ -53,6 +51,11 @@ public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserialize @Setter private AvroRecordType avroRecordType; + @NonNull + @Getter + @VisibleForTesting + protected final LoadingCache> datumReaderCache; + /** * Constructor accepting various dependencies. * @@ -62,6 +65,11 @@ public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserialize public AvroDeserializer(GlueSchemaRegistryConfiguration configs) { this.schemaRegistrySerDeConfigs = configs; this.avroRecordType = configs.getAvroRecordType(); + this.datumReaderCache = + CacheBuilder + .newBuilder() + .maximumSize(MAX_DATUM_READER_CACHE_SIZE) + .build(new DatumReaderCache()); } /** @@ -80,16 +88,16 @@ public Object deserialize(@NonNull ByteBuffer buffer, @NonNull String schema) { log.debug("Length of actual message: {}", data.length); - Schema schemaDefinition = AVRO_UTILS.parseSchema(schema); - DatumReader datumReader = createDatumReader(schemaDefinition); + DatumReader datumReader = datumReaderCache.get(schema); + BinaryDecoder binaryDecoder = getBinaryDecoder(data, 0, data.length); Object result = datumReader.read(null, binaryDecoder); log.debug("Finished de-serializing Avro message"); return result; - } catch (IOException | InstantiationException | IllegalAccessException e) { - String message = String.format("Exception occurred while de-serializing Avro message"); + } catch (Exception e) { + String message = "Exception occurred while de-serializing Avro message"; throw new AWSSchemaRegistryException(message, e); } } @@ -98,41 +106,10 @@ private BinaryDecoder getBinaryDecoder(byte[] data, int start, int end) { return DecoderFactory.get().binaryDecoder(data, start, end, null); } - /** - * This method is used to create Avro datum reader for deserialization. By - * default, it is GenericDatumReader; SpecificDatumReader will only be created - * if the user specifies. In this case, the program will check if the user have - * those specific code-generated schema class locally. ReaderSchema will be - * supplied if the user wants to use a specific schema to deserialize the - * message. (Compatibility check will be invoked) - * - * @param writerSchema schema that writes the Avro message - * @return Avro datum reader for de-serialization - * @throws InstantiationException can be thrown for readerClass.newInstance() - * from java.lang.Class implementation - * @throws IllegalAccessException can be thrown readerClass.newInstance() from - * java.lang.Class implementation - */ - public DatumReader createDatumReader(Schema writerSchema) - throws InstantiationException, IllegalAccessException { - - switch (this.avroRecordType) { - case SPECIFIC_RECORD: - @SuppressWarnings("unchecked") - Class readerClass = SpecificData.get().getClass(writerSchema); - - Schema readerSchema = readerClass.newInstance().getSchema(); - log.debug("Using SpecificDatumReader for de-serializing Avro message, schema: {})", readerSchema.toString()); - return new SpecificDatumReader<>(writerSchema, readerSchema); - - case GENERIC_RECORD: - log.debug("Using GenericDatumReader for de-serializing Avro message, schema: {})", writerSchema.toString()); - return new GenericDatumReader<>(writerSchema); - - default: - String message = String.format("Data Format in configuration is not supported, Data Format: %s ", - this.avroRecordType.getName()); - throw new UnsupportedOperationException(message); + private class DatumReaderCache extends CacheLoader> { + @Override + public DatumReader load(String schema) throws Exception { + return DatumReaderInstance.from(schema, avroRecordType); } } } diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/DatumReaderInstance.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/DatumReaderInstance.java new file mode 100644 index 00000000..718d7cd5 --- /dev/null +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/avro/DatumReaderInstance.java @@ -0,0 +1,58 @@ +package com.amazonaws.services.schemaregistry.deserializers.avro; + +import com.amazonaws.services.schemaregistry.utils.AVROUtils; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.DatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +@Slf4j +public class DatumReaderInstance { + private static final AVROUtils AVRO_UTILS = AVROUtils.getInstance(); + + /** + * This method is used to create Avro datum reader for deserialization. By + * default, it is GenericDatumReader; SpecificDatumReader will only be created + * if the user specifies. In this case, the program will check if the user have + * those specific code-generated schema class locally. ReaderSchema will be + * supplied if the user wants to use a specific schema to deserialize the + * message. (Compatibility check will be invoked) + * + * @param writerSchemaDefinition Avro record writer schema. + * @return Avro datum reader for de-serialization + * @throws InstantiationException can be thrown for readerClass.newInstance() + * from java.lang.Class implementation + * @throws IllegalAccessException can be thrown readerClass.newInstance() from + * java.lang.Class implementation + */ + public static DatumReader from(String writerSchemaDefinition, AvroRecordType avroRecordType) + throws InstantiationException, IllegalAccessException { + + Schema writerSchema = AVRO_UTILS.parseSchema(writerSchemaDefinition); + + switch (avroRecordType) { + case SPECIFIC_RECORD: + @SuppressWarnings("unchecked") + Class readerClass = SpecificData.get().getClass(writerSchema); + + Schema readerSchema = readerClass.newInstance().getSchema(); + log.debug("Using SpecificDatumReader for de-serializing Avro message, schema: {})", + readerSchema.toString()); + return new SpecificDatumReader<>(writerSchema, readerSchema); + + case GENERIC_RECORD: + log.debug("Using GenericDatumReader for de-serializing Avro message, schema: {})", + writerSchema.toString()); + return new GenericDatumReader<>(writerSchema); + + default: + String message = String.format("Unsupported AvroRecordType: %s", + avroRecordType.getName()); + throw new UnsupportedOperationException(message); + } + } +} diff --git a/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializerTest.java b/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializerTest.java index a6a0cb73..a12e5b91 100644 --- a/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializerTest.java +++ b/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/deserializers/avro/AvroDeserializerTest.java @@ -38,7 +38,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -58,9 +57,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Unit tests for testing Avro related serialization and de-serialization. @@ -247,7 +244,10 @@ public void testDeserialize_incompleteData_throwsException() { Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE); AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.SPECIFIC_RECORD); - assertThrows(GlueSchemaRegistryIncompatibleDataException.class, () -> avroDeserializer.deserialize(ByteBuffer.wrap(serializedData), schema.toString())); + Exception ex = assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializer.deserialize(ByteBuffer.wrap(serializedData), schema.toString())); + Throwable rootCause = ex.getCause(); + assertTrue(rootCause instanceof GlueSchemaRegistryIncompatibleDataException); + assertEquals("Data is not compatible with schema registry size: 2", rootCause.getMessage()); } /** @@ -261,7 +261,10 @@ public void testDeserialize_invalidHeaderVersionByte_throwsException() { Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE); AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.SPECIFIC_RECORD); - assertThrows(GlueSchemaRegistryIncompatibleDataException.class, () -> avroDeserializer.deserialize(serializedData, schema.toString())); + Exception ex = assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializer.deserialize(serializedData, schema.toString())); + Throwable rootCause = ex.getCause(); + assertTrue(rootCause instanceof GlueSchemaRegistryIncompatibleDataException); + assertEquals("Invalid schema registry header version byte in data", rootCause.getMessage()); } /** @@ -275,7 +278,10 @@ public void testDeserialize_invalidCompressionByte_throwsException() { Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE); AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.SPECIFIC_RECORD); - assertThrows(GlueSchemaRegistryIncompatibleDataException.class, () -> avroDeserializer.deserialize(serializedData, schema.toString())); + Exception ex = assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializer.deserialize(serializedData, schema.toString())); + Throwable rootCause = ex.getCause(); + assertTrue(rootCause instanceof GlueSchemaRegistryIncompatibleDataException); + assertEquals("Invalid schema registry compression byte in data", rootCause.getMessage()); } /** @@ -294,6 +300,8 @@ public void testDeserialize_genericRecord_equalsOriginal(AWSSchemaRegistryConsta Object deserializedObject = avroDeserializer.deserialize( serializedData, schema.toString()); assertGenericRecord(genericRecord, deserializedObject); + //Assert the instance is getting cached. + assertEquals(1, avroDeserializer.getDatumReaderCache().size()); } public void assertGenericRecord(GenericRecord genericRecord, Object deserializedObject) { @@ -318,6 +326,8 @@ public void testDeserialize_genericRecordWithSpecificMode_equalsOriginal(AWSSche Object deserializedObject = avroDeserializer.deserialize(serializedData, schema.toString()); + //Assert the instance is getting cached. + assertEquals(1, avroDeserializer.getDatumReaderCache().size()); assertGenericRecordWithSpecificRecordMode(genericRecord, deserializedObject); } @@ -649,65 +659,11 @@ public void testDeserialize_unknownRecordType_throwsException(AWSSchemaRegistryC Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE); AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.UNKNOWN); - assertThrows(UnsupportedOperationException.class, + Exception ex = assertThrows(AWSSchemaRegistryException.class, () -> deserialize(avroDeserializer, serializedData.array(), schema.toString())); - } - - /** - * Tests the de-serialization for createDatumReader IllegalAccessException which - * will be wrapper under AWSSchemaRegistryException. - */ - @Test - public void testDeserialize_datumReaderIllegalAccess_throwsException() { - GenericRecord genericRecord = RecordGenerator.createGenericAvroRecord(); - ByteBuffer serializedData = - createBasicSerializedData(genericRecord, AWSSchemaRegistryConstants.COMPRESSION.NONE.name(), - DataFormat.AVRO); - - Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE); - AvroDeserializer avroDeserializerMock = mock(AvroDeserializer.class); - when(avroDeserializerMock.deserialize(Mockito.any(ByteBuffer.class), - Mockito.anyString())).thenCallRealMethod(); - - GlueSchemaRegistryConfiguration config = mock(GlueSchemaRegistryConfiguration.class); - avroDeserializerMock.setSchemaRegistrySerDeConfigs(config); - - try { - when(avroDeserializerMock.createDatumReader(Mockito.any(Schema.class))).thenThrow(new IllegalAccessException("Illegal access!")); - } catch (Exception e) { - fail("Test failed with exception", e); - } - - assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializerMock.deserialize( - serializedData, schema.toString())); - } - - /** - * Tests the de-serialization for createDatumReader InstantiationException which - * will be wrapper under AWSSchemaRegistryException. - */ - @Test - public void testDeserialize_datumReaderInstantiationFails_throwsException() { - GenericRecord genericRecord = RecordGenerator.createGenericAvroRecord(); - ByteBuffer serializedData = createBasicSerializedData(genericRecord, - AWSSchemaRegistryConstants.COMPRESSION.NONE.name(), DataFormat.AVRO); - - Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE); - AvroDeserializer avroDeserializerMock = mock(AvroDeserializer.class); - when(avroDeserializerMock.deserialize(Mockito.any(ByteBuffer.class), - Mockito.anyString())).thenCallRealMethod(); - - GlueSchemaRegistryConfiguration config = mock(GlueSchemaRegistryConfiguration.class); - when(config.getCompressionType()).thenReturn(AWSSchemaRegistryConstants.COMPRESSION.NONE); - avroDeserializerMock.setSchemaRegistrySerDeConfigs(config); - - try { - when(avroDeserializerMock.createDatumReader(Mockito.any(Schema.class))).thenThrow(new InstantiationException("Instantiation errors!")); - } catch (Exception e) { - fail("Test failed with exception", e); - } - - assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializerMock.deserialize(serializedData, schema.toString())); + Throwable rootCause = ex.getCause().getCause(); + assertTrue(rootCause instanceof UnsupportedOperationException); + assertEquals("Unsupported AvroRecordType: UNKNOWN", rootCause.getMessage()); } /**