Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does Confluent kafka python lib support different registry? #1119

Closed
5 of 7 tasks
Akhilj786 opened this issue May 11, 2021 · 2 comments
Closed
5 of 7 tasks

Does Confluent kafka python lib support different registry? #1119

Akhilj786 opened this issue May 11, 2021 · 2 comments
Labels

Comments

@Akhilj786
Copy link

Description

Versions:
python 3.8.8

Does Confluent kafka python library support different registry?
We plan to leverage the following registry
https://www.apicur.io/registry/docs/apicurio-registry/2.0.0.Final/index.html
https://developers.redhat.com/blog/2019/12/17/replacing-confluent-schema-registry-with-red-hat-integration-service-registry/

How to reproduce

================

Sample code:


class ProtoBufAIOProducer:
    def __init__(self, configs,value_ser=None, loop=None):
        self._loop = loop or asyncio.get_event_loop()
        # self._producer = AvroProducer(configs, default_key_schema=schema_key, default_value_schema=schema_value)
        schema_registry_client = SchemaRegistryClient(
            {'url': configs['schema.registry.url'], 'ssl.ca.location': configs['ssl.ca.location'],
             })
        protobuf_serializer = ProtobufSerializer(value_ser, schema_registry_client)

        self.producer_conf = {'bootstrap.servers': configs['bootstrap.servers'],
                              'key.serializer': StringSerializer('utf_8'),
                              'value.serializer': protobuf_serializer
                              }

        self._producer = SerializingProducer(self.producer_conf)
        self._cancelled = False
        self._poll_thread = Thread(target=self._poll_loop)
        self._poll_thread.start()

    def _poll_loop(self):
        while not self._cancelled:
            self._producer.poll(0.1)

    def close(self):
        self._cancelled = True
        self._poll_thread.join()

    def produce(self, topic, key, value, on_delivery):
        """
        A produce method in which delivery notifications are made available
        via both the returned future and on_delivery callback (if specified).
        """
        result = self._loop.create_future()

        def ack(err, msg):
            if err:
                self._loop.call_soon_threadsafe(
                    result.set_exception, KafkaException(err))
            else:
                self._loop.call_soon_threadsafe(
                    result.set_result, msg)
            if on_delivery:
                self._loop.call_soon_threadsafe(
                    on_delivery, err, msg)

        self._producer.produce(topic=topic, key=key, value=value, on_delivery=ack)
        self._producer.flush()
        return result

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):1.6.1
  • Apache Kafka broker version: 2.7.0 (Commit:448719dc99a19793)
  • Client configuration:
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081/api/ccompat
cdc.input.topic=pbf-meal
group.id=pbf-meal 
  • Operating system:macos big sur
  • Provide client logs (with 'debug': '..' as necessary)
Traceback (most recent call last):
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce
   value = self._value_serializer(value, ctx)
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/protobuf.py", line 320, in __call__
   self._schema_id = self._registry.register_schema(subject,
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 336, in register_schema
   response = self._rest_client.post(
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 127, in post
   return self.send_request(url, method='POST', body=body)
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 179, in send_request
   raise SchemaRegistryError(response.status_code,
confluent_kafka.schema_registry.error.SchemaRegistryError: Unknown Schema Registry Error: b'Unrecognized field "references" (class io.apicurio.registry.ccompat.dto.SchemaInfo), not marked as ignorable' (HTTP status code 400, SR code -1)
  • Provide broker log excerpts
  • Critical issue
@jliunyu
Copy link
Contributor

jliunyu commented Mar 24, 2022

Hi @Akhilj786, thanks for asking. We don't support schema registry reference support yet, but there is already a PR for it.

For the other registry you mentioned, we haven't looked at that yet.

@nhaq-confluent
Copy link

The question posed has been answered so closing this issue. Please raise another issue for any followup questions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants