From af19ca8fe7354b3406c5400c0a592ee1e40b8a68 Mon Sep 17 00:00:00 2001 From: "docmerlin (j. Emrys Landivar)" Date: Wed, 15 Apr 2020 10:25:20 -0500 Subject: [PATCH] fix(kafka): update kafka lib; make kafka errors unsilent Errors on sending to kafka when they failed, used to fail silently. This lead to user frustration. This updates the kafka client and injects a logger so they now fail loudly. --- Gopkg.lock | 6 +- services/kafka/config.go | 9 +- services/kafka/service.go | 20 +- vendor/github.com/segmentio/kafka-go/batch.go | 14 +- vendor/github.com/segmentio/kafka-go/conn.go | 299 ++++++++++-------- .../segmentio/kafka-go/consumergroup.go | 63 ++-- .../segmentio/kafka-go/createtopics.go | 2 +- .../segmentio/kafka-go/deletetopics.go | 2 +- vendor/github.com/segmentio/kafka-go/error.go | 43 ++- .../segmentio/kafka-go/listgroups.go | 12 +- .../github.com/segmentio/kafka-go/logger.go | 17 + .../github.com/segmentio/kafka-go/message.go | 8 +- .../github.com/segmentio/kafka-go/metadata.go | 9 +- .../github.com/segmentio/kafka-go/protocol.go | 162 ++++++++-- .../github.com/segmentio/kafka-go/reader.go | 71 +++-- .../segmentio/kafka-go/recordbatch.go | 108 +++++++ vendor/github.com/segmentio/kafka-go/write.go | 139 +------- .../github.com/segmentio/kafka-go/writer.go | 41 ++- 18 files changed, 641 insertions(+), 384 deletions(-) create mode 100644 vendor/github.com/segmentio/kafka-go/logger.go create mode 100644 vendor/github.com/segmentio/kafka-go/recordbatch.go diff --git a/Gopkg.lock b/Gopkg.lock index e37cf5280..6995896d2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -717,15 +717,15 @@ revision = "1d7be4effb13d2d908342d349d71a284a7542693" [[projects]] - digest = "1:b4b2e722848c38227ef9b926b7f93ba150d80e18fcf4aa3d1ea4830ca0de61a4" + digest = "1:b1184e4b8e474f452b201392428a78f93471ec90a7ef72fd00bf2b11a2511b30" name = "github.com/segmentio/kafka-go" packages = [ ".", "sasl", ] pruneopts = "NUT" - revision = "531c50bb11447b43c02f52718f9213631921fad3" - version = "v0.3.3" + revision = "9a956db8bd00245835f16007fbfe8ec58b31b8b9" + version = "v0.3.5" [[projects]] branch = "master" diff --git a/services/kafka/config.go b/services/kafka/config.go index 39c3e4914..5dad033cf 100644 --- a/services/kafka/config.go +++ b/services/kafka/config.go @@ -2,12 +2,13 @@ package kafka import ( "crypto/tls" + "fmt" "time" "github.com/influxdata/influxdb/toml" "github.com/influxdata/kapacitor/tlsconfig" "github.com/pkg/errors" - kafka "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go" ) const ( @@ -75,7 +76,7 @@ func (c *Config) ApplyConditionalDefaults() { } } -func (c Config) WriterConfig() (kafka.WriterConfig, error) { +func (c Config) WriterConfig(diagnostic Diagnostic) (kafka.WriterConfig, error) { var tlsCfg *tls.Config if c.UseSSL { t, err := tlsconfig.Create(c.SSLCA, c.SSLCert, c.SSLKey, c.InsecureSkipVerify) @@ -88,6 +89,7 @@ func (c Config) WriterConfig() (kafka.WriterConfig, error) { Timeout: time.Duration(c.Timeout), TLS: tlsCfg, } + return kafka.WriterConfig{ Brokers: c.Brokers, Balancer: &kafka.LeastBytes{}, @@ -100,6 +102,9 @@ func (c Config) WriterConfig() (kafka.WriterConfig, error) { // It also means that no errors will be captured from the WriteMessages method. // As such we track the WriteStats for errors and report them with Kapacitor's normal diagnostics. Async: true, + ErrorLogger: kafka.LoggerFunc(func(s string, x ...interface{}) { + diagnostic.Error("kafka client error", fmt.Errorf(s, x...)) + }), }, nil } diff --git a/services/kafka/service.go b/services/kafka/service.go index 6549f49a2..2a04bff2a 100644 --- a/services/kafka/service.go +++ b/services/kafka/service.go @@ -127,8 +127,8 @@ func NewCluster(c Config) *Cluster { } } -func (c *Cluster) WriteMessage(topic string, key, msg []byte) error { - w, err := c.writer(topic) +func (c *Cluster) WriteMessage(diagnostic Diagnostic, topic string, key, msg []byte) error { + w, err := c.writer(topic, diagnostic) if err != nil { return err } @@ -138,7 +138,7 @@ func (c *Cluster) WriteMessage(topic string, key, msg []byte) error { }) } -func (c *Cluster) writer(topic string) (*writer, error) { +func (c *Cluster) writer(topic string, diagnostic Diagnostic) (*writer, error) { c.mu.RLock() w, ok := c.writers[topic] c.mu.RUnlock() @@ -147,10 +147,13 @@ func (c *Cluster) writer(topic string) (*writer, error) { defer c.mu.Unlock() w, ok = c.writers[topic] if !ok { - wc, err := c.cfg.WriterConfig() + wc, err := c.cfg.WriterConfig(diagnostic) if err != nil { return nil, err } + if topic == "" { + return nil, errors.New("topic must not be empty") + } wc.Topic = topic kw := kafka.NewWriter(wc) // Create new writer @@ -315,7 +318,7 @@ func (s *Service) Test(options interface{}) error { if !ok { return fmt.Errorf("unknown cluster %q", o.Cluster) } - return c.WriteMessage(o.Topic, []byte(o.Key), []byte(o.Message)) + return c.WriteMessage(s.diag, o.Topic, []byte(o.Key), []byte(o.Message)) } type HandlerConfig struct { @@ -347,12 +350,15 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er return nil, errors.Wrap(err, "failed to parse template") } } + + diag := s.diag.WithContext(ctx...) + return &handler{ s: s, cluster: cluster, topic: c.Topic, template: t, - diag: s.diag.WithContext(ctx...), + diag: diag, }, nil } @@ -361,7 +367,7 @@ func (h *handler) Handle(event alert.Event) { if err != nil { h.diag.Error("failed to prepare kafka message body", err) } - if err := h.cluster.WriteMessage(h.topic, []byte(event.State.ID), body); err != nil { + if err := h.cluster.WriteMessage(h.diag, h.topic, []byte(event.State.ID), body); err != nil { h.diag.Error("failed to write message to kafka", err) } } diff --git a/vendor/github.com/segmentio/kafka-go/batch.go b/vendor/github.com/segmentio/kafka-go/batch.go index 8fc6c4e33..ff9fae102 100644 --- a/vendor/github.com/segmentio/kafka-go/batch.go +++ b/vendor/github.com/segmentio/kafka-go/batch.go @@ -230,7 +230,12 @@ func (batch *Batch) readMessage( err = batch.msgs.discard() switch { case err != nil: - batch.err = err + // Since io.EOF is used by the batch to indicate that there is are + // no more messages to consume, it is crucial that any io.EOF errors + // on the underlying connection are repackaged. Otherwise, the + // caller can't tell the difference between a batch that was fully + // consumed or a batch whose connection is in an error state. + batch.err = dontExpectEOF(err) case batch.msgs.remaining() == 0: // Because we use the adjusted deadline we could end up returning // before the actual deadline occurred. This is necessary otherwise @@ -243,7 +248,12 @@ func (batch *Batch) readMessage( batch.err = err } default: - batch.err = err + // Since io.EOF is used by the batch to indicate that there is are + // no more messages to consume, it is crucial that any io.EOF errors + // on the underlying connection are repackaged. Otherwise, the + // caller can't tell the difference between a batch that was fully + // consumed or a batch whose connection is in an error state. + batch.err = dontExpectEOF(err) } return diff --git a/vendor/github.com/segmentio/kafka-go/conn.go b/vendor/github.com/segmentio/kafka-go/conn.go index 1b6e7649f..dcac23c9b 100644 --- a/vendor/github.com/segmentio/kafka-go/conn.go +++ b/vendor/github.com/segmentio/kafka-go/conn.go @@ -44,6 +44,9 @@ type Conn struct { // base network connection conn net.Conn + // number of inflight requests on the connection. + inflight int32 + // offset management (synchronized on the mutex field) mutex sync.Mutex offset int64 @@ -72,14 +75,30 @@ type Conn struct { correlationID int32 // number of replica acks required when publishing to a partition - requiredAcks int32 - apiVersions map[apiKey]ApiVersion - fetchVersion apiVersion - produceVersion apiVersion + requiredAcks int32 + + // lazily loaded API versions used by this connection + apiVersions atomic.Value // apiVersionMap transactionalID *string } +type apiVersionMap map[apiKey]ApiVersion + +func (v apiVersionMap) negotiate(key apiKey, sortedSupportedVersions ...apiVersion) apiVersion { + x := v[key] + + for i := len(sortedSupportedVersions) - 1; i >= 0; i-- { + s := sortedSupportedVersions[i] + + if apiVersion(x.MaxVersion) >= s { + return s + } + } + + return -1 +} + // ConnConfig is a configuration object used to create new instances of Conn. type ConnConfig struct { ClientID string @@ -175,48 +194,48 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn { }}, }}, }).size() - c.selectVersions() c.fetchMaxBytes = math.MaxInt32 - c.fetchMinSize return c } -func (c *Conn) selectVersions() { - var err error - apiVersions, err := c.ApiVersions() +func (c *Conn) negotiateVersion(key apiKey, sortedSupportedVersions ...apiVersion) (apiVersion, error) { + v, err := c.loadVersions() if err != nil { - c.apiVersions = defaultApiVersions - } else { - c.apiVersions = make(map[apiKey]ApiVersion) - for _, v := range apiVersions { - c.apiVersions[apiKey(v.ApiKey)] = v - } + return -1, err } - for _, v := range c.apiVersions { - if apiKey(v.ApiKey) == fetchRequest { - switch version := v.MaxVersion; { - case version >= 10: - c.fetchVersion = 10 - case version >= 5: - c.fetchVersion = 5 - default: - c.fetchVersion = 2 - } - } - if apiKey(v.ApiKey) == produceRequest { - if v.MaxVersion >= 7 { - c.produceVersion = 7 - } else { - c.produceVersion = 2 - } - } + a := v.negotiate(key, sortedSupportedVersions...) + if a < 0 { + return -1, fmt.Errorf("no matching versions were found between the client and the broker for API key %d", key) + } + return a, nil +} + +func (c *Conn) loadVersions() (apiVersionMap, error) { + v, _ := c.apiVersions.Load().(apiVersionMap) + if v != nil { + return v, nil + } + + brokerVersions, err := c.ApiVersions() + if err != nil { + return nil, err + } + + v = make(apiVersionMap, len(brokerVersions)) + + for _, a := range brokerVersions { + v[apiKey(a.ApiKey)] = a } + + c.apiVersions.Store(v) + return v, nil } // Controller requests kafka for the current controller and returns its URL func (c *Conn) Controller() (broker Broker, err error) { err = c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1([]string{})) + return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{})) }, func(deadline time.Time, size int) error { var res metadataResponseV1 @@ -244,7 +263,7 @@ func (c *Conn) Brokers() ([]Broker, error) { var brokers []Broker err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1([]string{})) + return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{})) }, func(deadline time.Time, size int) error { var res metadataResponseV1 @@ -284,7 +303,7 @@ func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsRe err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(describeGroupsRequest, v0, id, request) + return c.writeRequest(describeGroups, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -312,7 +331,7 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(groupCoordinatorRequest, v0, id, request) + return c.writeRequest(findCoordinator, v0, id, request) }, func(deadline time.Time, size int) error { @@ -339,7 +358,7 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(heartbeatRequest, v0, id, request) + return c.writeRequest(heartbeat, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -365,7 +384,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(joinGroupRequest, v1, id, request) + return c.writeRequest(joinGroup, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -391,7 +410,7 @@ func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, er err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(leaveGroupRequest, v0, id, request) + return c.writeRequest(leaveGroup, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -417,7 +436,7 @@ func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, er err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(listGroupsRequest, v1, id, request) + return c.writeRequest(listGroups, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -443,7 +462,7 @@ func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponse err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(offsetCommitRequest, v2, id, request) + return c.writeRequest(offsetCommit, v2, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -474,7 +493,7 @@ func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(offsetFetchRequest, v1, id, request) + return c.writeRequest(offsetFetch, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -504,7 +523,7 @@ func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error err := c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(syncGroupRequest, v0, id, request) + return c.writeRequest(syncGroup, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -764,10 +783,15 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch { return &Batch{err: dontExpectEOF(err)} } + fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10) + if err != nil { + return &Batch{err: dontExpectEOF(err)} + } + id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) - switch c.fetchVersion { + switch fetchVersion { case v10: return c.wb.writeFetchRequestV10( id, @@ -818,7 +842,7 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch { var highWaterMark int64 var remain int - switch c.fetchVersion { + switch fetchVersion { case v10: throttle, highWaterMark, remain, err = readFetchResponseHeaderV10(&c.rbuf, size) case v5: @@ -851,7 +875,11 @@ func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch { partition: int(c.partition), // partition is copied to Batch to prevent race with Batch.close offset: offset, highWaterMark: highWaterMark, - err: dontExpectEOF(err), + // there shouldn't be a short read on initially setting up the batch. + // as such, any io.EOF is re-mapped to an io.ErrUnexpectedEOF so that we + // don't accidentally signal that we successfully reached the end of the + // batch. + err: dontExpectEOF(err), } } @@ -928,15 +956,21 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) { // connection. If there are none, the method fetches all partitions of the kafka // cluster. func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) { - defaultTopics := [...]string{c.topic} - if len(topics) == 0 && len(c.topic) != 0 { - topics = defaultTopics[:] + if len(topics) == 0 { + if len(c.topic) != 0 { + defaultTopics := [...]string{c.topic} + topics = defaultTopics[:] + } else { + // topics needs to be explicitly nil-ed out or the broker will + // interpret it as a request for 0 partitions instead of all. + topics = nil + } } err = c.readOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(metadataRequest, v1, id, topicMetadataRequestV1(topics)) + return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics)) }, func(deadline time.Time, size int) error { var res metadataResponseV1 @@ -1026,7 +1060,6 @@ func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message } func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) { - if len(msgs) == 0 { return } @@ -1051,14 +1084,26 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) nbytes += len(msg.Key) + len(msg.Value) } + var produceVersion apiVersion + if produceVersion, err = c.negotiateVersion(produce, v2, v3, v7); err != nil { + return + } + err = c.writeOperation( func(deadline time.Time, id int32) error { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) - switch version := c.apiVersions[produceRequest].MaxVersion; { - case version >= 7: + switch produceVersion { + case v7: + recordBatch, err := + newRecordBatch( + codec, + msgs..., + ) + if err != nil { + return err + } return c.wb.writeProduceRequestV7( - codec, id, c.clientID, c.topic, @@ -1066,11 +1111,18 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) deadlineToTimeout(deadline, now), int16(atomic.LoadInt32(&c.requiredAcks)), c.transactionalID, - msgs..., + recordBatch, ) - case version >= 3: + case v3: + recordBatch, err := + newRecordBatch( + codec, + msgs..., + ) + if err != nil { + return err + } return c.wb.writeProduceRequestV3( - codec, id, c.clientID, c.topic, @@ -1078,7 +1130,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) deadlineToTimeout(deadline, now), int16(atomic.LoadInt32(&c.requiredAcks)), c.transactionalID, - msgs..., + recordBatch, ) default: return c.wb.writeProduceRequestV2( @@ -1105,7 +1157,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) // Read the list of partitions, there should be only one since // we've produced a message to a single partition. size, err = readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) { - switch c.produceVersion { + switch produceVersion { case v7: var p produceResponsePartitionV7 size, err := p.readFrom(r, size) @@ -1218,6 +1270,18 @@ func (c *Conn) writeOperation(write func(time.Time, int32) error, read func(time return c.do(&c.wdeadline, write, read) } +func (c *Conn) enter() { + atomic.AddInt32(&c.inflight, +1) +} + +func (c *Conn) leave() { + atomic.AddInt32(&c.inflight, -1) +} + +func (c *Conn) concurrency() int { + return int(atomic.LoadInt32(&c.inflight)) +} + func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error) error { id, err := c.doRequest(d, write) if err != nil { @@ -1243,6 +1307,7 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func } func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) { + c.enter() c.wlock.Lock() c.correlationID++ id = c.correlationID @@ -1254,6 +1319,7 @@ func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (i // recoverable state so we're better off just giving up at this point to // avoid any risk of corrupting the following operations. c.conn.Close() + c.leave() } c.wlock.Unlock() @@ -1261,60 +1327,45 @@ func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (i } func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) { - // I applied exactly zero scientific process to choose this value, - // it seemed to worked fine in practice tho. - // - // My guess is 100 iterations where the goroutine gets descheduled - // by calling runtime.Gosched() may end up on a wait of ~10ms to ~1s - // (if the programs is heavily CPU bound and has lots of goroutines), - // so it should allow for bailing quickly without taking too much risk - // to get false positives. - const maxAttempts = 100 - var lastID int32 - - for attempt := 0; attempt < maxAttempts; { + for { var rsz int32 var rid int32 c.rlock.Lock() deadline = d.setConnReadDeadline(c.conn) + rsz, rid, err = c.peekResponseSizeAndID() - if rsz, rid, err = c.peekResponseSizeAndID(); err != nil { + if err != nil { d.unsetConnReadDeadline() c.conn.Close() c.rlock.Unlock() - return + break } if id == rid { c.skipResponseSizeAndID() size, lock = int(rsz-4), &c.rlock - return + // Don't unlock the read mutex to yield ownership to the caller. + break + } + + if c.concurrency() == 1 { + // If the goroutine is the only one waiting on this connection it + // should be impossible to read a correlation id different from the + // one it expects. This is a sign that the data we are reading on + // the wire is corrupted and the connection needs to be closed. + err = io.ErrNoProgress + c.rlock.Unlock() + break } // Optimistically release the read lock if a response has already // been received but the current operation is not the target for it. c.rlock.Unlock() runtime.Gosched() - - // This check is a safety mechanism, if we make too many loop - // iterations and always draw the same id then we could be facing - // corrupted data on the wire, or the goroutine(s) sharing ownership - // of this connection may have panicked and therefore will not be able - // to participate in consuming bytes from the wire. To prevent entering - // an infinite loop which reads the same value over and over we bail - // with the uncommon io.ErrNoProgress error which should give a good - // enough signal about what is going wrong. - if rid != lastID { - attempt++ - } else { - attempt = 0 - } - - lastID = rid } - err = io.ErrNoProgress + c.leave() return } @@ -1327,44 +1378,26 @@ func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID } } -type ApiVersion struct { - ApiKey int16 - MinVersion int16 - MaxVersion int16 -} - -var defaultApiVersions map[apiKey]ApiVersion = map[apiKey]ApiVersion{ - produceRequest: ApiVersion{int16(produceRequest), int16(v2), int16(v2)}, - fetchRequest: ApiVersion{int16(fetchRequest), int16(v2), int16(v2)}, - listOffsetRequest: ApiVersion{int16(listOffsetRequest), int16(v1), int16(v1)}, - metadataRequest: ApiVersion{int16(metadataRequest), int16(v1), int16(v1)}, - offsetCommitRequest: ApiVersion{int16(offsetCommitRequest), int16(v2), int16(v2)}, - offsetFetchRequest: ApiVersion{int16(offsetFetchRequest), int16(v1), int16(v1)}, - groupCoordinatorRequest: ApiVersion{int16(groupCoordinatorRequest), int16(v0), int16(v0)}, - joinGroupRequest: ApiVersion{int16(joinGroupRequest), int16(v1), int16(v1)}, - heartbeatRequest: ApiVersion{int16(heartbeatRequest), int16(v0), int16(v0)}, - leaveGroupRequest: ApiVersion{int16(leaveGroupRequest), int16(v0), int16(v0)}, - syncGroupRequest: ApiVersion{int16(syncGroupRequest), int16(v0), int16(v0)}, - describeGroupsRequest: ApiVersion{int16(describeGroupsRequest), int16(v1), int16(v1)}, - listGroupsRequest: ApiVersion{int16(listGroupsRequest), int16(v1), int16(v1)}, - apiVersionsRequest: ApiVersion{int16(apiVersionsRequest), int16(v0), int16(v0)}, - createTopicsRequest: ApiVersion{int16(createTopicsRequest), int16(v0), int16(v0)}, - deleteTopicsRequest: ApiVersion{int16(deleteTopicsRequest), int16(v1), int16(v1)}, -} - func (c *Conn) ApiVersions() ([]ApiVersion, error) { - id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error { - now := time.Now() - deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) + deadline := &c.rdeadline + if deadline.deadline().IsZero() { + // ApiVersions is called automatically when API version negotiation + // needs to happen, so we are not garanteed that a read deadline has + // been set yet. Fallback to use the write deadline in case it was + // set, for example when version negotiation is initiated during a + // produce request. + deadline = &c.wdeadline + } + + id, err := c.doRequest(deadline, func(_ time.Time, id int32) error { h := requestHeader{ - ApiKey: int16(apiVersionsRequest), + ApiKey: int16(apiVersions), ApiVersion: int16(v0), CorrelationID: id, ClientID: c.clientID, } h.Size = (h.size() - 4) - h.writeTo(&c.wb) return c.wbuf.Flush() }) @@ -1372,7 +1405,7 @@ func (c *Conn) ApiVersions() ([]ApiVersion, error) { return nil, err } - _, size, lock, err := c.waitResponse(&c.rdeadline, id) + _, size, lock, err := c.waitResponse(deadline, id) if err != nil { return nil, err } @@ -1483,14 +1516,15 @@ func (c *Conn) saslHandshake(mechanism string) error { // number will affect how the SASL authentication // challenge/responses are sent var resp saslHandshakeResponseV0 - version := v0 - if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 { - version = v1 + + version, err := c.negotiateVersion(saslHandshake, v0, v1) + if err != nil { + return err } - err := c.writeOperation( + err = c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(saslHandshakeRequest, version, id, &saslHandshakeRequestV0{Mechanism: mechanism}) + return c.writeRequest(saslHandshake, version, id, &saslHandshakeRequestV0{Mechanism: mechanism}) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (int, error) { @@ -1512,13 +1546,17 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) { // if we sent a v1 handshake, then we must encapsulate the authentication // request in a saslAuthenticateRequest. otherwise, we read and write raw // bytes. - if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 { + version, err := c.negotiateVersion(saslHandshake, v0, v1) + if err != nil { + return nil, err + } + if version == v1 { var request = saslAuthenticateRequestV0{Data: data} var response saslAuthenticateResponseV0 err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(saslAuthenticateRequest, v0, id, request) + return c.writeRequest(saslAuthenticate, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -1543,8 +1581,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) { } var respLen int32 - _, err := readInt32(&c.rbuf, 4, &respLen) - if err != nil { + if _, err := readInt32(&c.rbuf, 4, &respLen); err != nil { return nil, err } diff --git a/vendor/github.com/segmentio/kafka-go/consumergroup.go b/vendor/github.com/segmentio/kafka-go/consumergroup.go index 8975d24d2..978253083 100644 --- a/vendor/github.com/segmentio/kafka-go/consumergroup.go +++ b/vendor/github.com/segmentio/kafka-go/consumergroup.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "log" "math" "strings" "sync" @@ -135,11 +134,11 @@ type ConsumerGroupConfig struct { // If not nil, specifies a logger used to report internal changes within the // reader. - Logger *log.Logger + Logger Logger // ErrorLogger is the logger used to report errors. If nil, the reader falls // back to using Logger instead. - ErrorLogger *log.Logger + ErrorLogger Logger // connect is a function for dialing the coordinator. This is provided for // unit testing to mock broker connections. @@ -304,8 +303,8 @@ type Generation struct { wg sync.WaitGroup retentionMillis int64 - log func(func(*log.Logger)) - logError func(func(*log.Logger)) + log func(func(Logger)) + logError func(func(Logger)) } // close stops the generation and waits for all functions launched via Start to @@ -374,7 +373,7 @@ func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error { _, err := g.conn.offsetCommit(request) if err == nil { // if logging is enabled, print out the partitions that were committed. - g.log(func(l *log.Logger) { + g.log(func(l Logger) { var report []string for _, t := range request.Topics { report = append(report, fmt.Sprintf("\ttopic: %s", t.Topic)) @@ -394,11 +393,11 @@ func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error { // end of the generation. func (g *Generation) heartbeatLoop(interval time.Duration) { g.Start(func(ctx context.Context) { - g.log(func(l *log.Logger) { + g.log(func(l Logger) { l.Printf("started heartbeat for group, %v [%v]", g.GroupID, interval) }) - defer g.log(func(l *log.Logger) { - l.Println("stopped heartbeat for group,", g.GroupID) + defer g.log(func(l Logger) { + l.Printf("stopped heartbeat for group %s\n", g.GroupID) }) ticker := time.NewTicker(interval) @@ -430,10 +429,10 @@ func (g *Generation) heartbeatLoop(interval time.Duration) { // establish a new connection to the coordinator. func (g *Generation) partitionWatcher(interval time.Duration, topic string) { g.Start(func(ctx context.Context) { - g.log(func(l *log.Logger) { + g.log(func(l Logger) { l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval) }) - defer g.log(func(l *log.Logger) { + defer g.log(func(l Logger) { l.Printf("stopped partition watcher for group, %v, topic %v", g.GroupID, topic) }) @@ -442,7 +441,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { ops, err := g.conn.ReadPartitions(topic) if err != nil { - g.logError(func(l *log.Logger) { + g.logError(func(l Logger) { l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err) }) return @@ -457,13 +456,13 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { switch err { case nil, UnknownTopicOrPartition: if len(ops) != oParts { - g.log(func(l *log.Logger) { + g.log(func(l Logger) { l.Printf("Partition changes found, reblancing group: %v.", g.GroupID) }) return } default: - g.logError(func(l *log.Logger) { + g.logError(func(l Logger) { l.Printf("Problem getting partitions while checking for changes, %v", err) }) if _, ok := err.(Error); ok { @@ -632,7 +631,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { // conditions. conn, err := cg.coordinator() if err != nil { - cg.withErrorLogger(func(log *log.Logger) { + cg.withErrorLogger(func(log Logger) { log.Printf("Unable to establish connection to consumer group coordinator for group %s: %v", cg.config.ID, err) }) return memberID, err // a prior memberID may still be valid, so don't return "" @@ -647,19 +646,19 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { // consumer is elected leader. it may also change or assign the member ID. memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID) if err != nil { - cg.withErrorLogger(func(log *log.Logger) { + cg.withErrorLogger(func(log Logger) { log.Printf("Failed to join group %s: %v", cg.config.ID, err) }) return memberID, err } - cg.withLogger(func(log *log.Logger) { + cg.withLogger(func(log Logger) { log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID) }) // sync group assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments) if err != nil { - cg.withErrorLogger(func(log *log.Logger) { + cg.withErrorLogger(func(log Logger) { log.Printf("Failed to sync group %s: %v", cg.config.ID, err) }) return memberID, err @@ -669,7 +668,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { var offsets map[string]map[int]int64 offsets, err = cg.fetchOffsets(conn, assignments) if err != nil { - cg.withErrorLogger(func(log *log.Logger) { + cg.withErrorLogger(func(log Logger) { log.Printf("Failed to fetch offsets for group %s: %v", cg.config.ID, err) }) return memberID, err @@ -788,7 +787,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i memberID = response.MemberID generationID := response.GenerationID - cg.withLogger(func(l *log.Logger) { + cg.withLogger(func(l Logger) { l.Printf("joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID) }) @@ -800,7 +799,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i } assignments = v - cg.withLogger(func(l *log.Logger) { + cg.withLogger(func(l Logger) { for memberID, assignment := range assignments { for topic, partitions := range assignment { l.Printf("assigned member/topic/partitions %v/%v/%v", memberID, topic, partitions) @@ -809,7 +808,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i }) } - cg.withLogger(func(l *log.Logger) { + cg.withLogger(func(l Logger) { l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID) }) @@ -848,8 +847,8 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque // assignTopicPartitions uses the selected GroupBalancer to assign members to // their various partitions func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) { - cg.withLogger(func(l *log.Logger) { - l.Println("selected as leader for group,", cg.config.ID) + cg.withLogger(func(l Logger) { + l.Printf("selected as leader for group, %s\n", cg.config.ID) }) balancer, ok := findGroupBalancer(group.GroupProtocol, cg.config.GroupBalancers) @@ -876,7 +875,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup return nil, err } - cg.withLogger(func(l *log.Logger) { + cg.withLogger(func(l Logger) { l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID) for _, member := range members { l.Printf("found member: %v/%#v", member.ID, member.UserData) @@ -935,12 +934,12 @@ func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generation } if len(assignments.Topics) == 0 { - cg.withLogger(func(l *log.Logger) { + cg.withLogger(func(l Logger) { l.Printf("received empty assignments for group, %v as member %s for generation %d", cg.config.ID, memberID, generationID) }) } - cg.withLogger(func(l *log.Logger) { + cg.withLogger(func(l Logger) { l.Printf("sync group finished for group, %v", cg.config.ID) }) @@ -975,7 +974,7 @@ func (cg *ConsumerGroup) makeSyncGroupRequestV0(memberID string, generationID in }) } - cg.withErrorLogger(func(logger *log.Logger) { + cg.withLogger(func(logger Logger) { logger.Printf("Syncing %d assignments for generation %d as member %s", len(request.GroupAssignments), generationID, memberID) }) } @@ -1048,7 +1047,7 @@ func (cg *ConsumerGroup) leaveGroup(memberID string) error { return nil } - cg.withLogger(func(log *log.Logger) { + cg.withLogger(func(log Logger) { log.Printf("Leaving group %s, member %s", cg.config.ID, memberID) }) @@ -1067,7 +1066,7 @@ func (cg *ConsumerGroup) leaveGroup(memberID string) error { MemberID: memberID, }) if err != nil { - cg.withErrorLogger(func(log *log.Logger) { + cg.withErrorLogger(func(log Logger) { log.Printf("leave group failed for group, %v, and member, %v: %v", cg.config.ID, memberID, err) }) } @@ -1077,13 +1076,13 @@ func (cg *ConsumerGroup) leaveGroup(memberID string) error { return err } -func (cg *ConsumerGroup) withLogger(do func(*log.Logger)) { +func (cg *ConsumerGroup) withLogger(do func(Logger)) { if cg.config.Logger != nil { do(cg.config.Logger) } } -func (cg *ConsumerGroup) withErrorLogger(do func(*log.Logger)) { +func (cg *ConsumerGroup) withErrorLogger(do func(Logger)) { if cg.config.ErrorLogger != nil { do(cg.config.ErrorLogger) } else { diff --git a/vendor/github.com/segmentio/kafka-go/createtopics.go b/vendor/github.com/segmentio/kafka-go/createtopics.go index a38e0801d..95619da2d 100644 --- a/vendor/github.com/segmentio/kafka-go/createtopics.go +++ b/vendor/github.com/segmentio/kafka-go/createtopics.go @@ -222,7 +222,7 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(createTopicsRequest, v0, id, request) + return c.writeRequest(createTopics, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { diff --git a/vendor/github.com/segmentio/kafka-go/deletetopics.go b/vendor/github.com/segmentio/kafka-go/deletetopics.go index c0af87db1..687c380c3 100644 --- a/vendor/github.com/segmentio/kafka-go/deletetopics.go +++ b/vendor/github.com/segmentio/kafka-go/deletetopics.go @@ -94,7 +94,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(deleteTopicsRequest, v0, id, request) + return c.writeRequest(deleteTopics, v0, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { diff --git a/vendor/github.com/segmentio/kafka-go/error.go b/vendor/github.com/segmentio/kafka-go/error.go index dc91e6e2c..744535dcd 100644 --- a/vendor/github.com/segmentio/kafka-go/error.go +++ b/vendor/github.com/segmentio/kafka-go/error.go @@ -6,6 +6,7 @@ import ( ) // Error represents the different error codes that may be returned by kafka. +// https://kafka.apache.org/protocol#protocol_error_codes type Error int const ( @@ -22,6 +23,7 @@ const ( MessageSizeTooLarge Error = 10 StaleControllerEpoch Error = 11 OffsetMetadataTooLarge Error = 12 + NetworkException Error = 13 GroupLoadInProgress Error = 14 GroupCoordinatorNotAvailable Error = 15 NotCoordinatorForGroup Error = 16 @@ -85,6 +87,12 @@ const ( FencedLeaderEpoch Error = 74 UnknownLeaderEpoch Error = 75 UnsupportedCompressionType Error = 76 + StaleBrokerEpoch Error = 77 + OffsetNotAvailable Error = 78 + MemberIDRequired Error = 79 + PreferredLeaderNotAvailable Error = 80 + GroupMaxSizeReached Error = 81 + FencedInstanceID Error = 82 ) // Error satisfies the error interface. @@ -99,14 +107,35 @@ func (e Error) Timeout() bool { // Temporary returns true if the operation that generated the error may succeed // if retried at a later time. +// Kafka error documentation specifies these as "retriable" +// https://kafka.apache.org/protocol#protocol_error_codes func (e Error) Temporary() bool { - return e == LeaderNotAvailable || - e == BrokerNotAvailable || - e == ReplicaNotAvailable || - e == GroupLoadInProgress || - e == GroupCoordinatorNotAvailable || - e == RebalanceInProgress || - e.Timeout() + switch e { + case InvalidMessage, + UnknownTopicOrPartition, + LeaderNotAvailable, + NotLeaderForPartition, + RequestTimedOut, + NetworkException, + GroupLoadInProgress, + GroupCoordinatorNotAvailable, + NotCoordinatorForGroup, + NotEnoughReplicas, + NotEnoughReplicasAfterAppend, + NotController, + KafkaStorageError, + FetchSessionIDNotFound, + InvalidFetchSessionEpoch, + ListenerNotFound, + FencedLeaderEpoch, + UnknownLeaderEpoch, + OffsetNotAvailable, + PreferredLeaderNotAvailable: + return true + + default: + return false + } } // Title returns a human readable title for the error. diff --git a/vendor/github.com/segmentio/kafka-go/listgroups.go b/vendor/github.com/segmentio/kafka-go/listgroups.go index 9f5e4438d..5e6619df5 100644 --- a/vendor/github.com/segmentio/kafka-go/listgroups.go +++ b/vendor/github.com/segmentio/kafka-go/listgroups.go @@ -14,22 +14,22 @@ func (t listGroupsRequestV1) size() int32 { func (t listGroupsRequestV1) writeTo(wb *writeBuffer) { } -type ListGroupsResponseGroupV1 struct { +type listGroupsResponseGroupV1 struct { // GroupID holds the unique group identifier GroupID string ProtocolType string } -func (t ListGroupsResponseGroupV1) size() int32 { +func (t listGroupsResponseGroupV1) size() int32 { return sizeofString(t.GroupID) + sizeofString(t.ProtocolType) } -func (t ListGroupsResponseGroupV1) writeTo(wb *writeBuffer) { +func (t listGroupsResponseGroupV1) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeString(t.ProtocolType) } -func (t *ListGroupsResponseGroupV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *listGroupsResponseGroupV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.GroupID); err != nil { return } @@ -47,7 +47,7 @@ type listGroupsResponseV1 struct { // ErrorCode holds response error code ErrorCode int16 - Groups []ListGroupsResponseGroupV1 + Groups []listGroupsResponseGroupV1 } func (t listGroupsResponseV1) size() int32 { @@ -71,7 +71,7 @@ func (t *listGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, } fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { - var item ListGroupsResponseGroupV1 + var item listGroupsResponseGroupV1 if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil { return } diff --git a/vendor/github.com/segmentio/kafka-go/logger.go b/vendor/github.com/segmentio/kafka-go/logger.go new file mode 100644 index 000000000..e18ab360a --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/logger.go @@ -0,0 +1,17 @@ +package kafka + +// Logger interface API for log.Logger +type Logger interface { + Printf(string, ...interface{}) +} + +// LoggerFunc is a bridge between Logger and any third party logger +// Usage: +// l := NewLogger() // some logger +// r := kafka.NewReader(kafka.ReaderConfig{ +// Logger: kafka.LoggerFunc(l.Infof), +// ErrorLogger: kafka.LoggerFunc(l.Errorf), +// }) +type LoggerFunc func(string, ...interface{}) + +func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) } diff --git a/vendor/github.com/segmentio/kafka-go/message.go b/vendor/github.com/segmentio/kafka-go/message.go index aae06b5a9..32612ecce 100644 --- a/vendor/github.com/segmentio/kafka-go/message.go +++ b/vendor/github.com/segmentio/kafka-go/message.go @@ -38,6 +38,12 @@ func (msg Message) message(cw *crc32Writer) message { return m } +const timestampSize = 8 + +func (msg Message) size() int32 { + return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize +} + type message struct { CRC int32 MagicByte int8 @@ -62,7 +68,7 @@ func (m message) crc32(cw *crc32Writer) int32 { func (m message) size() int32 { size := 4 + 1 + 1 + sizeofBytes(m.Key) + sizeofBytes(m.Value) if m.MagicByte != 0 { - size += 8 // Timestamp + size += timestampSize } return size } diff --git a/vendor/github.com/segmentio/kafka-go/metadata.go b/vendor/github.com/segmentio/kafka-go/metadata.go index 56e12d0e9..d524b9fd8 100644 --- a/vendor/github.com/segmentio/kafka-go/metadata.go +++ b/vendor/github.com/segmentio/kafka-go/metadata.go @@ -7,7 +7,14 @@ func (r topicMetadataRequestV1) size() int32 { } func (r topicMetadataRequestV1) writeTo(wb *writeBuffer) { - wb.writeStringArray([]string(r)) + // communicate nil-ness to the broker by passing -1 as the array length. + // for this particular request, the broker interpets a zero length array + // as a request for no topics whereas a nil array is for all topics. + if r == nil { + wb.writeArrayLen(-1) + } else { + wb.writeStringArray([]string(r)) + } } type metadataResponseV1 struct { diff --git a/vendor/github.com/segmentio/kafka-go/protocol.go b/vendor/github.com/segmentio/kafka-go/protocol.go index 449ad6e40..727cb2cf8 100644 --- a/vendor/github.com/segmentio/kafka-go/protocol.go +++ b/vendor/github.com/segmentio/kafka-go/protocol.go @@ -3,31 +3,102 @@ package kafka import ( "encoding/binary" "fmt" + "strconv" ) +type ApiVersion struct { + ApiKey int16 + MinVersion int16 + MaxVersion int16 +} + +func (v ApiVersion) Format(w fmt.State, r rune) { + switch r { + case 's': + fmt.Fprint(w, apiKey(v.ApiKey)) + case 'd': + switch { + case w.Flag('-'): + fmt.Fprint(w, v.MinVersion) + case w.Flag('+'): + fmt.Fprint(w, v.MaxVersion) + default: + fmt.Fprint(w, v.ApiKey) + } + case 'v': + switch { + case w.Flag('-'): + fmt.Fprintf(w, "v%d", v.MinVersion) + case w.Flag('+'): + fmt.Fprintf(w, "v%d", v.MaxVersion) + case w.Flag('#'): + fmt.Fprintf(w, "kafka.ApiVersion{ApiKey:%d MinVersion:%d MaxVersion:%d}", v.ApiKey, v.MinVersion, v.MaxVersion) + default: + fmt.Fprintf(w, "%s[v%d:v%d]", apiKey(v.ApiKey), v.MinVersion, v.MaxVersion) + } + } +} + type apiKey int16 const ( - produceRequest apiKey = 0 - fetchRequest apiKey = 1 - listOffsetRequest apiKey = 2 - metadataRequest apiKey = 3 - offsetCommitRequest apiKey = 8 - offsetFetchRequest apiKey = 9 - groupCoordinatorRequest apiKey = 10 - joinGroupRequest apiKey = 11 - heartbeatRequest apiKey = 12 - leaveGroupRequest apiKey = 13 - syncGroupRequest apiKey = 14 - describeGroupsRequest apiKey = 15 - listGroupsRequest apiKey = 16 - saslHandshakeRequest apiKey = 17 - apiVersionsRequest apiKey = 18 - createTopicsRequest apiKey = 19 - deleteTopicsRequest apiKey = 20 - saslAuthenticateRequest apiKey = 36 + produce apiKey = 0 + fetch apiKey = 1 + listOffsets apiKey = 2 + metadata apiKey = 3 + leaderAndIsr apiKey = 4 + stopReplica apiKey = 5 + updateMetadata apiKey = 6 + controlledShutdown apiKey = 7 + offsetCommit apiKey = 8 + offsetFetch apiKey = 9 + findCoordinator apiKey = 10 + joinGroup apiKey = 11 + heartbeat apiKey = 12 + leaveGroup apiKey = 13 + syncGroup apiKey = 14 + describeGroups apiKey = 15 + listGroups apiKey = 16 + saslHandshake apiKey = 17 + apiVersions apiKey = 18 + createTopics apiKey = 19 + deleteTopics apiKey = 20 + deleteRecords apiKey = 21 + initProducerId apiKey = 22 + offsetForLeaderEpoch apiKey = 23 + addPartitionsToTxn apiKey = 24 + addOffsetsToTxn apiKey = 25 + endTxn apiKey = 26 + writeTxnMarkers apiKey = 27 + txnOffsetCommit apiKey = 28 + describeAcls apiKey = 29 + createAcls apiKey = 30 + deleteAcls apiKey = 31 + describeConfigs apiKey = 32 + alterConfigs apiKey = 33 + alterReplicaLogDirs apiKey = 34 + describeLogDirs apiKey = 35 + saslAuthenticate apiKey = 36 + createPartitions apiKey = 37 + createDelegationToken apiKey = 38 + renewDelegationToken apiKey = 39 + expireDelegationToken apiKey = 40 + describeDelegationToken apiKey = 41 + deleteGroups apiKey = 42 + electLeaders apiKey = 43 + incrementalAlterConfigs apiKey = 44 + alterPartitionReassignments apiKey = 45 + listPartitionReassignments apiKey = 46 + offsetDelete apiKey = 47 ) +func (k apiKey) String() string { + if i := int(k); i >= 0 && i < len(apiKeyStrings) { + return apiKeyStrings[i] + } + return strconv.Itoa(int(k)) +} + type apiVersion int16 const ( @@ -35,11 +106,66 @@ const ( v1 apiVersion = 1 v2 apiVersion = 2 v3 apiVersion = 3 + v4 apiVersion = 4 v5 apiVersion = 5 + v6 apiVersion = 6 v7 apiVersion = 7 + v8 apiVersion = 8 + v9 apiVersion = 9 v10 apiVersion = 10 ) +var apiKeyStrings = [...]string{ + produce: "Produce", + fetch: "Fetch", + listOffsets: "ListOffsets", + metadata: "Metadata", + leaderAndIsr: "LeaderAndIsr", + stopReplica: "StopReplica", + updateMetadata: "UpdateMetadata", + controlledShutdown: "ControlledShutdown", + offsetCommit: "OffsetCommit", + offsetFetch: "OffsetFetch", + findCoordinator: "FindCoordinator", + joinGroup: "JoinGroup", + heartbeat: "Heartbeat", + leaveGroup: "LeaveGroup", + syncGroup: "SyncGroup", + describeGroups: "DescribeGroups", + listGroups: "ListGroups", + saslHandshake: "SaslHandshake", + apiVersions: "ApiVersions", + createTopics: "CreateTopics", + deleteTopics: "DeleteTopics", + deleteRecords: "DeleteRecords", + initProducerId: "InitProducerId", + offsetForLeaderEpoch: "OffsetForLeaderEpoch", + addPartitionsToTxn: "AddPartitionsToTxn", + addOffsetsToTxn: "AddOffsetsToTxn", + endTxn: "EndTxn", + writeTxnMarkers: "WriteTxnMarkers", + txnOffsetCommit: "TxnOffsetCommit", + describeAcls: "DescribeAcls", + createAcls: "CreateAcls", + deleteAcls: "DeleteAcls", + describeConfigs: "DescribeConfigs", + alterConfigs: "AlterConfigs", + alterReplicaLogDirs: "AlterReplicaLogDirs", + describeLogDirs: "DescribeLogDirs", + saslAuthenticate: "SaslAuthenticate", + createPartitions: "CreatePartitions", + createDelegationToken: "CreateDelegationToken", + renewDelegationToken: "RenewDelegationToken", + expireDelegationToken: "ExpireDelegationToken", + describeDelegationToken: "DescribeDelegationToken", + deleteGroups: "DeleteGroups", + electLeaders: "ElectLeaders", + incrementalAlterConfigs: "IncrementalAlfterConfigs", + alterPartitionReassignments: "AlterPartitionReassignments", + listPartitionReassignments: "ListPartitionReassignments", + offsetDelete: "OffsetDelete", +} + type requestHeader struct { Size int32 ApiKey int16 diff --git a/vendor/github.com/segmentio/kafka-go/reader.go b/vendor/github.com/segmentio/kafka-go/reader.go index 825f6922e..d8ab5316a 100644 --- a/vendor/github.com/segmentio/kafka-go/reader.go +++ b/vendor/github.com/segmentio/kafka-go/reader.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "log" "math" "sort" "strconv" @@ -16,7 +15,7 @@ import ( const ( LastOffset int64 = -1 // The most recent offset available for a partition. - FirstOffset = -2 // The least recent offset available for a partition. + FirstOffset int64 = -2 // The least recent offset available for a partition. ) const ( @@ -98,7 +97,7 @@ func (r *Reader) subscribe(assignments []PartitionAssignment) { r.start(offsetsByPartition) r.mutex.Unlock() - r.withLogger(func(l *log.Logger) { + r.withLogger(func(l Logger) { l.Printf("subscribed to partitions: %+v", offsetsByPartition) }) } @@ -195,7 +194,7 @@ func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) { commit := func() { if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil { - r.withErrorLogger(func(l *log.Logger) { l.Print(err) }) + r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) }) } else { offsets.reset() } @@ -227,11 +226,11 @@ func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) { // commitLoop processes commits off the commit chan func (r *Reader) commitLoop(ctx context.Context, gen *Generation) { - r.withLogger(func(l *log.Logger) { - l.Println("started commit for group,", r.config.GroupID) + r.withLogger(func(l Logger) { + l.Printf("started commit for group %s\n", r.config.GroupID) }) - defer r.withLogger(func(l *log.Logger) { - l.Println("stopped commit for group,", r.config.GroupID) + defer r.withLogger(func(l Logger) { + l.Printf("stopped commit for group %s\n", r.config.GroupID) }) if r.config.CommitInterval == 0 { @@ -249,7 +248,7 @@ func (r *Reader) run(cg *ConsumerGroup) { defer close(r.done) defer cg.Close() - r.withLogger(func(l *log.Logger) { + r.withLogger(func(l Logger) { l.Printf("entering loop for consumer group, %v\n", r.config.GroupID) }) @@ -260,8 +259,8 @@ func (r *Reader) run(cg *ConsumerGroup) { return } r.stats.errors.observe(1) - r.withErrorLogger(func(l *log.Logger) { - l.Println(err) + r.withErrorLogger(func(l Logger) { + l.Printf(err.Error()) }) continue } @@ -415,11 +414,11 @@ type ReaderConfig struct { // If not nil, specifies a logger used to report internal changes within the // reader. - Logger *log.Logger + Logger Logger // ErrorLogger is the logger used to report errors. If nil, the reader falls // back to using Logger instead. - ErrorLogger *log.Logger + ErrorLogger Logger // IsolationLevel controls the visibility of transactional records. // ReadUncommitted makes all records visible. With ReadCommitted only @@ -883,7 +882,7 @@ func (r *Reader) Offset() int64 { r.mutex.Lock() offset := r.offset r.mutex.Unlock() - r.withLogger(func(log *log.Logger) { + r.withLogger(func(log Logger) { log.Printf("looking up offset of kafka reader for partition %d of %s: %d", r.config.Partition, r.config.Topic, offset) }) return offset @@ -921,7 +920,7 @@ func (r *Reader) SetOffset(offset int64) error { if r.closed { err = io.ErrClosedPipe } else if offset != r.offset { - r.withLogger(func(log *log.Logger) { + r.withLogger(func(log Logger) { log.Printf("setting the offset of the kafka reader for partition %d of %s from %d to %d", r.config.Partition, r.config.Topic, r.offset, offset) }) @@ -1007,13 +1006,13 @@ func (r *Reader) Stats() ReaderStats { return stats } -func (r *Reader) withLogger(do func(*log.Logger)) { +func (r *Reader) withLogger(do func(Logger)) { if r.config.Logger != nil { do(r.config.Logger) } } -func (r *Reader) withErrorLogger(do func(*log.Logger)) { +func (r *Reader) withErrorLogger(do func(Logger)) { if r.config.ErrorLogger != nil { do(r.config.ErrorLogger) } else { @@ -1042,7 +1041,7 @@ func (r *Reader) readLag(ctx context.Context) { if err != nil { r.stats.errors.observe(1) - r.withErrorLogger(func(log *log.Logger) { + r.withErrorLogger(func(log Logger) { log.Printf("kafka reader failed to read lag of partition %d of %s", r.config.Partition, r.config.Topic) }) } else { @@ -1101,8 +1100,8 @@ func (r *Reader) start(offsetsByPartition map[int]int64) { // them using the high level reader API. type reader struct { dialer *Dialer - logger *log.Logger - errorLogger *log.Logger + logger Logger + errorLogger Logger brokers []string topic string partition int @@ -1142,7 +1141,7 @@ func (r *reader) run(ctx context.Context, offset int64) { } } - r.withLogger(func(log *log.Logger) { + r.withLogger(func(log Logger) { log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, offset) }) @@ -1153,7 +1152,7 @@ func (r *reader) run(ctx context.Context, offset int64) { // This would happen if the requested offset is passed the last // offset on the partition leader. In that case we're just going // to retry later hoping that enough data has been produced. - r.withErrorLogger(func(log *log.Logger) { + r.withErrorLogger(func(log Logger) { log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, OffsetOutOfRange) }) continue @@ -1165,7 +1164,7 @@ func (r *reader) run(ctx context.Context, offset int64) { r.sendError(ctx, err) } else { r.stats.errors.observe(1) - r.withErrorLogger(func(log *log.Logger) { + r.withErrorLogger(func(log Logger) { log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err) }) } @@ -1192,8 +1191,14 @@ func (r *reader) run(ctx context.Context, offset int64) { switch offset, err = r.read(ctx, offset, conn); err { case nil: errcount = 0 + case io.EOF: + // done with this batch of messages...carry on. note that this + // block relies on the batch repackaging real io.EOF errors as + // io.UnexpectedEOF. otherwise, we would end up swallowing real + // errors here. + break readLoop case UnknownTopicOrPartition: - r.withErrorLogger(func(log *log.Logger) { + r.withErrorLogger(func(log Logger) { log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, offset, r.brokers) }) @@ -1204,7 +1209,7 @@ func (r *reader) run(ctx context.Context, offset int64) { r.stats.rebalances.observe(1) break readLoop case NotLeaderForPartition: - r.withErrorLogger(func(log *log.Logger) { + r.withErrorLogger(func(log Logger) { log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, offset) }) @@ -1218,7 +1223,7 @@ func (r *reader) run(ctx context.Context, offset int64) { case RequestTimedOut: // Timeout on the kafka side, this can be safely retried. errcount = 0 - r.withLogger(func(log *log.Logger) { + r.withLogger(func(log Logger) { log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, offset) }) r.stats.timeouts.observe(1) @@ -1228,7 +1233,7 @@ func (r *reader) run(ctx context.Context, offset int64) { first, last, err := r.readOffsets(conn) if err != nil { - r.withErrorLogger(func(log *log.Logger) { + r.withErrorLogger(func(log Logger) { log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err) }) conn.Close() @@ -1237,7 +1242,7 @@ func (r *reader) run(ctx context.Context, offset int64) { switch { case offset < first: - r.withErrorLogger(func(log *log.Logger) { + r.withErrorLogger(func(log Logger) { log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, offset, first, first-offset) }) offset, errcount = first, 0 @@ -1249,7 +1254,7 @@ func (r *reader) run(ctx context.Context, offset int64) { default: // We may be reading past the last offset, will retry later. - r.withErrorLogger(func(log *log.Logger) { + r.withErrorLogger(func(log Logger) { log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, offset) }) } @@ -1270,7 +1275,7 @@ func (r *reader) run(ctx context.Context, offset int64) { if _, ok := err.(Error); ok { r.sendError(ctx, err) } else { - r.withErrorLogger(func(log *log.Logger) { + r.withErrorLogger(func(log Logger) { log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, offset, err) }) r.stats.errors.observe(1) @@ -1316,7 +1321,7 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star offset = first } - r.withLogger(func(log *log.Logger) { + r.withLogger(func(log Logger) { log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, offset) }) @@ -1418,13 +1423,13 @@ func (r *reader) sendError(ctx context.Context, err error) error { } } -func (r *reader) withLogger(do func(*log.Logger)) { +func (r *reader) withLogger(do func(Logger)) { if r.logger != nil { do(r.logger) } } -func (r *reader) withErrorLogger(do func(*log.Logger)) { +func (r *reader) withErrorLogger(do func(Logger)) { if r.errorLogger != nil { do(r.errorLogger) } else { diff --git a/vendor/github.com/segmentio/kafka-go/recordbatch.go b/vendor/github.com/segmentio/kafka-go/recordbatch.go new file mode 100644 index 000000000..59ab4937b --- /dev/null +++ b/vendor/github.com/segmentio/kafka-go/recordbatch.go @@ -0,0 +1,108 @@ +package kafka + +import ( + "bytes" + "time" +) + +const recordBatchHeaderSize int32 = 0 + + 8 + // base offset + 4 + // batch length + 4 + // partition leader epoch + 1 + // magic + 4 + // crc + 2 + // attributes + 4 + // last offset delta + 8 + // first timestamp + 8 + // max timestamp + 8 + // producer id + 2 + // producer epoch + 4 + // base sequence + 4 // msg count + +func recordBatchSize(msgs ...Message) (size int32) { + size = recordBatchHeaderSize + baseTime := msgs[0].Time + + for i := range msgs { + msg := &msgs[i] + msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i)) + size += int32(msz + varIntLen(int64(msz))) + } + + return +} + +func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) { + compressed = acquireBuffer() + compressor := codec.NewWriter(compressed) + wb := &writeBuffer{w: compressor} + + for i, msg := range msgs { + wb.writeRecord(0, msgs[0].Time, int64(i), msg) + } + + if err = compressor.Close(); err != nil { + releaseBuffer(compressed) + return + } + + attributes = int16(codec.Code()) + size = recordBatchHeaderSize + int32(compressed.Len()) + return +} + +type recordBatch struct { + // required input parameters + codec CompressionCodec + attributes int16 + msgs []Message + + // parameters calculated during init + compressed *bytes.Buffer + size int32 +} + +func newRecordBatch(codec CompressionCodec, msgs ...Message) (r *recordBatch, err error) { + r = &recordBatch{ + codec: codec, + msgs: msgs, + } + if r.codec == nil { + r.size = recordBatchSize(r.msgs...) + } else { + r.compressed, r.attributes, r.size, err = compressRecordBatch(r.codec, r.msgs...) + } + return +} + +func (r *recordBatch) writeTo(wb *writeBuffer) { + wb.writeInt32(r.size) + + baseTime := r.msgs[0].Time + lastTime := r.msgs[len(r.msgs)-1].Time + if r.compressed != nil { + wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) { + wb.Write(r.compressed.Bytes()) + }) + releaseBuffer(r.compressed) + } else { + wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) { + for i, msg := range r.msgs { + wb.writeRecord(0, r.msgs[0].Time, int64(i), msg) + } + }) + } +} + +func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int { + return 1 + // attributes + varIntLen(int64(milliseconds(timestampDelta))) + + varIntLen(offsetDelta) + + varBytesLen(msg.Key) + + varBytesLen(msg.Value) + + varArrayLen(len(msg.Headers), func(i int) int { + h := &msg.Headers[i] + return varStringLen(h.Key) + varBytesLen(h.Value) + }) +} diff --git a/vendor/github.com/segmentio/kafka-go/write.go b/vendor/github.com/segmentio/kafka-go/write.go index 32ca49c88..3b806509c 100644 --- a/vendor/github.com/segmentio/kafka-go/write.go +++ b/vendor/github.com/segmentio/kafka-go/write.go @@ -167,7 +167,7 @@ type writable interface { func (wb *writeBuffer) writeFetchRequestV2(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error { h := requestHeader{ - ApiKey: int16(fetchRequest), + ApiKey: int16(fetch), ApiVersion: int16(v2), CorrelationID: correlationID, ClientID: clientID, @@ -203,7 +203,7 @@ func (wb *writeBuffer) writeFetchRequestV2(correlationID int32, clientID, topic func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error { h := requestHeader{ - ApiKey: int16(fetchRequest), + ApiKey: int16(fetch), ApiVersion: int16(v5), CorrelationID: correlationID, ClientID: clientID, @@ -245,7 +245,7 @@ func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error { h := requestHeader{ - ApiKey: int16(fetchRequest), + ApiKey: int16(fetch), ApiVersion: int16(v10), CorrelationID: correlationID, ClientID: clientID, @@ -297,7 +297,7 @@ func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic func (wb *writeBuffer) writeListOffsetRequestV1(correlationID int32, clientID, topic string, partition int32, time int64) error { h := requestHeader{ - ApiKey: int16(listOffsetRequest), + ApiKey: int16(listOffsets), ApiVersion: int16(v1), CorrelationID: correlationID, ClientID: clientID, @@ -341,7 +341,7 @@ func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlation } h := requestHeader{ - ApiKey: int16(produceRequest), + ApiKey: int16(produce), ApiVersion: int16(v2), CorrelationID: correlationID, ClientID: clientID, @@ -379,22 +379,10 @@ func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlation return wb.Flush() } -func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) { - var size int32 - var attributes int16 - var compressed *bytes.Buffer - - if codec == nil { - size = recordBatchSize(msgs...) - } else { - compressed, attributes, size, err = compressRecordBatch(codec, msgs...) - if err != nil { - return - } - } +func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) { h := requestHeader{ - ApiKey: int16(produceRequest), + ApiKey: int16(produce), ApiVersion: int16(v3), CorrelationID: correlationID, ClientID: clientID, @@ -409,7 +397,7 @@ func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlation 4 + // partition array length 4 + // partition 4 + // message set size - size + recordBatch.size h.writeTo(wb) wb.writeNullableString(transactionalID) @@ -424,42 +412,15 @@ func (wb *writeBuffer) writeProduceRequestV3(codec CompressionCodec, correlation wb.writeArrayLen(1) wb.writeInt32(partition) - wb.writeInt32(size) - baseTime := msgs[0].Time - lastTime := msgs[len(msgs)-1].Time - - if compressed != nil { - wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) { - wb.Write(compressed.Bytes()) - }) - releaseBuffer(compressed) - } else { - wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) { - for i, msg := range msgs { - wb.writeRecord(0, msgs[0].Time, int64(i), msg) - } - }) - } + recordBatch.writeTo(wb) return wb.Flush() } -func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, msgs ...Message) (err error) { - var size int32 - var attributes int16 - var compressed *bytes.Buffer - - if codec == nil { - size = recordBatchSize(msgs...) - } else { - compressed, attributes, size, err = compressRecordBatch(codec, msgs...) - if err != nil { - return - } - } +func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) { h := requestHeader{ - ApiKey: int16(produceRequest), + ApiKey: int16(produce), ApiVersion: int16(v7), CorrelationID: correlationID, ClientID: clientID, @@ -473,7 +434,7 @@ func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlation 4 + // partition array length 4 + // partition 4 + // message set size - size + recordBatch.size h.writeTo(wb) wb.writeNullableString(transactionalID) @@ -488,22 +449,7 @@ func (wb *writeBuffer) writeProduceRequestV7(codec CompressionCodec, correlation wb.writeArrayLen(1) wb.writeInt32(partition) - wb.writeInt32(size) - baseTime := msgs[0].Time - lastTime := msgs[len(msgs)-1].Time - - if compressed != nil { - wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) { - wb.Write(compressed.Bytes()) - }) - releaseBuffer(compressed) - } else { - wb.writeRecordBatch(attributes, size, len(msgs), baseTime, lastTime, func(wb *writeBuffer) { - for i, msg := range msgs { - wb.writeRecord(0, msgs[0].Time, int64(i), msg) - } - }) - } + recordBatch.writeTo(wb) return wb.Flush() } @@ -572,25 +518,6 @@ func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *by return } -func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) { - compressed = acquireBuffer() - compressor := codec.NewWriter(compressed) - wb := &writeBuffer{w: compressor} - - for i, msg := range msgs { - wb.writeRecord(0, msgs[0].Time, int64(i), msg) - } - - if err = compressor.Close(); err != nil { - releaseBuffer(compressed) - return - } - - attributes = int16(codec.Code()) - size = recordBatchHeaderSize + int32(compressed.Len()) - return -} - func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) { const magicByte = 1 // compatible with kafka 0.10.0.0+ @@ -685,43 +612,3 @@ func messageSetSize(msgs ...Message) (size int32) { } return } - -func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int { - return 1 + // attributes - varIntLen(int64(milliseconds(timestampDelta))) + - varIntLen(offsetDelta) + - varBytesLen(msg.Key) + - varBytesLen(msg.Value) + - varArrayLen(len(msg.Headers), func(i int) int { - h := &msg.Headers[i] - return varStringLen(h.Key) + varBytesLen(h.Value) - }) -} - -const recordBatchHeaderSize int32 = 0 + - 8 + // base offset - 4 + // batch length - 4 + // partition leader epoch - 1 + // magic - 4 + // crc - 2 + // attributes - 4 + // last offset delta - 8 + // first timestamp - 8 + // max timestamp - 8 + // producer id - 2 + // producer epoch - 4 + // base sequence - 4 // msg count - -func recordBatchSize(msgs ...Message) (size int32) { - size = recordBatchHeaderSize - baseTime := msgs[0].Time - - for i := range msgs { - msg := &msgs[i] - msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i)) - size += int32(msz + varIntLen(int64(msz))) - } - - return -} diff --git a/vendor/github.com/segmentio/kafka-go/writer.go b/vendor/github.com/segmentio/kafka-go/writer.go index d1c97c1c4..c07a0c519 100644 --- a/vendor/github.com/segmentio/kafka-go/writer.go +++ b/vendor/github.com/segmentio/kafka-go/writer.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "log" "math/rand" "sort" "sync" @@ -103,6 +102,11 @@ type WriterConfig struct { // The default is to refresh partitions every 15 seconds. RebalanceInterval time.Duration + // Connections that were idle for this duration will not be reused. + // + // Defaults to 9 minutes. + IdleConnTimeout time.Duration + // Number of acknowledges from partition replicas required before receiving // a response to a produce request (default to -1, which means to wait for // all replicas). @@ -120,11 +124,11 @@ type WriterConfig struct { // If not nil, specifies a logger used to report internal changes within the // writer. - Logger *log.Logger + Logger Logger // ErrorLogger is the logger used to report errors. If nil, the writer falls // back to using Logger instead. - ErrorLogger *log.Logger + ErrorLogger Logger newPartitionWriter func(partition int, config WriterConfig, stats *writerStats) partitionWriter } @@ -248,6 +252,9 @@ func NewWriter(config WriterConfig) *Writer { if config.RebalanceInterval == 0 { config.RebalanceInterval = 15 * time.Second } + if config.IdleConnTimeout == 0 { + config.IdleConnTimeout = 9 * time.Minute + } w := &Writer{ config: config, @@ -311,7 +318,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error { } for i, msg := range msgs { - if int(msg.message(nil).size()) > w.config.BatchBytes { + if int(msg.size()) > w.config.BatchBytes { err := MessageTooLargeError{ Message: msg, Remaining: msgs[i+1:], @@ -556,13 +563,14 @@ type writer struct { maxMessageBytes int batchTimeout time.Duration writeTimeout time.Duration + idleConnTimeout time.Duration dialer *Dialer msgs chan writerMessage join sync.WaitGroup stats *writerStats codec CompressionCodec - logger *log.Logger - errorLogger *log.Logger + logger Logger + errorLogger Logger } func newWriter(partition int, config WriterConfig, stats *writerStats) *writer { @@ -575,6 +583,7 @@ func newWriter(partition int, config WriterConfig, stats *writerStats) *writer { maxMessageBytes: config.BatchBytes, batchTimeout: config.BatchTimeout, writeTimeout: config.WriteTimeout, + idleConnTimeout: config.IdleConnTimeout, dialer: config.Dialer, msgs: make(chan writerMessage, config.QueueCapacity), stats: stats, @@ -596,13 +605,13 @@ func (w *writer) messages() chan<- writerMessage { return w.msgs } -func (w *writer) withLogger(do func(*log.Logger)) { +func (w *writer) withLogger(do func(Logger)) { if w.logger != nil { do(w.logger) } } -func (w *writer) withErrorLogger(do func(*log.Logger)) { +func (w *writer) withErrorLogger(do func(Logger)) { if w.errorLogger != nil { do(w.errorLogger) } else { @@ -624,6 +633,7 @@ func (w *writer) run() { var resch = make([](chan<- error), 0, w.batchSize) var lastMsg writerMessage var batchSizeBytes int + var idleConnDeadline time.Time defer func() { if conn != nil { @@ -640,7 +650,7 @@ func (w *writer) run() { if lastMsg.res != nil { resch = append(resch, lastMsg.res) } - batchSizeBytes += int(lastMsg.msg.message(nil).size()) + batchSizeBytes += int(lastMsg.msg.size()) lastMsg = writerMessage{} if !batchTimerRunning { batchTimer.Reset(w.batchTimeout) @@ -652,7 +662,7 @@ func (w *writer) run() { if !ok { done, mustFlush = true, true } else { - if int(wm.msg.message(nil).size())+batchSizeBytes > w.maxMessageBytes { + if int(wm.msg.size())+batchSizeBytes > w.maxMessageBytes { // If the size of the current message puts us over the maxMessageBytes limit, // store the message but don't send it in this batch. mustFlush = true @@ -663,7 +673,7 @@ func (w *writer) run() { if wm.res != nil { resch = append(resch, wm.res) } - batchSizeBytes += int(wm.msg.message(nil).size()) + batchSizeBytes += int(wm.msg.size()) mustFlush = len(batch) >= w.batchSize || batchSizeBytes >= w.maxMessageBytes } if !batchTimerRunning { @@ -684,6 +694,10 @@ func (w *writer) run() { } batchTimerRunning = false } + if conn != nil && time.Now().After(idleConnDeadline) { + conn.Close() + conn = nil + } if len(batch) == 0 { continue } @@ -694,6 +708,7 @@ func (w *writer) run() { conn = nil } } + idleConnDeadline = time.Now().Add(w.idleConnTimeout) for i := range batch { batch[i] = Message{} } @@ -727,7 +742,7 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret if conn == nil { if conn, err = w.dial(); err != nil { w.stats.errors.observe(1) - w.withErrorLogger(func(logger *log.Logger) { + w.withErrorLogger(func(logger Logger) { logger.Printf("error dialing kafka brokers for topic %s (partition %d): %s", w.topic, w.partition, err) }) for i, res := range resch { @@ -741,7 +756,7 @@ func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)) if _, err = conn.WriteCompressedMessages(w.codec, batch...); err != nil { w.stats.errors.observe(1) - w.withErrorLogger(func(logger *log.Logger) { + w.withErrorLogger(func(logger Logger) { logger.Printf("error writing messages to %s (partition %d): %s", w.topic, w.partition, err) }) for i, res := range resch {