diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index cebb87ce771f..166e2fd9ca65 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -17,7 +17,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Rename beat.cpu.*.time metrics to beat.cpu.*.time.ms. {pull}6449[6449] - Mark `system.syslog.message` and `system.auth.message` as `text` instead of `keyword`. {pull}6589[6589] - Allow override of dynamic template `match_mapping_type` for fields with object_type. {pull}6691[6691] -- Set default kafka version to 1.0.0 in kafka output. Older versions are still supported by configuring the `version` setting. {pull}7025[7025] +- Set default kafka version to 1.0.0 in kafka output. Older versions are still supported by configuring the `version` setting. Minimally supported version is 0.11 (older versions might work, but are untested). {pull}7025[7025] - Add `host.name` field to all events, to avoid mapping conflicts. This could be breaking Logstash configs if you rely on the `host` field being a string. {pull}7051[7051] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index e8a9677f0ec9..7a365a8f0322 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -2030,12 +2030,12 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------- Dependency: github.com/Shopify/sarama -Version: v1.16.0/enh/offset-replica-id -Revision: 32b4ad5c9537ed14e471779b76713ff65420db39 +Version: v1.17.0/enh/offset-replica-id +Revision: d1575e4abe04acbbe8ac766320585cdf271dd189 License type (autodetected): MIT ./vendor/github.com/Shopify/sarama/LICENSE: -------------------------------------------------------------------- -Copyright (c) 2013 Evan Huus +Copyright (c) 2013 Shopify Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index d50360591271..7c15c6201d34 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -616,6 +616,10 @@ output.elasticsearch: # default is gzip. #compression: gzip + # Set the compression level. Currently only gzip provides a compression level + # between 0 and 9. The default value is chosen by the compression algorithm. + #compression_level: 4 + # The maximum permitted size of JSON-encoded messages. Bigger messages will be # dropped. The default value is 1000000 (bytes). This value should be equal to # or less than the broker's message.max.bytes. diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 8d787d0f8afc..33d9d8bf1913 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1276,6 +1276,10 @@ output.elasticsearch: # default is gzip. #compression: gzip + # Set the compression level. Currently only gzip provides a compression level + # between 0 and 9. The default value is chosen by the compression algorithm. + #compression_level: 4 + # The maximum permitted size of JSON-encoded messages. Bigger messages will be # dropped. The default value is 1000000 (bytes). This value should be equal to # or less than the broker's message.max.bytes. diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index df3f104751c8..56010ce9dc7f 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -723,6 +723,10 @@ output.elasticsearch: # default is gzip. #compression: gzip + # Set the compression level. Currently only gzip provides a compression level + # between 0 and 9. The default value is chosen by the compression algorithm. + #compression_level: 4 + # The maximum permitted size of JSON-encoded messages. Bigger messages will be # dropped. The default value is 1000000 (bytes). This value should be equal to # or less than the broker's message.max.bytes. diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index fc3bbd254c09..7f0d922a1484 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -509,6 +509,10 @@ output.elasticsearch: # default is gzip. #compression: gzip + # Set the compression level. Currently only gzip provides a compression level + # between 0 and 9. The default value is chosen by the compression algorithm. + #compression_level: 4 + # The maximum permitted size of JSON-encoded messages. Bigger messages will be # dropped. The default value is 1000000 (bytes). This value should be equal to # or less than the broker's message.max.bytes. diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index 90345342bee6..34e8b6b65940 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -667,7 +667,8 @@ NOTE: Events bigger than <> will be ==== Compatibility -This output works with Kafka 0.8, 0.9, and 0.10. +This output works with all Kafka in between 0.11 and 1.1.1. Older versions +might work as well, but are not supported. ==== Configuration options @@ -691,7 +692,7 @@ Kafka version ${beatname_lc} is assumed to run against. Defaults to 1.0.0. Event timestamps will be added, if version 0.10.0.0+ is enabled. -Valid values are all kafka releases in between `0.8.2.0` and `1.1.0`. +Valid values are all kafka releases in between `0.8.2.0` and `1.1.1`. ===== `username` @@ -833,6 +834,15 @@ The keep-alive period for an active network connection. If 0s, keep-alives are d Sets the output compression codec. Must be one of `none`, `snappy`, `lz4` and `gzip`. The default is `gzip`. +===== `compression_level` + +Sets the compression level used by gzip. Setting this value to 0 disables compression. +The compression level must be in the range of 1 (best speed) to 9 (best compression). + +Increasing the compression level will reduce the network usage but will increase the cpu usage. + +The default value is 4. + [[kafka-max_message_bytes]] ===== `max_message_bytes` diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 2e9a0152d1fc..956a8bd8cb4b 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -36,25 +36,26 @@ import ( ) type kafkaConfig struct { - Hosts []string `config:"hosts" validate:"required"` - TLS *tlscommon.Config `config:"ssl"` - Timeout time.Duration `config:"timeout" validate:"min=1"` - Metadata metaConfig `config:"metadata"` - Key *fmtstr.EventFormatString `config:"key"` - Partition map[string]*common.Config `config:"partition"` - KeepAlive time.Duration `config:"keep_alive" validate:"min=0"` - MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"` - RequiredACKs *int `config:"required_acks" validate:"min=-1"` - BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"` - Compression string `config:"compression"` - Version string `config:"version"` - BulkMaxSize int `config:"bulk_max_size"` - MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` - ClientID string `config:"client_id"` - ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` - Username string `config:"username"` - Password string `config:"password"` - Codec codec.Config `config:"codec"` + Hosts []string `config:"hosts" validate:"required"` + TLS *tlscommon.Config `config:"ssl"` + Timeout time.Duration `config:"timeout" validate:"min=1"` + Metadata metaConfig `config:"metadata"` + Key *fmtstr.EventFormatString `config:"key"` + Partition map[string]*common.Config `config:"partition"` + KeepAlive time.Duration `config:"keep_alive" validate:"min=0"` + MaxMessageBytes *int `config:"max_message_bytes" validate:"min=1"` + RequiredACKs *int `config:"required_acks" validate:"min=-1"` + BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"` + Compression string `config:"compression"` + CompressionLevel int `config:"compression_level"` + Version string `config:"version"` + BulkMaxSize int `config:"bulk_max_size"` + MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"` + ClientID string `config:"client_id"` + ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"` + Username string `config:"username"` + Password string `config:"password"` + Codec codec.Config `config:"codec"` } type metaConfig struct { @@ -89,17 +90,18 @@ func defaultConfig() kafkaConfig { }, RefreshFreq: 10 * time.Minute, }, - KeepAlive: 0, - MaxMessageBytes: nil, // use library default - RequiredACKs: nil, // use library default - BrokerTimeout: 10 * time.Second, - Compression: "gzip", - Version: "1.0.0", - MaxRetries: 3, - ClientID: "beats", - ChanBufferSize: 256, - Username: "", - Password: "", + KeepAlive: 0, + MaxMessageBytes: nil, // use library default + RequiredACKs: nil, // use library default + BrokerTimeout: 10 * time.Second, + Compression: "gzip", + CompressionLevel: 4, + Version: "1.0.0", + MaxRetries: 3, + ClientID: "beats", + ChanBufferSize: 256, + Username: "", + Password: "", } } @@ -120,6 +122,13 @@ func (c *kafkaConfig) Validate() error { return fmt.Errorf("password must be set when username is configured") } + if c.Compression == "gzip" { + lvl := c.CompressionLevel + if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) { + return fmt.Errorf("compression_level must be between 0 and 9") + } + } + return nil } @@ -138,6 +147,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Net.WriteTimeout = timeout k.Net.KeepAlive = config.KeepAlive k.Producer.Timeout = config.BrokerTimeout + k.Producer.CompressionLevel = config.CompressionLevel tls, err := outputs.LoadTLSConfig(config.TLS) if err != nil { diff --git a/libbeat/outputs/kafka/version.go b/libbeat/outputs/kafka/version.go index d171520a10c5..884e67616a20 100644 --- a/libbeat/outputs/kafka/version.go +++ b/libbeat/outputs/kafka/version.go @@ -27,7 +27,8 @@ var ( v0_11_0_1 = parseKafkaVersion("0.11.0.1") v0_11_0_2 = parseKafkaVersion("0.11.0.2") v1_0_1 = parseKafkaVersion("1.0.1") - v1_1_0 = parseKafkaVersion("1.1.0") + v1_0_2 = parseKafkaVersion("1.0.2") + v1_1_1 = parseKafkaVersion("1.1.1") kafkaVersions = map[string]sarama.KafkaVersion{ "": sarama.V1_0_0_0, @@ -61,10 +62,12 @@ var ( "1.0.0": sarama.V1_0_0_0, "1.0.1": v1_0_1, - "1.0": v1_0_1, - "1.1.0": v1_1_0, - "1.1": v1_1_0, - "1": v1_1_0, + "1.0.2": v1_0_2, + "1.0": v1_0_2, + "1.1.0": sarama.V1_1_0_0, + "1.1.1": v1_1_1, + "1.1": v1_1_1, + "1": v1_1_1, } ) diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index 4acd3a0ffb30..9e041845053d 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -1177,6 +1177,10 @@ output.elasticsearch: # default is gzip. #compression: gzip + # Set the compression level. Currently only gzip provides a compression level + # between 0 and 9. The default value is chosen by the compression algorithm. + #compression_level: 4 + # The maximum permitted size of JSON-encoded messages. Bigger messages will be # dropped. The default value is 1000000 (bytes). This value should be equal to # or less than the broker's message.max.bytes. diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index a63b78007b8c..031ee795441f 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -986,6 +986,10 @@ output.elasticsearch: # default is gzip. #compression: gzip + # Set the compression level. Currently only gzip provides a compression level + # between 0 and 9. The default value is chosen by the compression algorithm. + #compression_level: 4 + # The maximum permitted size of JSON-encoded messages. Bigger messages will be # dropped. The default value is 1000000 (bytes). This value should be equal to # or less than the broker's message.max.bytes. diff --git a/vendor/github.com/Shopify/sarama/CHANGELOG.md b/vendor/github.com/Shopify/sarama/CHANGELOG.md index 836841650c3d..16d5829c9953 100644 --- a/vendor/github.com/Shopify/sarama/CHANGELOG.md +++ b/vendor/github.com/Shopify/sarama/CHANGELOG.md @@ -1,5 +1,43 @@ # Changelog +#### Version 1.17.0 (2018-05-30) + +New Features: + - Add support for gzip compression levels + ([#1044](https://github.com/Shopify/sarama/pull/1044)). + - Add support for Metadata request/response pairs versions v1 to v5 + ([#1047](https://github.com/Shopify/sarama/pull/1047), + [#1069](https://github.com/Shopify/sarama/pull/1069)). + - Add versioning to JoinGroup request/response pairs + ([#1098](https://github.com/Shopify/sarama/pull/1098)) + - Add support for CreatePartitions, DeleteGroups, DeleteRecords request/response pairs + ([#1065](https://github.com/Shopify/sarama/pull/1065), + [#1096](https://github.com/Shopify/sarama/pull/1096), + [#1027](https://github.com/Shopify/sarama/pull/1027)). + - Add `Controller()` method to Client interface + ([#1063](https://github.com/Shopify/sarama/pull/1063)). + +Improvements: + - ConsumerMetadataReq/Resp has been migrated to FindCoordinatorReq/Resp + ([#1010](https://github.com/Shopify/sarama/pull/1010)). + - Expose missing protocol parts: `msgSet` and `recordBatch` + ([#1049](https://github.com/Shopify/sarama/pull/1049)). + - Add support for v1 DeleteTopics Request + ([#1052](https://github.com/Shopify/sarama/pull/1052)). + - Add support for Go 1.10 + ([#1064](https://github.com/Shopify/sarama/pull/1064)). + - Claim support for Kafka 1.1.0 + ([#1073](https://github.com/Shopify/sarama/pull/1073)). + +Bug Fixes: + - Fix FindCoordinatorResponse.encode to allow nil Coordinator + ([#1050](https://github.com/Shopify/sarama/pull/1050), + [#1051](https://github.com/Shopify/sarama/pull/1051)). + - Clear all metadata when we have the latest topic info + ([#1033](https://github.com/Shopify/sarama/pull/1033)). + - Make `PartitionConsumer.Close` idempotent + ([#1092](https://github.com/Shopify/sarama/pull/1092)). + #### Version 1.16.0 (2018-02-12) New Features: diff --git a/vendor/github.com/Shopify/sarama/LICENSE b/vendor/github.com/Shopify/sarama/LICENSE index 8121b63b1c4a..d2bf4352f4c9 100644 --- a/vendor/github.com/Shopify/sarama/LICENSE +++ b/vendor/github.com/Shopify/sarama/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2013 Evan Huus +Copyright (c) 2013 Shopify Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the diff --git a/vendor/github.com/Shopify/sarama/Makefile b/vendor/github.com/Shopify/sarama/Makefile index 58a39e4f34dd..b9a453dd29c6 100644 --- a/vendor/github.com/Shopify/sarama/Makefile +++ b/vendor/github.com/Shopify/sarama/Makefile @@ -4,7 +4,7 @@ default: fmt vet errcheck test test: echo "" > coverage.txt for d in `go list ./... | grep -v vendor`; do \ - go test -v -timeout 60s -race -coverprofile=profile.out -covermode=atomic $$d; \ + go test -p 1 -v -timeout 90s -race -coverprofile=profile.out -covermode=atomic $$d || exit 1; \ if [ -f profile.out ]; then \ cat profile.out >> coverage.txt; \ rm profile.out; \ @@ -14,8 +14,9 @@ test: vet: go vet ./... +# See https://github.com/kisielk/errcheck/pull/141 for details on ignorepkg errcheck: - errcheck github.com/Shopify/sarama/... + errcheck -ignorepkg fmt github.com/Shopify/sarama/... fmt: @if [ -n "$$(go fmt ./...)" ]; then echo 'Please run go fmt on your code.' && exit 1; fi diff --git a/vendor/github.com/Shopify/sarama/README.md b/vendor/github.com/Shopify/sarama/README.md index 28431f13eb8f..4fc0cc600f86 100644 --- a/vendor/github.com/Shopify/sarama/README.md +++ b/vendor/github.com/Shopify/sarama/README.md @@ -21,7 +21,7 @@ You might also want to look at the [Frequently Asked Questions](https://github.c Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest stable releases of Kafka and Go, and we provide a two month grace period for older releases. This means we currently officially support -Go 1.9 and 1.8, and Kafka 1.0 through 0.10, although older releases are +Go 1.8 through 1.10, and Kafka 0.11 through 1.1, although older releases are still likely to work. Sarama follows semantic versioning and provides API stability via the gopkg.in service. diff --git a/vendor/github.com/Shopify/sarama/broker.go b/vendor/github.com/Shopify/sarama/broker.go index b759f8f7841c..d836bee6d86a 100644 --- a/vendor/github.com/Shopify/sarama/broker.go +++ b/vendor/github.com/Shopify/sarama/broker.go @@ -18,6 +18,7 @@ import ( type Broker struct { id int32 addr string + rack *string conf *Config correlationID int32 @@ -230,6 +231,18 @@ func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*Consume return response, nil } +func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) { + response := new(FindCoordinatorResponse) + + err := b.sendAndReceive(request, response) + + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) { response := new(OffsetResponse) @@ -373,6 +386,17 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, return response, nil } +func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) { + response := new(CreatePartitionsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) { response := new(CreateTopicsResponse) @@ -395,6 +419,17 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon return response, nil } +func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) { + response := new(DeleteRecordsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) { response := new(DescribeAclsResponse) @@ -504,6 +539,17 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon return response, nil } + +func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) { + response := new(DeleteGroupsResponse) + + if err := b.sendAndReceive(request, response); err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() @@ -569,7 +615,7 @@ func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error { } } -func (b *Broker) decode(pd packetDecoder) (err error) { +func (b *Broker) decode(pd packetDecoder, version int16) (err error) { b.id, err = pd.getInt32() if err != nil { return err @@ -585,6 +631,13 @@ func (b *Broker) decode(pd packetDecoder) (err error) { return err } + if version >= 1 { + b.rack, err = pd.getNullableString() + if err != nil { + return err + } + } + b.addr = net.JoinHostPort(host, fmt.Sprint(port)) if _, _, err := net.SplitHostPort(b.addr); err != nil { return err @@ -593,7 +646,7 @@ func (b *Broker) decode(pd packetDecoder) (err error) { return nil } -func (b *Broker) encode(pe packetEncoder) (err error) { +func (b *Broker) encode(pe packetEncoder, version int16) (err error) { host, portstr, err := net.SplitHostPort(b.addr) if err != nil { @@ -613,6 +666,13 @@ func (b *Broker) encode(pe packetEncoder) (err error) { pe.putInt32(int32(port)) + if version >= 1 { + err = pe.putNullableString(b.rack) + if err != nil { + return err + } + } + return nil } diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go index 3dbfc4b06ffb..019cb43735ae 100644 --- a/vendor/github.com/Shopify/sarama/client.go +++ b/vendor/github.com/Shopify/sarama/client.go @@ -17,6 +17,9 @@ type Client interface { // altered after it has been created. Config() *Config + // Controller returns the cluster controller broker. + Controller() (*Broker, error) + // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []*Broker @@ -97,6 +100,7 @@ type client struct { seedBrokers []*Broker deadSeeds []*Broker + controllerID int32 // cluster controller broker id brokers map[int32]*Broker // maps broker ids to brokers metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs @@ -379,6 +383,27 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in return offset, err } +func (client *client) Controller() (*Broker, error) { + if client.Closed() { + return nil, ErrClosedClient + } + + controller := client.cachedController() + if controller == nil { + if err := client.refreshMetadata(); err != nil { + return nil, err + } + controller = client.cachedController() + } + + if controller == nil { + return nil, ErrControllerNotAvailable + } + + _ = controller.Open(client.conf) + return controller, nil +} + func (client *client) Coordinator(consumerGroup string) (*Broker, error) { if client.Closed() { return nil, ErrClosedClient @@ -607,20 +632,7 @@ func (client *client) backgroundMetadataUpdater() { for { select { case <-ticker.C: - topics := []string{} - if !client.conf.Metadata.Full { - if specificTopics, err := client.Topics(); err != nil { - Logger.Println("Client background metadata topic load:", err) - break - } else if len(specificTopics) == 0 { - Logger.Println("Client background metadata update: no specific topics to update") - break - } else { - topics = specificTopics - } - } - - if err := client.RefreshMetadata(topics...); err != nil { + if err := client.refreshMetadata(); err != nil { Logger.Println("Client background metadata update:", err) } case <-client.closer: @@ -629,6 +641,26 @@ func (client *client) backgroundMetadataUpdater() { } } +func (client *client) refreshMetadata() error { + topics := []string{} + + if !client.conf.Metadata.Full { + if specificTopics, err := client.Topics(); err != nil { + return err + } else if len(specificTopics) == 0 { + return ErrNoTopicsToUpdateMetadata + } else { + topics = specificTopics + } + } + + if err := client.RefreshMetadata(topics...); err != nil { + return err + } + + return nil +} + func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error { retry := func(err error) error { if attemptsRemaining > 0 { @@ -645,12 +677,18 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) } else { Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr) } - response, err := broker.GetMetadata(&MetadataRequest{Topics: topics}) + + req := &MetadataRequest{Topics: topics} + if client.conf.Version.IsAtLeast(V0_10_0_0) { + req.Version = 1 + } + response, err := broker.GetMetadata(req) switch err.(type) { case nil: + allKnownMetaData := len(topics) == 0 // valid response, use it - shouldRetry, err := client.updateMetadata(response) + shouldRetry, err := client.updateMetadata(response, allKnownMetaData) if shouldRetry { Logger.Println("client/metadata found some partitions to be leaderless") return retry(err) // note: err can be nil @@ -674,7 +712,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) } // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable -func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err error) { +func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) { client.lock.Lock() defer client.lock.Unlock() @@ -686,6 +724,12 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er client.registerBroker(broker) } + client.controllerID = data.ControllerID + + if allKnownMetaData { + client.metadata = make(map[string]map[int32]*PartitionMetadata) + client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32) + } for _, topic := range data.Topics { delete(client.metadata, topic.Name) delete(client.cachedPartitionsResults, topic.Name) @@ -735,8 +779,15 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker { return nil } -func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*ConsumerMetadataResponse, error) { - retry := func(err error) (*ConsumerMetadataResponse, error) { +func (client *client) cachedController() *Broker { + client.lock.RLock() + defer client.lock.RUnlock() + + return client.brokers[client.controllerID] +} + +func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) { + retry := func(err error) (*FindCoordinatorResponse, error) { if attemptsRemaining > 0 { Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining) time.Sleep(client.conf.Metadata.Retry.Backoff) @@ -748,10 +799,11 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin for broker := client.any(); broker != nil; broker = client.any() { Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr()) - request := new(ConsumerMetadataRequest) - request.ConsumerGroup = consumerGroup + request := new(FindCoordinatorRequest) + request.CoordinatorKey = consumerGroup + request.CoordinatorType = CoordinatorGroup - response, err := broker.GetConsumerMetadata(request) + response, err := broker.FindCoordinator(request) if err != nil { Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err) diff --git a/vendor/github.com/Shopify/sarama/config.go b/vendor/github.com/Shopify/sarama/config.go index 29ea5c2b36e0..a564b5c23e4b 100644 --- a/vendor/github.com/Shopify/sarama/config.go +++ b/vendor/github.com/Shopify/sarama/config.go @@ -1,7 +1,10 @@ package sarama import ( + "compress/gzip" "crypto/tls" + "fmt" + "io/ioutil" "regexp" "time" @@ -99,6 +102,10 @@ type Config struct { // The type of compression to use on messages (defaults to no compression). // Similar to `compression.codec` setting of the JVM producer. Compression CompressionCodec + // The level of compression to use on messages. The meaning depends + // on the actual compression type used and defaults to default compression + // level for the codec. + CompressionLevel int // Generates partitioners for choosing the partition to send messages to // (defaults to hashing the message key). Similar to the `partitioner.class` // setting for the JVM producer. @@ -290,6 +297,7 @@ func NewConfig() *Config { c.Producer.Retry.Max = 3 c.Producer.Retry.Backoff = 100 * time.Millisecond c.Producer.Return.Errors = true + c.Producer.CompressionLevel = CompressionLevelDefault c.Consumer.Fetch.Min = 1 c.Consumer.Fetch.Default = 1024 * 1024 @@ -302,7 +310,7 @@ func NewConfig() *Config { c.ClientID = defaultClientID c.ChannelBufferSize = 256 - c.Version = minVersion + c.Version = MinVersion c.MetricRegistry = metrics.NewRegistry() return c @@ -409,6 +417,14 @@ func (c *Config) Validate() error { return ConfigurationError("lz4 compression requires Version >= V0_10_0_0") } + if c.Producer.Compression == CompressionGZIP { + if c.Producer.CompressionLevel != CompressionLevelDefault { + if _, err := gzip.NewWriterLevel(ioutil.Discard, c.Producer.CompressionLevel); err != nil { + return ConfigurationError(fmt.Sprintf("gzip compression does not work with level %d: %v", c.Producer.CompressionLevel, err)) + } + } + } + // validate the Consumer values switch { case c.Consumer.Fetch.Min <= 0: diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go index 48d231cf9844..96226ac5bf52 100644 --- a/vendor/github.com/Shopify/sarama/consumer.go +++ b/vendor/github.com/Shopify/sarama/consumer.go @@ -310,6 +310,7 @@ type partitionConsumer struct { trigger, dying chan none responseResult error + closeOnce sync.Once fetchSize int32 offset int64 @@ -412,7 +413,9 @@ func (child *partitionConsumer) AsyncClose() { // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will // also just close itself) - close(child.dying) + child.closeOnce.Do(func() { + close(child.dying) + }) } func (child *partitionConsumer) Close() error { @@ -461,7 +464,6 @@ feederLoop: child.messages <- msg } child.broker.input <- child - expiryTicker.Stop() continue feederLoop } else { // current message has not been sent, return to select @@ -482,9 +484,6 @@ feederLoop: func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) { var messages []*ConsumerMessage - var incomplete bool - prelude := true - for _, msgBlock := range msgSet.Messages { for _, msg := range msgBlock.Messages() { offset := msg.Offset @@ -492,29 +491,22 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset offset += baseOffset } - if prelude && offset < child.offset { + if offset < child.offset { continue } - prelude = false - - if offset >= child.offset { - messages = append(messages, &ConsumerMessage{ - Topic: child.topic, - Partition: child.partition, - Key: msg.Msg.Key, - Value: msg.Msg.Value, - Offset: offset, - Timestamp: msg.Msg.Timestamp, - BlockTimestamp: msgBlock.Msg.Timestamp, - }) - child.offset = offset + 1 - } else { - incomplete = true - } + messages = append(messages, &ConsumerMessage{ + Topic: child.topic, + Partition: child.partition, + Key: msg.Msg.Key, + Value: msg.Msg.Value, + Offset: offset, + Timestamp: msg.Msg.Timestamp, + BlockTimestamp: msgBlock.Msg.Timestamp, + }) + child.offset = offset + 1 } } - - if incomplete || len(messages) == 0 { + if len(messages) == 0 { return nil, ErrIncompleteResponse } return messages, nil @@ -522,42 +514,25 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) { var messages []*ConsumerMessage - var incomplete bool - prelude := true - originalOffset := child.offset - for _, rec := range batch.Records { offset := batch.FirstOffset + rec.OffsetDelta - if prelude && offset < child.offset { + if offset < child.offset { continue } - prelude = false - - if offset >= child.offset { - messages = append(messages, &ConsumerMessage{ - Topic: child.topic, - Partition: child.partition, - Key: rec.Key, - Value: rec.Value, - Offset: offset, - Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta), - Headers: rec.Headers, - }) - child.offset = offset + 1 - } else { - incomplete = true - } - } - - if incomplete { + messages = append(messages, &ConsumerMessage{ + Topic: child.topic, + Partition: child.partition, + Key: rec.Key, + Value: rec.Value, + Offset: offset, + Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta), + Headers: rec.Headers, + }) + child.offset = offset + 1 + } + if len(messages) == 0 { return nil, ErrIncompleteResponse } - - child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1 - if child.offset <= originalOffset { - return nil, ErrConsumerOffsetNotAdvanced - } - return messages, nil } @@ -610,14 +585,14 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu switch records.recordsType { case legacyRecords: - messageSetMessages, err := child.parseMessages(records.msgSet) + messageSetMessages, err := child.parseMessages(records.MsgSet) if err != nil { return nil, err } messages = append(messages, messageSetMessages...) case defaultRecords: - recordBatchMessages, err := child.parseRecords(records.recordBatch) + recordBatchMessages, err := child.parseRecords(records.RecordBatch) if err != nil { return nil, err } diff --git a/vendor/github.com/Shopify/sarama/consumer_metadata_request.go b/vendor/github.com/Shopify/sarama/consumer_metadata_request.go index 483be3354df5..4de45e7bf50b 100644 --- a/vendor/github.com/Shopify/sarama/consumer_metadata_request.go +++ b/vendor/github.com/Shopify/sarama/consumer_metadata_request.go @@ -5,12 +5,19 @@ type ConsumerMetadataRequest struct { } func (r *ConsumerMetadataRequest) encode(pe packetEncoder) error { - return pe.putString(r.ConsumerGroup) + tmp := new(FindCoordinatorRequest) + tmp.CoordinatorKey = r.ConsumerGroup + tmp.CoordinatorType = CoordinatorGroup + return tmp.encode(pe) } func (r *ConsumerMetadataRequest) decode(pd packetDecoder, version int16) (err error) { - r.ConsumerGroup, err = pd.getString() - return err + tmp := new(FindCoordinatorRequest) + if err := tmp.decode(pd, version); err != nil { + return err + } + r.ConsumerGroup = tmp.CoordinatorKey + return nil } func (r *ConsumerMetadataRequest) key() int16 { diff --git a/vendor/github.com/Shopify/sarama/consumer_metadata_response.go b/vendor/github.com/Shopify/sarama/consumer_metadata_response.go index 6b9632bbafe6..442cbde7ac00 100644 --- a/vendor/github.com/Shopify/sarama/consumer_metadata_response.go +++ b/vendor/github.com/Shopify/sarama/consumer_metadata_response.go @@ -14,20 +14,18 @@ type ConsumerMetadataResponse struct { } func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) { - tmp, err := pd.getInt16() - if err != nil { - return err - } - r.Err = KError(tmp) + tmp := new(FindCoordinatorResponse) - coordinator := new(Broker) - if err := coordinator.decode(pd); err != nil { + if err := tmp.decode(pd, version); err != nil { return err } - if coordinator.addr == ":0" { + + r.Err = tmp.Err + + r.Coordinator = tmp.Coordinator + if tmp.Coordinator == nil { return nil } - r.Coordinator = coordinator // this can all go away in 2.0, but we have to fill in deprecated fields to maintain // backwards compatibility @@ -47,28 +45,22 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err } func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { - pe.putInt16(int16(r.Err)) - if r.Coordinator != nil { - host, portstr, err := net.SplitHostPort(r.Coordinator.Addr()) - if err != nil { - return err - } - port, err := strconv.ParseInt(portstr, 10, 32) - if err != nil { - return err - } - pe.putInt32(r.Coordinator.ID()) - if err := pe.putString(host); err != nil { - return err - } - pe.putInt32(int32(port)) - return nil + if r.Coordinator == nil { + r.Coordinator = new(Broker) + r.Coordinator.id = r.CoordinatorID + r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort))) + } + + tmp := &FindCoordinatorResponse{ + Version: 0, + Err: r.Err, + Coordinator: r.Coordinator, } - pe.putInt32(r.CoordinatorID) - if err := pe.putString(r.CoordinatorHost); err != nil { + + if err := tmp.encode(pe); err != nil { return err } - pe.putInt32(r.CoordinatorPort) + return nil } diff --git a/vendor/github.com/Shopify/sarama/delete_groups_request.go b/vendor/github.com/Shopify/sarama/delete_groups_request.go new file mode 100644 index 000000000000..305a324ac2d5 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/delete_groups_request.go @@ -0,0 +1,30 @@ +package sarama + +type DeleteGroupsRequest struct { + Groups []string +} + +func (r *DeleteGroupsRequest) encode(pe packetEncoder) error { + return pe.putStringArray(r.Groups) +} + +func (r *DeleteGroupsRequest) decode(pd packetDecoder, version int16) (err error) { + r.Groups, err = pd.getStringArray() + return +} + +func (r *DeleteGroupsRequest) key() int16 { + return 42 +} + +func (r *DeleteGroupsRequest) version() int16 { + return 0 +} + +func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion { + return V1_1_0_0 +} + +func (r *DeleteGroupsRequest) AddGroup(group string) { + r.Groups = append(r.Groups, group) +} diff --git a/vendor/github.com/Shopify/sarama/delete_groups_response.go b/vendor/github.com/Shopify/sarama/delete_groups_response.go new file mode 100644 index 000000000000..c067ebb42b03 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/delete_groups_response.go @@ -0,0 +1,70 @@ +package sarama + +import ( + "time" +) + +type DeleteGroupsResponse struct { + ThrottleTime time.Duration + GroupErrorCodes map[string]KError +} + +func (r *DeleteGroupsResponse) encode(pe packetEncoder) error { + pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) + + if err := pe.putArrayLength(len(r.GroupErrorCodes)); err != nil { + return err + } + for groupID, errorCode := range r.GroupErrorCodes { + if err := pe.putString(groupID); err != nil { + return err + } + pe.putInt16(int16(errorCode)) + } + + return nil +} + +func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + n, err := pd.getArrayLength() + if err != nil { + return err + } + if n == 0 { + return nil + } + + r.GroupErrorCodes = make(map[string]KError, n) + for i := 0; i < n; i++ { + groupID, err := pd.getString() + if err != nil { + return err + } + errorCode, err := pd.getInt16() + if err != nil { + return err + } + + r.GroupErrorCodes[groupID] = KError(errorCode) + } + + return nil +} + +func (r *DeleteGroupsResponse) key() int16 { + return 42 +} + +func (r *DeleteGroupsResponse) version() int16 { + return 0 +} + +func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion { + return V1_1_0_0 +} diff --git a/vendor/github.com/Shopify/sarama/delete_records_request.go b/vendor/github.com/Shopify/sarama/delete_records_request.go new file mode 100644 index 000000000000..93efafd4d0b8 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/delete_records_request.go @@ -0,0 +1,126 @@ +package sarama + +import ( + "sort" + "time" +) + +// request message format is: +// [topic] timeout(int32) +// where topic is: +// name(string) [partition] +// where partition is: +// id(int32) offset(int64) + +type DeleteRecordsRequest struct { + Topics map[string]*DeleteRecordsRequestTopic + Timeout time.Duration +} + +func (d *DeleteRecordsRequest) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(d.Topics)); err != nil { + return err + } + keys := make([]string, 0, len(d.Topics)) + for topic := range d.Topics { + keys = append(keys, topic) + } + sort.Strings(keys) + for _, topic := range keys { + if err := pe.putString(topic); err != nil { + return err + } + if err := d.Topics[topic].encode(pe); err != nil { + return err + } + } + pe.putInt32(int32(d.Timeout / time.Millisecond)) + + return nil +} + +func (d *DeleteRecordsRequest) decode(pd packetDecoder, version int16) error { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + d.Topics = make(map[string]*DeleteRecordsRequestTopic, n) + for i := 0; i < n; i++ { + topic, err := pd.getString() + if err != nil { + return err + } + details := new(DeleteRecordsRequestTopic) + if err = details.decode(pd, version); err != nil { + return err + } + d.Topics[topic] = details + } + } + + timeout, err := pd.getInt32() + if err != nil { + return err + } + d.Timeout = time.Duration(timeout) * time.Millisecond + + return nil +} + +func (d *DeleteRecordsRequest) key() int16 { + return 21 +} + +func (d *DeleteRecordsRequest) version() int16 { + return 0 +} + +func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion { + return V0_11_0_0 +} + +type DeleteRecordsRequestTopic struct { + PartitionOffsets map[int32]int64 // partition => offset +} + +func (t *DeleteRecordsRequestTopic) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(t.PartitionOffsets)); err != nil { + return err + } + keys := make([]int32, 0, len(t.PartitionOffsets)) + for partition := range t.PartitionOffsets { + keys = append(keys, partition) + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + for _, partition := range keys { + pe.putInt32(partition) + pe.putInt64(t.PartitionOffsets[partition]) + } + return nil +} + +func (t *DeleteRecordsRequestTopic) decode(pd packetDecoder, version int16) error { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + t.PartitionOffsets = make(map[int32]int64, n) + for i := 0; i < n; i++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + offset, err := pd.getInt64() + if err != nil { + return err + } + t.PartitionOffsets[partition] = offset + } + } + + return nil +} diff --git a/vendor/github.com/Shopify/sarama/delete_records_response.go b/vendor/github.com/Shopify/sarama/delete_records_response.go new file mode 100644 index 000000000000..733a58b6bc35 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/delete_records_response.go @@ -0,0 +1,158 @@ +package sarama + +import ( + "sort" + "time" +) + +// response message format is: +// throttleMs(int32) [topic] +// where topic is: +// name(string) [partition] +// where partition is: +// id(int32) low_watermark(int64) error_code(int16) + +type DeleteRecordsResponse struct { + Version int16 + ThrottleTime time.Duration + Topics map[string]*DeleteRecordsResponseTopic +} + +func (d *DeleteRecordsResponse) encode(pe packetEncoder) error { + pe.putInt32(int32(d.ThrottleTime / time.Millisecond)) + + if err := pe.putArrayLength(len(d.Topics)); err != nil { + return err + } + keys := make([]string, 0, len(d.Topics)) + for topic := range d.Topics { + keys = append(keys, topic) + } + sort.Strings(keys) + for _, topic := range keys { + if err := pe.putString(topic); err != nil { + return err + } + if err := d.Topics[topic].encode(pe); err != nil { + return err + } + } + return nil +} + +func (d *DeleteRecordsResponse) decode(pd packetDecoder, version int16) error { + d.Version = version + + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + d.Topics = make(map[string]*DeleteRecordsResponseTopic, n) + for i := 0; i < n; i++ { + topic, err := pd.getString() + if err != nil { + return err + } + details := new(DeleteRecordsResponseTopic) + if err = details.decode(pd, version); err != nil { + return err + } + d.Topics[topic] = details + } + } + + return nil +} + +func (d *DeleteRecordsResponse) key() int16 { + return 21 +} + +func (d *DeleteRecordsResponse) version() int16 { + return 0 +} + +func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion { + return V0_11_0_0 +} + +type DeleteRecordsResponseTopic struct { + Partitions map[int32]*DeleteRecordsResponsePartition +} + +func (t *DeleteRecordsResponseTopic) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(t.Partitions)); err != nil { + return err + } + keys := make([]int32, 0, len(t.Partitions)) + for partition := range t.Partitions { + keys = append(keys, partition) + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + for _, partition := range keys { + pe.putInt32(partition) + if err := t.Partitions[partition].encode(pe); err != nil { + return err + } + } + return nil +} + +func (t *DeleteRecordsResponseTopic) decode(pd packetDecoder, version int16) error { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + t.Partitions = make(map[int32]*DeleteRecordsResponsePartition, n) + for i := 0; i < n; i++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + details := new(DeleteRecordsResponsePartition) + if err = details.decode(pd, version); err != nil { + return err + } + t.Partitions[partition] = details + } + } + + return nil +} + +type DeleteRecordsResponsePartition struct { + LowWatermark int64 + Err KError +} + +func (t *DeleteRecordsResponsePartition) encode(pe packetEncoder) error { + pe.putInt64(t.LowWatermark) + pe.putInt16(int16(t.Err)) + return nil +} + +func (t *DeleteRecordsResponsePartition) decode(pd packetDecoder, version int16) error { + lowWatermark, err := pd.getInt64() + if err != nil { + return err + } + t.LowWatermark = lowWatermark + + kErr, err := pd.getInt16() + if err != nil { + return err + } + t.Err = KError(kErr) + + return nil +} diff --git a/vendor/github.com/Shopify/sarama/delete_topics_request.go b/vendor/github.com/Shopify/sarama/delete_topics_request.go index ed9089ea4789..911f67d31bab 100644 --- a/vendor/github.com/Shopify/sarama/delete_topics_request.go +++ b/vendor/github.com/Shopify/sarama/delete_topics_request.go @@ -3,6 +3,7 @@ package sarama import "time" type DeleteTopicsRequest struct { + Version int16 Topics []string Timeout time.Duration } @@ -25,6 +26,7 @@ func (d *DeleteTopicsRequest) decode(pd packetDecoder, version int16) (err error return err } d.Timeout = time.Duration(timeout) * time.Millisecond + d.Version = version return nil } @@ -33,9 +35,14 @@ func (d *DeleteTopicsRequest) key() int16 { } func (d *DeleteTopicsRequest) version() int16 { - return 0 + return d.Version } func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion { - return V0_10_1_0 + switch d.Version { + case 1: + return V0_11_0_0 + default: + return V0_10_1_0 + } } diff --git a/vendor/github.com/Shopify/sarama/errors.go b/vendor/github.com/Shopify/sarama/errors.go index 54f431a4a91e..c578ef5fb437 100644 --- a/vendor/github.com/Shopify/sarama/errors.go +++ b/vendor/github.com/Shopify/sarama/errors.go @@ -41,6 +41,14 @@ var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetc // a RecordBatch. var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch") +// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version +// is lower than 0.10.0.0. +var ErrControllerNotAvailable = errors.New("kafka: controller is not available") + +// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update +// the metadata. +var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata") + // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that. type PacketEncodingError struct { diff --git a/vendor/github.com/Shopify/sarama/fetch_request.go b/vendor/github.com/Shopify/sarama/fetch_request.go index 8c8e3a5afc83..462ab8afbb8e 100644 --- a/vendor/github.com/Shopify/sarama/fetch_request.go +++ b/vendor/github.com/Shopify/sarama/fetch_request.go @@ -149,7 +149,7 @@ func (r *FetchRequest) requiredVersion() KafkaVersion { case 4: return V0_11_0_0 default: - return minVersion + return MinVersion } } diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go index 0e81ad89f434..ae91bb9eb097 100644 --- a/vendor/github.com/Shopify/sarama/fetch_response.go +++ b/vendor/github.com/Shopify/sarama/fetch_response.go @@ -280,7 +280,7 @@ func (r *FetchResponse) requiredVersion() KafkaVersion { case 4: return V0_11_0_0 default: - return minVersion + return MinVersion } } @@ -353,7 +353,7 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc records := newLegacyRecords(&MessageSet{}) frb.RecordsSet = []*Records{&records} } - set := frb.RecordsSet[0].msgSet + set := frb.RecordsSet[0].MsgSet set.Messages = append(set.Messages, msgBlock) } @@ -365,7 +365,7 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco records := newDefaultRecords(&RecordBatch{Version: 2}) frb.RecordsSet = []*Records{&records} } - batch := frb.RecordsSet[0].recordBatch + batch := frb.RecordsSet[0].RecordBatch batch.addRecord(rec) } @@ -375,7 +375,7 @@ func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset records := newDefaultRecords(&RecordBatch{Version: 2}) frb.RecordsSet = []*Records{&records} } - batch := frb.RecordsSet[0].recordBatch + batch := frb.RecordsSet[0].RecordBatch batch.LastOffsetDelta = offset } diff --git a/vendor/github.com/Shopify/sarama/find_coordinator_request.go b/vendor/github.com/Shopify/sarama/find_coordinator_request.go new file mode 100644 index 000000000000..0ab5cb5ff575 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/find_coordinator_request.go @@ -0,0 +1,61 @@ +package sarama + +type CoordinatorType int8 + +const ( + CoordinatorGroup CoordinatorType = 0 + CoordinatorTransaction CoordinatorType = 1 +) + +type FindCoordinatorRequest struct { + Version int16 + CoordinatorKey string + CoordinatorType CoordinatorType +} + +func (f *FindCoordinatorRequest) encode(pe packetEncoder) error { + if err := pe.putString(f.CoordinatorKey); err != nil { + return err + } + + if f.Version >= 1 { + pe.putInt8(int8(f.CoordinatorType)) + } + + return nil +} + +func (f *FindCoordinatorRequest) decode(pd packetDecoder, version int16) (err error) { + if f.CoordinatorKey, err = pd.getString(); err != nil { + return err + } + + if version >= 1 { + f.Version = version + coordinatorType, err := pd.getInt8() + if err != nil { + return err + } + + f.CoordinatorType = CoordinatorType(coordinatorType) + } + + return nil +} + +func (f *FindCoordinatorRequest) key() int16 { + return 10 +} + +func (f *FindCoordinatorRequest) version() int16 { + return f.Version +} + +func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion { + switch f.Version { + case 1: + return V0_11_0_0 + default: + return V0_8_2_0 + } +} diff --git a/vendor/github.com/Shopify/sarama/find_coordinator_response.go b/vendor/github.com/Shopify/sarama/find_coordinator_response.go new file mode 100644 index 000000000000..9c900e8b7742 --- /dev/null +++ b/vendor/github.com/Shopify/sarama/find_coordinator_response.go @@ -0,0 +1,92 @@ +package sarama + +import ( + "time" +) + +var NoNode = &Broker{id: -1, addr: ":-1"} + +type FindCoordinatorResponse struct { + Version int16 + ThrottleTime time.Duration + Err KError + ErrMsg *string + Coordinator *Broker +} + +func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) { + if version >= 1 { + f.Version = version + + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + f.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + } + + tmp, err := pd.getInt16() + if err != nil { + return err + } + f.Err = KError(tmp) + + if version >= 1 { + if f.ErrMsg, err = pd.getNullableString(); err != nil { + return err + } + } + + coordinator := new(Broker) + // The version is hardcoded to 0, as version 1 of the Broker-decode + // contains the rack-field which is not present in the FindCoordinatorResponse. + if err := coordinator.decode(pd, 0); err != nil { + return err + } + if coordinator.addr == ":0" { + return nil + } + f.Coordinator = coordinator + + return nil +} + +func (f *FindCoordinatorResponse) encode(pe packetEncoder) error { + if f.Version >= 1 { + pe.putInt32(int32(f.ThrottleTime / time.Millisecond)) + } + + pe.putInt16(int16(f.Err)) + + if f.Version >= 1 { + if err := pe.putNullableString(f.ErrMsg); err != nil { + return err + } + } + + coordinator := f.Coordinator + if coordinator == nil { + coordinator = NoNode + } + if err := coordinator.encode(pe, 0); err != nil { + return err + } + return nil +} + +func (f *FindCoordinatorResponse) key() int16 { + return 10 +} + +func (f *FindCoordinatorResponse) version() int16 { + return f.Version +} + +func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion { + switch f.Version { + case 1: + return V0_11_0_0 + default: + return V0_8_2_0 + } +} diff --git a/vendor/github.com/Shopify/sarama/join_group_request.go b/vendor/github.com/Shopify/sarama/join_group_request.go index 3a7ba17122d3..97e9299ea1a3 100644 --- a/vendor/github.com/Shopify/sarama/join_group_request.go +++ b/vendor/github.com/Shopify/sarama/join_group_request.go @@ -25,8 +25,10 @@ func (p *GroupProtocol) encode(pe packetEncoder) (err error) { } type JoinGroupRequest struct { + Version int16 GroupId string SessionTimeout int32 + RebalanceTimeout int32 MemberId string ProtocolType string GroupProtocols map[string][]byte // deprecated; use OrderedGroupProtocols @@ -38,6 +40,9 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error { return err } pe.putInt32(r.SessionTimeout) + if r.Version >= 1 { + pe.putInt32(r.RebalanceTimeout) + } if err := pe.putString(r.MemberId); err != nil { return err } @@ -76,6 +81,8 @@ func (r *JoinGroupRequest) encode(pe packetEncoder) error { } func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.GroupId, err = pd.getString(); err != nil { return } @@ -84,6 +91,12 @@ func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) { return } + if version >= 1 { + if r.RebalanceTimeout, err = pd.getInt32(); err != nil { + return err + } + } + if r.MemberId, err = pd.getString(); err != nil { return } @@ -118,11 +131,18 @@ func (r *JoinGroupRequest) key() int16 { } func (r *JoinGroupRequest) version() int16 { - return 0 + return r.Version } func (r *JoinGroupRequest) requiredVersion() KafkaVersion { - return V0_9_0_0 + switch r.Version { + case 2: + return V0_11_0_0 + case 1: + return V0_10_1_0 + default: + return V0_9_0_0 + } } func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) { diff --git a/vendor/github.com/Shopify/sarama/join_group_response.go b/vendor/github.com/Shopify/sarama/join_group_response.go index 6d35fe36494e..5752acc8aeb7 100644 --- a/vendor/github.com/Shopify/sarama/join_group_response.go +++ b/vendor/github.com/Shopify/sarama/join_group_response.go @@ -1,6 +1,8 @@ package sarama type JoinGroupResponse struct { + Version int16 + ThrottleTime int32 Err KError GenerationId int32 GroupProtocol string @@ -22,6 +24,9 @@ func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata } func (r *JoinGroupResponse) encode(pe packetEncoder) error { + if r.Version >= 2 { + pe.putInt32(r.ThrottleTime) + } pe.putInt16(int16(r.Err)) pe.putInt32(r.GenerationId) @@ -53,6 +58,14 @@ func (r *JoinGroupResponse) encode(pe packetEncoder) error { } func (r *JoinGroupResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if version >= 2 { + if r.ThrottleTime, err = pd.getInt32(); err != nil { + return + } + } + kerr, err := pd.getInt16() if err != nil { return err @@ -107,9 +120,16 @@ func (r *JoinGroupResponse) key() int16 { } func (r *JoinGroupResponse) version() int16 { - return 0 + return r.Version } func (r *JoinGroupResponse) requiredVersion() KafkaVersion { - return V0_9_0_0 + switch r.Version { + case 2: + return V0_11_0_0 + case 1: + return V0_10_1_0 + default: + return V0_9_0_0 + } } diff --git a/vendor/github.com/Shopify/sarama/message.go b/vendor/github.com/Shopify/sarama/message.go index bd5650bbc07f..fecdbfdef758 100644 --- a/vendor/github.com/Shopify/sarama/message.go +++ b/vendor/github.com/Shopify/sarama/message.go @@ -24,13 +24,28 @@ const ( CompressionLZ4 CompressionCodec = 3 ) +func (cc CompressionCodec) String() string { + return []string{ + "none", + "gzip", + "snappy", + "lz4", + }[int(cc)] +} + +// CompressionLevelDefault is the constant to use in CompressionLevel +// to have the default compression level for any codec. The value is picked +// that we don't use any existing compression levels. +const CompressionLevelDefault = -1000 + type Message struct { - Codec CompressionCodec // codec used to compress the message contents - Key []byte // the message key, may be nil - Value []byte // the message contents - Set *MessageSet // the message set a message might wrap - Version int8 // v1 requires Kafka 0.10 - Timestamp time.Time // the timestamp of the message (version 1+ only) + Codec CompressionCodec // codec used to compress the message contents + CompressionLevel int // compression level + Key []byte // the message key, may be nil + Value []byte // the message contents + Set *MessageSet // the message set a message might wrap + Version int8 // v1 requires Kafka 0.10 + Timestamp time.Time // the timestamp of the message (version 1+ only) compressedCache []byte compressedSize int // used for computing the compression ratio metrics @@ -66,7 +81,15 @@ func (m *Message) encode(pe packetEncoder) error { payload = m.Value case CompressionGZIP: var buf bytes.Buffer - writer := gzip.NewWriter(&buf) + var writer *gzip.Writer + if m.CompressionLevel != CompressionLevelDefault { + writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel) + if err != nil { + return err + } + } else { + writer = gzip.NewWriter(&buf) + } if _, err = writer.Write(m.Value); err != nil { return err } diff --git a/vendor/github.com/Shopify/sarama/metadata_request.go b/vendor/github.com/Shopify/sarama/metadata_request.go index 9a26b55fd032..48adfa28cb93 100644 --- a/vendor/github.com/Shopify/sarama/metadata_request.go +++ b/vendor/github.com/Shopify/sarama/metadata_request.go @@ -1,40 +1,65 @@ package sarama type MetadataRequest struct { - Topics []string + Version int16 + Topics []string + AllowAutoTopicCreation bool } func (r *MetadataRequest) encode(pe packetEncoder) error { - err := pe.putArrayLength(len(r.Topics)) - if err != nil { - return err + if r.Version < 0 || r.Version > 5 { + return PacketEncodingError{"invalid or unsupported MetadataRequest version field"} } - - for i := range r.Topics { - err = pe.putString(r.Topics[i]) + if r.Version == 0 || r.Topics != nil || len(r.Topics) > 0 { + err := pe.putArrayLength(len(r.Topics)) if err != nil { return err } + + for i := range r.Topics { + err = pe.putString(r.Topics[i]) + if err != nil { + return err + } + } + } else { + pe.putInt32(-1) + } + if r.Version > 3 { + pe.putBool(r.AllowAutoTopicCreation) } return nil } func (r *MetadataRequest) decode(pd packetDecoder, version int16) error { - topicCount, err := pd.getArrayLength() + r.Version = version + size, err := pd.getInt32() if err != nil { return err } - if topicCount == 0 { + if size < 0 { return nil - } + } else { + topicCount := size + if topicCount == 0 { + return nil + } - r.Topics = make([]string, topicCount) - for i := range r.Topics { - topic, err := pd.getString() + r.Topics = make([]string, topicCount) + for i := range r.Topics { + topic, err := pd.getString() + if err != nil { + return err + } + r.Topics[i] = topic + } + } + if r.Version > 3 { + autoCreation, err := pd.getBool() if err != nil { return err } - r.Topics[i] = topic + r.AllowAutoTopicCreation = autoCreation } return nil } @@ -44,9 +69,20 @@ func (r *MetadataRequest) key() int16 { } func (r *MetadataRequest) version() int16 { - return 0 + return r.Version } func (r *MetadataRequest) requiredVersion() KafkaVersion { - return minVersion + switch r.Version { + case 1: + return V0_10_0_0 + case 2: + return V0_10_1_0 + case 3, 4: + return V0_11_0_0 + case 5: + return V1_0_0_0 + default: + return MinVersion + } } diff --git a/vendor/github.com/Shopify/sarama/metadata_response.go b/vendor/github.com/Shopify/sarama/metadata_response.go index f9d6a4271edc..bf8a67bbc524 100644 --- a/vendor/github.com/Shopify/sarama/metadata_response.go +++ b/vendor/github.com/Shopify/sarama/metadata_response.go @@ -1,14 +1,15 @@ package sarama type PartitionMetadata struct { - Err KError - ID int32 - Leader int32 - Replicas []int32 - Isr []int32 + Err KError + ID int32 + Leader int32 + Replicas []int32 + Isr []int32 + OfflineReplicas []int32 } -func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) { +func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err @@ -35,10 +36,17 @@ func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) { return err } + if version >= 5 { + pm.OfflineReplicas, err = pd.getInt32Array() + if err != nil { + return err + } + } + return nil } -func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) { +func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) { pe.putInt16(int16(pm.Err)) pe.putInt32(pm.ID) pe.putInt32(pm.Leader) @@ -53,16 +61,24 @@ func (pm *PartitionMetadata) encode(pe packetEncoder) (err error) { return err } + if version >= 5 { + err = pe.putInt32Array(pm.OfflineReplicas) + if err != nil { + return err + } + } + return nil } type TopicMetadata struct { Err KError Name string + IsInternal bool // Only valid for Version >= 1 Partitions []*PartitionMetadata } -func (tm *TopicMetadata) decode(pd packetDecoder) (err error) { +func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) { tmp, err := pd.getInt16() if err != nil { return err @@ -74,6 +90,13 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) { return err } + if version >= 1 { + tm.IsInternal, err = pd.getBool() + if err != nil { + return err + } + } + n, err := pd.getArrayLength() if err != nil { return err @@ -81,7 +104,7 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) { tm.Partitions = make([]*PartitionMetadata, n) for i := 0; i < n; i++ { tm.Partitions[i] = new(PartitionMetadata) - err = tm.Partitions[i].decode(pd) + err = tm.Partitions[i].decode(pd, version) if err != nil { return err } @@ -90,7 +113,7 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) { return nil } -func (tm *TopicMetadata) encode(pe packetEncoder) (err error) { +func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) { pe.putInt16(int16(tm.Err)) err = pe.putString(tm.Name) @@ -98,13 +121,17 @@ func (tm *TopicMetadata) encode(pe packetEncoder) (err error) { return err } + if version >= 1 { + pe.putBool(tm.IsInternal) + } + err = pe.putArrayLength(len(tm.Partitions)) if err != nil { return err } for _, pm := range tm.Partitions { - err = pm.encode(pe) + err = pm.encode(pe, version) if err != nil { return err } @@ -114,11 +141,24 @@ func (tm *TopicMetadata) encode(pe packetEncoder) (err error) { } type MetadataResponse struct { - Brokers []*Broker - Topics []*TopicMetadata + Version int16 + ThrottleTimeMs int32 + Brokers []*Broker + ClusterID *string + ControllerID int32 + Topics []*TopicMetadata } func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if version >= 3 { + r.ThrottleTimeMs, err = pd.getInt32() + if err != nil { + return err + } + } + n, err := pd.getArrayLength() if err != nil { return err @@ -127,12 +167,28 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { r.Brokers = make([]*Broker, n) for i := 0; i < n; i++ { r.Brokers[i] = new(Broker) - err = r.Brokers[i].decode(pd) + err = r.Brokers[i].decode(pd, version) + if err != nil { + return err + } + } + + if version >= 2 { + r.ClusterID, err = pd.getNullableString() if err != nil { return err } } + if version >= 1 { + r.ControllerID, err = pd.getInt32() + if err != nil { + return err + } + } else { + r.ControllerID = -1 + } + n, err = pd.getArrayLength() if err != nil { return err @@ -141,7 +197,7 @@ func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) { r.Topics = make([]*TopicMetadata, n) for i := 0; i < n; i++ { r.Topics[i] = new(TopicMetadata) - err = r.Topics[i].decode(pd) + err = r.Topics[i].decode(pd, version) if err != nil { return err } @@ -156,18 +212,22 @@ func (r *MetadataResponse) encode(pe packetEncoder) error { return err } for _, broker := range r.Brokers { - err = broker.encode(pe) + err = broker.encode(pe, r.Version) if err != nil { return err } } + if r.Version >= 1 { + pe.putInt32(r.ControllerID) + } + err = pe.putArrayLength(len(r.Topics)) if err != nil { return err } for _, tm := range r.Topics { - err = tm.encode(pe) + err = tm.encode(pe, r.Version) if err != nil { return err } @@ -181,11 +241,22 @@ func (r *MetadataResponse) key() int16 { } func (r *MetadataResponse) version() int16 { - return 0 + return r.Version } func (r *MetadataResponse) requiredVersion() KafkaVersion { - return minVersion + switch r.Version { + case 1: + return V0_10_0_0 + case 2: + return V0_10_1_0 + case 3, 4: + return V0_11_0_0 + case 5: + return V1_0_0_0 + default: + return MinVersion + } } // testing API diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go index f79a9d5e9b42..5541d32ec69c 100644 --- a/vendor/github.com/Shopify/sarama/mockresponses.go +++ b/vendor/github.com/Shopify/sarama/mockresponses.go @@ -68,9 +68,10 @@ func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) { // MockMetadataResponse is a `MetadataResponse` builder. type MockMetadataResponse struct { - leaders map[string]map[int32]int32 - brokers map[string]int32 - t TestReporter + controllerID int32 + leaders map[string]map[int32]int32 + brokers map[string]int32 + t TestReporter } func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse { @@ -96,9 +97,17 @@ func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMet return mmr } +func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse { + mmr.controllerID = brokerID + return mmr +} + func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder { metadataRequest := reqBody.(*MetadataRequest) - metadataResponse := &MetadataResponse{} + metadataResponse := &MetadataResponse{ + Version: metadataRequest.version(), + ControllerID: mmr.controllerID, + } for addr, brokerID := range mmr.brokers { metadataResponse.AddBroker(addr, brokerID) } @@ -326,6 +335,60 @@ func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder { return res } +// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder. +type MockFindCoordinatorResponse struct { + groupCoordinators map[string]interface{} + transCoordinators map[string]interface{} + t TestReporter +} + +func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse { + return &MockFindCoordinatorResponse{ + groupCoordinators: make(map[string]interface{}), + transCoordinators: make(map[string]interface{}), + t: t, + } +} + +func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse { + switch coordinatorType { + case CoordinatorGroup: + mr.groupCoordinators[group] = broker + case CoordinatorTransaction: + mr.transCoordinators[group] = broker + } + return mr +} + +func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse { + switch coordinatorType { + case CoordinatorGroup: + mr.groupCoordinators[group] = kerror + case CoordinatorTransaction: + mr.transCoordinators[group] = kerror + } + return mr +} + +func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder { + req := reqBody.(*FindCoordinatorRequest) + res := &FindCoordinatorResponse{} + var v interface{} + switch req.CoordinatorType { + case CoordinatorGroup: + v = mr.groupCoordinators[req.CoordinatorKey] + case CoordinatorTransaction: + v = mr.transCoordinators[req.CoordinatorKey] + } + switch v := v.(type) { + case *MockBroker: + res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()} + case KError: + res.Err = v + } + return res +} + // MockOffsetCommitResponse is a `OffsetCommitResponse` builder. type MockOffsetCommitResponse struct { errors map[string]map[string]map[int32]KError diff --git a/vendor/github.com/Shopify/sarama/offset_commit_request.go b/vendor/github.com/Shopify/sarama/offset_commit_request.go index b21ea634b024..37e99fbf5b86 100644 --- a/vendor/github.com/Shopify/sarama/offset_commit_request.go +++ b/vendor/github.com/Shopify/sarama/offset_commit_request.go @@ -1,5 +1,7 @@ package sarama +import "errors" + // ReceiveTime is a special value for the timestamp field of Offset Commit Requests which // tells the broker to set the timestamp to the time at which the request was received. // The timestamp is only used if message version 1 is used, which requires kafka 0.8.2. @@ -173,7 +175,7 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion { case 2: return V0_9_0_0 default: - return minVersion + return MinVersion } } @@ -188,3 +190,15 @@ func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset i r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata} } + +func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) { + partitions := r.blocks[topic] + if partitions == nil { + return 0, "", errors.New("No such offset") + } + block := partitions[partitionID] + if block == nil { + return 0, "", errors.New("No such offset") + } + return block.offset, block.metadata, nil +} diff --git a/vendor/github.com/Shopify/sarama/offset_commit_response.go b/vendor/github.com/Shopify/sarama/offset_commit_response.go index 7f277e7753a1..a4b18acdff29 100644 --- a/vendor/github.com/Shopify/sarama/offset_commit_response.go +++ b/vendor/github.com/Shopify/sarama/offset_commit_response.go @@ -81,5 +81,5 @@ func (r *OffsetCommitResponse) version() int16 { } func (r *OffsetCommitResponse) requiredVersion() KafkaVersion { - return minVersion + return MinVersion } diff --git a/vendor/github.com/Shopify/sarama/offset_fetch_request.go b/vendor/github.com/Shopify/sarama/offset_fetch_request.go index b19fe79ba7aa..5a05014b481f 100644 --- a/vendor/github.com/Shopify/sarama/offset_fetch_request.go +++ b/vendor/github.com/Shopify/sarama/offset_fetch_request.go @@ -68,7 +68,7 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion { case 1: return V0_8_2_0 default: - return minVersion + return MinVersion } } diff --git a/vendor/github.com/Shopify/sarama/offset_fetch_response.go b/vendor/github.com/Shopify/sarama/offset_fetch_response.go index 323220eac976..11e4b1f3fdfe 100644 --- a/vendor/github.com/Shopify/sarama/offset_fetch_response.go +++ b/vendor/github.com/Shopify/sarama/offset_fetch_response.go @@ -115,7 +115,7 @@ func (r *OffsetFetchResponse) version() int16 { } func (r *OffsetFetchResponse) requiredVersion() KafkaVersion { - return minVersion + return MinVersion } func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock { diff --git a/vendor/github.com/Shopify/sarama/offset_request.go b/vendor/github.com/Shopify/sarama/offset_request.go index a14f71828dfb..687fef284a2a 100644 --- a/vendor/github.com/Shopify/sarama/offset_request.go +++ b/vendor/github.com/Shopify/sarama/offset_request.go @@ -117,7 +117,7 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion { case 1: return V0_10_1_0 default: - return minVersion + return MinVersion } } diff --git a/vendor/github.com/Shopify/sarama/offset_response.go b/vendor/github.com/Shopify/sarama/offset_response.go index 9a9cfe96f3ba..8b2193f9a0bf 100644 --- a/vendor/github.com/Shopify/sarama/offset_response.go +++ b/vendor/github.com/Shopify/sarama/offset_response.go @@ -155,7 +155,7 @@ func (r *OffsetResponse) requiredVersion() KafkaVersion { case 1: return V0_10_1_0 default: - return minVersion + return MinVersion } } diff --git a/vendor/github.com/Shopify/sarama/produce_request.go b/vendor/github.com/Shopify/sarama/produce_request.go index 0ec4d8d53f97..0c755d02b646 100644 --- a/vendor/github.com/Shopify/sarama/produce_request.go +++ b/vendor/github.com/Shopify/sarama/produce_request.go @@ -113,9 +113,9 @@ func (r *ProduceRequest) encode(pe packetEncoder) error { } if metricRegistry != nil { if r.Version >= 3 { - topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric) + topicRecordCount += updateBatchMetrics(records.RecordBatch, compressionRatioMetric, topicCompressionRatioMetric) } else { - topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric) + topicRecordCount += updateMsgSetMetrics(records.MsgSet, compressionRatioMetric, topicCompressionRatioMetric) } batchSize := int64(pe.offset() - startOffset) batchSizeMetric.Update(batchSize) @@ -215,7 +215,7 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion { case 3: return V0_11_0_0 default: - return minVersion + return MinVersion } } @@ -231,7 +231,7 @@ func (r *ProduceRequest) ensureRecords(topic string, partition int32) { func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { r.ensureRecords(topic, partition) - set := r.records[topic][partition].msgSet + set := r.records[topic][partition].MsgSet if set == nil { set = new(MessageSet) diff --git a/vendor/github.com/Shopify/sarama/produce_response.go b/vendor/github.com/Shopify/sarama/produce_response.go index 043c40f87723..667e34c661b5 100644 --- a/vendor/github.com/Shopify/sarama/produce_response.go +++ b/vendor/github.com/Shopify/sarama/produce_response.go @@ -152,7 +152,7 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion { case 3: return V0_11_0_0 default: - return minVersion + return MinVersion } } diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go index 3cbaeb5f90ea..13be2b3c92b1 100644 --- a/vendor/github.com/Shopify/sarama/produce_set.go +++ b/vendor/github.com/Shopify/sarama/produce_set.go @@ -59,10 +59,11 @@ func (ps *produceSet) add(msg *ProducerMessage) error { if set == nil { if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { batch := &RecordBatch{ - FirstTimestamp: timestamp, - Version: 2, - ProducerID: -1, /* No producer id */ - Codec: ps.parent.conf.Producer.Compression, + FirstTimestamp: timestamp, + Version: 2, + ProducerID: -1, /* No producer id */ + Codec: ps.parent.conf.Producer.Compression, + CompressionLevel: ps.parent.conf.Producer.CompressionLevel, } set = &partitionSet{recordsToSend: newDefaultRecords(batch)} size = recordBatchOverhead @@ -79,7 +80,7 @@ func (ps *produceSet) add(msg *ProducerMessage) error { rec := &Record{ Key: key, Value: val, - TimestampDelta: timestamp.Sub(set.recordsToSend.recordBatch.FirstTimestamp), + TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp), } size += len(key) + len(val) if len(msg.Headers) > 0 { @@ -89,14 +90,14 @@ func (ps *produceSet) add(msg *ProducerMessage) error { size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32 } } - set.recordsToSend.recordBatch.addRecord(rec) + set.recordsToSend.RecordBatch.addRecord(rec) } else { msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val} if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { msgToSend.Timestamp = timestamp msgToSend.Version = 1 } - set.recordsToSend.msgSet.addMessage(msgToSend) + set.recordsToSend.MsgSet.addMessage(msgToSend) size = producerMessageOverhead + len(key) + len(val) } @@ -122,7 +123,14 @@ func (ps *produceSet) buildRequest() *ProduceRequest { for topic, partitionSet := range ps.msgs { for partition, set := range partitionSet { if req.Version >= 3 { - rb := set.recordsToSend.recordBatch + // If the API version we're hitting is 3 or greater, we need to calculate + // offsets for each record in the batch relative to FirstOffset. + // Additionally, we must set LastOffsetDelta to the value of the last offset + // in the batch. Since the OffsetDelta of the first record is 0, we know that the + // final record of any batch will have an offset of (# of records in batch) - 1. + // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets + // under the RecordBatch section for details.) + rb := set.recordsToSend.RecordBatch if len(rb.Records) > 0 { rb.LastOffsetDelta = int32(len(rb.Records) - 1) for i, record := range rb.Records { @@ -134,7 +142,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { continue } if ps.parent.conf.Producer.Compression == CompressionNone { - req.AddSet(topic, partition, set.recordsToSend.msgSet) + req.AddSet(topic, partition, set.recordsToSend.MsgSet) } else { // When compression is enabled, the entire set for each partition is compressed // and sent as the payload of a single fake "message" with the appropriate codec @@ -147,24 +155,25 @@ func (ps *produceSet) buildRequest() *ProduceRequest { // recompressing the message set. // (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets // for details on relative offsets.) - for i, msg := range set.recordsToSend.msgSet.Messages { + for i, msg := range set.recordsToSend.MsgSet.Messages { msg.Offset = int64(i) } } - payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry) + payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry) if err != nil { Logger.Println(err) // if this happens, it's basically our fault. panic(err) } compMsg := &Message{ - Codec: ps.parent.conf.Producer.Compression, - Key: nil, - Value: payload, - Set: set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics + Codec: ps.parent.conf.Producer.Compression, + CompressionLevel: ps.parent.conf.Producer.CompressionLevel, + Key: nil, + Value: payload, + Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics } if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) { compMsg.Version = 1 - compMsg.Timestamp = set.recordsToSend.msgSet.Messages[0].Msg.Timestamp + compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp } req.AddMessage(topic, partition, compMsg) } diff --git a/vendor/github.com/Shopify/sarama/record_batch.go b/vendor/github.com/Shopify/sarama/record_batch.go index 321de485b0db..845318aa3417 100644 --- a/vendor/github.com/Shopify/sarama/record_batch.go +++ b/vendor/github.com/Shopify/sarama/record_batch.go @@ -40,6 +40,7 @@ type RecordBatch struct { PartitionLeaderEpoch int32 Version int8 Codec CompressionCodec + CompressionLevel int Control bool LastOffsetDelta int32 FirstTimestamp time.Time @@ -219,7 +220,15 @@ func (b *RecordBatch) encodeRecords(pe packetEncoder) error { b.compressedRecords = raw case CompressionGZIP: var buf bytes.Buffer - writer := gzip.NewWriter(&buf) + var writer *gzip.Writer + if b.CompressionLevel != CompressionLevelDefault { + writer, err = gzip.NewWriterLevel(&buf, b.CompressionLevel) + if err != nil { + return err + } + } else { + writer = gzip.NewWriter(&buf) + } if _, err := writer.Write(raw); err != nil { return err } diff --git a/vendor/github.com/Shopify/sarama/records.go b/vendor/github.com/Shopify/sarama/records.go index 258dcbac880a..301055bb070c 100644 --- a/vendor/github.com/Shopify/sarama/records.go +++ b/vendor/github.com/Shopify/sarama/records.go @@ -14,30 +14,30 @@ const ( // Records implements a union type containing either a RecordBatch or a legacy MessageSet. type Records struct { recordsType int - msgSet *MessageSet - recordBatch *RecordBatch + MsgSet *MessageSet + RecordBatch *RecordBatch } func newLegacyRecords(msgSet *MessageSet) Records { - return Records{recordsType: legacyRecords, msgSet: msgSet} + return Records{recordsType: legacyRecords, MsgSet: msgSet} } func newDefaultRecords(batch *RecordBatch) Records { - return Records{recordsType: defaultRecords, recordBatch: batch} + return Records{recordsType: defaultRecords, RecordBatch: batch} } -// setTypeFromFields sets type of Records depending on which of msgSet or recordBatch is not nil. +// setTypeFromFields sets type of Records depending on which of MsgSet or RecordBatch is not nil. // The first return value indicates whether both fields are nil (and the type is not set). // If both fields are not nil, it returns an error. func (r *Records) setTypeFromFields() (bool, error) { - if r.msgSet == nil && r.recordBatch == nil { + if r.MsgSet == nil && r.RecordBatch == nil { return true, nil } - if r.msgSet != nil && r.recordBatch != nil { - return false, fmt.Errorf("both msgSet and recordBatch are set, but record type is unknown") + if r.MsgSet != nil && r.RecordBatch != nil { + return false, fmt.Errorf("both MsgSet and RecordBatch are set, but record type is unknown") } r.recordsType = defaultRecords - if r.msgSet != nil { + if r.MsgSet != nil { r.recordsType = legacyRecords } return false, nil @@ -52,15 +52,15 @@ func (r *Records) encode(pe packetEncoder) error { switch r.recordsType { case legacyRecords: - if r.msgSet == nil { + if r.MsgSet == nil { return nil } - return r.msgSet.encode(pe) + return r.MsgSet.encode(pe) case defaultRecords: - if r.recordBatch == nil { + if r.RecordBatch == nil { return nil } - return r.recordBatch.encode(pe) + return r.RecordBatch.encode(pe) } return fmt.Errorf("unknown records type: %v", r.recordsType) @@ -89,11 +89,11 @@ func (r *Records) decode(pd packetDecoder) error { switch r.recordsType { case legacyRecords: - r.msgSet = &MessageSet{} - return r.msgSet.decode(pd) + r.MsgSet = &MessageSet{} + return r.MsgSet.decode(pd) case defaultRecords: - r.recordBatch = &RecordBatch{} - return r.recordBatch.decode(pd) + r.RecordBatch = &RecordBatch{} + return r.RecordBatch.decode(pd) } return fmt.Errorf("unknown records type: %v", r.recordsType) } @@ -107,15 +107,15 @@ func (r *Records) numRecords() (int, error) { switch r.recordsType { case legacyRecords: - if r.msgSet == nil { + if r.MsgSet == nil { return 0, nil } - return len(r.msgSet.Messages), nil + return len(r.MsgSet.Messages), nil case defaultRecords: - if r.recordBatch == nil { + if r.RecordBatch == nil { return 0, nil } - return len(r.recordBatch.Records), nil + return len(r.RecordBatch.Records), nil } return 0, fmt.Errorf("unknown records type: %v", r.recordsType) } @@ -131,15 +131,15 @@ func (r *Records) isPartial() (bool, error) { case unknownRecords: return false, nil case legacyRecords: - if r.msgSet == nil { + if r.MsgSet == nil { return false, nil } - return r.msgSet.PartialTrailingMessage, nil + return r.MsgSet.PartialTrailingMessage, nil case defaultRecords: - if r.recordBatch == nil { + if r.RecordBatch == nil { return false, nil } - return r.recordBatch.PartialTrailingRecord, nil + return r.RecordBatch.PartialTrailingRecord, nil } return false, fmt.Errorf("unknown records type: %v", r.recordsType) } @@ -155,10 +155,10 @@ func (r *Records) isControl() (bool, error) { case legacyRecords: return false, nil case defaultRecords: - if r.recordBatch == nil { + if r.RecordBatch == nil { return false, nil } - return r.recordBatch.Control, nil + return r.RecordBatch.Control, nil } return false, fmt.Errorf("unknown records type: %v", r.recordsType) } diff --git a/vendor/github.com/Shopify/sarama/request.go b/vendor/github.com/Shopify/sarama/request.go index 5f7cb76e95b4..4d211a14f173 100644 --- a/vendor/github.com/Shopify/sarama/request.go +++ b/vendor/github.com/Shopify/sarama/request.go @@ -97,7 +97,7 @@ func allocateBody(key, version int16) protocolBody { case 9: return &OffsetFetchRequest{} case 10: - return &ConsumerMetadataRequest{} + return &FindCoordinatorRequest{} case 11: return &JoinGroupRequest{} case 12: @@ -118,6 +118,8 @@ func allocateBody(key, version int16) protocolBody { return &CreateTopicsRequest{} case 20: return &DeleteTopicsRequest{} + case 21: + return &DeleteRecordsRequest{} case 22: return &InitProducerIDRequest{} case 24: @@ -140,6 +142,8 @@ func allocateBody(key, version int16) protocolBody { return &AlterConfigsRequest{} case 37: return &CreatePartitionsRequest{} + case 42: + return &DeleteGroupsRequest{} } return nil } diff --git a/vendor/github.com/Shopify/sarama/utils.go b/vendor/github.com/Shopify/sarama/utils.go index 9d7b60f16148..702e22627015 100644 --- a/vendor/github.com/Shopify/sarama/utils.go +++ b/vendor/github.com/Shopify/sarama/utils.go @@ -139,21 +139,49 @@ func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool { // Effective constants defining the supported kafka versions. var ( - V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) - V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) - V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) - V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) - V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) - V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) - V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) - V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) - V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) - V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) - V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) - minVersion = V0_8_2_0 + V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) + V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) + V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) + V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) + V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) + V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) + V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) + V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) + V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) + V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) + V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) + V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) + V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) + V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) + V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) + V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) + + SupportedVersions = []KafkaVersion{ + V0_8_2_0, + V0_8_2_1, + V0_8_2_2, + V0_9_0_0, + V0_9_0_1, + V0_10_0_0, + V0_10_0_1, + V0_10_1_0, + V0_10_1_1, + V0_10_2_0, + V0_10_2_1, + V0_11_0_0, + V0_11_0_1, + V0_11_0_2, + V1_0_0_0, + V1_1_0_0, + } + MinVersion = V0_8_2_0 + MaxVersion = V1_1_0_0 ) func ParseKafkaVersion(s string) (KafkaVersion, error) { + if len(s) < 5 { + return MinVersion, fmt.Errorf("invalid version `%s`", s) + } var major, minor, veryMinor, patch uint var err error if s[0] == '0' { @@ -162,7 +190,7 @@ func ParseKafkaVersion(s string) (KafkaVersion, error) { err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor}) } if err != nil { - return minVersion, err + return MinVersion, err } return newKafkaVersion(major, minor, veryMinor, patch), nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index 5221b45453f2..ada0662cb4c6 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -17,14 +17,14 @@ "versionExact": "v1.2.2" }, { - "checksumSHA1": "pH1jOw5Kfigc2tteo6KlaU9+JE8=", + "checksumSHA1": "xSwVjXDGIMoADDte4hBjra6ldGk=", "origin": "github.com/urso/sarama", "path": "github.com/Shopify/sarama", - "revision": "32b4ad5c9537ed14e471779b76713ff65420db39", + "revision": "d1575e4abe04acbbe8ac766320585cdf271dd189", "revisionTime": "2016-11-23T00:27:23Z", "tree": true, - "version": "v1.16.0/enh/offset-replica-id", - "versionExact": "v1.16.0/enh/offset-replica-id" + "version": "v1.17.0/enh/offset-replica-id", + "versionExact": "v1.17.0/enh/offset-replica-id" }, { "checksumSHA1": "DYv6Q1+VfnUVxMwvk5IshAClLvw=", diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 563748360c26..c4ad07387e86 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -538,6 +538,10 @@ output.elasticsearch: # default is gzip. #compression: gzip + # Set the compression level. Currently only gzip provides a compression level + # between 0 and 9. The default value is chosen by the compression algorithm. + #compression_level: 4 + # The maximum permitted size of JSON-encoded messages. Bigger messages will be # dropped. The default value is 1000000 (bytes). This value should be equal to # or less than the broker's message.max.bytes.