You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently the code in the schema_registry serdes to write the schemaId into the payload is duplicated across the three supported serdes (avro, protobuf, and json) and also hardcoded to work with the confluent wire format.
There has been interest in the past to add flexibility to this in #679 and #1119. Now our organization is interested in deploying apicurio registry as a service (as a generic not-just-kafka solution) alongside some existing confluent compatible (kafka-specific) usage.
It would be useful to be able to configure this in the serdes to add support for apicurio's framing, as well as open this up to future customizations for others. My proposal, which I will open as a PR:
Interface:
the serdes classes get a new configuration option schemaid.location that requires a callable that returns a 2-tuple of functions that perform the reading and writing of the schemaid respectively.
two such callables are defined initially: confluent_payload_framing and apicurio_payload_framing with the confluent one being the default if the config isn't specified (to maintain backwards compatibility)
an example client wanting to use apicurio framing:
defconfluent_payload_framing(ctx):
defreader(payload):
iflen(payload) <=5:
raiseSerializationError("Expecting data framing of length 6 bytes or ""more but total data size is {} bytes. This ""message was not produced with a Confluent ""Schema Registry serializer".format(len(data)))
magic, schema_id=struct.unpack('>bI', payload.read(5))
ifmagic!=_MAGIC_BYTE:
raiseSerializationError("Unexpected magic byte {}. This message ""was not produced with a Confluent ""Schema Registry serializer".format(magic))
returnschema_iddefwriter(fo, schema_id):
fo.write(struct.pack('>bI', _MAGIC_BYTE, schema_id))
returnreader, writer
Notice that ctx is passed in. This is to enable possible future kafka header based schemaid location support.
As an alternative to the 2-tuple of callable return, I could also work this as a class returned, but I thought this was simpler.
The text was updated successfully, but these errors were encountered:
Description
Currently the code in the schema_registry serdes to write the schemaId into the payload is duplicated across the three supported serdes (avro, protobuf, and json) and also hardcoded to work with the confluent wire format.
There has been interest in the past to add flexibility to this in #679 and #1119. Now our organization is interested in deploying apicurio registry as a service (as a generic not-just-kafka solution) alongside some existing confluent compatible (kafka-specific) usage.
It would be useful to be able to configure this in the serdes to add support for apicurio's framing, as well as open this up to future customizations for others. My proposal, which I will open as a PR:
Interface:
the serdes classes get a new configuration option
schemaid.location
that requires a callable that returns a 2-tuple of functions that perform the reading and writing of the schemaid respectively.two such callables are defined initially:
confluent_payload_framing
andapicurio_payload_framing
with the confluent one being the default if the config isn't specified (to maintain backwards compatibility)an example client wanting to use apicurio framing:
the simple contents of
confluent_payload_framing
:Notice that ctx is passed in. This is to enable possible future kafka header based schemaid location support.
As an alternative to the 2-tuple of callable return, I could also work this as a class returned, but I thought this was simpler.
The text was updated successfully, but these errors were encountered: