From 250cf3e3ae7397f8413d1981522a04d4f2453447 Mon Sep 17 00:00:00 2001
From: Silvia Mitter <silvia.mitter@elastic.co>
Date: Mon, 25 Feb 2019 14:31:46 +0100
Subject: [PATCH 1/2] Use IndexPrefix for kafka and logstash output. (#10841)

Output index differed to Elasticsearch output.

fixes #10839
---
 libbeat/outputs/kafka/config.go               |   8 ++
 libbeat/outputs/kafka/config_test.go          |  13 +--
 libbeat/outputs/kafka/kafka.go                |   8 +-
 .../outputs/kafka/kafka_integration_test.go   |   3 +-
 libbeat/outputs/logstash/async_test.go        |   2 +-
 libbeat/outputs/logstash/config.go            |  54 +++++++---
 libbeat/outputs/logstash/config_test.go       | 100 ++++++++++++++++++
 libbeat/outputs/logstash/logstash.go          |  12 +--
 libbeat/outputs/logstash/sync_test.go         |   2 +-
 libbeat/outputs/logstash/window_test.go       |   2 +-
 10 files changed, 159 insertions(+), 45 deletions(-)
 create mode 100644 libbeat/outputs/logstash/config_test.go

diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go
index 9cf81d342611..48a46491f918 100644
--- a/libbeat/outputs/kafka/config.go
+++ b/libbeat/outputs/kafka/config.go
@@ -106,6 +106,14 @@ func defaultConfig() kafkaConfig {
 	}
 }
 
+func readConfig(cfg *common.Config) (*kafkaConfig, error) {
+	c := defaultConfig()
+	if err := cfg.Unpack(&c); err != nil {
+		return nil, err
+	}
+	return &c, nil
+}
+
 func (c *kafkaConfig) Validate() error {
 	if len(c.Hosts) == 0 {
 		return errors.New("no hosts configured")
diff --git a/libbeat/outputs/kafka/config_test.go b/libbeat/outputs/kafka/config_test.go
index 262da4f9c056..dd74b50ed3d7 100644
--- a/libbeat/outputs/kafka/config_test.go
+++ b/libbeat/outputs/kafka/config_test.go
@@ -39,18 +39,13 @@ func TestConfigAcceptValid(t *testing.T) {
 	for name, test := range tests {
 		test := test
 		t.Run(name, func(t *testing.T) {
-			c, err := common.NewConfigFrom(test)
+			c := common.MustNewConfigFrom(test)
+			c.SetString("hosts", 0, "localhost")
+			cfg, err := readConfig(c)
 			if err != nil {
 				t.Fatalf("Can not create test configuration: %v", err)
 			}
-			c.SetString("hosts", 0, "localhost")
-
-			cfg := defaultConfig()
-			if err := c.Unpack(&cfg); err != nil {
-				t.Fatalf("Unpacking configuration failed: %v", err)
-			}
-
-			if _, err := newSaramaConfig(&cfg); err != nil {
+			if _, err := newSaramaConfig(cfg); err != nil {
 				t.Fatalf("Failure creating sarama config: %v", err)
 			}
 		})
diff --git a/libbeat/outputs/kafka/kafka.go b/libbeat/outputs/kafka/kafka.go
index dc34e77c5581..f96b372698f8 100644
--- a/libbeat/outputs/kafka/kafka.go
+++ b/libbeat/outputs/kafka/kafka.go
@@ -77,8 +77,8 @@ func makeKafka(
 ) (outputs.Group, error) {
 	debugf("initialize kafka output")
 
-	config := defaultConfig()
-	if err := cfg.Unpack(&config); err != nil {
+	config, err := readConfig(cfg)
+	if err != nil {
 		return outputs.Fail(err)
 	}
 
@@ -92,7 +92,7 @@ func makeKafka(
 		return outputs.Fail(err)
 	}
 
-	libCfg, err := newSaramaConfig(&config)
+	libCfg, err := newSaramaConfig(config)
 	if err != nil {
 		return outputs.Fail(err)
 	}
@@ -107,7 +107,7 @@ func makeKafka(
 		return outputs.Fail(err)
 	}
 
-	client, err := newKafkaClient(observer, hosts, beat.Beat, config.Key, topic, codec, libCfg)
+	client, err := newKafkaClient(observer, hosts, beat.IndexPrefix, config.Key, topic, codec, libCfg)
 	if err != nil {
 		return outputs.Fail(err)
 	}
diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go
index 4d85003a719a..7230335612de 100644
--- a/libbeat/outputs/kafka/kafka_integration_test.go
+++ b/libbeat/outputs/kafka/kafka_integration_test.go
@@ -199,7 +199,7 @@ func TestKafkaPublish(t *testing.T) {
 		}
 
 		t.Run(name, func(t *testing.T) {
-			grp, err := makeKafka(nil, beat.Info{Beat: "libbeat"}, outputs.NewNilObserver(), cfg)
+			grp, err := makeKafka(nil, beat.Info{Beat: "libbeat", IndexPrefix: "testbeat"}, outputs.NewNilObserver(), cfg)
 			if err != nil {
 				t.Fatal(err)
 			}
@@ -208,6 +208,7 @@ func TestKafkaPublish(t *testing.T) {
 			if err := output.Connect(); err != nil {
 				t.Fatal(err)
 			}
+			assert.Equal(t, output.index, "testbeat")
 			defer output.Close()
 
 			// publish test events
diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go
index d02c3d348e88..b99fb9a57491 100644
--- a/libbeat/outputs/logstash/async_test.go
+++ b/libbeat/outputs/logstash/async_test.go
@@ -50,7 +50,7 @@ func TestAsyncStructuredEvent(t *testing.T) {
 }
 
 func makeAsyncTestClient(conn *transport.Client) testClientDriver {
-	config := defaultConfig
+	config := defaultConfig()
 	config.Timeout = 1 * time.Second
 	config.Pipelining = 3
 	client, err := newAsyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config)
diff --git a/libbeat/outputs/logstash/config.go b/libbeat/outputs/logstash/config.go
index 2b8d7687e555..598413f5814f 100644
--- a/libbeat/outputs/logstash/config.go
+++ b/libbeat/outputs/logstash/config.go
@@ -20,6 +20,10 @@ package logstash
 import (
 	"time"
 
+	"github.com/elastic/beats/libbeat/beat"
+
+	"github.com/elastic/beats/libbeat/common"
+	"github.com/elastic/beats/libbeat/common/cfgwarn"
 	"github.com/elastic/beats/libbeat/common/transport/tlscommon"
 	"github.com/elastic/beats/libbeat/outputs/transport"
 )
@@ -45,23 +49,39 @@ type Backoff struct {
 	Max  time.Duration
 }
 
-var defaultConfig = Config{
-	LoadBalance:      false,
-	Pipelining:       2,
-	BulkMaxSize:      2048,
-	SlowStart:        false,
-	CompressionLevel: 3,
-	Timeout:          30 * time.Second,
-	MaxRetries:       3,
-	TTL:              0 * time.Second,
-	Backoff: Backoff{
-		Init: 1 * time.Second,
-		Max:  60 * time.Second,
-	},
-	EscapeHTML: false,
+func defaultConfig() Config {
+	return Config{
+		LoadBalance:      false,
+		Pipelining:       2,
+		BulkMaxSize:      2048,
+		SlowStart:        false,
+		CompressionLevel: 3,
+		Timeout:          30 * time.Second,
+		MaxRetries:       3,
+		TTL:              0 * time.Second,
+		Backoff: Backoff{
+			Init: 1 * time.Second,
+			Max:  60 * time.Second,
+		},
+		EscapeHTML: false,
+	}
 }
 
-func newConfig() *Config {
-	c := defaultConfig
-	return &c
+func readConfig(cfg *common.Config, info beat.Info) (*Config, error) {
+	c := defaultConfig()
+
+	err := cfgwarn.CheckRemoved6xSettings(cfg, "port")
+	if err != nil {
+		return nil, err
+	}
+
+	if err := cfg.Unpack(&c); err != nil {
+		return nil, err
+	}
+
+	if c.Index == "" {
+		c.Index = info.IndexPrefix
+	}
+
+	return &c, nil
 }
diff --git a/libbeat/outputs/logstash/config_test.go b/libbeat/outputs/logstash/config_test.go
new file mode 100644
index 000000000000..ee3ffe179786
--- /dev/null
+++ b/libbeat/outputs/logstash/config_test.go
@@ -0,0 +1,100 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package logstash
+
+import (
+	"testing"
+	"time"
+
+	"github.com/elastic/beats/libbeat/beat"
+	"github.com/elastic/beats/libbeat/common"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestConfig(t *testing.T) {
+
+	info := beat.Info{Beat: "testbeat", Name: "foo", IndexPrefix: "bar"}
+	for name, test := range map[string]struct {
+		config         *common.Config
+		expectedConfig *Config
+		err            bool
+	}{
+		"default config": {
+			config: common.MustNewConfigFrom([]byte(`{ }`)),
+			expectedConfig: &Config{
+				LoadBalance:      false,
+				Pipelining:       2,
+				BulkMaxSize:      2048,
+				SlowStart:        false,
+				CompressionLevel: 3,
+				Timeout:          30 * time.Second,
+				MaxRetries:       3,
+				TTL:              0 * time.Second,
+				Backoff: Backoff{
+					Init: 1 * time.Second,
+					Max:  60 * time.Second,
+				},
+				EscapeHTML: false,
+				Index:      "bar",
+			},
+		},
+		"config given": {
+			config: common.MustNewConfigFrom(common.MapStr{
+				"index":         "beat-index",
+				"loadbalance":   true,
+				"bulk_max_size": 1024,
+				"slow_start":    false,
+			}),
+			expectedConfig: &Config{
+				LoadBalance:      true,
+				BulkMaxSize:      1024,
+				Pipelining:       2,
+				SlowStart:        false,
+				CompressionLevel: 3,
+				Timeout:          30 * time.Second,
+				MaxRetries:       3,
+				TTL:              0 * time.Second,
+				Backoff: Backoff{
+					Init: 1 * time.Second,
+					Max:  60 * time.Second,
+				},
+				EscapeHTML: false,
+				Index:      "beat-index",
+			},
+		},
+		"removed config setting": {
+			config: common.MustNewConfigFrom(common.MapStr{
+				"port": "8080",
+			}),
+			expectedConfig: nil,
+			err:            true,
+		},
+	} {
+		t.Run(name, func(t *testing.T) {
+			cfg, err := readConfig(test.config, info)
+			if test.err {
+				assert.Error(t, err)
+				assert.Nil(t, cfg)
+			} else {
+				assert.NoError(t, err)
+				assert.Equal(t, test.expectedConfig, cfg)
+			}
+		})
+	}
+}
diff --git a/libbeat/outputs/logstash/logstash.go b/libbeat/outputs/logstash/logstash.go
index 0c14bf5882bb..d1a64a47b269 100644
--- a/libbeat/outputs/logstash/logstash.go
+++ b/libbeat/outputs/logstash/logstash.go
@@ -20,7 +20,6 @@ package logstash
 import (
 	"github.com/elastic/beats/libbeat/beat"
 	"github.com/elastic/beats/libbeat/common"
-	"github.com/elastic/beats/libbeat/common/cfgwarn"
 	"github.com/elastic/beats/libbeat/common/transport/tlscommon"
 	"github.com/elastic/beats/libbeat/logp"
 	"github.com/elastic/beats/libbeat/outputs"
@@ -45,20 +44,11 @@ func makeLogstash(
 	observer outputs.Observer,
 	cfg *common.Config,
 ) (outputs.Group, error) {
-	if !cfg.HasField("index") {
-		cfg.SetString("index", -1, beat.Beat)
-	}
-
-	err := cfgwarn.CheckRemoved6xSettings(cfg, "port")
+	config, err := readConfig(cfg, beat)
 	if err != nil {
 		return outputs.Fail(err)
 	}
 
-	config := newConfig()
-	if err := cfg.Unpack(config); err != nil {
-		return outputs.Fail(err)
-	}
-
 	hosts, err := outputs.ReadHostList(cfg)
 	if err != nil {
 		return outputs.Fail(err)
diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go
index dc48309211dd..f9d74cb69cba 100644
--- a/libbeat/outputs/logstash/sync_test.go
+++ b/libbeat/outputs/logstash/sync_test.go
@@ -63,7 +63,7 @@ func newClientServerTCP(t *testing.T, to time.Duration) *clientServer {
 }
 
 func makeTestClient(conn *transport.Client) testClientDriver {
-	config := defaultConfig
+	config := defaultConfig()
 	config.Timeout = 1 * time.Second
 	config.TTL = 5 * time.Second
 	client, err := newSyncClient(beat.Info{}, conn, outputs.NewNilObserver(), &config)
diff --git a/libbeat/outputs/logstash/window_test.go b/libbeat/outputs/logstash/window_test.go
index 773f9c374df9..ab5e64f85087 100644
--- a/libbeat/outputs/logstash/window_test.go
+++ b/libbeat/outputs/logstash/window_test.go
@@ -30,7 +30,7 @@ func TestShrinkWindowSizeNeverZero(t *testing.T) {
 
 	windowSize := 124
 	var w window
-	w.init(windowSize, defaultConfig.BulkMaxSize)
+	w.init(windowSize, defaultConfig().BulkMaxSize)
 
 	w.windowSize = int32(windowSize)
 	for i := 0; i < 100; i++ {

From ea868d7bf8eb3249673ea0c8b43b3ee6339b8b87 Mon Sep 17 00:00:00 2001
From: simitt <silvia.mitter@elastic.co>
Date: Thu, 28 Feb 2019 10:21:50 +0100
Subject: [PATCH 2/2] Add changelog entry for aligning default index #10841.

---
 CHANGELOG-developer.next.asciidoc | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc
index 1637fc63b24f..d441555a460f 100644
--- a/CHANGELOG-developer.next.asciidoc
+++ b/CHANGELOG-developer.next.asciidoc
@@ -25,6 +25,7 @@ The list below covers the major changes between 7.0.0-beta1 and master only.
 - Remove support for deprecated `GenRootCmd` methods. {pull}10721[10721]
 
 ==== Bugfixes
+- Align default index between elasticsearch and logstash and kafka output. {pull}10841[10841]
 
 ==== Added