Skip to content

Commit

Permalink
feat: substantially improve avro deserialization (#6201)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra authored Sep 15, 2020
1 parent 2faf5fe commit 325a009
Showing 1 changed file with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,33 +305,35 @@ private Struct toKsqlStruct(
final String pathStr
) {
final Struct ksqlStruct = new Struct(schema);
final Map<String, Field> caseInsensitiveFieldMap =
getCaseInsensitiveFieldMap(connectSchema);

schema.fields().forEach(field -> {
final String fieldNameUppercase = field.name().toUpperCase();
if (caseInsensitiveFieldMap.containsKey(fieldNameUppercase)) {
final Field connectField = caseInsensitiveFieldMap.get(fieldNameUppercase);
// make sure to get/put the field using the Field object to avoid a lookup in Struct
final Object fieldValue = connectStruct.get(connectField);
final Schema fieldSchema = connectField.schema();
ksqlStruct.put(
field,
toKsqlValue(
field.schema(),
fieldSchema,
fieldValue,
pathStr + PATH_SEPARATOR + field.name()));
for (final Field field : connectSchema.fields()) {
Field ksqlField = schema.field(field.name());
// WARNING: do not move the toUpperCase() outside of this if branch - upper
// casing strings is an expensive operation, whereas an extra lookup into
// a hash map with string keys is nearly free because of string interning
// (the hash code is cached for strings)
if (ksqlField == null) {
// check "case-insensitive" match - ksqlDB defines case insensitivity
// as being equivalent to a field with all of its chars upper-cased
ksqlField = schema.field(field.name().toUpperCase());
if (ksqlField == null) {
continue;
}
}
});
return ksqlStruct;
}

private static Map<String, Field> getCaseInsensitiveFieldMap(final Schema schema) {
final Map<String, Field> fieldsByName = new HashMap<>();
schema.fields().forEach(
field -> fieldsByName.put(field.name().toUpperCase(), field)
);
return fieldsByName;
final Object fieldValue = connectStruct.get(field);
final Schema fieldSchema = field.schema();
ksqlStruct.put(
ksqlField,
toKsqlValue(
ksqlField.schema(),
fieldSchema,
fieldValue,
pathStr + PATH_SEPARATOR + ksqlField.name()
)
);
}

return ksqlStruct;
}
}

0 comments on commit 325a009

Please sign in to comment.