diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 8456fd0524..5fba3d637c 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -164,6 +164,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { for i := range options.Topics { options.Topics[i] = tns[i].Name } + options.Topics = distinct(options.Topics) return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq) } @@ -554,6 +555,18 @@ func generateRandomName() string { return string(bytes) } +func distinct(fqdnTopics []string) []string { + set := make(map[string]struct{}) + uniques := make([]string, 0, len(fqdnTopics)) + for _, topic := range fqdnTopics { + if _, ok := set[topic]; !ok { + set[topic] = struct{}{} + uniques = append(uniques, topic) + } + } + return uniques +} + func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType { switch st { case Exclusive: