-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protobuf usage example #16
Comments
@leosunmo I have not tried protobuf, but as per confluent documentation you will have to use wire format. On the consumer side you have to ignore first 5 bytes of the message and then deserialize it. |
Yeah I've got that far. The approach I'll be taking is to compile all the schemas I care about, and then use the Schema Registry as supplementary information I think. |
Hi @leosunmo Did you get a chance to get this working ? |
I've ended up using https://developers.google.com/protocol-buffers/docs/proto3#any instead to communicate what type the message is. I rely on having compiled the types already, so no "runtime" dynamic fetching of protobuf files. |
@dineshgowda24 It's been a while since I looked at this, but I utilise Google's Any format, https://developers.google.com/protocol-buffers/docs/reference/csharp/class/google/protobuf/well-known-types/any. On the producer side I do something like this: func makeNotifEventMessage() []byte {
event := &metrics.Event{ # My Protobuf message
Desc: "Email got sent successfully",
SourceSystem: "test-producer",
Ts: timestamppb.Now(),
Type: metrics.Event_EMAIL_SUCCESS,
},
}
metricAny, _ := anypb.New(event) # This wraps the "event" protobuf object in an "Any" protobuf type.
out, _ := proto.Marshal(metricAny)
return out
} And on the consumer side it ends up looking something like this: # Handle getting Kafka messages
# Unmarshal in to an "Any" type, which is wrapping the "real" message.
wrapper := &anypb.Any{}
if err := proto.Unmarshal(kafkaMessage.Value, wrapper); err != nil {
c.log.Error("Failed to parse event", zap.Error(err))
continue
}
// This is probably where you could involve protoregistry in some fancy way
// and anypb.UnmarshalNew(), but for now we can keep it simple.
var msgBytes []byte
// Iterate over the possible message types and process them.
switch wrapper.MessageName() { # The "Any" type has a MessageName field which allows you to see what kind of protobuf message the wrapped object is.
case "metrics.Event":
me := &metrics.Event{}
anypb.UnmarshalTo(wrapper, me, proto.UnmarshalOptions{})
msgBytes = processMetricsEvent(me)
default:
c.log.Info(fmt.Sprintf("Unknown message: %s, skipping.", wrapper.MessageName()))
c.commitMessages(ctx, []kafka.Message{})
continue
} I hope this makes some sense. |
@leosunmo Thanks, I get this part. I wanted to know how you are registering the |
I am not registering anything with the Schema Registry in these examples. This is a way to work with multiple schemas without using Schema Registry. If you wanted to use the Schema Registry you probably wouldn't use Any at all here. |
Got it, thanks 👍🏽 |
Should Proto serialization/deserialization contributions be made to this repository? Something along the lines of what is available with Codec() for Avro. |
I welcome any change that adds value to this library. 😊 What do you have in mind? |
👍🏽 If you look at the Wire Format document and scroll past the table to the fine print, there is a Then we get to publishing and reading the schema itself, which involves multiple schemas when there are references to external types, as documented in their Protobuf Serializer docs. I haven’t had the need for this yet and have yet to look into solving it. |
Good idea. 👍🏻 |
I'm closing this issue as we have included existing examples in the repo and the rest is on PR at #85 |
Hey, I am trying to figure out if I can use this client to fetch Protobuf schemas in a Go consumer. Does anyone have any examples of how you could accomplish this?
Without running the Proto cli to pre-generate the code I am not sure how this would work at runtime.
Any pointers or example would be appreciated!
The text was updated successfully, but these errors were encountered: