Skip to content

Commit

Permalink
Add DatumReader Cache to improve de-serialization performance (#65)
Browse files Browse the repository at this point in the history
* Add DatumReader Cache to improve de-serialization performance

* Fix exception message

Co-authored-by: Ravindranath Kakarla <[email protected]>
  • Loading branch information
blacktooth and blacktooth authored Aug 5, 2021
1 parent c3159ce commit d8ec6a9
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,21 @@
import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDataFormatDeserializer;
import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerDataParser;
import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
Expand All @@ -44,15 +41,21 @@
@Slf4j
public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserializer {
private static final GlueSchemaRegistryDeserializerDataParser DESERIALIZER_DATA_PARSER =
GlueSchemaRegistryDeserializerDataParser.getInstance();
private static final AVROUtils AVRO_UTILS = AVROUtils.getInstance();
GlueSchemaRegistryDeserializerDataParser.getInstance();
//TODO: Make this configurable if requested by customers.
private static final long MAX_DATUM_READER_CACHE_SIZE = 100;

@Getter
@Setter
private GlueSchemaRegistryConfiguration schemaRegistrySerDeConfigs;
@Setter
private AvroRecordType avroRecordType;

@NonNull
@Getter
@VisibleForTesting
protected final LoadingCache<String, DatumReader<Object>> datumReaderCache;

/**
* Constructor accepting various dependencies.
*
Expand All @@ -62,6 +65,11 @@ public class AvroDeserializer implements GlueSchemaRegistryDataFormatDeserialize
public AvroDeserializer(GlueSchemaRegistryConfiguration configs) {
this.schemaRegistrySerDeConfigs = configs;
this.avroRecordType = configs.getAvroRecordType();
this.datumReaderCache =
CacheBuilder
.newBuilder()
.maximumSize(MAX_DATUM_READER_CACHE_SIZE)
.build(new DatumReaderCache());
}

/**
Expand All @@ -80,16 +88,16 @@ public Object deserialize(@NonNull ByteBuffer buffer, @NonNull String schema) {

log.debug("Length of actual message: {}", data.length);

Schema schemaDefinition = AVRO_UTILS.parseSchema(schema);
DatumReader<Object> datumReader = createDatumReader(schemaDefinition);
DatumReader<Object> datumReader = datumReaderCache.get(schema);

BinaryDecoder binaryDecoder = getBinaryDecoder(data, 0, data.length);
Object result = datumReader.read(null, binaryDecoder);

log.debug("Finished de-serializing Avro message");

return result;
} catch (IOException | InstantiationException | IllegalAccessException e) {
String message = String.format("Exception occurred while de-serializing Avro message");
} catch (Exception e) {
String message = "Exception occurred while de-serializing Avro message";
throw new AWSSchemaRegistryException(message, e);
}
}
Expand All @@ -98,41 +106,10 @@ private BinaryDecoder getBinaryDecoder(byte[] data, int start, int end) {
return DecoderFactory.get().binaryDecoder(data, start, end, null);
}

/**
* This method is used to create Avro datum reader for deserialization. By
* default, it is GenericDatumReader; SpecificDatumReader will only be created
* if the user specifies. In this case, the program will check if the user have
* those specific code-generated schema class locally. ReaderSchema will be
* supplied if the user wants to use a specific schema to deserialize the
* message. (Compatibility check will be invoked)
*
* @param writerSchema schema that writes the Avro message
* @return Avro datum reader for de-serialization
* @throws InstantiationException can be thrown for readerClass.newInstance()
* from java.lang.Class implementation
* @throws IllegalAccessException can be thrown readerClass.newInstance() from
* java.lang.Class implementation
*/
public DatumReader<Object> createDatumReader(Schema writerSchema)
throws InstantiationException, IllegalAccessException {

switch (this.avroRecordType) {
case SPECIFIC_RECORD:
@SuppressWarnings("unchecked")
Class<SpecificRecord> readerClass = SpecificData.get().getClass(writerSchema);

Schema readerSchema = readerClass.newInstance().getSchema();
log.debug("Using SpecificDatumReader for de-serializing Avro message, schema: {})", readerSchema.toString());
return new SpecificDatumReader<>(writerSchema, readerSchema);

case GENERIC_RECORD:
log.debug("Using GenericDatumReader for de-serializing Avro message, schema: {})", writerSchema.toString());
return new GenericDatumReader<>(writerSchema);

default:
String message = String.format("Data Format in configuration is not supported, Data Format: %s ",
this.avroRecordType.getName());
throw new UnsupportedOperationException(message);
private class DatumReaderCache extends CacheLoader<String, DatumReader<Object>> {
@Override
public DatumReader<Object> load(String schema) throws Exception {
return DatumReaderInstance.from(schema, avroRecordType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.amazonaws.services.schemaregistry.deserializers.avro;

import com.amazonaws.services.schemaregistry.utils.AVROUtils;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;

@Slf4j
public class DatumReaderInstance {
private static final AVROUtils AVRO_UTILS = AVROUtils.getInstance();

/**
* This method is used to create Avro datum reader for deserialization. By
* default, it is GenericDatumReader; SpecificDatumReader will only be created
* if the user specifies. In this case, the program will check if the user have
* those specific code-generated schema class locally. ReaderSchema will be
* supplied if the user wants to use a specific schema to deserialize the
* message. (Compatibility check will be invoked)
*
* @param writerSchemaDefinition Avro record writer schema.
* @return Avro datum reader for de-serialization
* @throws InstantiationException can be thrown for readerClass.newInstance()
* from java.lang.Class implementation
* @throws IllegalAccessException can be thrown readerClass.newInstance() from
* java.lang.Class implementation
*/
public static DatumReader<Object> from(String writerSchemaDefinition, AvroRecordType avroRecordType)
throws InstantiationException, IllegalAccessException {

Schema writerSchema = AVRO_UTILS.parseSchema(writerSchemaDefinition);

switch (avroRecordType) {
case SPECIFIC_RECORD:
@SuppressWarnings("unchecked")
Class<SpecificRecord> readerClass = SpecificData.get().getClass(writerSchema);

Schema readerSchema = readerClass.newInstance().getSchema();
log.debug("Using SpecificDatumReader for de-serializing Avro message, schema: {})",
readerSchema.toString());
return new SpecificDatumReader<>(writerSchema, readerSchema);

case GENERIC_RECORD:
log.debug("Using GenericDatumReader for de-serializing Avro message, schema: {})",
writerSchema.toString());
return new GenericDatumReader<>(writerSchema);

default:
String message = String.format("Unsupported AvroRecordType: %s",
avroRecordType.getName());
throw new UnsupportedOperationException(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
Expand All @@ -58,9 +57,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Unit tests for testing Avro related serialization and de-serialization.
Expand Down Expand Up @@ -247,7 +244,10 @@ public void testDeserialize_incompleteData_throwsException() {

Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE);
AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.SPECIFIC_RECORD);
assertThrows(GlueSchemaRegistryIncompatibleDataException.class, () -> avroDeserializer.deserialize(ByteBuffer.wrap(serializedData), schema.toString()));
Exception ex = assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializer.deserialize(ByteBuffer.wrap(serializedData), schema.toString()));
Throwable rootCause = ex.getCause();
assertTrue(rootCause instanceof GlueSchemaRegistryIncompatibleDataException);
assertEquals("Data is not compatible with schema registry size: 2", rootCause.getMessage());
}

/**
Expand All @@ -261,7 +261,10 @@ public void testDeserialize_invalidHeaderVersionByte_throwsException() {

Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE);
AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.SPECIFIC_RECORD);
assertThrows(GlueSchemaRegistryIncompatibleDataException.class, () -> avroDeserializer.deserialize(serializedData, schema.toString()));
Exception ex = assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializer.deserialize(serializedData, schema.toString()));
Throwable rootCause = ex.getCause();
assertTrue(rootCause instanceof GlueSchemaRegistryIncompatibleDataException);
assertEquals("Invalid schema registry header version byte in data", rootCause.getMessage());
}

/**
Expand All @@ -275,7 +278,10 @@ public void testDeserialize_invalidCompressionByte_throwsException() {

Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE);
AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.SPECIFIC_RECORD);
assertThrows(GlueSchemaRegistryIncompatibleDataException.class, () -> avroDeserializer.deserialize(serializedData, schema.toString()));
Exception ex = assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializer.deserialize(serializedData, schema.toString()));
Throwable rootCause = ex.getCause();
assertTrue(rootCause instanceof GlueSchemaRegistryIncompatibleDataException);
assertEquals("Invalid schema registry compression byte in data", rootCause.getMessage());
}

/**
Expand All @@ -294,6 +300,8 @@ public void testDeserialize_genericRecord_equalsOriginal(AWSSchemaRegistryConsta
Object deserializedObject = avroDeserializer.deserialize( serializedData,
schema.toString());
assertGenericRecord(genericRecord, deserializedObject);
//Assert the instance is getting cached.
assertEquals(1, avroDeserializer.getDatumReaderCache().size());
}

public void assertGenericRecord(GenericRecord genericRecord, Object deserializedObject) {
Expand All @@ -318,6 +326,8 @@ public void testDeserialize_genericRecordWithSpecificMode_equalsOriginal(AWSSche
Object deserializedObject = avroDeserializer.deserialize(serializedData,
schema.toString());

//Assert the instance is getting cached.
assertEquals(1, avroDeserializer.getDatumReaderCache().size());
assertGenericRecordWithSpecificRecordMode(genericRecord, deserializedObject);
}

Expand Down Expand Up @@ -649,65 +659,11 @@ public void testDeserialize_unknownRecordType_throwsException(AWSSchemaRegistryC
Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE);
AvroDeserializer avroDeserializer = createAvroDeserializer(AvroRecordType.UNKNOWN);

assertThrows(UnsupportedOperationException.class,
Exception ex = assertThrows(AWSSchemaRegistryException.class,
() -> deserialize(avroDeserializer, serializedData.array(), schema.toString()));
}

/**
* Tests the de-serialization for createDatumReader IllegalAccessException which
* will be wrapper under AWSSchemaRegistryException.
*/
@Test
public void testDeserialize_datumReaderIllegalAccess_throwsException() {
GenericRecord genericRecord = RecordGenerator.createGenericAvroRecord();
ByteBuffer serializedData =
createBasicSerializedData(genericRecord, AWSSchemaRegistryConstants.COMPRESSION.NONE.name(),
DataFormat.AVRO);

Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE);
AvroDeserializer avroDeserializerMock = mock(AvroDeserializer.class);
when(avroDeserializerMock.deserialize(Mockito.any(ByteBuffer.class),
Mockito.anyString())).thenCallRealMethod();

GlueSchemaRegistryConfiguration config = mock(GlueSchemaRegistryConfiguration.class);
avroDeserializerMock.setSchemaRegistrySerDeConfigs(config);

try {
when(avroDeserializerMock.createDatumReader(Mockito.any(Schema.class))).thenThrow(new IllegalAccessException("Illegal access!"));
} catch (Exception e) {
fail("Test failed with exception", e);
}

assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializerMock.deserialize(
serializedData, schema.toString()));
}

/**
* Tests the de-serialization for createDatumReader InstantiationException which
* will be wrapper under AWSSchemaRegistryException.
*/
@Test
public void testDeserialize_datumReaderInstantiationFails_throwsException() {
GenericRecord genericRecord = RecordGenerator.createGenericAvroRecord();
ByteBuffer serializedData = createBasicSerializedData(genericRecord,
AWSSchemaRegistryConstants.COMPRESSION.NONE.name(), DataFormat.AVRO);

Schema schema = SchemaLoader.loadAvroSchema(AVRO_USER_SCHEMA_FILE);
AvroDeserializer avroDeserializerMock = mock(AvroDeserializer.class);
when(avroDeserializerMock.deserialize(Mockito.any(ByteBuffer.class),
Mockito.anyString())).thenCallRealMethod();

GlueSchemaRegistryConfiguration config = mock(GlueSchemaRegistryConfiguration.class);
when(config.getCompressionType()).thenReturn(AWSSchemaRegistryConstants.COMPRESSION.NONE);
avroDeserializerMock.setSchemaRegistrySerDeConfigs(config);

try {
when(avroDeserializerMock.createDatumReader(Mockito.any(Schema.class))).thenThrow(new InstantiationException("Instantiation errors!"));
} catch (Exception e) {
fail("Test failed with exception", e);
}

assertThrows(AWSSchemaRegistryException.class, () -> avroDeserializerMock.deserialize(serializedData, schema.toString()));
Throwable rootCause = ex.getCause().getCause();
assertTrue(rootCause instanceof UnsupportedOperationException);
assertEquals("Unsupported AvroRecordType: UNKNOWN", rootCause.getMessage());
}

/**
Expand Down

0 comments on commit d8ec6a9

Please sign in to comment.