Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce JSON_SR format #4596

Merged
merged 4 commits into from
Feb 26, 2020
Merged

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Feb 20, 2020

Description

Introduce a new format to be able to read the data that is produced by confluent's json serializers that append a magic byte and the schema ID to standard JSON. Note that we can't just use the standard deserializer because we require the mapper to use USE_BIG_DECIMAL_FOR_FLOATS to avoid deserializing

Testing done

Unit testing and local testing.

NOTE: still needs support for print topic

ksql> CREATE STREAM json_sr (col1 VARCHAR, col2 INT) WITH (kafka_topic='json_sr', value_format='JSON_SR', partitions=1);

 Message
----------------
 Stream created
----------------
ksql> INSERT INTO json_sr (col1, col2) VALUES ('foo', 1);
ksql> PRINT json_sr FROM BEGINNING;
Key format: UNDEFINED
Value format: KAFKA (STRING)
rowtime: 2/19/20 3:33:07 PM PST, key: <null>, value: {"COL1":"foo","COL2":1}
+----------------------+----------------------+----------------------+----------------------+
|ROWTIME               |ROWKEY                |COL1                  |COL2                  |
+----------------------+----------------------+----------------------+----------------------+
|1582155187492         |null                  |foo                   |1                     |

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@hjafarpour
Copy link
Contributor

@agavra seems that incorrect value format is shown in the output. We have JSON_SR but KAFKA (String) is shown:

ksql> PRINT json_sr FROM BEGINNING;
Key format: UNDEFINED
Value format: KAFKA (STRING)
rowtime: 2/19/20 3:33:07 PM PST, key: <null>, value: {"COL1":"foo","COL2":1}
+----------------------+----------------------+----------------------+----------------------+
|ROWTIME               |ROWKEY                |COL1                  |COL2                  |
+----------------------+----------------------+----------------------+----------------------+
|1582155187492         |null                  |foo                   |1       

@agavra
Copy link
Contributor Author

agavra commented Feb 20, 2020

@hjafarpour - yes, see the note right above 😂 "NOTE: still needs support for print topic"

@agavra agavra force-pushed the json_schema branch 3 times, most recently from 479c786 to 1a8cc29 Compare February 20, 2020 22:30
@agavra agavra marked this pull request as ready for review February 20, 2020 23:24
@agavra agavra requested a review from a team as a code owner February 20, 2020 23:24
* using the schema registry format (first byte magic byte, then
* four bytes for the schemaID).
*/
public static InputStream asStandardJson(@Nonnull final byte[] jsonWithMagic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to return an InputStream? It seems that all the callers really just want the bytes.

Would it be simpler (and more efficient) just to slice the byte[] and return another one with the prefix removed?

Also.. I know this InputStream won't block, but as we move to a more reactive / non blocking model in the server we should avoid using blocking constructs such as input and output streams and favour buffers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Almog is trying to avoid the array copy for every message when deserializing. This could be premature optimisation, but I'm guessing it will hurt performance as we already know serialization is a large cost to us.

An alternative to copying the data into a new buffer would be to build a parser with the original buffer and with the suitable offset:, i.e. change a line like:

// Original code that deserialized the whole byte array:
MAPPER.readTree(bytes);

to

// Build a parser with the whole array and appropriate offset into that array:
// This avoids the array copy at the cost of slightly more complex code.
final int offset = isJsonSchema ? JsonSerdeUtils.SIZE_OF_SR_PREFIX : 0;
MAPPER.readTree(MAPPER.getFactory().createParser(bytes, offset, bytes.length - offset));

Copy link
Contributor

@purplefox purplefox Feb 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying small arrays is super fast in Java. I would bet that it would be a lot faster than constructing an InputStream around it and reading bytes one by one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if you use ByteBuffer or Vert.x Buffers you can avoid the copy altogether as slice just references the original array with different offset and length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced with:

    return mapper.readValue(
        jsonWithMagic,
        SIZE_OF_SR_PREFIX,
        jsonWithMagic.length - SIZE_OF_SR_PREFIX,
        clazz
    );

Inside the method. I believe this should work.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agavra

I've got to admit, I really dislike the fact we've got magic bytes in our JSON data :(

So we've now got the JSON, which should just be pure JSON text, and JSON_SR which is JSON data prefixed with SR data. This makes sense, even if it's fecking ugly!

However, we've other formats that utilise the schema registry and I think we should be using consistent naming. i.e. AVRO should really be AVRO_SR and PROTOBUF should be PROTOBUF_SR, as they are both prefixed data. This consistent naming will make things much easier for the user to understand. In the future we may want to support pure AVRO or PROTOBUF without SR integration. We should make such a change now, as it will only get harder later. cc @derekjn, @MichaelDrogalis for a Product perspective on this.

Other thoughts on this JSON_SR format:

  • Intermediate and sink topics will inherit this schema, meaning they'll be schemas being added to the schema registry. For Avro we clean these up when things are deleted. Is this happening for the other schema registry enabled formats?
  • There doesn't seem to be any doc updates for this yet. Are they coming later?

Also, have we considered any other approaches to handling the SR integration, rather than appending _SR to our formats? Thinking out loud, we could have a WITH property that controlled the integration?

Don't we still also encode the schema Id in AVRO_SCHEMA_ID in the WITH clause? We should also change this to SCHEMA_ID as part of this work.

* using the schema registry format (first byte magic byte, then
* four bytes for the schemaID).
*/
public static InputStream asStandardJson(@Nonnull final byte[] jsonWithMagic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Almog is trying to avoid the array copy for every message when deserializing. This could be premature optimisation, but I'm guessing it will hurt performance as we already know serialization is a large cost to us.

An alternative to copying the data into a new buffer would be to build a parser with the original buffer and with the suitable offset:, i.e. change a line like:

// Original code that deserialized the whole byte array:
MAPPER.readTree(bytes);

to

// Build a parser with the whole array and appropriate offset into that array:
// This avoids the array copy at the cost of slightly more complex code.
final int offset = isJsonSchema ? JsonSerdeUtils.SIZE_OF_SR_PREFIX : 0;
MAPPER.readTree(MAPPER.getFactory().createParser(bytes, offset, bytes.length - offset));

@agavra
Copy link
Contributor Author

agavra commented Feb 25, 2020

Thanks for the review @big-andy-coates

However, we've other formats that utilise the schema registry and I think we should be using consistent naming. i.e. AVRO should really be AVRO_SR and PROTOBUF should be PROTOBUF_SR, as they are both prefixed data. This consistent naming will make things much easier for the user to understand. In the future we may want to support pure AVRO or PROTOBUF without SR integration. We should make such a change now, as it will only get harder later. cc @derekjn, @MichaelDrogalis for a Product perspective on this.

The idea is that this format is "temporary". When schema registry implements support for headers-based serialization instead of magic bytes we will remove JSON_SR and have all formats be just JSON. At that point, we will also be able to support vanilla AVRO and vanilla PROTOBUF without introducing new formats.

I understand what you're getting at, but I don't think renaming the formats today has an ROI that justifies that backwards incompatible change.

* Intermediate and sink topics will inherit this schema, meaning they'll be schemas being added to the schema registry.  For Avro we clean these up when things are deleted. Is this happening for the other schema registry enabled formats?

Good call, I'll double check this.

* There doesn't seem to be any doc updates for this yet.  Are they coming later?

Stay tuned :)

Also, have we considered any other approaches to handling the SR integration, rather than appending _SR to our formats? Thinking out loud, we could have a WITH property that controlled the integration?

I think they are different formats, and as such, should be treated as different formats. A WITH clause would (in my opinion) be more confusing but I'm happy to open that up to product folk (@derekjn)

Don't we still also encode the schema Id in AVRO_SCHEMA_ID in the WITH clause? We should also change this to SCHEMA_ID as part of this work.

Already opened a ticket for this #4556

@derekjn
Copy link
Contributor

derekjn commented Feb 26, 2020

I've got to admit, I really dislike the fact we've got magic bytes in our JSON data :(

@big-andy-coates big +1 from me :(

I think they are different formats, and as such, should be treated as different formats. A WITH clause would (in my opinion) be more confusing but I'm happy to open that up to product folk (@derekjn)

I'm in agreement with @agavra that these should be treated as different formats, especially if this specific format is an interim solution until magic bytes are moved to the message header. Adding a WITH parameter is an interesting approach here but I don't feel that it would be worth it given that this basically amounts to an interim workaround. It may also lead users to believe that other established value formats could be augmented using a WITH modifier.

I'm in favor of keeping this as simple and isolated as possible by just introducing the single format JSON_SR.

@agavra
Copy link
Contributor Author

agavra commented Feb 26, 2020

Intermediate and sink topics will inherit this schema, meaning they'll be schemas being added to the schema registry. For Avro we clean these up when things are deleted. Is this happening for the other schema registry enabled formats?

@big-andy-coates I double checked this, and they are cleaned up (it's independent of whether or not it's an AVRO topic)

private String target = "?";

public KsqlJsonDeserializer(
final PersistenceSchema physicalSchema
final PersistenceSchema physicalSchema,
final boolean isJsonSchema
) {
this.physicalSchema = JsonSerdeUtils.validateSchema(physicalSchema);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null check here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got another PR following up this one to fix loose ends and will make sure to add it there! (I want to get this in since the build is green)

Thanks for the review @vpapavas!

Copy link
Member

@vpapavas vpapavas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @agavra! LGTM since you plan to add documentation and fix the print topic.

@agavra
Copy link
Contributor Author

agavra commented Feb 26, 2020

@big-andy-coates, I'm going to go ahead and merge this because it seems most of your points of contention are about product considerations which I hope @derekjn cleared up.

Given we still have some time before code freeze/release, if you feel strongly that this isn't the right way to do it let's sync up offline and come to an agreement on the best way forward and I'll address it in a future PR.

@agavra agavra merged commit daa04d2 into confluentinc:5.5.x Feb 26, 2020
@agavra agavra deleted the json_schema branch February 26, 2020 20:57
@agavra agavra mentioned this pull request Feb 26, 2020
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants