Skip to content

Commit

Permalink
fix: sarama version
Browse files Browse the repository at this point in the history
Signed-off-by: Yvonnick Esnault <[email protected]>
  • Loading branch information
yesnault committed Nov 30, 2021
1 parent bbbcde3 commit 1652357
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
2 changes: 1 addition & 1 deletion engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func getBroker(ctx context.Context, t string, option interface{}) (Broker, error
k := &KafkaClient{}
return k.initialize(ctx, option)
}
return nil, fmt.Errorf("Invalid Broker Type %s", t)
return nil, fmt.Errorf("invalid Broker Type %s", t)
}

// ResetPublicIntegrations load all integration of type Event and creates kafka brokers
Expand Down
6 changes: 5 additions & 1 deletion engine/api/event/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type KafkaConfig struct {
BrokerAddresses string
User string
Password string
Version string
Topic string
MaxMessageByte int
DisableTLS bool
Expand All @@ -35,7 +36,7 @@ type KafkaConfig struct {
func (c *KafkaClient) initialize(ctx context.Context, options interface{}) (Broker, error) {
conf, ok := options.(KafkaConfig)
if !ok {
return nil, fmt.Errorf("Invalid Kafka Initialization")
return nil, fmt.Errorf("invalid Kafka Initialization")
}

if conf.BrokerAddresses == "" ||
Expand Down Expand Up @@ -70,6 +71,9 @@ func (c *KafkaClient) initProducer() error {
config.Net.SASL.User = c.options.User
config.Net.SASL.Password = c.options.Password
}
if c.options.Version == "" {
config.Version = sarama.V0_10_2_0
}

config.ClientID = c.options.ClientID
config.Producer.Return.Successes = true
Expand Down

0 comments on commit 1652357

Please sign in to comment.