Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Registry: Support custom schemaId framing #1688

Open
wolfchimneyrock opened this issue Dec 11, 2023 · 0 comments · May be fixed by #1689
Open

Schema Registry: Support custom schemaId framing #1688

wolfchimneyrock opened this issue Dec 11, 2023 · 0 comments · May be fixed by #1689

Comments

@wolfchimneyrock
Copy link

wolfchimneyrock commented Dec 11, 2023

Description

Currently the code in the schema_registry serdes to write the schemaId into the payload is duplicated across the three supported serdes (avro, protobuf, and json) and also hardcoded to work with the confluent wire format.

There has been interest in the past to add flexibility to this in #679 and #1119. Now our organization is interested in deploying apicurio registry as a service (as a generic not-just-kafka solution) alongside some existing confluent compatible (kafka-specific) usage.

It would be useful to be able to configure this in the serdes to add support for apicurio's framing, as well as open this up to future customizations for others. My proposal, which I will open as a PR:

Interface:

the serdes classes get a new configuration option schemaid.location that requires a callable that returns a 2-tuple of functions that perform the reading and writing of the schemaid respectively.

two such callables are defined initially: confluent_payload_framing and apicurio_payload_framing with the confluent one being the default if the config isn't specified (to maintain backwards compatibility)

an example client wanting to use apicurio framing:

from confluent_kafka.schema_registry import apicurio_payload_framing
...
avro_conf = {'schemaid.location': apicurio_payload_framing}
avro_serializer = AvroSerializer(schema_registry_client, schema_str, conf=avro_conf)

the simple contents of confluent_payload_framing:

def confluent_payload_framing(ctx):
    def reader(payload):
        if len(payload) <= 5:
            raise SerializationError("Expecting data framing of length 6 bytes or "
                                     "more but total data size is {} bytes. This "
                                     "message was not produced with a Confluent "
                                     "Schema Registry serializer".format(len(data)))
        magic, schema_id = struct.unpack('>bI', payload.read(5))
        if magic != _MAGIC_BYTE:
            raise SerializationError("Unexpected magic byte {}. This message "
                                     "was not produced with a Confluent "
                                     "Schema Registry serializer".format(magic))
        return schema_id

    def writer(fo, schema_id):
        fo.write(struct.pack('>bI', _MAGIC_BYTE, schema_id))

    return reader, writer

Notice that ctx is passed in. This is to enable possible future kafka header based schemaid location support.

As an alternative to the 2-tuple of callable return, I could also work this as a class returned, but I thought this was simpler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants