Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: support PROTOBUF for PRINT TOPIC #4594

Merged
merged 2 commits into from
Feb 25, 2020
Merged

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Feb 19, 2020

fixes #4516

Description

Adds support for PRINT TOPIC for protobuf topics

Testing done

Unit testing and manual:

ksql> PRINT print_proto FROM BEGINNING;
Key format: UNDEFINED
Value format: PROTOBUF
rowtime: 2/19/20 9:40:02 AM PST, key: <null>, value: COL1: "a" COL2: 2 COL3: "c"
^CTopic printing ceased

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra requested a review from a team as a code owner February 19, 2020 18:38
@hjafarpour
Copy link
Contributor

hjafarpour commented Feb 19, 2020

Key format: UNDEFINED
Value format: PROTOBUF

@agavra usually Key and Value have the same format when using schema registry. What does UNDEFINED mean in this context? Do we show UNDEFINED if the key format is PROTOBUF?

@agavra
Copy link
Contributor Author

agavra commented Feb 19, 2020

@hjafarpour - non-KAFKA keys are not supported at all yet. It is UNDEFINED since all of the keys so far were null. If it was long or int or string it would've shown that value.

@hjafarpour
Copy link
Contributor

Thanks @agavra , can't wait to have AVRO/JSON/PROTOBUF key support! ;)

Copy link
Contributor

@purplefox purplefox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra.

LGTM, except I don't think we need to complicate the RecordFormatter code by adding a format method.

@@ -453,6 +456,7 @@ private static SqlType columnType(final ColumnName column, final LogicalSchema s
}
}

LOG.error("Could not serialize row.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: generally avoid logging and throwing - it's an anti-pattern. Why isn't it sufficient just to throw here? Does the calling code not log it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the calling code doesn't log it - it gets returned to the user in the CLI as "could not serialize row" but it doesn't give the full trace. This has bit me several times so I'd rather keep it here as at some point we should log it (we can debate where, I don't care that much)

@@ -248,7 +251,7 @@ String format(final Bytes bytes) {
) {
try {
final Object result = deserializer.deserializer.deserialize(topicName, bytes.get());
return Optional.of(result == null ? "<null>" : result.toString());
return Optional.of(result == null ? "<null>" : deserializer.format(result));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call to format seems completely superfluous. This method now calls deserialize on the deserializer and then immediately passes the result back to format on the same deserializer. Why not just have deserialize return the formatted result?

In the case of the new protobuf format, all we need to do is pass a factory method to the constructor of the PROTOBUF enum that wraps the KafkaProtobufDeserializer instance in one that applies the format, in a similar way to newJsonDeserializer.

   private static Deserializer<?> newProtobufDeserializer(final SchemaRegistryClient srClient) {

    final Printer printer = TextFormat.printer();
    final KafkaProtobufDeserializer<?> inner = new KafkaProtobufDeserializer<>(srClient);
    
    return (Deserializer<Object>) (topic, data) -> {
      final Message msg = inner.deserialize(topic, data);
      if (msg == null) {
        return null;
      }
      
      return printer.shortDebugString(msg);
    };
  }

...

 enum Format {
    AVRO(0, KafkaAvroDeserializer::new),
    PROTOBUF(0, RecordFormatter::newAvroDeserializer),
    JSON(RecordFormatter::newJsonDeserializer),
 ...
 } 

This avoids the need to complicate the code with another function call on all deserializers...

@agavra agavra merged commit 55cc0d0 into confluentinc:5.5.x Feb 25, 2020
@agavra agavra deleted the print_topic branch February 25, 2020 18:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants