diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 84abb68c50c9..047837de8423 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -71,6 +71,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di - Update init scripts to use the `test config` subcommand instead of the deprecated `-configtest` flag. {issue}4600[4600] - Get by default the credentials for connecting to Kibana from the Elasticsearch output configuration. {pull}4867[4867] - Added `cloud.id` and `cloud.auth` settings, for simplifying using Beats with the Elastic Cloud. {issue}4959[4959] +- Add lz4 compression support to kafka output. {pull}4977[4977] +- Add newer kafka versions to kafka output. {pull}4977[4977] *Auditbeat* diff --git a/NOTICE b/NOTICE index 9f0d7bdee7c0..a8382589a065 100644 --- a/NOTICE +++ b/NOTICE @@ -3034,6 +3034,34 @@ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-------------------------------------------------------------------- +Dependency: github.com/Shopify/sarama +Version: v1.12/enh/offset-replica-id +Revision: c292021939f5aba53b3ffc2cb09c7aadb32a42df +License type (autodetected): MIT license +./vendor/github.com/Shopify/sarama/LICENSE: +-------------------------------------------------------------------- +Copyright (c) 2013 Evan Huus + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + -------------------------------------------------------------------- Dependency: github.com/Sirupsen/logrus Revision: 5e5dc898656f695e2a086b8e12559febbfc01562 diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index c07e0037a2bd..a2443fc8abea 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -596,8 +596,7 @@ supported stable version (currently version 0.8.2.0). Event timestamps will be added, if version 0.10.0.0+ is enabled. -Valid values are `0.8.2.0`, `0.8.2.1`, `0.8.2.2`, `0.8.2`, `0.8`, `0.9.0.0`, -`0.9.0.1`, `0.9.0`, `0.9`, `0.10.0.0`, `0.10.0`, and `0.10`. +Valid values are all kafka releases in between `0.8.2.0` and `0.11.0.0`. ===== `username` @@ -720,7 +719,7 @@ The keep-alive period for an active network connection. If 0s, keep-alives are d ===== `compression` -Sets the output compression codec. Must be one of `none`, `snappy` and `gzip`. The default is `gzip`. +Sets the output compression codec. Must be one of `none`, `snappy`, `lz4` and `gzip`. The default is `gzip`. [[kafka-max_message_bytes]] ===== `max_message_bytes` diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index f867c47445a9..a5739e13aee8 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -44,12 +44,21 @@ var ( errNoHosts = errors.New("No hosts configured") ) +// TODO: remove me. +// Compat version overwrite for missing versions in sarama +// Public API is compatible between these versions. +var ( + v0_10_2_1 = sarama.V0_10_2_0 + v0_11_0_0 = sarama.V0_10_2_0 +) + var ( compressionModes = map[string]sarama.CompressionCodec{ "none": sarama.CompressionNone, "no": sarama.CompressionNone, "off": sarama.CompressionNone, "gzip": sarama.CompressionGZIP, + "lz4": sarama.CompressionLZ4, "snappy": sarama.CompressionSnappy, } @@ -72,7 +81,14 @@ var ( "0.10.0": sarama.V0_10_0_1, "0.10.1.0": sarama.V0_10_1_0, "0.10.1": sarama.V0_10_1_0, - "0.10": sarama.V0_10_1_0, + "0.10.2.0": sarama.V0_10_2_0, + "0.10.2.1": v0_10_2_1, + "0.10.2": v0_10_2_1, + "0.10": v0_10_2_1, + + "0.11.0.0": v0_11_0_0, + "0.11.0": v0_11_0_0, + "0.11": v0_11_0_0, } ) diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md index aad986fef9a4..0a0082df752e 100644 --- a/vendor/github.com/Shopify/sarama/CHANGELOG.md +++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md @@ -1,7 +1,46 @@ # Changelog +#### Version 1.12.0 (2017-05-08) + +New Features: + - Added support for the `ApiVersions` request and response pair, and Kafka + version 0.10.2 ([#867](https://github.com/Shopify/sarama/pull/867)). Note + that you still need to specify the Kafka version in the Sarama configuration + for the time being. + - Added a `Brokers` method to the Client which returns the complete set of + active brokers ([#813](https://github.com/Shopify/sarama/pull/813)). + - Added an `InSyncReplicas` method to the Client which returns the set of all + in-sync broker IDs for the given partition, now that the Kafka versions for + which this was misleading are no longer in our supported set + ([#872](https://github.com/Shopify/sarama/pull/872)). + - Added a `NewCustomHashPartitioner` method which allows constructing a hash + partitioner with a custom hash method in case the default (FNV-1a) is not + suitable + ([#837](https://github.com/Shopify/sarama/pull/837), + [#841](https://github.com/Shopify/sarama/pull/841)). + +Improvements: + - Recognize more Kafka error codes + ([#859](https://github.com/Shopify/sarama/pull/859)). + +Bug Fixes: + - Fix an issue where decoding a malformed FetchRequest would not return the + correct error ([#818](https://github.com/Shopify/sarama/pull/818)). + - Respect ordering of group protocols in JoinGroupRequests. This fix is + transparent if you're using the `AddGroupProtocol` or + `AddGroupProtocolMetadata` helpers; otherwise you will need to switch from + the `GroupProtocols` field (now deprecated) to use `OrderedGroupProtocols` + ([#812](https://github.com/Shopify/sarama/issues/812)). + - Fix an alignment-related issue with atomics on 32-bit architectures + ([#859](https://github.com/Shopify/sarama/pull/859)). + #### Version 1.11.0 (2016-12-20) +_Important:_ As of Sarama 1.11 it is necessary to set the config value of +`Producer.Return.Successes` to true in order to use the SyncProducer. Previous +versions would silently override this value when instantiating a SyncProducer +which led to unexpected values and data races. + New Features: - Metrics! Thanks to Sébastien Launay for all his work on this feature ([#701](https://github.com/Shopify/sarama/pull/701), diff --git a/vendor/github.com/Shopify/sarama/MIT-LICENSE b/vendor/github.com/Shopify/sarama/LICENSE similarity index 100% rename from vendor/github.com/Shopify/sarama/MIT-LICENSE rename to vendor/github.com/Shopify/sarama/LICENSE diff --git a/vendor/github.com/Shopify/sarama/README.md b/vendor/github.com/Shopify/sarama/README.md index c2968b92c35a..6e12a07ae087 100644 --- a/vendor/github.com/Shopify/sarama/README.md +++ b/vendor/github.com/Shopify/sarama/README.md @@ -13,12 +13,14 @@ Sarama is an MIT-licensed Go client library for [Apache Kafka](https://kafka.apa - The [examples](./examples) directory contains more elaborate example applications. - The [tools](./tools) directory contains command line tools that can be useful for testing, diagnostics, and instrumentation. +You might also want to look at the [Frequently Asked Questions](https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions). + ### Compatibility and API stability Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest stable releases of Kafka and Go, and we provide a two month grace period for older releases. This means we currently officially support -Go 1.7 and 1.6, and Kafka 0.10.0 and 0.9.0, although older releases are +Go 1.8 and 1.7, and Kafka 0.10 and 0.9, although older releases are still likely to work. Sarama follows semantic versioning and provides API stability via the gopkg.in service. @@ -27,7 +29,7 @@ A changelog is available [here](CHANGELOG.md). ### Contributing -* Get started by checking our [contribution guidelines](https://github.com/Shopify/sarama/blob/master/CONTRIBUTING.md). +* Get started by checking our [contribution guidelines](https://github.com/Shopify/sarama/blob/master/.github/CONTRIBUTING.md). * Read the [Sarama wiki](https://github.com/Shopify/sarama/wiki) for more technical and design details. * The [Kafka Protocol Specification](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol) diff --git a/vendor/github.com/Shopify/sarama/api_versions_response.go b/vendor/github.com/Shopify/sarama/api_versions_response.go index 16d62db2d305..23bc326e15f2 100644 --- a/vendor/github.com/Shopify/sarama/api_versions_response.go +++ b/vendor/github.com/Shopify/sarama/api_versions_response.go @@ -50,12 +50,13 @@ func (r *ApiVersionsResponse) encode(pe packetEncoder) error { } func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error { - if kerr, err := pd.getInt16(); err != nil { + kerr, err := pd.getInt16() + if err != nil { return err - } else { - r.Err = KError(kerr) } + r.Err = KError(kerr) + numBlocks, err := pd.getArrayLength() if err != nil { return err diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go index 3af47fddd8e0..6d71a6d8feb5 100644 --- a/vendor/github.com/Shopify/sarama/async_producer.go +++ b/vendor/github.com/Shopify/sarama/async_producer.go @@ -17,24 +17,23 @@ import ( // scope. type AsyncProducer interface { - // AsyncClose triggers a shutdown of the producer, flushing any messages it may - // have buffered. The shutdown has completed when both the Errors and Successes - // channels have been closed. When calling AsyncClose, you *must* continue to - // read from those channels in order to drain the results of any messages in - // flight. + // AsyncClose triggers a shutdown of the producer. The shutdown has completed + // when both the Errors and Successes channels have been closed. When calling + // AsyncClose, you *must* continue to read from those channels in order to + // drain the results of any messages in flight. AsyncClose() - // Close shuts down the producer and flushes any messages it may have buffered. - // You must call this function before a producer object passes out of scope, as - // it may otherwise leak memory. You must call this before calling Close on the - // underlying client. + // Close shuts down the producer and waits for any buffered messages to be + // flushed. You must call this function before a producer object passes out of + // scope, as it may otherwise leak memory. You must call this before calling + // Close on the underlying client. Close() error // Input is the input channel for the user to write messages to that they // wish to send. Input() chan<- *ProducerMessage - // Successes is the success output channel back to the user when AckSuccesses is + // Successes is the success output channel back to the user when Return.Successes is // enabled. If Return.Successes is true, you MUST read from this channel or the // Producer will deadlock. It is suggested that you send and read messages // together in a single select statement. @@ -200,7 +199,7 @@ func (p *asyncProducer) Close() error { if p.conf.Producer.Return.Successes { go withRecover(func() { - for _ = range p.successes { + for range p.successes { } }) } diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go index 89beecc8648a..f57a6909429b 100644 --- a/vendor/github.com/Shopify/sarama/broker.go +++ b/vendor/github.com/Shopify/sarama/broker.go @@ -52,7 +52,7 @@ type responsePromise struct { errors chan error } -// NewBroker creates and returns a Broker targetting the given host:port address. +// NewBroker creates and returns a Broker targeting the given host:port address. // This does not attempt to actually connect, you have to call Open() for that. func NewBroker(addr string) *Broker { return &Broker{id: -1, addr: addr} @@ -355,6 +355,17 @@ func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroups return response, nil } +func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) { + response := new(ApiVersionsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go index f869a1434f74..45de3973d552 100644 --- a/vendor/github.com/Shopify/sarama/client.go +++ b/vendor/github.com/Shopify/sarama/client.go @@ -17,6 +17,9 @@ type Client interface { // altered after it has been created. Config() *Config + // Brokers returns the current set of active brokers as retrieved from cluster metadata. + Brokers() []*Broker + // Topics returns the set of available topics as retrieved from cluster metadata. Topics() ([]string, error) @@ -35,6 +38,11 @@ type Client interface { // Replicas returns the set of all replica IDs for the given partition. Replicas(topic string, partitionID int32) ([]int32, error) + // InSyncReplicas returns the set of all in-sync replica IDs for the given + // partition. In-sync replicas are replicas which are fully caught up with + // the partition leader. + InSyncReplicas(topic string, partitionID int32) ([]int32, error) + // RefreshMetadata takes a list of topics and queries the cluster to refresh the // available metadata for those topics. If no topics are provided, it will refresh // metadata for all topics. @@ -133,7 +141,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) { client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index])) } - // do an initial fetch of all cluster metadata by specifing an empty list of topics + // do an initial fetch of all cluster metadata by specifying an empty list of topics err := client.RefreshMetadata() switch err { case nil: @@ -157,6 +165,16 @@ func (client *client) Config() *Config { return client.conf } +func (client *client) Brokers() []*Broker { + client.lock.RLock() + defer client.lock.RUnlock() + brokers := make([]*Broker, 0) + for _, broker := range client.brokers { + brokers = append(brokers, broker) + } + return brokers +} + func (client *client) Close() error { if client.Closed() { // Chances are this is being called from a defer() and the error will go unobserved @@ -282,6 +300,31 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) return dupeAndSort(metadata.Replicas), nil } +func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) { + if client.Closed() { + return nil, ErrClosedClient + } + + metadata := client.cachedMetadata(topic, partitionID) + + if metadata == nil { + err := client.RefreshMetadata(topic) + if err != nil { + return nil, err + } + metadata = client.cachedMetadata(topic, partitionID) + } + + if metadata == nil { + return nil, ErrUnknownTopicOrPartition + } + + if metadata.Err == ErrReplicaNotAvailable { + return nil, metadata.Err + } + return dupeAndSort(metadata.Isr), nil +} + func (client *client) Leader(topic string, partitionID int32) (*Broker, error) { if client.Closed() { return nil, ErrClosedClient @@ -592,12 +635,12 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) switch err.(type) { case nil: // valid response, use it - if shouldRetry, err := client.updateMetadata(response); shouldRetry { + shouldRetry, err := client.updateMetadata(response) + if shouldRetry { Logger.Println("client/metadata found some partitions to be leaderless") return retry(err) // note: err can be nil - } else { - return err } + return err case PacketEncodingError: // didn't even send, return the error diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go index a417a38b2e6c..5021c57e908e 100644 --- a/vendor/github.com/Shopify/sarama/config.go +++ b/vendor/github.com/Shopify/sarama/config.go @@ -305,10 +305,13 @@ func (c *Config) Validate() error { Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.") } if c.Producer.MaxMessageBytes >= int(MaxRequestSize) { - Logger.Println("Producer.MaxMessageBytes is larger than MaxRequestSize; it will be ignored.") + Logger.Println("Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored.") } if c.Producer.Flush.Bytes >= int(MaxRequestSize) { - Logger.Println("Producer.Flush.Bytes is larger than MaxRequestSize; it will be ignored.") + Logger.Println("Producer.Flush.Bytes must be smaller than MaxRequestSize; it will be ignored.") + } + if (c.Producer.Flush.Bytes > 0 || c.Producer.Flush.Messages > 0) && c.Producer.Flush.Frequency == 0 { + Logger.Println("Producer.Flush: Bytes or Messages are set, but Frequency is not; messages may not get flushed.") } if c.Producer.Timeout%time.Millisecond != 0 { Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.") diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go index ddac564ff5ab..78d7fa2caa60 100644 --- a/vendor/github.com/Shopify/sarama/consumer.go +++ b/vendor/github.com/Shopify/sarama/consumer.go @@ -289,10 +289,11 @@ type PartitionConsumer interface { } type partitionConsumer struct { - consumer *consumer - conf *Config - topic string - partition int32 + highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG + consumer *consumer + conf *Config + topic string + partition int32 broker *brokerConsumer messages chan *ConsumerMessage @@ -302,9 +303,8 @@ type partitionConsumer struct { trigger, dying chan none responseResult error - fetchSize int32 - offset int64 - highWaterMarkOffset int64 + fetchSize int32 + offset int64 } var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing @@ -324,7 +324,7 @@ func (child *partitionConsumer) sendError(err error) { } func (child *partitionConsumer) dispatcher() { - for _ = range child.trigger { + for range child.trigger { select { case <-child.dying: close(child.trigger) @@ -411,7 +411,7 @@ func (child *partitionConsumer) Close() error { child.AsyncClose() go withRecover(func() { - for _ = range child.messages { + for range child.messages { // drain } }) diff --git a/vendor/github.com/Shopify/sarama/crc32_field.go b/vendor/github.com/Shopify/sarama/crc32_field.go index 5c286079056c..f4fde18ad252 100644 --- a/vendor/github.com/Shopify/sarama/crc32_field.go +++ b/vendor/github.com/Shopify/sarama/crc32_field.go @@ -2,8 +2,7 @@ package sarama import ( "encoding/binary" - - "github.com/klauspost/crc32" + "hash/crc32" ) // crc32Field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s. diff --git a/vendor/github.com/Shopify/sarama/describe_groups_response.go b/vendor/github.com/Shopify/sarama/describe_groups_response.go index d2c2071e1e67..542b3a971709 100644 --- a/vendor/github.com/Shopify/sarama/describe_groups_response.go +++ b/vendor/github.com/Shopify/sarama/describe_groups_response.go @@ -89,12 +89,13 @@ func (gd *GroupDescription) encode(pe packetEncoder) error { } func (gd *GroupDescription) decode(pd packetDecoder) (err error) { - if kerr, err := pd.getInt16(); err != nil { + kerr, err := pd.getInt16() + if err != nil { return err - } else { - gd.Err = KError(kerr) } + gd.Err = KError(kerr) + if gd.GroupId, err = pd.getString(); err != nil { return } diff --git a/vendor/github.com/Shopify/sarama/errors.go b/vendor/github.com/Shopify/sarama/errors.go index cc3f623d0d73..e6800ed49290 100644 --- a/vendor/github.com/Shopify/sarama/errors.go +++ b/vendor/github.com/Shopify/sarama/errors.go @@ -108,12 +108,20 @@ const ( ErrUnsupportedSASLMechanism KError = 33 ErrIllegalSASLState KError = 34 ErrUnsupportedVersion KError = 35 + ErrTopicAlreadyExists KError = 36 + ErrInvalidPartitions KError = 37 + ErrInvalidReplicationFactor KError = 38 + ErrInvalidReplicaAssignment KError = 39 + ErrInvalidConfig KError = 40 + ErrNotController KError = 41 + ErrInvalidRequest KError = 42 ErrUnsupportedForMessageFormat KError = 43 + ErrPolicyViolation KError = 44 ) func (err KError) Error() string { // Error messages stolen/adapted from - // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol + // https://kafka.apache.org/protocol#protocol_error_codes switch err { case ErrNoError: return "kafka server: Not an error, why are you printing me?" @@ -189,8 +197,24 @@ func (err KError) Error() string { return "kafka server: Request is not valid given the current SASL state." case ErrUnsupportedVersion: return "kafka server: The version of API is not supported." + case ErrTopicAlreadyExists: + return "kafka server: Topic with this name already exists." + case ErrInvalidPartitions: + return "kafka server: Number of partitions is invalid." + case ErrInvalidReplicationFactor: + return "kafka server: Replication-factor is invalid." + case ErrInvalidReplicaAssignment: + return "kafka server: Replica assignment is invalid." + case ErrInvalidConfig: + return "kafka server: Configuration is invalid." + case ErrNotController: + return "kafka server: This is not the correct controller for this cluster." + case ErrInvalidRequest: + return "kafka server: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details." case ErrUnsupportedForMessageFormat: return "kafka server: The requested operation is not supported by the message format version." + case ErrPolicyViolation: + return "kafka server: Request parameters do not satisfy the configured policy." } return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err) diff --git a/vendor/github.com/Shopify/sarama/fetch_request.go b/vendor/github.com/Shopify/sarama/fetch_request.go index ae701a3f29a5..ab817a06eeed 100644 --- a/vendor/github.com/Shopify/sarama/fetch_request.go +++ b/vendor/github.com/Shopify/sarama/fetch_request.go @@ -92,7 +92,7 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) { } fetchBlock := &fetchRequestBlock{} if err = fetchBlock.decode(pd); err != nil { - return nil + return err } r.blocks[topic][partition] = fetchBlock } diff --git a/vendor/github.com/Shopify/sarama/heartbeat_response.go b/vendor/github.com/Shopify/sarama/heartbeat_response.go index 3c51163ad1f2..766f5fdec6f3 100644 --- a/vendor/github.com/Shopify/sarama/heartbeat_response.go +++ b/vendor/github.com/Shopify/sarama/heartbeat_response.go @@ -10,11 +10,11 @@ func (r *HeartbeatResponse) encode(pe packetEncoder) error { } func (r *HeartbeatResponse) decode(pd packetDecoder, version int16) error { - if kerr, err := pd.getInt16(); err != nil { + kerr, err := pd.getInt16() + if err != nil { return err - } else { - r.Err = KError(kerr) } + r.Err = KError(kerr) return nil } diff --git a/vendor/github.com/Shopify/sarama/join_group_request.go b/vendor/github.com/Shopify/sarama/join_group_request.go index 656db4562d3d..3a7ba17122d3 100644 --- a/vendor/github.com/Shopify/sarama/join_group_request.go +++ b/vendor/github.com/Shopify/sarama/join_group_request.go @@ -1,11 +1,36 @@ package sarama +type GroupProtocol struct { + Name string + Metadata []byte +} + +func (p *GroupProtocol) decode(pd packetDecoder) (err error) { + p.Name, err = pd.getString() + if err != nil { + return err + } + p.Metadata, err = pd.getBytes() + return err +} + +func (p *GroupProtocol) encode(pe packetEncoder) (err error) { + if err := pe.putString(p.Name); err != nil { + return err + } + if err := pe.putBytes(p.Metadata); err != nil { + return err + } + return nil +} + type JoinGroupRequest struct { - GroupId string - SessionTimeout int32 - MemberId string - ProtocolType string - GroupProtocols map[string][]byte + GroupId string + SessionTimeout int32 + MemberId string + ProtocolType string + GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols + OrderedGroupProtocols []*GroupProtocol } func (r *JoinGroupRequest) encode(pe packetEncoder) error { @@ -20,16 +45,31 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error { return err } - if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil { - return err - } - for name, metadata := range r.GroupProtocols { - if err := pe.putString(name); err != nil { + if len(r.GroupProtocols) > 0 { + if len(r.OrderedGroupProtocols) > 0 { + return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"} + } + + if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil { return err } - if err := pe.putBytes(metadata); err != nil { + for name, metadata := range r.GroupProtocols { + if err := pe.putString(name); err != nil { + return err + } + if err := pe.putBytes(metadata); err != nil { + return err + } + } + } else { + if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil { return err } + for _, protocol := range r.OrderedGroupProtocols { + if err := protocol.encode(pe); err != nil { + return err + } + } } return nil @@ -62,16 +102,12 @@ func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) { r.GroupProtocols = make(map[string][]byte) for i := 0; i < n; i++ { - name, err := pd.getString() - if err != nil { - return err - } - metadata, err := pd.getBytes() - if err != nil { + protocol := &GroupProtocol{} + if err := protocol.decode(pd); err != nil { return err } - - r.GroupProtocols[name] = metadata + r.GroupProtocols[protocol.Name] = protocol.Metadata + r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol) } return nil @@ -90,11 +126,10 @@ func (r *JoinGroupRequest) requiredVersion() KafkaVersion { } func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { - if r.GroupProtocols == nil { - r.GroupProtocols = make(map[string][]byte) - } - - r.GroupProtocols[name] = metadata + r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{ + Name: name, + Metadata: metadata, + }) } func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error { diff --git a/vendor/github.com/Shopify/sarama/join_group_response.go b/vendor/github.com/Shopify/sarama/join_group_response.go index 94c7a7fde072..6d35fe36494e 100644 --- a/vendor/github.com/Shopify/sarama/join_group_response.go +++ b/vendor/github.com/Shopify/sarama/join_group_response.go @@ -53,12 +53,13 @@ func (r *JoinGroupResponse) encode(pe packetEncoder) error { } func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) { - if kerr, err := pd.getInt16(); err != nil { + kerr, err := pd.getInt16() + if err != nil { return err - } else { - r.Err = KError(kerr) } + r.Err = KError(kerr) + if r.GenerationId, err = pd.getInt32(); err != nil { return } diff --git a/vendor/github.com/Shopify/sarama/leave_group_response.go b/vendor/github.com/Shopify/sarama/leave_group_response.go index bd4a34f46cec..d60c626da01c 100644 --- a/vendor/github.com/Shopify/sarama/leave_group_response.go +++ b/vendor/github.com/Shopify/sarama/leave_group_response.go @@ -10,11 +10,11 @@ func (r *LeaveGroupResponse) encode(pe packetEncoder) error { } func (r *LeaveGroupResponse) decode(pd packetDecoder, version int16) (err error) { - if kerr, err := pd.getInt16(); err != nil { + kerr, err := pd.getInt16() + if err != nil { return err - } else { - r.Err = KError(kerr) } + r.Err = KError(kerr) return nil } diff --git a/vendor/github.com/Shopify/sarama/list_groups_response.go b/vendor/github.com/Shopify/sarama/list_groups_response.go index 3a84f9b6c18d..56115d4c75ad 100644 --- a/vendor/github.com/Shopify/sarama/list_groups_response.go +++ b/vendor/github.com/Shopify/sarama/list_groups_response.go @@ -24,12 +24,13 @@ func (r *ListGroupsResponse) encode(pe packetEncoder) error { } func (r *ListGroupsResponse) decode(pd packetDecoder, version int16) error { - if kerr, err := pd.getInt16(); err != nil { + kerr, err := pd.getInt16() + if err != nil { return err - } else { - r.Err = KError(kerr) } + r.Err = KError(kerr) + n, err := pd.getArrayLength() if err != nil { return err diff --git a/vendor/github.com/Shopify/sarama/partitioner.go b/vendor/github.com/Shopify/sarama/partitioner.go index d24199da9c2d..972932728a54 100644 --- a/vendor/github.com/Shopify/sarama/partitioner.go +++ b/vendor/github.com/Shopify/sarama/partitioner.go @@ -87,6 +87,18 @@ type hashPartitioner struct { hasher hash.Hash32 } +// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. +// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that +// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance. +func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor { + return func(topic string) Partitioner { + p := new(hashPartitioner) + p.random = NewRandomPartitioner(topic) + p.hasher = hasher() + return p + } +} + // NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a // random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, // modulus the number of partitions. This ensures that messages with the same key always end up on the diff --git a/vendor/github.com/Shopify/sarama/produce_response.go b/vendor/github.com/Shopify/sarama/produce_response.go index 195abcb812e8..3f05dd9fbdb6 100644 --- a/vendor/github.com/Shopify/sarama/produce_response.go +++ b/vendor/github.com/Shopify/sarama/produce_response.go @@ -76,11 +76,12 @@ func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) { } if r.Version >= 1 { - if millis, err := pd.getInt32(); err != nil { + millis, err := pd.getInt32() + if err != nil { return err - } else { - r.ThrottleTime = time.Duration(millis) * time.Millisecond } + + r.ThrottleTime = time.Duration(millis) * time.Millisecond } return nil diff --git a/vendor/github.com/Shopify/sarama/real_decoder.go b/vendor/github.com/Shopify/sarama/real_decoder.go index a0141af079c3..3cf93533a6ac 100644 --- a/vendor/github.com/Shopify/sarama/real_decoder.go +++ b/vendor/github.com/Shopify/sarama/real_decoder.go @@ -204,11 +204,12 @@ func (rd *realDecoder) getStringArray() ([]string, error) { ret := make([]string, n) for i := range ret { - if str, err := rd.getString(); err != nil { + str, err := rd.getString() + if err != nil { return nil, err - } else { - ret[i] = str } + + ret[i] = str } return ret, nil } diff --git a/vendor/github.com/Shopify/sarama/sasl_handshake_response.go b/vendor/github.com/Shopify/sarama/sasl_handshake_response.go index 8379bbb269b1..ef290d4bc6da 100644 --- a/vendor/github.com/Shopify/sarama/sasl_handshake_response.go +++ b/vendor/github.com/Shopify/sarama/sasl_handshake_response.go @@ -11,13 +11,13 @@ func (r *SaslHandshakeResponse) encode(pe packetEncoder) error { } func (r *SaslHandshakeResponse) decode(pd packetDecoder, version int16) error { - if kerr, err := pd.getInt16(); err != nil { + kerr, err := pd.getInt16() + if err != nil { return err - } else { - r.Err = KError(kerr) } - var err error + r.Err = KError(kerr) + if r.EnabledMechanisms, err = pd.getStringArray(); err != nil { return err } diff --git a/vendor/github.com/Shopify/sarama/sync_group_response.go b/vendor/github.com/Shopify/sarama/sync_group_response.go index 12aef6730347..194b382b4ab6 100644 --- a/vendor/github.com/Shopify/sarama/sync_group_response.go +++ b/vendor/github.com/Shopify/sarama/sync_group_response.go @@ -17,12 +17,13 @@ func (r *SyncGroupResponse) encode(pe packetEncoder) error { } func (r *SyncGroupResponse) decode(pd packetDecoder, version int16) (err error) { - if kerr, err := pd.getInt16(); err != nil { + kerr, err := pd.getInt16() + if err != nil { return err - } else { - r.Err = KError(kerr) } + r.Err = KError(kerr) + r.MemberAssignment, err = pd.getBytes() return } diff --git a/vendor/github.com/Shopify/sarama/sync_producer.go b/vendor/github.com/Shopify/sarama/sync_producer.go index c77ae3140773..dd096b6db671 100644 --- a/vendor/github.com/Shopify/sarama/sync_producer.go +++ b/vendor/github.com/Shopify/sarama/sync_producer.go @@ -25,10 +25,10 @@ type SyncProducer interface { // SendMessages will return an error. SendMessages(msgs []*ProducerMessage) error - // Close shuts down the producer and flushes any messages it may have buffered. - // You must call this function before a producer object passes out of scope, as - // it may otherwise leak memory. You must call this before calling Close on the - // underlying client. + // Close shuts down the producer and waits for any buffered messages to be + // flushed. You must call this function before a producer object passes out of + // scope, as it may otherwise leak memory. You must call this before calling + // Close on the underlying client. Close() error } diff --git a/vendor/github.com/Shopify/sarama/utils.go b/vendor/github.com/Shopify/sarama/utils.go index 3cbab2d92b13..d36db9210893 100644 --- a/vendor/github.com/Shopify/sarama/utils.go +++ b/vendor/github.com/Shopify/sarama/utils.go @@ -148,5 +148,6 @@ var ( V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) + V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) minVersion = V0_8_2_0 ) diff --git a/vendor/vendor.json b/vendor/vendor.json index 34c9c2e3d2a7..69708a13963e 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -9,11 +9,14 @@ "revisionTime": "2017-05-24T00:36:31Z" }, { - "checksumSHA1": "WfQhuFTJ3zYmzVjeghW+9e76Yq0=", + "checksumSHA1": "GCpYz281OE/XkAQK23BLr+nK6O0=", "origin": "github.com/urso/sarama", "path": "github.com/Shopify/sarama", - "revision": "f0996189c86dc27338468f2a9b10077c6d572b34", - "revisionTime": "2016-11-23T00:27:23Z" + "revision": "c292021939f5aba53b3ffc2cb09c7aadb32a42df", + "revisionTime": "2016-11-23T00:27:23Z", + "tree": true, + "version": "v1.12/enh/offset-replica-id", + "versionExact": "v1.12/enh/offset-replica-id" }, { "checksumSHA1": "DYv6Q1+VfnUVxMwvk5IshAClLvw=",