diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 0eece0307c7d..1bd83b1b8b5a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -53,7 +53,12 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d - Configurable redis `keys` using filters and format strings. {pull}2169[2169] - Add format string support to `output.kafka.topic`. {pull}2188[2188] - Add `output.kafka.topics` for more advanced kafka topic selection per event. {pull}2188[2188] - +- Add support for kafka 0.10. +- Add support for kafka 0.10. {pull}2190[2190] +- Add SASL/PLAIN authentication support to kafka output. {pull}2190[2190] +- Make Kafka metadata update configurable. {pull}2190[2190] +- Add kafka version setting (optional) enabling kafka broker version support. {pull}2190[2190] +- Add kafka message timestamp if at least version 0.10 is configured. {pull}2190[2190] *Metricbeat* diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index a67d54b4c244..af276081e1f5 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -487,6 +487,27 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # Authentication details. Password is required if username is set. + #username: '' + #password: '' + + # Kafka version filebeat is assumed to run against. Defaults to the oldest + # supported stable version (currently version 0.8.2.0) + #version: 0.8.2 + + # Metadata update configuration. Metadata do contain leader information + # deciding which broker to use when publishing. + #metadata: + # Max metadata request retry attempts when cluster is in middle of leader + # election. Defaults to 3 retries. + #retry.max: 3 + + # Waiting time between retries during leader elections. Default is 250ms. + #retry.backoff: 250ms + + # Refresh metadata interval. Defaults to every 10 minutes. + #refresh_frequency: 10m + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/glide.yaml b/glide.yaml index f5455cbece1d..c5f4478a8e26 100644 --- a/glide.yaml +++ b/glide.yaml @@ -59,11 +59,13 @@ import: - package: github.com/miekg/dns version: 5d001d020961ae1c184f9f8152fdc73810481677 - package: github.com/Shopify/sarama - version: v1.9.0 + version: v1.10.0 - package: github.com/klauspost/crc32 version: v1.0 - package: github.com/golang/snappy version: d9eb7a3d35ec988b8585d4a0068e462c27d28380 +- package: github.com/eapache/go-xerial-snappy + version: bb955e01b9346ac19dc29eb16586c90ded99a98c - package: github.com/eapache/queue version: ded5959c0d4e360646dc9e9908cff48666781367 - package: github.com/eapache/go-resiliency diff --git a/libbeat/_meta/config.full.yml b/libbeat/_meta/config.full.yml index 6d10fc225bec..39a567914827 100644 --- a/libbeat/_meta/config.full.yml +++ b/libbeat/_meta/config.full.yml @@ -261,6 +261,27 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # Authentication details. Password is required if username is set. + #username: '' + #password: '' + + # Kafka version beatname is assumed to run against. Defaults to the oldest + # supported stable version (currently version 0.8.2.0) + #version: 0.8.2 + + # Metadata update configuration. Metadata do contain leader information + # deciding which broker to use when publishing. + #metadata: + # Max metadata request retry attempts when cluster is in middle of leader + # election. Defaults to 3 retries. + #retry.max: 3 + + # Waiting time between retries during leader elections. Default is 250ms. + #retry.backoff: 250ms + + # Refresh metadata interval. Defaults to every 10 minutes. + #refresh_frequency: 10m + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index c6e830e779e0..57985656104d 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -541,6 +541,24 @@ The default value is true. The list of Kafka broker addresses from where to fetch the cluster metadata. The cluster metadata contain the actual Kafka brokers events are published to. +===== version + +Kafka version ${beatname_lc} is assumed to run against. Defaults to oldest +supported stable version (currently version 0.8.2.0). + +Event timestamps will be added, if version 0.10.0.0+ is enabled. + +Valid values are `0.8.2.0`, `0.8.2.1`, `0.8.2.2`, `0.8.2`, `0.8`, `0.9.0.0`, +`0.9.0.1`, `0.9.0`, `0.9`, `0.10.0.0`, `0.10.0`, and `0.10`. + +===== username + +The username for connecting to Kafka. If username is configured, the passowrd must be configured as well. Only SASL/PLAIN is supported. + +===== password + +The password for connecting to Kafka. + ===== topic The Kafka topic used for produced events. The setting can be a format string @@ -554,6 +572,17 @@ The configurable ClientID used for logging, debugging, and auditing purposes. Th The number of concurrent load-balanced Kafka output workers. +===== metadata + +Kafka metadata update settings. The metadata do contain information about +brokers, topics, partition, and active leaders to use for publishing. + +*`refresh_frequency`*:: Metadata refreash interval. Defaults to 10 minutes. + +*`retry.max`*:: Total number of metadata update retries when cluster is in middle of leader election. The default is 3. + +*`retry.backoff`*:: Waiting time between retries during leader elections. Default is 250ms. + ===== max_retries The number of times to retry publishing an event after a publishing failure. diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index bf780e84e6de..38439da6efe8 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -108,6 +108,19 @@ func (c *client) AsyncPublishEvents( for _, event := range events { topic, err := c.topic.Select(event) + var ts time.Time + + // message timestamps have been added to kafka with version 0.10.0.0 + if c.config.Version.IsAtLeast(sarama.V0_10_0_0) { + if tsRaw, ok := event["@timestamp"]; ok { + if tmp, ok := tsRaw.(common.Time); ok { + ts = time.Time(tmp) + } else if tmp, ok := tsRaw.(time.Time); ok { + ts = tmp + } + } + } + jsonEvent, err := json.Marshal(event) if err != nil { ref.done() @@ -115,9 +128,10 @@ func (c *client) AsyncPublishEvents( } msg := &sarama.ProducerMessage{ - Metadata: ref, - Topic: topic, - Value: sarama.ByteEncoder(jsonEvent), + Metadata: ref, + Topic: topic, + Value: sarama.ByteEncoder(jsonEvent), + Timestamp: ts, } ch <- msg diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 883e8736c3e0..2094b4a776b3 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -14,30 +14,54 @@ type kafkaConfig struct { TLS *outputs.TLSConfig `config:"tls"` Timeout time.Duration `config:"timeout" validate:"min=1"` Worker int `config:"worker" validate:"min=1"` + Metadata metaConfig `config:"metadata"` KeepAlive time.Duration `config:"keep_alive" validate:"min=0"` MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"` RequiredACKs *int `config:"required_acks" validate:"min=-1"` BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"` Compression string `config:"compression"` + Version string `config:"version"` MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` ClientID string `config:"client_id"` ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` + Username string `config:"username"` + Password string `config:"password"` +} + +type metaConfig struct { + Retry metaRetryConfig `config:"retry"` + RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"` +} + +type metaRetryConfig struct { + Max int `config:"max" validate:"min=0"` + Backoff time.Duration `config:"backoff" validate:"min=0"` } var ( defaultConfig = kafkaConfig{ - Hosts: nil, - TLS: nil, - Timeout: 30 * time.Second, - Worker: 1, + Hosts: nil, + TLS: nil, + Timeout: 30 * time.Second, + Worker: 1, + Metadata: metaConfig{ + Retry: metaRetryConfig{ + Max: 3, + Backoff: 250 * time.Millisecond, + }, + RefreshFreq: 10 * time.Minute, + }, KeepAlive: 0, MaxMessageBytes: nil, // use library default RequiredACKs: nil, // use library default BrokerTimeout: 10 * time.Second, Compression: "gzip", + Version: "", MaxRetries: 3, ClientID: "beats", ChanBufferSize: 256, + Username: "", + Password: "", } ) @@ -50,5 +74,13 @@ func (c *kafkaConfig) Validate() error { return fmt.Errorf("compression mode '%v' unknown", c.Compression) } + if _, ok := kafkaVersions[c.Version]; !ok { + return fmt.Errorf("unknown/unsupported kafka version '%v'", c.Version) + } + + if c.Username != "" && c.Password == "" { + return fmt.Errorf("password must be set when username is configured") + } + return nil } diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 0ae5efaeaba1..86a370919144 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -53,6 +53,25 @@ var ( "gzip": sarama.CompressionGZIP, "snappy": sarama.CompressionSnappy, } + + kafkaVersions = map[string]sarama.KafkaVersion{ + "": sarama.V0_8_2_0, + + "0.8.2.0": sarama.V0_8_2_0, + "0.8.2.1": sarama.V0_8_2_1, + "0.8.2.2": sarama.V0_8_2_2, + "0.8.2": sarama.V0_8_2_2, + "0.8": sarama.V0_8_2_2, + + "0.9.0.0": sarama.V0_9_0_0, + "0.9.0.1": sarama.V0_9_0_1, + "0.9.0": sarama.V0_9_0_1, + "0.9": sarama.V0_9_0_1, + + "0.10.0.0": sarama.V0_10_0_0, + "0.10.0": sarama.V0_10_0_0, + "0.10": sarama.V0_10_0_0, + } ) // New instantiates a new kafka output instance. @@ -211,8 +230,16 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Net.TLS.Enable = tls != nil k.Net.TLS.Config = tls - // TODO: configure metadata level properties - // use lib defaults + if config.Username != "" { + k.Net.SASL.Enable = true + k.Net.SASL.User = config.Username + k.Net.SASL.Password = config.Password + } + + // configure metadata update properties + k.Metadata.Retry.Max = config.Metadata.Retry.Max + k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff + k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq // configure producer API properties if config.MaxMessageBytes != nil { @@ -237,6 +264,7 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) { retryMax = 1000 } k.Producer.Retry.Max = retryMax + // TODO: k.Producer.Retry.Backoff = ? // configure per broker go channel buffering k.ChannelBufferSize = config.ChanBufferSize @@ -247,5 +275,12 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) { logp.Err("Invalid kafka configuration: %v", err) return nil, err } + + version, ok := kafkaVersions[config.Version] + if !ok { + return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version) + } + k.Version = version + return k, nil } diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index 83d00880e2e1..333a01ccd92b 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -409,6 +409,27 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # Authentication details. Password is required if username is set. + #username: '' + #password: '' + + # Kafka version metricbeat is assumed to run against. Defaults to the oldest + # supported stable version (currently version 0.8.2.0) + #version: 0.8.2 + + # Metadata update configuration. Metadata do contain leader information + # deciding which broker to use when publishing. + #metadata: + # Max metadata request retry attempts when cluster is in middle of leader + # election. Defaults to 3 retries. + #retry.max: 3 + + # Waiting time between retries during leader elections. Default is 250ms. + #retry.backoff: 250ms + + # Refresh metadata interval. Defaults to every 10 minutes. + #refresh_frequency: 10m + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/packetbeat/packetbeat.full.yml b/packetbeat/packetbeat.full.yml index 9e3900038541..b96c0acb2c5e 100644 --- a/packetbeat/packetbeat.full.yml +++ b/packetbeat/packetbeat.full.yml @@ -679,6 +679,27 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # Authentication details. Password is required if username is set. + #username: '' + #password: '' + + # Kafka version packetbeat is assumed to run against. Defaults to the oldest + # supported stable version (currently version 0.8.2.0) + #version: 0.8.2 + + # Metadata update configuration. Metadata do contain leader information + # deciding which broker to use when publishing. + #metadata: + # Max metadata request retry attempts when cluster is in middle of leader + # election. Defaults to 3 retries. + #retry.max: 3 + + # Waiting time between retries during leader elections. Default is 250ms. + #retry.backoff: 250ms + + # Refresh metadata interval. Defaults to every 10 minutes. + #refresh_frequency: 10m + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/vendor/github.com/Shopify/sarama/CONTRIBUTING.md b/vendor/github.com/Shopify/sarama/.github/CONTRIBUTING.md similarity index 100% rename from vendor/github.com/Shopify/sarama/CONTRIBUTING.md rename to vendor/github.com/Shopify/sarama/.github/CONTRIBUTING.md diff --git a/vendor/github.com/Shopify/sarama/.github/ISSUE_TEMPLATE.md b/vendor/github.com/Shopify/sarama/.github/ISSUE_TEMPLATE.md new file mode 100644 index 000000000000..ee6b6f785cfa --- /dev/null +++ b/vendor/github.com/Shopify/sarama/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,19 @@ +##### Versions + +Sarama Version: +Kafka Version: +Go Version: + +##### Configuration + +What configuration values are you using for Sarama and Kafka? + +##### Logs + +When filing an issue please provide logs from Sarama and Kafka if at all +possible. You can set `sarama.Logger` to a `log.Logger` to capture Sarama debug +output. + +##### Problem Description + + diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md index 0cc2f2dd329e..429f65aa3811 100644 --- a/vendor/github.com/Shopify/sarama/CHANGELOG.md +++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md @@ -1,5 +1,54 @@ # Changelog +#### Version 1.10.0 (2016-08-02) + +_Important:_ As of Sarama 1.10 it is necessary to tell Sarama the version of +Kafka you are running against (via the `config.Version` value) in order to use +features that may not be compatible with old Kafka versions. If you don't +specify this value it will default to 0.8.2 (the minimum supported), and trying +to use more recent features (like the offset manager) will fail with an error. + +_Also:_ The offset-manager's behaviour has been changed to match the upstream +java consumer (see [#705](https://github.com/Shopify/sarama/pull/705) and +[#713](https://github.com/Shopify/sarama/pull/713)). If you use the +offset-manager, please ensure that you are committing one *greater* than the +last consumed message offset or else you may end up consuming duplicate +messages. + +New Features: + - Support for Kafka 0.10 + ([#672](https://github.com/Shopify/sarama/pull/672), + [#678](https://github.com/Shopify/sarama/pull/678), + [#681](https://github.com/Shopify/sarama/pull/681), and others). + - Support for configuring the target Kafka version + ([#676](https://github.com/Shopify/sarama/pull/676)). + - Batch producing support in the SyncProducer + ([#677](https://github.com/Shopify/sarama/pull/677)). + - Extend producer mock to allow setting expectations on message contents + ([#667](https://github.com/Shopify/sarama/pull/667)). + +Improvements: + - Support `nil` compressed messages for deleting in compacted topics + ([#634](https://github.com/Shopify/sarama/pull/634)). + - Pre-allocate decoding errors, greatly reducing heap usage and GC time against + misbehaving brokers ([#690](https://github.com/Shopify/sarama/pull/690)). + - Re-use consumer expiry timers, removing one allocation per consumed message + ([#707](https://github.com/Shopify/sarama/pull/707)). + +Bug Fixes: + - Actually default the client ID to "sarama" like we say we do + ([#664](https://github.com/Shopify/sarama/pull/664)). + - Fix a rare issue where `Client.Leader` could return the wrong error + ([#685](https://github.com/Shopify/sarama/pull/685)). + - Fix a possible tight loop in the consumer + ([#693](https://github.com/Shopify/sarama/pull/693)). + - Match upstream's offset-tracking behaviour + ([#705](https://github.com/Shopify/sarama/pull/705)). + - Report UnknownTopicOrPartition errors from the offset manager + ([#706](https://github.com/Shopify/sarama/pull/706)). + - Fix possible negative partition value from the HashPartitioner + ([#709](https://github.com/Shopify/sarama/pull/709)). + #### Version 1.9.0 (2016-05-16) New Features: diff --git a/vendor/github.com/Shopify/sarama/README.md b/vendor/github.com/Shopify/sarama/README.md index a4bd9afd8b46..bcbd3e9c13ce 100644 --- a/vendor/github.com/Shopify/sarama/README.md +++ b/vendor/github.com/Shopify/sarama/README.md @@ -18,8 +18,8 @@ Sarama is an MIT-licensed Go client library for [Apache Kafka](https://kafka.apa Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest stable releases of Kafka and Go, and we provide a two month grace period for older releases. This means we currently officially support -Go 1.6 and 1.5, and Kafka 0.9.0 and 0.8.2, although older releases are still -likely to work. +Go 1.6 and 1.5, and Kafka 0.10.0, 0.9.0 and 0.8.2, although older releases are +still likely to work. Sarama follows semantic versioning and provides API stability via the gopkg.in service. You can import a version with a guaranteed stable API via http://gopkg.in/Shopify/sarama.v1. diff --git a/vendor/github.com/Shopify/sarama/api_versions_request.go b/vendor/github.com/Shopify/sarama/api_versions_request.go new file mode 100644 index 000000000000..ab65f01ccff1 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/api_versions_request.go @@ -0,0 +1,24 @@ +package sarama + +type ApiVersionsRequest struct { +} + +func (r *ApiVersionsRequest) encode(pe packetEncoder) error { + return nil +} + +func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) { + return nil +} + +func (r *ApiVersionsRequest) key() int16 { + return 18 +} + +func (r *ApiVersionsRequest) version() int16 { + return 0 +} + +func (r *ApiVersionsRequest) requiredVersion() KafkaVersion { + return V0_10_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/api_versions_response.go b/vendor/github.com/Shopify/sarama/api_versions_response.go new file mode 100644 index 000000000000..16d62db2d305 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/api_versions_response.go @@ -0,0 +1,86 @@ +package sarama + +type ApiVersionsResponseBlock struct { + ApiKey int16 + MinVersion int16 + MaxVersion int16 +} + +func (b *ApiVersionsResponseBlock) encode(pe packetEncoder) error { + pe.putInt16(b.ApiKey) + pe.putInt16(b.MinVersion) + pe.putInt16(b.MaxVersion) + return nil +} + +func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error { + var err error + + if b.ApiKey, err = pd.getInt16(); err != nil { + return err + } + + if b.MinVersion, err = pd.getInt16(); err != nil { + return err + } + + if b.MaxVersion, err = pd.getInt16(); err != nil { + return err + } + + return nil +} + +type ApiVersionsResponse struct { + Err KError + ApiVersions []*ApiVersionsResponseBlock +} + +func (r *ApiVersionsResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + if err := pe.putArrayLength(len(r.ApiVersions)); err != nil { + return err + } + for _, apiVersion := range r.ApiVersions { + if err := apiVersion.encode(pe); err != nil { + return err + } + } + return nil +} + +func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + numBlocks, err := pd.getArrayLength() + if err != nil { + return err + } + + r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks) + for i := 0; i < numBlocks; i++ { + block := new(ApiVersionsResponseBlock) + if err := block.decode(pd); err != nil { + return err + } + r.ApiVersions[i] = block + } + + return nil +} + +func (r *ApiVersionsResponse) key() int16 { + return 18 +} + +func (r *ApiVersionsResponse) version() int16 { + return 0 +} + +func (r *ApiVersionsResponse) requiredVersion() KafkaVersion { + return V0_10_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go index e7ae8c2e5d19..e1ae5b0dab21 100644 --- a/vendor/github.com/Shopify/sarama/async_producer.go +++ b/vendor/github.com/Shopify/sarama/async_producer.go @@ -135,6 +135,11 @@ type ProducerMessage struct { // Partition is the partition that the message was sent to. This is only // guaranteed to be defined if the message was successfully delivered. Partition int32 + // Timestamp is the timestamp assigned to the message by the broker. This + // is only guaranteed to be defined if the message was successfully + // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at + // least version 0.10.0. + Timestamp time.Time retries int flags flagSet @@ -722,6 +727,11 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo switch block.Err { // Success case ErrNoError: + if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() { + for _, msg := range msgs { + msg.Timestamp = block.Timestamp + } + } for i, msg := range msgs { msg.Offset = block.Offset + int64(i) } diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go index a9b0cf437c38..bfcb82f37a18 100644 --- a/vendor/github.com/Shopify/sarama/broker.go +++ b/vendor/github.com/Shopify/sarama/broker.go @@ -317,7 +317,7 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups return response, nil } -func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, error) { +func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() @@ -328,6 +328,10 @@ func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, e return nil, ErrNotConnected } + if !b.conf.Version.IsAtLeast(rb.requiredVersion()) { + return nil, ErrUnsupportedVersion + } + req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb} buf, err := encode(req) if err != nil { @@ -355,7 +359,7 @@ func (b *Broker) send(rb requestBody, promiseResponse bool) (*responsePromise, e return &promise, nil } -func (b *Broker) sendAndReceive(req requestBody, res decoder) error { +func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error { promise, err := b.send(req, res != nil) if err != nil { @@ -368,7 +372,7 @@ func (b *Broker) sendAndReceive(req requestBody, res decoder) error { select { case buf := <-promise.packets: - return decode(buf, res) + return versionedDecode(buf, res, req.version()) case err = <-promise.errors: return err } diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go index 54a88973952f..b8fc37bc7023 100644 --- a/vendor/github.com/Shopify/sarama/client.go +++ b/vendor/github.com/Shopify/sarama/client.go @@ -290,7 +290,7 @@ func (client *client) Leader(topic string, partitionID int32) (*Broker, error) { leader, err := client.cachedLeader(topic, partitionID) if leader == nil { - err := client.RefreshMetadata(topic) + err = client.RefreshMetadata(topic) if err != nil { return nil, err } @@ -685,7 +685,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin } for broker := client.any(); broker != nil; broker = client.any() { - Logger.Printf("client/coordinator requesting coordinator for consumergoup %s from %s\n", consumerGroup, broker.Addr()) + Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr()) request := new(ConsumerMetadataRequest) request.ConsumerGroup = consumerGroup @@ -707,7 +707,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin switch response.Err { case ErrNoError: - Logger.Printf("client/coordinator coordinator for consumergoup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr()) + Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr()) return response, nil case ErrConsumerCoordinatorNotAvailable: diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go index d9cf469fef4c..b61bf7ea4116 100644 --- a/vendor/github.com/Shopify/sarama/config.go +++ b/vendor/github.com/Shopify/sarama/config.go @@ -6,7 +6,9 @@ import ( "time" ) -var validID *regexp.Regexp = regexp.MustCompile(`\A[A-Za-z0-9._-]*\z`) +const defaultClientID = "sarama" + +var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) // Config is used to pass multiple configuration options to Sarama's constructors. type Config struct { @@ -189,7 +191,7 @@ type Config struct { // Return specifies what channels will be populated. If they are set to true, // you must read from them to prevent deadlock. Return struct { - // If enabled, any errors that occured while consuming are returned on + // If enabled, any errors that occurred while consuming are returned on // the Errors channel (default disabled). Errors bool } @@ -224,6 +226,13 @@ type Config struct { // in the background while user code is working, greatly improving throughput. // Defaults to 256. ChannelBufferSize int + // The version of Kafka that Sarama will assume it is running against. + // Defaults to the oldest supported stable version. Since Kafka provides + // backwards-compatibility, setting it to a version older than you have + // will not break anything, although it may prevent you from using the + // latest features. Setting it to a version greater than you are actually + // running may lead to random breakage. + Version KafkaVersion } // NewConfig returns a new configuration instance with sane defaults. @@ -256,7 +265,9 @@ func NewConfig() *Config { c.Consumer.Offsets.CommitInterval = 1 * time.Second c.Consumer.Offsets.Initial = OffsetNewest + c.ClientID = defaultClientID c.ChannelBufferSize = 256 + c.Version = minVersion return c } @@ -297,7 +308,7 @@ func (c *Config) Validate() error { if c.Consumer.Offsets.Retention%time.Millisecond != 0 { Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.") } - if c.ClientID == "sarama" { + if c.ClientID == defaultClientID { Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.") } diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go index b482371d1796..c70b528f0091 100644 --- a/vendor/github.com/Shopify/sarama/consumer.go +++ b/vendor/github.com/Shopify/sarama/consumer.go @@ -14,6 +14,7 @@ type ConsumerMessage struct { Topic string Partition int32 Offset int64 + Timestamp time.Time // only set if kafka is version 0.10+ } // ConsumerError is what is provided to the user when an error occurs. @@ -255,7 +256,7 @@ type PartitionConsumer interface { // the broker. Messages() <-chan *ConsumerMessage - // Errors returns a read channel of errors that occured during consuming, if + // Errors returns a read channel of errors that occurred during consuming, if // enabled. By default, errors are logged and not returned over this channel. // If you want to implement any custom error handling, set your config's // Consumer.Return.Errors setting to true, and read from this channel. @@ -412,15 +413,18 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { var msgs []*ConsumerMessage + expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime) feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) for i, msg := range msgs { + expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) + select { case child.messages <- msg: - case <-time.After(child.conf.Consumer.MaxProcessingTime): + case <-expiryTimer.C: child.responseResult = errTimedOut child.broker.acks.Done() for _, msg = range msgs[i:] { @@ -489,6 +493,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, + Timestamp: msg.Msg.Timestamp, }) child.offset = msg.Offset + 1 } else { @@ -538,7 +543,7 @@ func (bc *brokerConsumer) subscriptionManager() { var buffer []*partitionConsumer // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer - // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks + // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available, // so the main goroutine can block waiting for work if it has none. @@ -669,8 +674,12 @@ func (bc *brokerConsumer) abort(err error) { child.trigger <- none{} } - for newSubscription := range bc.newSubscriptions { - for _, child := range newSubscription { + for newSubscriptions := range bc.newSubscriptions { + if len(newSubscriptions) == 0 { + <-bc.wait + continue + } + for _, child := range newSubscriptions { child.sendError(err) child.trigger <- none{} } @@ -682,6 +691,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { MinBytes: bc.consumer.conf.Consumer.Fetch.Min, MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond), } + if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { + request.Version = 2 + } for child := range bc.subscriptions { request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize) diff --git a/vendor/github.com/Shopify/sarama/consumer_metadata_request.go b/vendor/github.com/Shopify/sarama/consumer_metadata_request.go index 9b8fcd74e864..483be3354df5 100644 --- a/vendor/github.com/Shopify/sarama/consumer_metadata_request.go +++ b/vendor/github.com/Shopify/sarama/consumer_metadata_request.go @@ -8,7 +8,7 @@ func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error { return pe.putString(r.ConsumerGroup) } -func (r *ConsumerMetadataRequest) decode(pd packetDecoder) (err error) { +func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) { r.ConsumerGroup, err = pd.getString() return err } @@ -20,3 +20,7 @@ func (r *ConsumerMetadataRequest) key() int16 { func (r *ConsumerMetadataRequest) version() int16 { return 0 } + +func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion { + return V0_8_2_0 +} diff --git a/vendor/github.com/Shopify/sarama/consumer_metadata_response.go b/vendor/github.com/Shopify/sarama/consumer_metadata_response.go index d6b5614b432f..6b9632bbafe6 100644 --- a/vendor/github.com/Shopify/sarama/consumer_metadata_response.go +++ b/vendor/github.com/Shopify/sarama/consumer_metadata_response.go @@ -13,7 +13,7 @@ type ConsumerMetadataResponse struct { CoordinatorPort int32 // deprecated: use Coordinator.Addr() } -func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) { +func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err @@ -71,3 +71,15 @@ func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { pe.putInt32(r.CoordinatorPort) return nil } + +func (r *ConsumerMetadataResponse) key() int16 { + return 10 +} + +func (r *ConsumerMetadataResponse) version() int16 { + return 0 +} + +func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion { + return V0_8_2_0 +} diff --git a/vendor/github.com/Shopify/sarama/describe_groups_request.go b/vendor/github.com/Shopify/sarama/describe_groups_request.go index c9426a6b7b94..1fb356777085 100644 --- a/vendor/github.com/Shopify/sarama/describe_groups_request.go +++ b/vendor/github.com/Shopify/sarama/describe_groups_request.go @@ -8,7 +8,7 @@ func (r *DescribeGroupsRequest) encode(pe packetEncoder) error { return pe.putStringArray(r.Groups) } -func (r *DescribeGroupsRequest) decode(pd packetDecoder) (err error) { +func (r *DescribeGroupsRequest) decode(pd packetDecoder, version int16) (err error) { r.Groups, err = pd.getStringArray() return } @@ -21,6 +21,10 @@ func (r *DescribeGroupsRequest) version() int16 { return 0 } +func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion { + return V0_9_0_0 +} + func (r *DescribeGroupsRequest) AddGroup(group string) { r.Groups = append(r.Groups, group) } diff --git a/vendor/github.com/Shopify/sarama/describe_groups_response.go b/vendor/github.com/Shopify/sarama/describe_groups_response.go index b4b32dd8b2aa..e78b8ce02108 100644 --- a/vendor/github.com/Shopify/sarama/describe_groups_response.go +++ b/vendor/github.com/Shopify/sarama/describe_groups_response.go @@ -18,7 +18,7 @@ func (r *DescribeGroupsResponse) encode(pe packetEncoder) error { return nil } -func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) { +func (r *DescribeGroupsResponse) decode(pd packetDecoder, version int16) (err error) { n, err := pd.getArrayLength() if err != nil { return err @@ -35,6 +35,18 @@ func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) { return nil } +func (r *DescribeGroupsResponse) key() int16 { + return 15 +} + +func (r *DescribeGroupsResponse) version() int16 { + return 0 +} + +func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion { + return V0_9_0_0 +} + type GroupDescription struct { Err KError GroupId string diff --git a/vendor/github.com/Shopify/sarama/encoder_decoder.go b/vendor/github.com/Shopify/sarama/encoder_decoder.go index b91efaa0ee60..35a24c2d99d7 100644 --- a/vendor/github.com/Shopify/sarama/encoder_decoder.go +++ b/vendor/github.com/Shopify/sarama/encoder_decoder.go @@ -41,6 +41,10 @@ type decoder interface { decode(pd packetDecoder) error } +type versionedDecoder interface { + decode(pd packetDecoder, version int16) error +} + // Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes, // interpreted using Kafka's encoding rules. func decode(buf []byte, in decoder) error { @@ -60,3 +64,21 @@ func decode(buf []byte, in decoder) error { return nil } + +func versionedDecode(buf []byte, in versionedDecoder, version int16) error { + if buf == nil { + return nil + } + + helper := realDecoder{raw: buf} + err := in.decode(&helper, version) + if err != nil { + return err + } + + if helper.off != len(buf) { + return PacketDecodingError{"invalid length"} + } + + return nil +} diff --git a/vendor/github.com/Shopify/sarama/errors.go b/vendor/github.com/Shopify/sarama/errors.go index a837087f1d76..cfb7006f7731 100644 --- a/vendor/github.com/Shopify/sarama/errors.go +++ b/vendor/github.com/Shopify/sarama/errors.go @@ -85,6 +85,7 @@ const ( ErrMessageSizeTooLarge KError = 10 ErrStaleControllerEpochCode KError = 11 ErrOffsetMetadataTooLarge KError = 12 + ErrNetworkException KError = 13 ErrOffsetsLoadInProgress KError = 14 ErrConsumerCoordinatorNotAvailable KError = 15 ErrNotCoordinatorForConsumer KError = 16 @@ -103,6 +104,10 @@ const ( ErrTopicAuthorizationFailed KError = 29 ErrGroupAuthorizationFailed KError = 30 ErrClusterAuthorizationFailed KError = 31 + ErrInvalidTimestamp KError = 32 + ErrUnsupportedSASLMechanism KError = 33 + ErrIllegalSASLState KError = 34 + ErrUnsupportedVersion KError = 35 ) func (err KError) Error() string { @@ -137,6 +142,8 @@ func (err KError) Error() string { return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)." case ErrOffsetMetadataTooLarge: return "kafka server: Specified a string larger than the configured maximum for offset metadata." + case ErrNetworkException: + return "kafka server: The server disconnected before a response was received." case ErrOffsetsLoadInProgress: return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition." case ErrConsumerCoordinatorNotAvailable: @@ -173,6 +180,14 @@ func (err KError) Error() string { return "kafka server: The client is not authorized to access this group." case ErrClusterAuthorizationFailed: return "kafka server: The client is not authorized to send this request type." + case ErrInvalidTimestamp: + return "kafka server: The timestamp of the message is out of acceptable range." + case ErrUnsupportedSASLMechanism: + return "kafka server: The broker does not support the requested SASL mechanism." + case ErrIllegalSASLState: + return "kafka server: Request is not valid given the current SASL state." + case ErrUnsupportedVersion: + return "kafka server: The version of API is not supported." } return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err) diff --git a/vendor/github.com/Shopify/sarama/fetch_request.go b/vendor/github.com/Shopify/sarama/fetch_request.go index 3c00fad65302..ae701a3f29a5 100644 --- a/vendor/github.com/Shopify/sarama/fetch_request.go +++ b/vendor/github.com/Shopify/sarama/fetch_request.go @@ -5,17 +5,17 @@ type fetchRequestBlock struct { maxBytes int32 } -func (f *fetchRequestBlock) encode(pe packetEncoder) error { - pe.putInt64(f.fetchOffset) - pe.putInt32(f.maxBytes) +func (b *fetchRequestBlock) encode(pe packetEncoder) error { + pe.putInt64(b.fetchOffset) + pe.putInt32(b.maxBytes) return nil } -func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) { - if f.fetchOffset, err = pd.getInt64(); err != nil { +func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) { + if b.fetchOffset, err = pd.getInt64(); err != nil { return err } - if f.maxBytes, err = pd.getInt32(); err != nil { + if b.maxBytes, err = pd.getInt32(); err != nil { return err } return nil @@ -24,18 +24,19 @@ func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) { type FetchRequest struct { MaxWaitTime int32 MinBytes int32 + Version int16 blocks map[string]map[int32]*fetchRequestBlock } -func (f *FetchRequest) encode(pe packetEncoder) (err error) { +func (r *FetchRequest) encode(pe packetEncoder) (err error) { pe.putInt32(-1) // replica ID is always -1 for clients - pe.putInt32(f.MaxWaitTime) - pe.putInt32(f.MinBytes) - err = pe.putArrayLength(len(f.blocks)) + pe.putInt32(r.MaxWaitTime) + pe.putInt32(r.MinBytes) + err = pe.putArrayLength(len(r.blocks)) if err != nil { return err } - for topic, blocks := range f.blocks { + for topic, blocks := range r.blocks { err = pe.putString(topic) if err != nil { return err @@ -55,14 +56,15 @@ func (f *FetchRequest) encode(pe packetEncoder) (err error) { return nil } -func (f *FetchRequest) decode(pd packetDecoder) (err error) { +func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version if _, err = pd.getInt32(); err != nil { return err } - if f.MaxWaitTime, err = pd.getInt32(); err != nil { + if r.MaxWaitTime, err = pd.getInt32(); err != nil { return err } - if f.MinBytes, err = pd.getInt32(); err != nil { + if r.MinBytes, err = pd.getInt32(); err != nil { return err } topicCount, err := pd.getArrayLength() @@ -72,7 +74,7 @@ func (f *FetchRequest) decode(pd packetDecoder) (err error) { if topicCount == 0 { return nil } - f.blocks = make(map[string]map[int32]*fetchRequestBlock) + r.blocks = make(map[string]map[int32]*fetchRequestBlock) for i := 0; i < topicCount; i++ { topic, err := pd.getString() if err != nil { @@ -82,7 +84,7 @@ func (f *FetchRequest) decode(pd packetDecoder) (err error) { if err != nil { return err } - f.blocks[topic] = make(map[int32]*fetchRequestBlock) + r.blocks[topic] = make(map[int32]*fetchRequestBlock) for j := 0; j < partitionCount; j++ { partition, err := pd.getInt32() if err != nil { @@ -92,32 +94,43 @@ func (f *FetchRequest) decode(pd packetDecoder) (err error) { if err = fetchBlock.decode(pd); err != nil { return nil } - f.blocks[topic][partition] = fetchBlock + r.blocks[topic][partition] = fetchBlock } } return nil } -func (f *FetchRequest) key() int16 { +func (r *FetchRequest) key() int16 { return 1 } -func (f *FetchRequest) version() int16 { - return 0 +func (r *FetchRequest) version() int16 { + return r.Version } -func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) { - if f.blocks == nil { - f.blocks = make(map[string]map[int32]*fetchRequestBlock) +func (r *FetchRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 1: + return V0_9_0_0 + case 2: + return V0_10_0_0 + default: + return minVersion + } +} + +func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) { + if r.blocks == nil { + r.blocks = make(map[string]map[int32]*fetchRequestBlock) } - if f.blocks[topic] == nil { - f.blocks[topic] = make(map[int32]*fetchRequestBlock) + if r.blocks[topic] == nil { + r.blocks[topic] = make(map[int32]*fetchRequestBlock) } tmp := new(fetchRequestBlock) tmp.maxBytes = maxBytes tmp.fetchOffset = fetchOffset - f.blocks[topic][partitionID] = tmp + r.blocks[topic][partitionID] = tmp } diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go index 1ac543921423..b56b166c2828 100644 --- a/vendor/github.com/Shopify/sarama/fetch_response.go +++ b/vendor/github.com/Shopify/sarama/fetch_response.go @@ -1,19 +1,21 @@ package sarama +import "time" + type FetchResponseBlock struct { Err KError HighWaterMarkOffset int64 MsgSet MessageSet } -func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) { +func (b *FetchResponseBlock) decode(pd packetDecoder) (err error) { tmp, err := pd.getInt16() if err != nil { return err } - pr.Err = KError(tmp) + b.Err = KError(tmp) - pr.HighWaterMarkOffset, err = pd.getInt64() + b.HighWaterMarkOffset, err = pd.getInt64() if err != nil { return err } @@ -27,35 +29,47 @@ func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) { if err != nil { return err } - err = (&pr.MsgSet).decode(msgSetDecoder) + err = (&b.MsgSet).decode(msgSetDecoder) return err } -type FetchResponse struct { - Blocks map[string]map[int32]*FetchResponseBlock -} - -func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) { - pe.putInt16(int16(pr.Err)) +func (b *FetchResponseBlock) encode(pe packetEncoder) (err error) { + pe.putInt16(int16(b.Err)) - pe.putInt64(pr.HighWaterMarkOffset) + pe.putInt64(b.HighWaterMarkOffset) pe.push(&lengthField{}) - err = pr.MsgSet.encode(pe) + err = b.MsgSet.encode(pe) if err != nil { return err } return pe.pop() } -func (fr *FetchResponse) decode(pd packetDecoder) (err error) { +type FetchResponse struct { + Blocks map[string]map[int32]*FetchResponseBlock + ThrottleTime time.Duration + Version int16 // v1 requires 0.9+, v2 requires 0.10+ +} + +func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if r.Version >= 1 { + throttle, err := pd.getInt32() + if err != nil { + return err + } + r.ThrottleTime = time.Duration(throttle) * time.Millisecond + } + numTopics, err := pd.getArrayLength() if err != nil { return err } - fr.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics) + r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics) for i := 0; i < numTopics; i++ { name, err := pd.getString() if err != nil { @@ -67,7 +81,7 @@ func (fr *FetchResponse) decode(pd packetDecoder) (err error) { return err } - fr.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks) + r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks) for j := 0; j < numBlocks; j++ { id, err := pd.getInt32() @@ -80,20 +94,24 @@ func (fr *FetchResponse) decode(pd packetDecoder) (err error) { if err != nil { return err } - fr.Blocks[name][id] = block + r.Blocks[name][id] = block } } return nil } -func (fr *FetchResponse) encode(pe packetEncoder) (err error) { - err = pe.putArrayLength(len(fr.Blocks)) +func (r *FetchResponse) encode(pe packetEncoder) (err error) { + if r.Version >= 1 { + pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) + } + + err = pe.putArrayLength(len(r.Blocks)) if err != nil { return err } - for topic, partitions := range fr.Blocks { + for topic, partitions := range r.Blocks { err = pe.putString(topic) if err != nil { return err @@ -116,26 +134,45 @@ func (fr *FetchResponse) encode(pe packetEncoder) (err error) { return nil } -func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock { - if fr.Blocks == nil { +func (r *FetchResponse) key() int16 { + return 1 +} + +func (r *FetchResponse) version() int16 { + return r.Version +} + +func (r *FetchResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 1: + return V0_9_0_0 + case 2: + return V0_10_0_0 + default: + return minVersion + } +} + +func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock { + if r.Blocks == nil { return nil } - if fr.Blocks[topic] == nil { + if r.Blocks[topic] == nil { return nil } - return fr.Blocks[topic][partition] + return r.Blocks[topic][partition] } -func (fr *FetchResponse) AddError(topic string, partition int32, err KError) { - if fr.Blocks == nil { - fr.Blocks = make(map[string]map[int32]*FetchResponseBlock) +func (r *FetchResponse) AddError(topic string, partition int32, err KError) { + if r.Blocks == nil { + r.Blocks = make(map[string]map[int32]*FetchResponseBlock) } - partitions, ok := fr.Blocks[topic] + partitions, ok := r.Blocks[topic] if !ok { partitions = make(map[int32]*FetchResponseBlock) - fr.Blocks[topic] = partitions + r.Blocks[topic] = partitions } frb, ok := partitions[partition] if !ok { @@ -145,14 +182,14 @@ func (fr *FetchResponse) AddError(topic string, partition int32, err KError) { frb.Err = err } -func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { - if fr.Blocks == nil { - fr.Blocks = make(map[string]map[int32]*FetchResponseBlock) +func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { + if r.Blocks == nil { + r.Blocks = make(map[string]map[int32]*FetchResponseBlock) } - partitions, ok := fr.Blocks[topic] + partitions, ok := r.Blocks[topic] if !ok { partitions = make(map[int32]*FetchResponseBlock) - fr.Blocks[topic] = partitions + r.Blocks[topic] = partitions } frb, ok := partitions[partition] if !ok { diff --git a/vendor/github.com/Shopify/sarama/heartbeat_request.go b/vendor/github.com/Shopify/sarama/heartbeat_request.go index b89d290f12bb..ce49c4739727 100644 --- a/vendor/github.com/Shopify/sarama/heartbeat_request.go +++ b/vendor/github.com/Shopify/sarama/heartbeat_request.go @@ -20,7 +20,7 @@ func (r *HeartbeatRequest) encode(pe packetEncoder) error { return nil } -func (r *HeartbeatRequest) decode(pd packetDecoder) (err error) { +func (r *HeartbeatRequest) decode(pd packetDecoder, version int16) (err error) { if r.GroupId, err = pd.getString(); err != nil { return } @@ -41,3 +41,7 @@ func (r *HeartbeatRequest) key() int16 { func (r *HeartbeatRequest) version() int16 { return 0 } + +func (r *HeartbeatRequest) requiredVersion() KafkaVersion { + return V0_9_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/heartbeat_response.go b/vendor/github.com/Shopify/sarama/heartbeat_response.go index b48b8c1f3523..3c51163ad1f2 100644 --- a/vendor/github.com/Shopify/sarama/heartbeat_response.go +++ b/vendor/github.com/Shopify/sarama/heartbeat_response.go @@ -9,7 +9,7 @@ func (r *HeartbeatResponse) encode(pe packetEncoder) error { return nil } -func (r *HeartbeatResponse) decode(pd packetDecoder) error { +func (r *HeartbeatResponse) decode(pd packetDecoder, version int16) error { if kerr, err := pd.getInt16(); err != nil { return err } else { @@ -18,3 +18,15 @@ func (r *HeartbeatResponse) decode(pd packetDecoder) error { return nil } + +func (r *HeartbeatResponse) key() int16 { + return 12 +} + +func (r *HeartbeatResponse) version() int16 { + return 0 +} + +func (r *HeartbeatResponse) requiredVersion() KafkaVersion { + return V0_9_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/join_group_request.go b/vendor/github.com/Shopify/sarama/join_group_request.go index 5884d79d4490..d95085b2deb6 100644 --- a/vendor/github.com/Shopify/sarama/join_group_request.go +++ b/vendor/github.com/Shopify/sarama/join_group_request.go @@ -35,7 +35,7 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error { return nil } -func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) { +func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) { if r.GroupId, err = pd.getString(); err != nil { return } @@ -85,6 +85,10 @@ func (r *JoinGroupRequest) version() int16 { return 0 } +func (r *JoinGroupRequest) requiredVersion() KafkaVersion { + return V0_9_0_0 +} + func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { if r.GroupProtocols == nil { r.GroupProtocols = make(map[string][]byte) diff --git a/vendor/github.com/Shopify/sarama/join_group_response.go b/vendor/github.com/Shopify/sarama/join_group_response.go index 16f6b9b40615..94c7a7fde072 100644 --- a/vendor/github.com/Shopify/sarama/join_group_response.go +++ b/vendor/github.com/Shopify/sarama/join_group_response.go @@ -52,7 +52,7 @@ func (r *JoinGroupResponse) encode(pe packetEncoder) error { return nil } -func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) { +func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) { if kerr, err := pd.getInt16(); err != nil { return err } else { @@ -100,3 +100,15 @@ func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) { return nil } + +func (r *JoinGroupResponse) key() int16 { + return 11 +} + +func (r *JoinGroupResponse) version() int16 { + return 0 +} + +func (r *JoinGroupResponse) requiredVersion() KafkaVersion { + return V0_9_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/leave_group_request.go b/vendor/github.com/Shopify/sarama/leave_group_request.go index cdb4d14fd60a..e177427482fd 100644 --- a/vendor/github.com/Shopify/sarama/leave_group_request.go +++ b/vendor/github.com/Shopify/sarama/leave_group_request.go @@ -16,7 +16,7 @@ func (r *LeaveGroupRequest) encode(pe packetEncoder) error { return nil } -func (r *LeaveGroupRequest) decode(pd packetDecoder) (err error) { +func (r *LeaveGroupRequest) decode(pd packetDecoder, version int16) (err error) { if r.GroupId, err = pd.getString(); err != nil { return } @@ -34,3 +34,7 @@ func (r *LeaveGroupRequest) key() int16 { func (r *LeaveGroupRequest) version() int16 { return 0 } + +func (r *LeaveGroupRequest) requiredVersion() KafkaVersion { + return V0_9_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/leave_group_response.go b/vendor/github.com/Shopify/sarama/leave_group_response.go index bad1dba2f2b4..bd4a34f46cec 100644 --- a/vendor/github.com/Shopify/sarama/leave_group_response.go +++ b/vendor/github.com/Shopify/sarama/leave_group_response.go @@ -9,7 +9,7 @@ func (r *LeaveGroupResponse) encode(pe packetEncoder) error { return nil } -func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) { +func (r *LeaveGroupResponse) decode(pd packetDecoder, version int16) (err error) { if kerr, err := pd.getInt16(); err != nil { return err } else { @@ -18,3 +18,15 @@ func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) { return nil } + +func (r *LeaveGroupResponse) key() int16 { + return 13 +} + +func (r *LeaveGroupResponse) version() int16 { + return 0 +} + +func (r *LeaveGroupResponse) requiredVersion() KafkaVersion { + return V0_9_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/list_groups_request.go b/vendor/github.com/Shopify/sarama/list_groups_request.go index 4d74c2665ae2..3b16abf7fa81 100644 --- a/vendor/github.com/Shopify/sarama/list_groups_request.go +++ b/vendor/github.com/Shopify/sarama/list_groups_request.go @@ -7,7 +7,7 @@ func (r *ListGroupsRequest) encode(pe packetEncoder) error { return nil } -func (r *ListGroupsRequest) decode(pd packetDecoder) (err error) { +func (r *ListGroupsRequest) decode(pd packetDecoder, version int16) (err error) { return nil } @@ -18,3 +18,7 @@ func (r *ListGroupsRequest) key() int16 { func (r *ListGroupsRequest) version() int16 { return 0 } + +func (r *ListGroupsRequest) requiredVersion() KafkaVersion { + return V0_9_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/list_groups_response.go b/vendor/github.com/Shopify/sarama/list_groups_response.go index 2f5314902185..3a84f9b6c18d 100644 --- a/vendor/github.com/Shopify/sarama/list_groups_response.go +++ b/vendor/github.com/Shopify/sarama/list_groups_response.go @@ -23,7 +23,7 @@ func (r *ListGroupsResponse) encode(pe packetEncoder) error { return nil } -func (r *ListGroupsResponse) decode(pd packetDecoder) error { +func (r *ListGroupsResponse) decode(pd packetDecoder, version int16) error { if kerr, err := pd.getInt16(); err != nil { return err } else { @@ -54,3 +54,15 @@ func (r *ListGroupsResponse) decode(pd packetDecoder) error { return nil } + +func (r *ListGroupsResponse) key() int16 { + return 16 +} + +func (r *ListGroupsResponse) version() int16 { + return 0 +} + +func (r *ListGroupsResponse) requiredVersion() KafkaVersion { + return V0_9_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/message.go b/vendor/github.com/Shopify/sarama/message.go index c4bdb9e9d77f..0f0ca5b6de08 100644 --- a/vendor/github.com/Shopify/sarama/message.go +++ b/vendor/github.com/Shopify/sarama/message.go @@ -5,6 +5,9 @@ import ( "compress/gzip" "fmt" "io/ioutil" + "time" + + "github.com/eapache/go-xerial-snappy" ) // CompressionCodec represents the various compression codecs recognized by Kafka in messages. @@ -19,15 +22,13 @@ const ( CompressionSnappy CompressionCodec = 2 ) -// The spec just says: "This is a version id used to allow backwards compatible evolution of the message -// binary format." but it doesn't say what the current value is, so presumably 0... -const messageFormat int8 = 0 - type Message struct { - Codec CompressionCodec // codec used to compress the message contents - Key []byte // the message key, may be nil - Value []byte // the message contents - Set *MessageSet // the message set a message might wrap + Codec CompressionCodec // codec used to compress the message contents + Key []byte // the message key, may be nil + Value []byte // the message contents + Set *MessageSet // the message set a message might wrap + Version int8 // v1 requires Kafka 0.10 + Timestamp time.Time // the timestamp of the message (version 1+ only) compressedCache []byte } @@ -35,11 +36,15 @@ type Message struct { func (m *Message) encode(pe packetEncoder) error { pe.push(&crc32Field{}) - pe.putInt8(messageFormat) + pe.putInt8(m.Version) attributes := int8(m.Codec) & compressionCodecMask pe.putInt8(attributes) + if m.Version >= 1 { + pe.putInt64(m.Timestamp.UnixNano() / int64(time.Millisecond)) + } + err := pe.putBytes(m.Key) if err != nil { return err @@ -50,7 +55,7 @@ func (m *Message) encode(pe packetEncoder) error { if m.compressedCache != nil { payload = m.compressedCache m.compressedCache = nil - } else { + } else if m.Value != nil { switch m.Codec { case CompressionNone: payload = m.Value @@ -66,7 +71,7 @@ func (m *Message) encode(pe packetEncoder) error { m.compressedCache = buf.Bytes() payload = m.compressedCache case CompressionSnappy: - tmp := snappyEncode(m.Value) + tmp := snappy.Encode(m.Value) m.compressedCache = tmp payload = m.compressedCache default: @@ -87,13 +92,10 @@ func (m *Message) decode(pd packetDecoder) (err error) { return err } - format, err := pd.getInt8() + m.Version, err = pd.getInt8() if err != nil { return err } - if format != messageFormat { - return PacketDecodingError{"unexpected messageFormat"} - } attribute, err := pd.getInt8() if err != nil { @@ -101,6 +103,14 @@ func (m *Message) decode(pd packetDecoder) (err error) { } m.Codec = CompressionCodec(attribute & compressionCodecMask) + if m.Version >= 1 { + millis, err := pd.getInt64() + if err != nil { + return err + } + m.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond)) + } + m.Key, err = pd.getBytes() if err != nil { return err @@ -116,7 +126,7 @@ func (m *Message) decode(pd packetDecoder) (err error) { // nothing to do case CompressionGZIP: if m.Value == nil { - return PacketDecodingError{"GZIP compression specified, but no data to uncompress"} + break } reader, err := gzip.NewReader(bytes.NewReader(m.Value)) if err != nil { @@ -130,9 +140,9 @@ func (m *Message) decode(pd packetDecoder) (err error) { } case CompressionSnappy: if m.Value == nil { - return PacketDecodingError{"Snappy compression specified, but no data to uncompress"} + break } - if m.Value, err = snappyDecode(m.Value); err != nil { + if m.Value, err = snappy.Decode(m.Value); err != nil { return err } if err := m.decodeSet(); err != nil { diff --git a/vendor/github.com/Shopify/sarama/metadata_request.go b/vendor/github.com/Shopify/sarama/metadata_request.go index 130cfd4f1196..9a26b55fd032 100644 --- a/vendor/github.com/Shopify/sarama/metadata_request.go +++ b/vendor/github.com/Shopify/sarama/metadata_request.go @@ -4,14 +4,14 @@ type MetadataRequest struct { Topics []string } -func (mr *MetadataRequest) encode(pe packetEncoder) error { - err := pe.putArrayLength(len(mr.Topics)) +func (r *MetadataRequest) encode(pe packetEncoder) error { + err := pe.putArrayLength(len(r.Topics)) if err != nil { return err } - for i := range mr.Topics { - err = pe.putString(mr.Topics[i]) + for i := range r.Topics { + err = pe.putString(r.Topics[i]) if err != nil { return err } @@ -19,7 +19,7 @@ func (mr *MetadataRequest) encode(pe packetEncoder) error { return nil } -func (mr *MetadataRequest) decode(pd packetDecoder) error { +func (r *MetadataRequest) decode(pd packetDecoder, version int16) error { topicCount, err := pd.getArrayLength() if err != nil { return err @@ -28,21 +28,25 @@ func (mr *MetadataRequest) decode(pd packetDecoder) error { return nil } - mr.Topics = make([]string, topicCount) - for i := range mr.Topics { + r.Topics = make([]string, topicCount) + for i := range r.Topics { topic, err := pd.getString() if err != nil { return err } - mr.Topics[i] = topic + r.Topics[i] = topic } return nil } -func (mr *MetadataRequest) key() int16 { +func (r *MetadataRequest) key() int16 { return 3 } -func (mr *MetadataRequest) version() int16 { +func (r *MetadataRequest) version() int16 { return 0 } + +func (r *MetadataRequest) requiredVersion() KafkaVersion { + return minVersion +} diff --git a/vendor/github.com/Shopify/sarama/metadata_response.go b/vendor/github.com/Shopify/sarama/metadata_response.go index b82221f7ed3e..f9d6a4271edc 100644 --- a/vendor/github.com/Shopify/sarama/metadata_response.go +++ b/vendor/github.com/Shopify/sarama/metadata_response.go @@ -118,16 +118,16 @@ type MetadataResponse struct { Topics []*TopicMetadata } -func (m *MetadataResponse) decode(pd packetDecoder) (err error) { +func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { n, err := pd.getArrayLength() if err != nil { return err } - m.Brokers = make([]*Broker, n) + r.Brokers = make([]*Broker, n) for i := 0; i < n; i++ { - m.Brokers[i] = new(Broker) - err = m.Brokers[i].decode(pd) + r.Brokers[i] = new(Broker) + err = r.Brokers[i].decode(pd) if err != nil { return err } @@ -138,10 +138,10 @@ func (m *MetadataResponse) decode(pd packetDecoder) (err error) { return err } - m.Topics = make([]*TopicMetadata, n) + r.Topics = make([]*TopicMetadata, n) for i := 0; i < n; i++ { - m.Topics[i] = new(TopicMetadata) - err = m.Topics[i].decode(pd) + r.Topics[i] = new(TopicMetadata) + err = r.Topics[i].decode(pd) if err != nil { return err } @@ -150,23 +150,23 @@ func (m *MetadataResponse) decode(pd packetDecoder) (err error) { return nil } -func (m *MetadataResponse) encode(pe packetEncoder) error { - err := pe.putArrayLength(len(m.Brokers)) +func (r *MetadataResponse) encode(pe packetEncoder) error { + err := pe.putArrayLength(len(r.Brokers)) if err != nil { return err } - for _, broker := range m.Brokers { + for _, broker := range r.Brokers { err = broker.encode(pe) if err != nil { return err } } - err = pe.putArrayLength(len(m.Topics)) + err = pe.putArrayLength(len(r.Topics)) if err != nil { return err } - for _, tm := range m.Topics { + for _, tm := range r.Topics { err = tm.encode(pe) if err != nil { return err @@ -176,16 +176,28 @@ func (m *MetadataResponse) encode(pe packetEncoder) error { return nil } +func (r *MetadataResponse) key() int16 { + return 3 +} + +func (r *MetadataResponse) version() int16 { + return 0 +} + +func (r *MetadataResponse) requiredVersion() KafkaVersion { + return minVersion +} + // testing API -func (m *MetadataResponse) AddBroker(addr string, id int32) { - m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr}) +func (r *MetadataResponse) AddBroker(addr string, id int32) { + r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr}) } -func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata { +func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata { var tmatch *TopicMetadata - for _, tm := range m.Topics { + for _, tm := range r.Topics { if tm.Name == topic { tmatch = tm goto foundTopic @@ -194,7 +206,7 @@ func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata { tmatch = new(TopicMetadata) tmatch.Name = topic - m.Topics = append(m.Topics, tmatch) + r.Topics = append(r.Topics, tmatch) foundTopic: @@ -202,8 +214,8 @@ foundTopic: return tmatch } -func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) { - tmatch := m.AddTopic(topic, ErrNoError) +func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) { + tmatch := r.AddTopic(topic, ErrNoError) var pmatch *PartitionMetadata for _, pm := range tmatch.Partitions { diff --git a/vendor/github.com/Shopify/sarama/mockbroker.go b/vendor/github.com/Shopify/sarama/mockbroker.go index c422565707d2..36996a50cfb7 100644 --- a/vendor/github.com/Shopify/sarama/mockbroker.go +++ b/vendor/github.com/Shopify/sarama/mockbroker.go @@ -60,7 +60,7 @@ type MockBroker struct { // RequestResponse represents a Request/Response pair processed by MockBroker. type RequestResponse struct { - Request requestBody + Request protocolBody Response encoder } diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go index 2f76df47b533..a203142094d0 100644 --- a/vendor/github.com/Shopify/sarama/mockresponses.go +++ b/vendor/github.com/Shopify/sarama/mockresponses.go @@ -17,7 +17,7 @@ type TestReporter interface { // allows generating a response based on a request body. MockResponses are used // to program behavior of MockBroker in tests. type MockResponse interface { - For(reqBody decoder) (res encoder) + For(reqBody versionedDecoder) (res encoder) } // MockWrapper is a mock response builder that returns a particular concrete @@ -26,7 +26,7 @@ type MockWrapper struct { res encoder } -func (mw *MockWrapper) For(reqBody decoder) (res encoder) { +func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) { return mw.res } @@ -58,7 +58,7 @@ func NewMockSequence(responses ...interface{}) *MockSequence { return ms } -func (mc *MockSequence) For(reqBody decoder) (res encoder) { +func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) { res = mc.responses[0].For(reqBody) if len(mc.responses) > 1 { mc.responses = mc.responses[1:] @@ -96,7 +96,7 @@ func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMet return mmr } -func (mmr *MockMetadataResponse) For(reqBody decoder) encoder { +func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder { metadataRequest := reqBody.(*MetadataRequest) metadataResponse := &MetadataResponse{} for addr, brokerID := range mmr.brokers { @@ -146,7 +146,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of return mor } -func (mor *MockOffsetResponse) For(reqBody decoder) encoder { +func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder { offsetRequest := reqBody.(*OffsetRequest) offsetResponse := &OffsetResponse{} for topic, partitions := range offsetRequest.blocks { @@ -216,7 +216,7 @@ func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, of return mfr } -func (mfr *MockFetchResponse) For(reqBody decoder) encoder { +func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder { fetchRequest := reqBody.(*FetchRequest) res := &FetchResponse{} for topic, partitions := range fetchRequest.blocks { @@ -298,7 +298,7 @@ func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *M return mr } -func (mr *MockConsumerMetadataResponse) For(reqBody decoder) encoder { +func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*ConsumerMetadataRequest) group := req.ConsumerGroup res := &ConsumerMetadataResponse{} @@ -340,7 +340,7 @@ func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int3 return mr } -func (mr *MockOffsetCommitResponse) For(reqBody decoder) encoder { +func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*OffsetCommitRequest) group := req.ConsumerGroup res := &OffsetCommitResponse{} @@ -391,7 +391,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE return mr } -func (mr *MockProduceResponse) For(reqBody decoder) encoder { +func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*ProduceRequest) res := &ProduceResponse{} for topic, partitions := range req.msgSets { @@ -442,7 +442,7 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3 return mr } -func (mr *MockOffsetFetchResponse) For(reqBody decoder) encoder { +func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder { req := reqBody.(*OffsetFetchRequest) group := req.ConsumerGroup res := &OffsetFetchResponse{} diff --git a/vendor/github.com/Shopify/sarama/mocks/async_producer.go b/vendor/github.com/Shopify/sarama/mocks/async_producer.go index 6ccf1f145547..d1d9ba416f8c 100644 --- a/vendor/github.com/Shopify/sarama/mocks/async_producer.go +++ b/vendor/github.com/Shopify/sarama/mocks/async_producer.go @@ -8,8 +8,10 @@ import ( // AsyncProducer implements sarama's Producer interface for testing purposes. // Before you can send messages to it's Input channel, you have to set expectations -// so it knows how to handle the input. This way you can easily test success and -// failure scenarios. +// so it knows how to handle the input; it returns an error if the number of messages +// received is bigger then the number of expectations set. You can also set a +// function in each expectation so that the message value is checked by this function +// and an error is returned if the match fails. type AsyncProducer struct { l sync.Mutex t ErrorReporter @@ -52,6 +54,18 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer { } else { expectation := mp.expectations[0] mp.expectations = mp.expectations[1:] + if expectation.CheckFunction != nil { + if val, err := msg.Value.Encode(); err != nil { + mp.t.Errorf("Input message encoding failed: %s", err.Error()) + mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} + } else { + err = expectation.CheckFunction(val) + if err != nil { + mp.t.Errorf("Check function returned an error: %s", err.Error()) + mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} + } + } + } if expectation.Result == errProduceSuccess { mp.lastOffset++ if config.Producer.Return.Successes { @@ -122,21 +136,39 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError { // Setting expectations //////////////////////////////////////////////// +// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message +// will be provided on the input channel. The mock producer will call the given function to check +// the message value. If an error is returned it will be made available on the Errors channel +// otherwise the mock will handle the message as if it produced successfully, i.e. it will make +// it available on the Successes channel if the Producer.Return.Successes setting is set to true. +func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) { + mp.l.Lock() + defer mp.l.Unlock() + mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) +} + +// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message +// will be provided on the input channel. The mock producer will first call the given function to +// check the message value. If an error is returned it will be made available on the Errors channel +// otherwise the mock will handle the message as if it failed to produce successfully. This means +// it will make a ProducerError available on the Errors channel. +func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) { + mp.l.Lock() + defer mp.l.Unlock() + mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) +} + // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it is produced successfully, // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting // is set to true. func (mp *AsyncProducer) ExpectInputAndSucceed() { - mp.l.Lock() - defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess}) + mp.ExpectInputWithCheckerFunctionAndSucceed(nil) } // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it failed to produce // successfully. This means it will make a ProducerError available on the Errors channel. func (mp *AsyncProducer) ExpectInputAndFail(err error) { - mp.l.Lock() - defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: err}) + mp.ExpectInputWithCheckerFunctionAndFail(nil, err) } diff --git a/vendor/github.com/Shopify/sarama/mocks/mocks.go b/vendor/github.com/Shopify/sarama/mocks/mocks.go index 96b79bc06265..4adb838d9965 100644 --- a/vendor/github.com/Shopify/sarama/mocks/mocks.go +++ b/vendor/github.com/Shopify/sarama/mocks/mocks.go @@ -25,6 +25,10 @@ type ErrorReporter interface { Errorf(string, ...interface{}) } +// ValueChecker is a function type to be set in each expectation of the producer mocks +// to check the value passed. +type ValueChecker func(val []byte) error + var ( errProduceSuccess error = nil errOutOfExpectations = errors.New("No more expectations set on mock") @@ -34,7 +38,8 @@ var ( const AnyOffset int64 = -1000 type producerExpectation struct { - Result error + Result error + CheckFunction ValueChecker } type consumerExpectation struct { diff --git a/vendor/github.com/Shopify/sarama/mocks/sync_producer.go b/vendor/github.com/Shopify/sarama/mocks/sync_producer.go index fa86b245cdf3..2ac7b5c32c42 100644 --- a/vendor/github.com/Shopify/sarama/mocks/sync_producer.go +++ b/vendor/github.com/Shopify/sarama/mocks/sync_producer.go @@ -34,7 +34,9 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer { // SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation. // You have to set expectations on the mock producer before calling SendMessage, so it knows -// how to handle them. If there is no more remaining expectations when SendMessage is called, +// how to handle them. You can set a function in each expectation so that the message value +// checked by this function and an error is returned if the match fails. +// If there is no more remaining expectation when SendMessage is called, // the mock producer will write an error to the test state object. func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { sp.l.Lock() @@ -43,7 +45,18 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3 if len(sp.expectations) > 0 { expectation := sp.expectations[0] sp.expectations = sp.expectations[1:] - + if expectation.CheckFunction != nil { + if val, err := msg.Value.Encode(); err != nil { + sp.t.Errorf("Input message encoding failed: %s", err.Error()) + return -1, -1, err + } else { + err := expectation.CheckFunction(val) + if err != nil { + sp.t.Errorf("Check function returned an error: %s", err.Error()) + return -1, -1, err + } + } + } if expectation.Result == errProduceSuccess { sp.lastOffset++ msg.Offset = sp.lastOffset @@ -57,6 +70,31 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3 } } +// SendMessages corresponds with the SendMessages method of sarama's SyncProducer implementation. +// You have to set expectations on the mock producer before calling SendMessages, so it knows +// how to handle them. If there is no more remaining expectations when SendMessages is called, +// the mock producer will write an error to the test state object. +func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { + sp.l.Lock() + defer sp.l.Unlock() + + if len(sp.expectations) >= len(msgs) { + expectations := sp.expectations[0 : len(msgs)-1] + sp.expectations = sp.expectations[len(msgs):] + + for _, expectation := range expectations { + if expectation.Result != errProduceSuccess { + return expectation.Result + } + + } + return nil + } else { + sp.t.Errorf("Insufficient expectations set on this mock producer to handle the input messages.") + return errOutOfExpectations + } +} + // Close corresponds with the Close method of sarama's SyncProducer implementation. // By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow, // so it will write an error to the test state if there's any remaining expectations. @@ -75,20 +113,36 @@ func (sp *SyncProducer) Close() error { // Setting expectations //////////////////////////////////////////////// +// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage +// will be called. The mock producer will first call the given function to check the message value. +// It will cascade the error of the function, if any, or handle the message as if it produced +// successfully, i.e. by returning a valid partition, and offset, and a nil error. +func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) { + sp.l.Lock() + defer sp.l.Unlock() + sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) +} + +// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be +// called. The mock producer will first call the given function to check the message value. +// It will cascade the error of the function, if any, or handle the message as if it failed +// to produce successfully, i.e. by returning the provided error. +func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) { + sp.l.Lock() + defer sp.l.Unlock() + sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) +} + // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it produced successfully, i.e. by // returning a valid partition, and offset, and a nil error. func (sp *SyncProducer) ExpectSendMessageAndSucceed() { - sp.l.Lock() - defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess}) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil) } // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it failed to produce // successfully, i.e. by returning the provided error. func (sp *SyncProducer) ExpectSendMessageAndFail(err error) { - sp.l.Lock() - defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: err}) + sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err) } diff --git a/vendor/github.com/Shopify/sarama/offset_commit_request.go b/vendor/github.com/Shopify/sarama/offset_commit_request.go index f51815260317..b21ea634b024 100644 --- a/vendor/github.com/Shopify/sarama/offset_commit_request.go +++ b/vendor/github.com/Shopify/sarama/offset_commit_request.go @@ -16,27 +16,27 @@ type offsetCommitRequestBlock struct { metadata string } -func (r *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error { - pe.putInt64(r.offset) +func (b *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error { + pe.putInt64(b.offset) if version == 1 { - pe.putInt64(r.timestamp) - } else if r.timestamp != 0 { + pe.putInt64(b.timestamp) + } else if b.timestamp != 0 { Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored") } - return pe.putString(r.metadata) + return pe.putString(b.metadata) } -func (r *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err error) { - if r.offset, err = pd.getInt64(); err != nil { +func (b *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err error) { + if b.offset, err = pd.getInt64(); err != nil { return err } if version == 1 { - if r.timestamp, err = pd.getInt64(); err != nil { + if b.timestamp, err = pd.getInt64(); err != nil { return err } } - r.metadata, err = pd.getString() + b.metadata, err = pd.getString() return err } @@ -49,7 +49,7 @@ type OffsetCommitRequest struct { // Version can be: // - 0 (kafka 0.8.1 and later) // - 1 (kafka 0.8.2 and later) - // - 2 (kafka 0.8.3 and later) + // - 2 (kafka 0.9.0 and later) Version int16 blocks map[string]map[int32]*offsetCommitRequestBlock } @@ -103,7 +103,9 @@ func (r *OffsetCommitRequest) encode(pe packetEncoder) error { return nil } -func (r *OffsetCommitRequest) decode(pd packetDecoder) (err error) { +func (r *OffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.ConsumerGroup, err = pd.getString(); err != nil { return err } @@ -164,6 +166,17 @@ func (r *OffsetCommitRequest) version() int16 { return r.Version } +func (r *OffsetCommitRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 1: + return V0_8_2_0 + case 2: + return V0_9_0_0 + default: + return minVersion + } +} + func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) { if r.blocks == nil { r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock) diff --git a/vendor/github.com/Shopify/sarama/offset_commit_response.go b/vendor/github.com/Shopify/sarama/offset_commit_response.go index 573a3b6a100f..7f277e7753a1 100644 --- a/vendor/github.com/Shopify/sarama/offset_commit_response.go +++ b/vendor/github.com/Shopify/sarama/offset_commit_response.go @@ -35,7 +35,7 @@ func (r *OffsetCommitResponse) encode(pe packetEncoder) error { return nil } -func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) { +func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) { numTopics, err := pd.getArrayLength() if err != nil || numTopics == 0 { return err @@ -71,3 +71,15 @@ func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) { return nil } + +func (r *OffsetCommitResponse) key() int16 { + return 8 +} + +func (r *OffsetCommitResponse) version() int16 { + return 0 +} + +func (r *OffsetCommitResponse) requiredVersion() KafkaVersion { + return minVersion +} diff --git a/vendor/github.com/Shopify/sarama/offset_fetch_request.go b/vendor/github.com/Shopify/sarama/offset_fetch_request.go index 30bbbbbd0d21..b19fe79ba7aa 100644 --- a/vendor/github.com/Shopify/sarama/offset_fetch_request.go +++ b/vendor/github.com/Shopify/sarama/offset_fetch_request.go @@ -28,7 +28,8 @@ func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) { return nil } -func (r *OffsetFetchRequest) decode(pd packetDecoder) (err error) { +func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version if r.ConsumerGroup, err = pd.getString(); err != nil { return err } @@ -62,6 +63,15 @@ func (r *OffsetFetchRequest) version() int16 { return r.Version } +func (r *OffsetFetchRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 1: + return V0_8_2_0 + default: + return minVersion + } +} + func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) { if r.partitions == nil { r.partitions = make(map[string][]int32) diff --git a/vendor/github.com/Shopify/sarama/offset_fetch_response.go b/vendor/github.com/Shopify/sarama/offset_fetch_response.go index 93078c350f8d..323220eac976 100644 --- a/vendor/github.com/Shopify/sarama/offset_fetch_response.go +++ b/vendor/github.com/Shopify/sarama/offset_fetch_response.go @@ -6,13 +6,13 @@ type OffsetFetchResponseBlock struct { Err KError } -func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) { - r.Offset, err = pd.getInt64() +func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) { + b.Offset, err = pd.getInt64() if err != nil { return err } - r.Metadata, err = pd.getString() + b.Metadata, err = pd.getString() if err != nil { return err } @@ -21,20 +21,20 @@ func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) { if err != nil { return err } - r.Err = KError(tmp) + b.Err = KError(tmp) return nil } -func (r *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) { - pe.putInt64(r.Offset) +func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) { + pe.putInt64(b.Offset) - err = pe.putString(r.Metadata) + err = pe.putString(b.Metadata) if err != nil { return err } - pe.putInt16(int16(r.Err)) + pe.putInt16(int16(b.Err)) return nil } @@ -64,7 +64,7 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error { return nil } -func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) { +func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) { numTopics, err := pd.getArrayLength() if err != nil || numTopics == 0 { return err @@ -106,6 +106,18 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) { return nil } +func (r *OffsetFetchResponse) key() int16 { + return 9 +} + +func (r *OffsetFetchResponse) version() int16 { + return 0 +} + +func (r *OffsetFetchResponse) requiredVersion() KafkaVersion { + return minVersion +} + func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock { if r.Blocks == nil { return nil diff --git a/vendor/github.com/Shopify/sarama/offset_manager.go b/vendor/github.com/Shopify/sarama/offset_manager.go index ebfd8b403dba..5e15cdafe3a3 100644 --- a/vendor/github.com/Shopify/sarama/offset_manager.go +++ b/vendor/github.com/Shopify/sarama/offset_manager.go @@ -136,11 +136,15 @@ type PartitionOffsetManager interface { // was committed for this partition yet. NextOffset() (int64, string) - // MarkOffset marks the provided offset as processed, alongside a metadata string + // MarkOffset marks the provided offset, alongside a metadata string // that represents the state of the partition consumer at that point in time. The // metadata string can be used by another consumer to restore that state, so it // can resume consumption. // + // To follow upstream conventions, you are expected to mark the offset of the + // next message to read, not the last message read. Thus, when calling `MarkOffset` + // you should typically add one to the offset of the last consumed message. + // // Note: calling MarkOffset does not necessarily commit the offset to the backend // store immediately for efficiency reasons, and it may never be committed if // your application crashes. This means that you may end up processing the same @@ -340,7 +344,7 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) { defer pom.lock.Unlock() if pom.offset >= 0 { - return pom.offset + 1, pom.metadata + return pom.offset, pom.metadata } return pom.parent.conf.Consumer.Offsets.Initial, "" @@ -457,7 +461,7 @@ func (bom *brokerOffsetManager) flushToBroker() { case ErrNoError: block := request.blocks[s.topic][s.partition] s.updateCommitted(block.offset, block.metadata) - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, + case ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer: // not a critical error, we just need to redispatch delete(bom.subscriptions, s) @@ -468,6 +472,12 @@ func (bom *brokerOffsetManager) flushToBroker() { case ErrOffsetsLoadInProgress: // nothing wrong but we didn't commit, we'll get it next time round break + case ErrUnknownTopicOrPartition: + // let the user know *and* try redispatching - if topic-auto-create is + // enabled, redispatching should trigger a metadata request and create the + // topic; if not then re-dispatching won't help, but we've let the user + // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706) + fallthrough default: // dunno, tell the user and try redispatching s.handleError(err) diff --git a/vendor/github.com/Shopify/sarama/offset_request.go b/vendor/github.com/Shopify/sarama/offset_request.go index 842d5c0f8360..c66d8f709114 100644 --- a/vendor/github.com/Shopify/sarama/offset_request.go +++ b/vendor/github.com/Shopify/sarama/offset_request.go @@ -5,17 +5,17 @@ type offsetRequestBlock struct { maxOffsets int32 } -func (r *offsetRequestBlock) encode(pe packetEncoder) error { - pe.putInt64(int64(r.time)) - pe.putInt32(r.maxOffsets) +func (b *offsetRequestBlock) encode(pe packetEncoder) error { + pe.putInt64(int64(b.time)) + pe.putInt32(b.maxOffsets) return nil } -func (r *offsetRequestBlock) decode(pd packetDecoder) (err error) { - if r.time, err = pd.getInt64(); err != nil { +func (b *offsetRequestBlock) decode(pd packetDecoder) (err error) { + if b.time, err = pd.getInt64(); err != nil { return err } - if r.maxOffsets, err = pd.getInt32(); err != nil { + if b.maxOffsets, err = pd.getInt32(); err != nil { return err } return nil @@ -50,7 +50,7 @@ func (r *OffsetRequest) encode(pe packetEncoder) error { return nil } -func (r *OffsetRequest) decode(pd packetDecoder) error { +func (r *OffsetRequest) decode(pd packetDecoder, version int16) error { // Ignore replica ID if _, err := pd.getInt32(); err != nil { return err @@ -96,6 +96,10 @@ func (r *OffsetRequest) version() int16 { return 0 } +func (r *OffsetRequest) requiredVersion() KafkaVersion { + return minVersion +} + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) { if r.blocks == nil { r.blocks = make(map[string]map[int32]*offsetRequestBlock) diff --git a/vendor/github.com/Shopify/sarama/offset_response.go b/vendor/github.com/Shopify/sarama/offset_response.go index 07d71ca7218b..ad1a66974511 100644 --- a/vendor/github.com/Shopify/sarama/offset_response.go +++ b/vendor/github.com/Shopify/sarama/offset_response.go @@ -5,29 +5,29 @@ type OffsetResponseBlock struct { Offsets []int64 } -func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) { +func (b *OffsetResponseBlock) decode(pd packetDecoder) (err error) { tmp, err := pd.getInt16() if err != nil { return err } - r.Err = KError(tmp) + b.Err = KError(tmp) - r.Offsets, err = pd.getInt64Array() + b.Offsets, err = pd.getInt64Array() return err } -func (r *OffsetResponseBlock) encode(pe packetEncoder) (err error) { - pe.putInt16(int16(r.Err)) +func (b *OffsetResponseBlock) encode(pe packetEncoder) (err error) { + pe.putInt16(int16(b.Err)) - return pe.putInt64Array(r.Offsets) + return pe.putInt64Array(b.Offsets) } type OffsetResponse struct { Blocks map[string]map[int32]*OffsetResponseBlock } -func (r *OffsetResponse) decode(pd packetDecoder) (err error) { +func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) { numTopics, err := pd.getArrayLength() if err != nil { return err @@ -115,6 +115,18 @@ func (r *OffsetResponse) encode(pe packetEncoder) (err error) { return nil } +func (r *OffsetResponse) key() int16 { + return 2 +} + +func (r *OffsetResponse) version() int16 { + return 0 +} + +func (r *OffsetResponse) requiredVersion() KafkaVersion { + return minVersion +} + // testing API func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) { diff --git a/vendor/github.com/Shopify/sarama/partitioner.go b/vendor/github.com/Shopify/sarama/partitioner.go index 8c2bca4138d1..3697ca82bde6 100644 --- a/vendor/github.com/Shopify/sarama/partitioner.go +++ b/vendor/github.com/Shopify/sarama/partitioner.go @@ -111,11 +111,11 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3 if err != nil { return -1, err } - hash := int32(p.hasher.Sum32()) - if hash < 0 { - hash = -hash + partition := int32(p.hasher.Sum32()) % numPartitions + if partition < 0 { + partition = -partition } - return hash % numPartitions, nil + return partition, nil } func (p *hashPartitioner) RequiresConsistency() bool { diff --git a/vendor/github.com/Shopify/sarama/produce_request.go b/vendor/github.com/Shopify/sarama/produce_request.go index 473513c6a2ef..f8a250946839 100644 --- a/vendor/github.com/Shopify/sarama/produce_request.go +++ b/vendor/github.com/Shopify/sarama/produce_request.go @@ -19,17 +19,18 @@ const ( type ProduceRequest struct { RequiredAcks RequiredAcks Timeout int32 + Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10 msgSets map[string]map[int32]*MessageSet } -func (p *ProduceRequest) encode(pe packetEncoder) error { - pe.putInt16(int16(p.RequiredAcks)) - pe.putInt32(p.Timeout) - err := pe.putArrayLength(len(p.msgSets)) +func (r *ProduceRequest) encode(pe packetEncoder) error { + pe.putInt16(int16(r.RequiredAcks)) + pe.putInt32(r.Timeout) + err := pe.putArrayLength(len(r.msgSets)) if err != nil { return err } - for topic, partitions := range p.msgSets { + for topic, partitions := range r.msgSets { err = pe.putString(topic) if err != nil { return err @@ -54,13 +55,13 @@ func (p *ProduceRequest) encode(pe packetEncoder) error { return nil } -func (p *ProduceRequest) decode(pd packetDecoder) error { +func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { requiredAcks, err := pd.getInt16() if err != nil { return err } - p.RequiredAcks = RequiredAcks(requiredAcks) - if p.Timeout, err = pd.getInt32(); err != nil { + r.RequiredAcks = RequiredAcks(requiredAcks) + if r.Timeout, err = pd.getInt32(); err != nil { return err } topicCount, err := pd.getArrayLength() @@ -70,7 +71,7 @@ func (p *ProduceRequest) decode(pd packetDecoder) error { if topicCount == 0 { return nil } - p.msgSets = make(map[string]map[int32]*MessageSet) + r.msgSets = make(map[string]map[int32]*MessageSet) for i := 0; i < topicCount; i++ { topic, err := pd.getString() if err != nil { @@ -80,7 +81,7 @@ func (p *ProduceRequest) decode(pd packetDecoder) error { if err != nil { return err } - p.msgSets[topic] = make(map[int32]*MessageSet) + r.msgSets[topic] = make(map[int32]*MessageSet) for j := 0; j < partitionCount; j++ { partition, err := pd.getInt32() if err != nil { @@ -99,47 +100,58 @@ func (p *ProduceRequest) decode(pd packetDecoder) error { if err != nil { return err } - p.msgSets[topic][partition] = msgSet + r.msgSets[topic][partition] = msgSet } } return nil } -func (p *ProduceRequest) key() int16 { +func (r *ProduceRequest) key() int16 { return 0 } -func (p *ProduceRequest) version() int16 { - return 0 +func (r *ProduceRequest) version() int16 { + return r.Version +} + +func (r *ProduceRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 1: + return V0_9_0_0 + case 2: + return V0_10_0_0 + default: + return minVersion + } } -func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { - if p.msgSets == nil { - p.msgSets = make(map[string]map[int32]*MessageSet) +func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { + if r.msgSets == nil { + r.msgSets = make(map[string]map[int32]*MessageSet) } - if p.msgSets[topic] == nil { - p.msgSets[topic] = make(map[int32]*MessageSet) + if r.msgSets[topic] == nil { + r.msgSets[topic] = make(map[int32]*MessageSet) } - set := p.msgSets[topic][partition] + set := r.msgSets[topic][partition] if set == nil { set = new(MessageSet) - p.msgSets[topic][partition] = set + r.msgSets[topic][partition] = set } set.addMessage(msg) } -func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) { - if p.msgSets == nil { - p.msgSets = make(map[string]map[int32]*MessageSet) +func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) { + if r.msgSets == nil { + r.msgSets = make(map[string]map[int32]*MessageSet) } - if p.msgSets[topic] == nil { - p.msgSets[topic] = make(map[int32]*MessageSet) + if r.msgSets[topic] == nil { + r.msgSets[topic] = make(map[int32]*MessageSet) } - p.msgSets[topic][partition] = set + r.msgSets[topic][partition] = set } diff --git a/vendor/github.com/Shopify/sarama/produce_response.go b/vendor/github.com/Shopify/sarama/produce_response.go index 1f49a85600fe..195abcb812e8 100644 --- a/vendor/github.com/Shopify/sarama/produce_response.go +++ b/vendor/github.com/Shopify/sarama/produce_response.go @@ -1,36 +1,52 @@ package sarama +import "time" + type ProduceResponseBlock struct { Err KError Offset int64 + // only provided if Version >= 2 and the broker is configured with `LogAppendTime` + Timestamp time.Time } -func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) { +func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err } - pr.Err = KError(tmp) + b.Err = KError(tmp) - pr.Offset, err = pd.getInt64() + b.Offset, err = pd.getInt64() if err != nil { return err } + if version >= 2 { + if millis, err := pd.getInt64(); err != nil { + return err + } else if millis != -1 { + b.Timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond)) + } + } + return nil } type ProduceResponse struct { - Blocks map[string]map[int32]*ProduceResponseBlock + Blocks map[string]map[int32]*ProduceResponseBlock + Version int16 + ThrottleTime time.Duration // only provided if Version >= 1 } -func (pr *ProduceResponse) decode(pd packetDecoder) (err error) { +func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + numTopics, err := pd.getArrayLength() if err != nil { return err } - pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics) + r.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics) for i := 0; i < numTopics; i++ { name, err := pd.getString() if err != nil { @@ -42,7 +58,7 @@ func (pr *ProduceResponse) decode(pd packetDecoder) (err error) { return err } - pr.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks) + r.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks) for j := 0; j < numBlocks; j++ { id, err := pd.getInt32() @@ -51,23 +67,31 @@ func (pr *ProduceResponse) decode(pd packetDecoder) (err error) { } block := new(ProduceResponseBlock) - err = block.decode(pd) + err = block.decode(pd, version) if err != nil { return err } - pr.Blocks[name][id] = block + r.Blocks[name][id] = block + } + } + + if r.Version >= 1 { + if millis, err := pd.getInt32(); err != nil { + return err + } else { + r.ThrottleTime = time.Duration(millis) * time.Millisecond } } return nil } -func (pr *ProduceResponse) encode(pe packetEncoder) error { - err := pe.putArrayLength(len(pr.Blocks)) +func (r *ProduceResponse) encode(pe packetEncoder) error { + err := pe.putArrayLength(len(r.Blocks)) if err != nil { return err } - for topic, partitions := range pr.Blocks { + for topic, partitions := range r.Blocks { err = pe.putString(topic) if err != nil { return err @@ -82,31 +106,53 @@ func (pr *ProduceResponse) encode(pe packetEncoder) error { pe.putInt64(prb.Offset) } } + if r.Version >= 1 { + pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) + } return nil } -func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock { - if pr.Blocks == nil { +func (r *ProduceResponse) key() int16 { + return 0 +} + +func (r *ProduceResponse) version() int16 { + return r.Version +} + +func (r *ProduceResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 1: + return V0_9_0_0 + case 2: + return V0_10_0_0 + default: + return minVersion + } +} + +func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock { + if r.Blocks == nil { return nil } - if pr.Blocks[topic] == nil { + if r.Blocks[topic] == nil { return nil } - return pr.Blocks[topic][partition] + return r.Blocks[topic][partition] } // Testing API -func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) { - if pr.Blocks == nil { - pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock) +func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) { + if r.Blocks == nil { + r.Blocks = make(map[string]map[int32]*ProduceResponseBlock) } - byTopic, ok := pr.Blocks[topic] + byTopic, ok := r.Blocks[topic] if !ok { byTopic = make(map[int32]*ProduceResponseBlock) - pr.Blocks[topic] = byTopic + r.Blocks[topic] = byTopic } byTopic[partition] = &ProduceResponseBlock{Err: err} } diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go index 9fe5f79d4c50..992f1f141ce2 100644 --- a/vendor/github.com/Shopify/sarama/produce_set.go +++ b/vendor/github.com/Shopify/sarama/produce_set.go @@ -52,7 +52,12 @@ func (ps *produceSet) add(msg *ProducerMessage) error { } set.msgs = append(set.msgs, msg) - set.setToSend.addMessage(&Message{Codec: CompressionNone, Key: key, Value: val}) + msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val} + if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) && !msg.Timestamp.IsZero() { + msgToSend.Timestamp = msg.Timestamp + msgToSend.Version = 1 + } + set.setToSend.addMessage(msgToSend) size := producerMessageOverhead + len(key) + len(val) set.bufferBytes += size @@ -67,6 +72,9 @@ func (ps *produceSet) buildRequest() *ProduceRequest { RequiredAcks: ps.parent.conf.Producer.RequiredAcks, Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond), } + if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { + req.Version = 2 + } for topic, partitionSet := range ps.msgs { for partition, set := range partitionSet { diff --git a/vendor/github.com/Shopify/sarama/real_decoder.go b/vendor/github.com/Shopify/sarama/real_decoder.go index e3ea33104839..a0141af079c3 100644 --- a/vendor/github.com/Shopify/sarama/real_decoder.go +++ b/vendor/github.com/Shopify/sarama/real_decoder.go @@ -5,6 +5,11 @@ import ( "math" ) +var errInvalidArrayLength = PacketDecodingError{"invalid array length"} +var errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"} +var errInvalidStringLength = PacketDecodingError{"invalid string length"} +var errInvalidSubsetSize = PacketDecodingError{"invalid subset size"} + type realDecoder struct { raw []byte off int @@ -64,7 +69,7 @@ func (rd *realDecoder) getArrayLength() (int, error) { rd.off = len(rd.raw) return -1, ErrInsufficientData } else if tmp > 2*math.MaxUint16 { - return -1, PacketDecodingError{"invalid array length"} + return -1, errInvalidArrayLength } return tmp, nil } @@ -82,7 +87,7 @@ func (rd *realDecoder) getBytes() ([]byte, error) { switch { case n < -1: - return nil, PacketDecodingError{"invalid byteslice length"} + return nil, errInvalidByteSliceLength case n == -1: return nil, nil case n == 0: @@ -108,7 +113,7 @@ func (rd *realDecoder) getString() (string, error) { switch { case n < -1: - return "", PacketDecodingError{"invalid string length"} + return "", errInvalidStringLength case n == -1: return "", nil case n == 0: @@ -141,7 +146,7 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) { } if n < 0 { - return nil, PacketDecodingError{"invalid array length"} + return nil, errInvalidArrayLength } ret := make([]int32, n) @@ -170,7 +175,7 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) { } if n < 0 { - return nil, PacketDecodingError{"invalid array length"} + return nil, errInvalidArrayLength } ret := make([]int64, n) @@ -194,7 +199,7 @@ func (rd *realDecoder) getStringArray() ([]string, error) { } if n < 0 { - return nil, PacketDecodingError{"invalid array length"} + return nil, errInvalidArrayLength } ret := make([]string, n) @@ -216,7 +221,7 @@ func (rd *realDecoder) remaining() int { func (rd *realDecoder) getSubset(length int) (packetDecoder, error) { if length < 0 { - return nil, PacketDecodingError{"invalid subset size"} + return nil, errInvalidSubsetSize } else if length > rd.remaining() { rd.off = len(rd.raw) return nil, ErrInsufficientData diff --git a/vendor/github.com/Shopify/sarama/request.go b/vendor/github.com/Shopify/sarama/request.go index b9f654ba21e7..5dd337b0db68 100644 --- a/vendor/github.com/Shopify/sarama/request.go +++ b/vendor/github.com/Shopify/sarama/request.go @@ -6,17 +6,18 @@ import ( "io" ) -type requestBody interface { +type protocolBody interface { encoder - decoder + versionedDecoder key() int16 version() int16 + requiredVersion() KafkaVersion } type request struct { correlationID int32 clientID string - body requestBody + body protocolBody } func (r *request) encode(pe packetEncoder) (err error) { @@ -53,7 +54,7 @@ func (r *request) decode(pd packetDecoder) (err error) { if r.body == nil { return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)} } - return r.body.decode(pd) + return r.body.decode(pd, version) } func decodeRequest(r io.Reader) (req *request, err error) { @@ -79,7 +80,7 @@ func decodeRequest(r io.Reader) (req *request, err error) { return req, nil } -func allocateBody(key, version int16) requestBody { +func allocateBody(key, version int16) protocolBody { switch key { case 0: return &ProduceRequest{} @@ -107,6 +108,10 @@ func allocateBody(key, version int16) requestBody { return &DescribeGroupsRequest{} case 16: return &ListGroupsRequest{} + case 17: + return &SaslHandshakeRequest{} + case 18: + return &ApiVersionsRequest{} } return nil } diff --git a/vendor/github.com/Shopify/sarama/sasl_handshake_request.go b/vendor/github.com/Shopify/sarama/sasl_handshake_request.go new file mode 100644 index 000000000000..fbbc8947b2ef --- /dev/null +++ b/vendor/github.com/Shopify/sarama/sasl_handshake_request.go @@ -0,0 +1,33 @@ +package sarama + +type SaslHandshakeRequest struct { + Mechanism string +} + +func (r *SaslHandshakeRequest) encode(pe packetEncoder) error { + if err := pe.putString(r.Mechanism); err != nil { + return err + } + + return nil +} + +func (r *SaslHandshakeRequest) decode(pd packetDecoder, version int16) (err error) { + if r.Mechanism, err = pd.getString(); err != nil { + return err + } + + return nil +} + +func (r *SaslHandshakeRequest) key() int16 { + return 17 +} + +func (r *SaslHandshakeRequest) version() int16 { + return 0 +} + +func (r *SaslHandshakeRequest) requiredVersion() KafkaVersion { + return V0_10_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/sasl_handshake_response.go b/vendor/github.com/Shopify/sarama/sasl_handshake_response.go new file mode 100644 index 000000000000..8379bbb269b1 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/sasl_handshake_response.go @@ -0,0 +1,38 @@ +package sarama + +type SaslHandshakeResponse struct { + Err KError + EnabledMechanisms []string +} + +func (r *SaslHandshakeResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.Err)) + return pe.putStringArray(r.EnabledMechanisms) +} + +func (r *SaslHandshakeResponse) decode(pd packetDecoder, version int16) error { + if kerr, err := pd.getInt16(); err != nil { + return err + } else { + r.Err = KError(kerr) + } + + var err error + if r.EnabledMechanisms, err = pd.getStringArray(); err != nil { + return err + } + + return nil +} + +func (r *SaslHandshakeResponse) key() int16 { + return 17 +} + +func (r *SaslHandshakeResponse) version() int16 { + return 0 +} + +func (r *SaslHandshakeResponse) requiredVersion() KafkaVersion { + return V0_10_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/snappy.go b/vendor/github.com/Shopify/sarama/snappy.go deleted file mode 100644 index e86cb7039715..000000000000 --- a/vendor/github.com/Shopify/sarama/snappy.go +++ /dev/null @@ -1,41 +0,0 @@ -package sarama - -import ( - "bytes" - "encoding/binary" - - "github.com/golang/snappy" -) - -var snappyMagic = []byte{130, 83, 78, 65, 80, 80, 89, 0} - -// SnappyEncode encodes binary data -func snappyEncode(src []byte) []byte { - return snappy.Encode(nil, src) -} - -// SnappyDecode decodes snappy data -func snappyDecode(src []byte) ([]byte, error) { - if bytes.Equal(src[:8], snappyMagic) { - var ( - pos = uint32(16) - max = uint32(len(src)) - dst = make([]byte, 0, len(src)) - chunk []byte - err error - ) - for pos < max { - size := binary.BigEndian.Uint32(src[pos : pos+4]) - pos += 4 - - chunk, err = snappy.Decode(chunk, src[pos:pos+size]) - if err != nil { - return nil, err - } - pos += size - dst = append(dst, chunk...) - } - return dst, nil - } - return snappy.Decode(nil, src) -} diff --git a/vendor/github.com/Shopify/sarama/sync_group_request.go b/vendor/github.com/Shopify/sarama/sync_group_request.go index 031cf0f2f67a..7fbe47b20f4f 100644 --- a/vendor/github.com/Shopify/sarama/sync_group_request.go +++ b/vendor/github.com/Shopify/sarama/sync_group_request.go @@ -33,7 +33,7 @@ func (r *SyncGroupRequest) encode(pe packetEncoder) error { return nil } -func (r *SyncGroupRequest) decode(pd packetDecoder) (err error) { +func (r *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) { if r.GroupId, err = pd.getString(); err != nil { return } @@ -77,6 +77,10 @@ func (r *SyncGroupRequest) version() int16 { return 0 } +func (r *SyncGroupRequest) requiredVersion() KafkaVersion { + return V0_9_0_0 +} + func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) { if r.GroupAssignments == nil { r.GroupAssignments = make(map[string][]byte) diff --git a/vendor/github.com/Shopify/sarama/sync_group_response.go b/vendor/github.com/Shopify/sarama/sync_group_response.go index 49c86922dc59..12aef6730347 100644 --- a/vendor/github.com/Shopify/sarama/sync_group_response.go +++ b/vendor/github.com/Shopify/sarama/sync_group_response.go @@ -16,7 +16,7 @@ func (r *SyncGroupResponse) encode(pe packetEncoder) error { return pe.putBytes(r.MemberAssignment) } -func (r *SyncGroupResponse) decode(pd packetDecoder) (err error) { +func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error) { if kerr, err := pd.getInt16(); err != nil { return err } else { @@ -26,3 +26,15 @@ func (r *SyncGroupResponse) decode(pd packetDecoder) (err error) { r.MemberAssignment, err = pd.getBytes() return } + +func (r *SyncGroupResponse) key() int16 { + return 14 +} + +func (r *SyncGroupResponse) version() int16 { + return 0 +} + +func (r *SyncGroupResponse) requiredVersion() KafkaVersion { + return V0_9_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/sync_producer.go b/vendor/github.com/Shopify/sarama/sync_producer.go index 2e6f87b8e426..b181527f0208 100644 --- a/vendor/github.com/Shopify/sarama/sync_producer.go +++ b/vendor/github.com/Shopify/sarama/sync_producer.go @@ -16,6 +16,12 @@ type SyncProducer interface { // of the produced message, or an error if the message failed to produce. SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) + // SendMessages produces a given set of messages, and returns only when all + // messages in the set have either succeeded or failed. Note that messages + // can succeed and fail individually; if some succeed and some fail, + // SendMessages will return an error. + SendMessages(msgs []*ProducerMessage) error + // Close shuts down the producer and flushes any messages it may have buffered. // You must call this function before a producer object passes out of scope, as // it may otherwise leak memory. You must call this before calling Close on the @@ -65,21 +71,56 @@ func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offs msg.Metadata = oldMetadata }() - expectation := make(chan error, 1) + expectation := make(chan *ProducerError, 1) msg.Metadata = expectation sp.producer.Input() <- msg if err := <-expectation; err != nil { - return -1, -1, err + return -1, -1, err.Err } return msg.Partition, msg.Offset, nil } +func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error { + savedMetadata := make([]interface{}, len(msgs)) + for i := range msgs { + savedMetadata[i] = msgs[i].Metadata + } + defer func() { + for i := range msgs { + msgs[i].Metadata = savedMetadata[i] + } + }() + + expectations := make(chan chan *ProducerError, len(msgs)) + go func() { + for _, msg := range msgs { + expectation := make(chan *ProducerError, 1) + msg.Metadata = expectation + sp.producer.Input() <- msg + expectations <- expectation + } + close(expectations) + }() + + var errors ProducerErrors + for expectation := range expectations { + if err := <-expectation; err != nil { + errors = append(errors, err) + } + } + + if len(errors) > 0 { + return errors + } + return nil +} + func (sp *syncProducer) handleSuccesses() { defer sp.wg.Done() for msg := range sp.producer.Successes() { - expectation := msg.Metadata.(chan error) + expectation := msg.Metadata.(chan *ProducerError) expectation <- nil } } @@ -87,8 +128,8 @@ func (sp *syncProducer) handleSuccesses() { func (sp *syncProducer) handleErrors() { defer sp.wg.Done() for err := range sp.producer.Errors() { - expectation := err.Msg.Metadata.(chan error) - expectation <- err.Err + expectation := err.Msg.Metadata.(chan *ProducerError) + expectation <- err } } diff --git a/vendor/github.com/Shopify/sarama/utils.go b/vendor/github.com/Shopify/sarama/utils.go index 04ca88750de1..b60e53a07cd0 100644 --- a/vendor/github.com/Shopify/sarama/utils.go +++ b/vendor/github.com/Shopify/sarama/utils.go @@ -109,3 +109,42 @@ func newBufConn(conn net.Conn) *bufConn { func (bc *bufConn) Read(b []byte) (n int, err error) { return bc.buf.Read(b) } + +// KafkaVersion instances represent versions of the upstream Kafka broker. +type KafkaVersion struct { + // it's a struct rather than just typing the array directly to make it opaque and stop people + // generating their own arbitrary versions + version [4]uint +} + +func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion { + return KafkaVersion{ + version: [4]uint{major, minor, veryMinor, patch}, + } +} + +// IsAtLeast return true if and only if the version it is called on is +// greater than or equal to the version passed in: +// V1.IsAtLeast(V2) // false +// V2.IsAtLeast(V1) // true +func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool { + for i := range v.version { + if v.version[i] > other.version[i] { + return true + } else if v.version[i] < other.version[i] { + return false + } + } + return true +} + +// Effective constants defining the supported kafka versions. +var ( + V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) + V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) + V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) + V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) + V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) + V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) + minVersion = V0_8_2_0 +) diff --git a/vendor/github.com/eapache/go-xerial-snappy/LICENSE b/vendor/github.com/eapache/go-xerial-snappy/LICENSE new file mode 100644 index 000000000000..5bf3688d9e41 --- /dev/null +++ b/vendor/github.com/eapache/go-xerial-snappy/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2016 Evan Huus + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/eapache/go-xerial-snappy/README.md b/vendor/github.com/eapache/go-xerial-snappy/README.md new file mode 100644 index 000000000000..3f2695c72826 --- /dev/null +++ b/vendor/github.com/eapache/go-xerial-snappy/README.md @@ -0,0 +1,13 @@ +# go-xerial-snappy + +[![Build Status](https://travis-ci.org/eapache/go-xerial-snappy.svg?branch=master)](https://travis-ci.org/eapache/go-xerial-snappy) + +Xerial-compatible Snappy framing support for golang. + +Packages using Xerial for snappy encoding use a framing format incompatible with +basically everything else in existence. This package wraps Go's built-in snappy +package to support it. + +Apps that use this format include Apache Kafka (see +https://github.com/dpkp/kafka-python/issues/126#issuecomment-35478921 for +details). diff --git a/vendor/github.com/eapache/go-xerial-snappy/snappy.go b/vendor/github.com/eapache/go-xerial-snappy/snappy.go new file mode 100644 index 000000000000..b8f8b51fcef3 --- /dev/null +++ b/vendor/github.com/eapache/go-xerial-snappy/snappy.go @@ -0,0 +1,43 @@ +package snappy + +import ( + "bytes" + "encoding/binary" + + master "github.com/golang/snappy" +) + +var xerialHeader = []byte{130, 83, 78, 65, 80, 80, 89, 0} + +// Encode encodes data as snappy with no framing header. +func Encode(src []byte) []byte { + return master.Encode(nil, src) +} + +// Decode decodes snappy data whether it is traditional unframed +// or includes the xerial framing format. +func Decode(src []byte) ([]byte, error) { + if !bytes.Equal(src[:8], xerialHeader) { + return master.Decode(nil, src) + } + + var ( + pos = uint32(16) + max = uint32(len(src)) + dst = make([]byte, 0, len(src)) + chunk []byte + err error + ) + for pos < max { + size := binary.BigEndian.Uint32(src[pos : pos+4]) + pos += 4 + + chunk, err = master.Decode(chunk, src[pos:pos+size]) + if err != nil { + return nil, err + } + pos += size + dst = append(dst, chunk...) + } + return dst, nil +} diff --git a/winlogbeat/winlogbeat.full.yml b/winlogbeat/winlogbeat.full.yml index 4f8707f8cf6f..92829db34309 100644 --- a/winlogbeat/winlogbeat.full.yml +++ b/winlogbeat/winlogbeat.full.yml @@ -296,6 +296,27 @@ output.elasticsearch: # using any event field. To set the topic from document type use `%{[type]}`. #topic: beats + # Authentication details. Password is required if username is set. + #username: '' + #password: '' + + # Kafka version winlogbeat is assumed to run against. Defaults to the oldest + # supported stable version (currently version 0.8.2.0) + #version: 0.8.2 + + # Metadata update configuration. Metadata do contain leader information + # deciding which broker to use when publishing. + #metadata: + # Max metadata request retry attempts when cluster is in middle of leader + # election. Defaults to 3 retries. + #retry.max: 3 + + # Waiting time between retries during leader elections. Default is 250ms. + #retry.backoff: 250ms + + # Refresh metadata interval. Defaults to every 10 minutes. + #refresh_frequency: 10m + # The number of concurrent load-balanced Kafka output workers. #worker: 1