Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka): update kafka lib; make kafka errors unsilent #2319

Merged
merged 1 commit into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions services/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package kafka

import (
"crypto/tls"
"fmt"
"time"

"github.com/influxdata/influxdb/toml"
"github.com/influxdata/kapacitor/tlsconfig"
"github.com/pkg/errors"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go"
)

const (
Expand Down Expand Up @@ -75,7 +76,7 @@ func (c *Config) ApplyConditionalDefaults() {
}
}

func (c Config) WriterConfig() (kafka.WriterConfig, error) {
func (c Config) WriterConfig(diagnostic Diagnostic) (kafka.WriterConfig, error) {
var tlsCfg *tls.Config
if c.UseSSL {
t, err := tlsconfig.Create(c.SSLCA, c.SSLCert, c.SSLKey, c.InsecureSkipVerify)
Expand All @@ -88,6 +89,7 @@ func (c Config) WriterConfig() (kafka.WriterConfig, error) {
Timeout: time.Duration(c.Timeout),
TLS: tlsCfg,
}

return kafka.WriterConfig{
Brokers: c.Brokers,
Balancer: &kafka.LeastBytes{},
Expand All @@ -100,6 +102,9 @@ func (c Config) WriterConfig() (kafka.WriterConfig, error) {
// It also means that no errors will be captured from the WriteMessages method.
// As such we track the WriteStats for errors and report them with Kapacitor's normal diagnostics.
Async: true,
ErrorLogger: kafka.LoggerFunc(func(s string, x ...interface{}) {
diagnostic.Error("kafka client error", fmt.Errorf(s, x...))
}),
}, nil
}

Expand Down
20 changes: 13 additions & 7 deletions services/kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func NewCluster(c Config) *Cluster {
}
}

func (c *Cluster) WriteMessage(topic string, key, msg []byte) error {
w, err := c.writer(topic)
func (c *Cluster) WriteMessage(diagnostic Diagnostic, topic string, key, msg []byte) error {
w, err := c.writer(topic, diagnostic)
if err != nil {
return err
}
Expand All @@ -138,7 +138,7 @@ func (c *Cluster) WriteMessage(topic string, key, msg []byte) error {
})
}

func (c *Cluster) writer(topic string) (*writer, error) {
func (c *Cluster) writer(topic string, diagnostic Diagnostic) (*writer, error) {
c.mu.RLock()
w, ok := c.writers[topic]
c.mu.RUnlock()
Expand All @@ -147,10 +147,13 @@ func (c *Cluster) writer(topic string) (*writer, error) {
defer c.mu.Unlock()
w, ok = c.writers[topic]
if !ok {
wc, err := c.cfg.WriterConfig()
wc, err := c.cfg.WriterConfig(diagnostic)
if err != nil {
return nil, err
}
if topic == "" {
return nil, errors.New("topic must not be empty")
}
wc.Topic = topic
kw := kafka.NewWriter(wc)
// Create new writer
Expand Down Expand Up @@ -315,7 +318,7 @@ func (s *Service) Test(options interface{}) error {
if !ok {
return fmt.Errorf("unknown cluster %q", o.Cluster)
}
return c.WriteMessage(o.Topic, []byte(o.Key), []byte(o.Message))
return c.WriteMessage(s.diag, o.Topic, []byte(o.Key), []byte(o.Message))
}

type HandlerConfig struct {
Expand Down Expand Up @@ -347,12 +350,15 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er
return nil, errors.Wrap(err, "failed to parse template")
}
}

diag := s.diag.WithContext(ctx...)

return &handler{
s: s,
cluster: cluster,
topic: c.Topic,
template: t,
diag: s.diag.WithContext(ctx...),
diag: diag,
}, nil
}

Expand All @@ -361,7 +367,7 @@ func (h *handler) Handle(event alert.Event) {
if err != nil {
h.diag.Error("failed to prepare kafka message body", err)
}
if err := h.cluster.WriteMessage(h.topic, []byte(event.State.ID), body); err != nil {
if err := h.cluster.WriteMessage(h.diag, h.topic, []byte(event.State.ID), body); err != nil {
h.diag.Error("failed to write message to kafka", err)
}
}
Expand Down
14 changes: 12 additions & 2 deletions vendor/github.com/segmentio/kafka-go/batch.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading