From 210ce7eee7d962c3db5fc23787daaddc65a292a1 Mon Sep 17 00:00:00 2001 From: simitt Date: Wed, 20 Feb 2019 11:20:02 +0100 Subject: [PATCH 1/3] Use IndexPrefix for kafka and logstash output. Output index differed to Elasticsearch output. fixes #10839 --- libbeat/outputs/kafka/kafka.go | 2 +- libbeat/outputs/logstash/logstash.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index dc34e77c558..6b27013ff41 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -107,7 +107,7 @@ func makeKafka( return outputs.Fail(err) } - client, err := newKafkaClient(observer, hosts, beat.Beat, config.Key, topic, codec, libCfg) + client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, codec, libCfg) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index 0c14bf5882b..b62c75e8b62 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -46,7 +46,7 @@ func makeLogstash( cfg *common.Config, ) (outputs.Group, error) { if !cfg.HasField("index") { - cfg.SetString("index", -1, beat.Beat) + cfg.SetString("index", -1, beat.IndexPrefix) } err := cfgwarn.CheckRemoved6xSettings(cfg, "port") From a4507a54733c2e78787d700475bfdfc0d035102e Mon Sep 17 00:00:00 2001 From: simitt Date: Thu, 21 Feb 2019 16:21:40 +0100 Subject: [PATCH 2/3] minor refactor and adding tests --- libbeat/outputs/kafka/config.go | 8 ++ libbeat/outputs/kafka/config_test.go | 13 +-- libbeat/outputs/kafka/kafka.go | 6 +- .../outputs/kafka/kafka_integration_test.go | 3 +- libbeat/outputs/logstash/async_test.go | 2 +- libbeat/outputs/logstash/config.go | 54 ++++++++---- libbeat/outputs/logstash/config_test.go | 83 +++++++++++++++++++ libbeat/outputs/logstash/logstash.go | 12 +-- libbeat/outputs/logstash/sync_test.go | 2 +- libbeat/outputs/logstash/window_test.go | 2 +- 10 files changed, 141 insertions(+), 44 deletions(-) create mode 100644 libbeat/outputs/logstash/config_test.go diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 9cf81d34261..48a46491f91 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -106,6 +106,14 @@ func defaultConfig() kafkaConfig { } } +func readConfig(cfg *common.Config) (*kafkaConfig, error) { + c := defaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return &c, nil +} + func (c *kafkaConfig) Validate() error { if len(c.Hosts) == 0 { return errors.New("no hosts configured") diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go index 262da4f9c05..dd74b50ed3d 100644 --- a/libbeat/outputs/kafka/config_test.go +++ b/libbeat/outputs/kafka/config_test.go @@ -39,18 +39,13 @@ func TestConfigAcceptValid(t *testing.T) { for name, test := range tests { test := test t.Run(name, func(t *testing.T) { - c, err := common.NewConfigFrom(test) + c := common.MustNewConfigFrom(test) + c.SetString("hosts", 0, "localhost") + cfg, err := readConfig(c) if err != nil { t.Fatalf("Can not create test configuration: %v", err) } - c.SetString("hosts", 0, "localhost") - - cfg := defaultConfig() - if err := c.Unpack(&cfg); err != nil { - t.Fatalf("Unpacking configuration failed: %v", err) - } - - if _, err := newSaramaConfig(&cfg); err != nil { + if _, err := newSaramaConfig(cfg); err != nil { t.Fatalf("Failure creating sarama config: %v", err) } }) diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go index 6b27013ff41..f96b372698f 100644 --- a/libbeat/outputs/kafka/kafka.go +++ b/libbeat/outputs/kafka/kafka.go @@ -77,8 +77,8 @@ func makeKafka( ) (outputs.Group, error) { debugf("initialize kafka output") - config := defaultConfig() - if err := cfg.Unpack(&config); err != nil { + config, err := readConfig(cfg) + if err != nil { return outputs.Fail(err) } @@ -92,7 +92,7 @@ func makeKafka( return outputs.Fail(err) } - libCfg, err := newSaramaConfig(&config) + libCfg, err := newSaramaConfig(config) if err != nil { return outputs.Fail(err) } diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 4d85003a719..7230335612d 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -199,7 +199,7 @@ func TestKafkaPublish(t *testing.T) { } t.Run(name, func(t *testing.T) { - grp, err := makeKafka(nil, beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), cfg) + grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg) if err != nil { t.Fatal(err) } @@ -208,6 +208,7 @@ func TestKafkaPublish(t *testing.T) { if err := output.Connect(); err != nil { t.Fatal(err) } + assert.Equal(t, output.index, "testbeat") defer output.Close() // publish test events diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go index d02c3d348e8..b99fb9a5749 100644 --- a/libbeat/outputs/logstash/async_test.go +++ b/libbeat/outputs/logstash/async_test.go @@ -50,7 +50,7 @@ func TestAsyncStructuredEvent(t *testing.T) { } func makeAsyncTestClient(conn *transport.Client) testClientDriver { - config := defaultConfig + config := defaultConfig() config.Timeout = 1 * time.Second config.Pipelining = 3 client, err := newAsyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config) diff --git a/libbeat/outputs/logstash/config.go b/libbeat/outputs/logstash/config.go index 2b8d7687e55..598413f5814 100644 --- a/libbeat/outputs/logstash/config.go +++ b/libbeat/outputs/logstash/config.go @@ -20,6 +20,10 @@ package logstash import ( "time" + "github.com/elastic/beats/libbeat/beat" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/outputs/transport" ) @@ -45,23 +49,39 @@ type Backoff struct { Max time.Duration } -var defaultConfig = Config{ - LoadBalance: false, - Pipelining: 2, - BulkMaxSize: 2048, - SlowStart: false, - CompressionLevel: 3, - Timeout: 30 * time.Second, - MaxRetries: 3, - TTL: 0 * time.Second, - Backoff: Backoff{ - Init: 1 * time.Second, - Max: 60 * time.Second, - }, - EscapeHTML: false, +func defaultConfig() Config { + return Config{ + LoadBalance: false, + Pipelining: 2, + BulkMaxSize: 2048, + SlowStart: false, + CompressionLevel: 3, + Timeout: 30 * time.Second, + MaxRetries: 3, + TTL: 0 * time.Second, + Backoff: Backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + EscapeHTML: false, + } } -func newConfig() *Config { - c := defaultConfig - return &c +func readConfig(cfg *common.Config, info beat.Info) (*Config, error) { + c := defaultConfig() + + err := cfgwarn.CheckRemoved6xSettings(cfg, "port") + if err != nil { + return nil, err + } + + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + + if c.Index == "" { + c.Index = info.IndexPrefix + } + + return &c, nil } diff --git a/libbeat/outputs/logstash/config_test.go b/libbeat/outputs/logstash/config_test.go new file mode 100644 index 00000000000..d820d9971b4 --- /dev/null +++ b/libbeat/outputs/logstash/config_test.go @@ -0,0 +1,83 @@ +package logstash + +import ( + "testing" + "time" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + + "github.com/stretchr/testify/assert" +) + +func TestConfig(t *testing.T) { + + info := beat.Info{Beat: "testbeat", Name: "foo", IndexPrefix: "bar"} + for name, test := range map[string]struct { + config *common.Config + expectedConfig *Config + err bool + }{ + "default config": { + config: common.MustNewConfigFrom([]byte(`{ }`)), + expectedConfig: &Config{ + LoadBalance: false, + Pipelining: 2, + BulkMaxSize: 2048, + SlowStart: false, + CompressionLevel: 3, + Timeout: 30 * time.Second, + MaxRetries: 3, + TTL: 0 * time.Second, + Backoff: Backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + EscapeHTML: false, + Index: "bar", + }, + }, + "config given": { + config: common.MustNewConfigFrom(common.MapStr{ + "index": "beat-index", + "loadbalance": true, + "bulk_max_size": 1024, + "slow_start": false, + }), + expectedConfig: &Config{ + LoadBalance: true, + BulkMaxSize: 1024, + Pipelining: 2, + SlowStart: false, + CompressionLevel: 3, + Timeout: 30 * time.Second, + MaxRetries: 3, + TTL: 0 * time.Second, + Backoff: Backoff{ + Init: 1 * time.Second, + Max: 60 * time.Second, + }, + EscapeHTML: false, + Index: "beat-index", + }, + }, + "removed config setting": { + config: common.MustNewConfigFrom(common.MapStr{ + "port": "8080", + }), + expectedConfig: nil, + err: true, + }, + } { + t.Run(name, func(t *testing.T) { + cfg, err := readConfig(test.config, info) + if test.err { + assert.Error(t, err) + assert.Nil(t, cfg) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expectedConfig, cfg) + } + }) + } +} diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go index b62c75e8b62..d1a64a47b26 100644 --- a/libbeat/outputs/logstash/logstash.go +++ b/libbeat/outputs/logstash/logstash.go @@ -20,7 +20,6 @@ package logstash import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" @@ -45,20 +44,11 @@ func makeLogstash( observer outputs.Observer, cfg *common.Config, ) (outputs.Group, error) { - if !cfg.HasField("index") { - cfg.SetString("index", -1, beat.IndexPrefix) - } - - err := cfgwarn.CheckRemoved6xSettings(cfg, "port") + config, err := readConfig(cfg, beat) if err != nil { return outputs.Fail(err) } - config := newConfig() - if err := cfg.Unpack(config); err != nil { - return outputs.Fail(err) - } - hosts, err := outputs.ReadHostList(cfg) if err != nil { return outputs.Fail(err) diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index dc48309211d..f9d74cb69cb 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -63,7 +63,7 @@ func newClientServerTCP(t *testing.T, to time.Duration) *clientServer { } func makeTestClient(conn *transport.Client) testClientDriver { - config := defaultConfig + config := defaultConfig() config.Timeout = 1 * time.Second config.TTL = 5 * time.Second client, err := newSyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config) diff --git a/libbeat/outputs/logstash/window_test.go b/libbeat/outputs/logstash/window_test.go index 773f9c374df..ab5e64f8508 100644 --- a/libbeat/outputs/logstash/window_test.go +++ b/libbeat/outputs/logstash/window_test.go @@ -30,7 +30,7 @@ func TestShrinkWindowSizeNeverZero(t *testing.T) { windowSize := 124 var w window - w.init(windowSize, defaultConfig.BulkMaxSize) + w.init(windowSize, defaultConfig().BulkMaxSize) w.windowSize = int32(windowSize) for i := 0; i < 100; i++ { From c7097109e2832f2f7dba7b8f4f093a701956c2ad Mon Sep 17 00:00:00 2001 From: simitt Date: Mon, 25 Feb 2019 10:02:40 +0100 Subject: [PATCH 3/3] run make fmt --- libbeat/outputs/logstash/config_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/libbeat/outputs/logstash/config_test.go b/libbeat/outputs/logstash/config_test.go index d820d9971b4..ee3ffe17978 100644 --- a/libbeat/outputs/logstash/config_test.go +++ b/libbeat/outputs/logstash/config_test.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package logstash import (