-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce JSON_SR format #4596
Conversation
@agavra seems that incorrect value format is shown in the output. We have JSON_SR but KAFKA (String) is shown:
|
@hjafarpour - yes, see the note right above 😂 "NOTE: still needs support for print topic" |
479c786
to
1a8cc29
Compare
* using the schema registry format (first byte magic byte, then | ||
* four bytes for the schemaID). | ||
*/ | ||
public static InputStream asStandardJson(@Nonnull final byte[] jsonWithMagic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to return an InputStream? It seems that all the callers really just want the bytes.
Would it be simpler (and more efficient) just to slice the byte[] and return another one with the prefix removed?
Also.. I know this InputStream won't block, but as we move to a more reactive / non blocking model in the server we should avoid using blocking constructs such as input and output streams and favour buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Almog is trying to avoid the array copy for every message when deserializing. This could be premature optimisation, but I'm guessing it will hurt performance as we already know serialization is a large cost to us.
An alternative to copying the data into a new buffer would be to build a parser with the original buffer and with the suitable offset:, i.e. change a line like:
// Original code that deserialized the whole byte array:
MAPPER.readTree(bytes);
to
// Build a parser with the whole array and appropriate offset into that array:
// This avoids the array copy at the cost of slightly more complex code.
final int offset = isJsonSchema ? JsonSerdeUtils.SIZE_OF_SR_PREFIX : 0;
MAPPER.readTree(MAPPER.getFactory().createParser(bytes, offset, bytes.length - offset));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying small arrays is super fast in Java. I would bet that it would be a lot faster than constructing an InputStream around it and reading bytes one by one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if you use ByteBuffer or Vert.x Buffers you can avoid the copy altogether as slice just references the original array with different offset and length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaced with:
return mapper.readValue(
jsonWithMagic,
SIZE_OF_SR_PREFIX,
jsonWithMagic.length - SIZE_OF_SR_PREFIX,
clazz
);
Inside the method. I believe this should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @agavra
I've got to admit, I really dislike the fact we've got magic bytes in our JSON data :(
So we've now got the JSON
, which should just be pure JSON text, and JSON_SR
which is JSON data prefixed with SR data. This makes sense, even if it's fecking ugly!
However, we've other formats that utilise the schema registry and I think we should be using consistent naming. i.e. AVRO
should really be AVRO_SR
and PROTOBUF
should be PROTOBUF_SR
, as they are both prefixed data. This consistent naming will make things much easier for the user to understand. In the future we may want to support pure AVRO
or PROTOBUF
without SR integration. We should make such a change now, as it will only get harder later. cc @derekjn, @MichaelDrogalis for a Product perspective on this.
Other thoughts on this JSON_SR
format:
- Intermediate and sink topics will inherit this schema, meaning they'll be schemas being added to the schema registry. For Avro we clean these up when things are deleted. Is this happening for the other schema registry enabled formats?
- There doesn't seem to be any doc updates for this yet. Are they coming later?
Also, have we considered any other approaches to handling the SR integration, rather than appending _SR
to our formats? Thinking out loud, we could have a WITH
property that controlled the integration?
Don't we still also encode the schema Id in AVRO_SCHEMA_ID
in the WITH
clause? We should also change this to SCHEMA_ID
as part of this work.
ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSerdeUtils.java
Outdated
Show resolved
Hide resolved
ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSerdeUtils.java
Outdated
Show resolved
Hide resolved
ksql-functional-tests/src/test/resources/query-validation-tests/elements.json
Outdated
Show resolved
Hide resolved
ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaFormat.java
Outdated
Show resolved
Hide resolved
ksql-serde/src/main/java/io/confluent/ksql/serde/json/JsonSchemaFormat.java
Show resolved
Hide resolved
* using the schema registry format (first byte magic byte, then | ||
* four bytes for the schemaID). | ||
*/ | ||
public static InputStream asStandardJson(@Nonnull final byte[] jsonWithMagic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Almog is trying to avoid the array copy for every message when deserializing. This could be premature optimisation, but I'm guessing it will hurt performance as we already know serialization is a large cost to us.
An alternative to copying the data into a new buffer would be to build a parser with the original buffer and with the suitable offset:, i.e. change a line like:
// Original code that deserialized the whole byte array:
MAPPER.readTree(bytes);
to
// Build a parser with the whole array and appropriate offset into that array:
// This avoids the array copy at the cost of slightly more complex code.
final int offset = isJsonSchema ? JsonSerdeUtils.SIZE_OF_SR_PREFIX : 0;
MAPPER.readTree(MAPPER.getFactory().createParser(bytes, offset, bytes.length - offset));
Thanks for the review @big-andy-coates
The idea is that this format is "temporary". When schema registry implements support for headers-based serialization instead of magic bytes we will remove JSON_SR and have all formats be just JSON. At that point, we will also be able to support vanilla I understand what you're getting at, but I don't think renaming the formats today has an ROI that justifies that backwards incompatible change.
Good call, I'll double check this.
Stay tuned :)
I think they are different formats, and as such, should be treated as different formats. A WITH clause would (in my opinion) be more confusing but I'm happy to open that up to product folk (@derekjn)
Already opened a ticket for this #4556 |
@big-andy-coates big +1 from me :(
I'm in agreement with @agavra that these should be treated as different formats, especially if this specific format is an interim solution until magic bytes are moved to the message header. Adding a I'm in favor of keeping this as simple and isolated as possible by just introducing the single format |
@big-andy-coates I double checked this, and they are cleaned up (it's independent of whether or not it's an AVRO topic) |
private String target = "?"; | ||
|
||
public KsqlJsonDeserializer( | ||
final PersistenceSchema physicalSchema | ||
final PersistenceSchema physicalSchema, | ||
final boolean isJsonSchema | ||
) { | ||
this.physicalSchema = JsonSerdeUtils.validateSchema(physicalSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Null check here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've got another PR following up this one to fix loose ends and will make sure to add it there! (I want to get this in since the build is green)
Thanks for the review @vpapavas!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @agavra! LGTM since you plan to add documentation and fix the print topic.
@big-andy-coates, I'm going to go ahead and merge this because it seems most of your points of contention are about product considerations which I hope @derekjn cleared up. Given we still have some time before code freeze/release, if you feel strongly that this isn't the right way to do it let's sync up offline and come to an agreement on the best way forward and I'll address it in a future PR. |
Description
Introduce a new format to be able to read the data that is produced by confluent's json serializers that append a magic byte and the schema ID to standard JSON. Note that we can't just use the standard deserializer because we require the mapper to use
USE_BIG_DECIMAL_FOR_FLOATS
to avoid deserializingTesting done
Unit testing and local testing.
NOTE: still needs support for print topic
Reviewer checklist