Skip to content

Commit

Permalink
perf: Only look up index of new key field once, not per row processed (
Browse files Browse the repository at this point in the history
…#3020)

On a `selectKey` the old code looks up the index of the new key field in the value schema per-row processed. This index is constant and can be pre-calculated just the once.
  • Loading branch information
big-andy-coates authored Jun 28, 2019
1 parent 39af991 commit fda1c7f
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -541,9 +541,12 @@ public SchemaKStream<?> selectKey(
);
}

final int keyIndexInValue = schema.valueFieldIndex(proposedKey.name())
.orElseThrow(IllegalStateException::new);

final KStream keyedKStream = kstream
.filter((key, value) -> value != null && extractColumn(proposedKey, value) != null)
.selectKey((key, value) -> extractColumn(proposedKey, value).toString())
.filter((key, value) -> value != null && extractColumn(keyIndexInValue, value) != null)
.selectKey((key, value) -> extractColumn(keyIndexInValue, value).toString())
.mapValues((key, row) -> {
if (updateRowKey) {
row.getColumns().set(SchemaUtil.ROWKEY_INDEX, key);
Expand Down Expand Up @@ -576,7 +579,7 @@ private static boolean isRowKey(final String fieldName) {
return SchemaUtil.isFieldName(fieldName, SchemaUtil.ROWKEY_NAME);
}

private Object extractColumn(final Field newKeyField, final GenericRow value) {
private Object extractColumn(final int keyIndexInValue, final GenericRow value) {
if (value.getColumns().size() != schema.valueFields().size()) {
throw new IllegalStateException("Field count mismatch. "
+ "Schema fields: " + schema
Expand All @@ -585,7 +588,7 @@ private Object extractColumn(final Field newKeyField, final GenericRow value) {

return value
.getColumns()
.get(schema.valueFieldIndex(newKeyField.name()).orElseThrow(IllegalStateException::new));
.get(keyIndexInValue);
}

private static String fieldNameFromExpression(final Expression expression) {
Expand Down

0 comments on commit fda1c7f

Please sign in to comment.