Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Renamed redis.index option to redis.key
Browse files Browse the repository at this point in the history
The "index" setting name has historical reasons, from the times that
all outputs shared the same configuration options. Since the meaning and
contents of the "index" setting are starting to vary per output, it's time
to clean this up. The Redis "index" is now called "key".

Before this change, the index was still somehow special, since it was set to
the beatName outside of the output. This removes this hack, and sets index to
beatName only in the Elasticsearch and Logstash modules.

This also removes the `index` setting for the file output, but that was never
documented.

Part of #2074.
Tudor Golubenco committed Jul 21, 2016
1 parent 2e761cd commit 245dd0a
Showing 10 changed files with 88 additions and 40 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
@@ -13,9 +13,12 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha4...master[Check the HEAD d
==== Breaking changes

*Affecting all Beats*

- Rename the `filters` section to `processors`. {pull}1944[1944]
- Introduce the condition with `when` in the processor configuration. {pull}1949[1949]
- The Elasticsearch template is now loaded by default. {pull}1993[1993]
- The Redis output `index` setting is renamed to `key`. `index` still works but it's deprecated. {pull}2077[2077]
- The undocumented file output `index` setting was removed. Use `filename` instead. {pull}2077[2077]

*Metricbeat*

11 changes: 9 additions & 2 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
@@ -631,8 +631,8 @@ output.redis:
# password is set.
password: "my_password"
# Optional index name. The default is {beatname_lc} and generates {beatname_lc} keys.
index: "{beatname_lc}"
# Optional key name. The default is {beatname_lc}
key: "{beatname_lc}"
# Optional Redis database number where the events are stored
# The default is 0.
@@ -668,6 +668,13 @@ The Redis port to use if `hosts` does not contain a port number. The default is

===== index

deprecated[5.0.0-alpha5,The `index` setting is renamed to `key.]

The name of the Redis list or channel the events are published to. The default is
"{beatname_lc}".

===== key

The name of the Redis list or channel the events are published to. The default is
"{beatname_lc}".

4 changes: 4 additions & 0 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,10 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu
cfg.SetInt("bulk_max_size", -1, defaultBulkSize)
}

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
}

output := &elasticsearchOutput{beatName: beatName}
err := output.init(cfg, topologyExpire)
if err != nil {
5 changes: 0 additions & 5 deletions libbeat/outputs/fileout/config.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ import (
)

type config struct {
Index string `config:"index"`
Path string `config:"path"`
Filename string `config:"filename"`
RotateEveryKb int `config:"rotate_every_kb" validate:"min=1"`
@@ -22,10 +21,6 @@ var (
)

func (c *config) Validate() error {
if c.Filename == "" && c.Index == "" {
return fmt.Errorf("File logging requires filename or index being set.")
}

if c.NumberOfFiles < 2 || c.NumberOfFiles > logp.RotatorMaxFiles {
return fmt.Errorf("The number_of_files to keep should be between 2 and %v",
logp.RotatorMaxFiles)
5 changes: 5 additions & 0 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
@@ -47,6 +47,11 @@ func init() {
}

func new(beatName string, cfg *common.Config, _ int) (outputs.Outputer, error) {

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
}

output := &logstash{}
if err := output.init(cfg); err != nil {
return nil, err
4 changes: 0 additions & 4 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
@@ -76,10 +76,6 @@ func InitOutputs(
continue
}

if !config.HasField("index") {
config.SetString("index", -1, beatName)
}

output, err := plugin(beatName, config, topologyExpire)
if err != nil {
logp.Err("failed to initialize %s plugin as output: %s", name, err)
12 changes: 10 additions & 2 deletions libbeat/outputs/redis/config.go
Original file line number Diff line number Diff line change
@@ -5,13 +5,15 @@ import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/transport"
)

type redisConfig struct {
Password string `config:"password"`
Index string `config:"index"`
Key string `config:"key"`
Port int `config:"port"`
LoadBalance bool `config:"loadbalance"`
Timeout time.Duration `config:"timeout"`
@@ -49,8 +51,14 @@ func (c *redisConfig) Validate() error {
return fmt.Errorf("redis data type %v not supported", c.DataType)
}

if c.Index == "" {
return errors.New("index required")
if c.Key != "" && c.Index != "" {
return errors.New("Cannot use both `output.redis.key` and `output.redis.index` configuration options." +
" Set only `output.redis.key`")
}

if c.Key == "" && c.Index != "" {
c.Key = c.Index
logp.Warn("The `output.redis.index` configuration setting is deprecated. Use `output.redis.key` instead.")
}

return nil
30 changes: 30 additions & 0 deletions libbeat/outputs/redis/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package redis

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestValidate(t *testing.T) {
type io struct {
Name string
Input redisConfig
Valid bool
}

tests := []io{
io{"No config", redisConfig{Key: "", Index: ""}, true},
io{"Only key", redisConfig{Key: "test", Index: ""}, true},
io{"Only index", redisConfig{Key: "", Index: "test"}, true},
io{"Both", redisConfig{Key: "test", Index: "test"}, false},

io{"Invalid Datatype", redisConfig{Key: "test", DataType: "something"}, false},
io{"List Datatype", redisConfig{Key: "test", DataType: "list"}, true},
io{"Channel Datatype", redisConfig{Key: "test", DataType: "channel"}, true},
}

for _, test := range tests {
assert.Equal(t, test.Input.Validate() == nil, test.Valid, test.Name)
}
}
12 changes: 6 additions & 6 deletions libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ package redis
import (
"errors"
"expvar"
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
@@ -18,6 +17,7 @@ import (
type redisOut struct {
mode mode.ConnectionMode
topology
beatName string
}

var debugf = logp.MakeDebug("redis")
@@ -40,7 +40,7 @@ func init() {
}

func new(beatName string, cfg *common.Config, expireTopo int) (outputs.Outputer, error) {
r := &redisOut{}
r := &redisOut{beatName: beatName}
if err := r.init(cfg, expireTopo); err != nil {
return nil, err
}
@@ -69,9 +69,9 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
return errors.New("Bad Redis data type")
}

index := []byte(config.Index)
if len(index) == 0 {
return fmt.Errorf("missing %v", cfg.PathOf("index"))
key := []byte(config.Key)
if len(key) == 0 {
key = []byte(r.beatName)
}

tls, err := outputs.LoadTLSConfig(config.TLS)
@@ -105,7 +105,7 @@ func (r *redisOut) init(cfg *common.Config, expireTopo int) error {
if err != nil {
return nil, err
}
return newClient(t, config.Password, config.Db, index, dataType), nil
return newClient(t, config.Password, config.Db, key, dataType), nil
})
if err != nil {
return err
42 changes: 21 additions & 21 deletions libbeat/outputs/redis/redis_integration_test.go
Original file line number Diff line number Diff line change
@@ -25,11 +25,11 @@ const (

func TestTopologyInRedisTCP(t *testing.T) {
db := 1
index := "test_topo_tcp"
key := "test_topo_tcp"
redisHosts := []string{getRedisAddr()}
redisConfig := map[string]interface{}{
"hosts": redisHosts,
"index": index,
"key": key,
"host_topology": redisHosts[0],
"db_topology": db,
"timeout": "5s",
@@ -40,11 +40,11 @@ func TestTopologyInRedisTCP(t *testing.T) {

func TestTopologyInRedisTLS(t *testing.T) {
db := 1
index := "test_topo_tls"
key := "test_topo_tls"
redisHosts := []string{getSRedisAddr()}
redisConfig := map[string]interface{}{
"hosts": redisHosts,
"index": index,
"key": key,
"host_topology": redisHosts[0],
"db_topology": db,
"timeout": "5s",
@@ -70,7 +70,7 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) {
}

db := 0
index := cfg["index"].(string)
key := cfg["key"].(string)
if v, ok := cfg["db_topology"]; ok {
db = v.(int)
}
@@ -83,7 +83,7 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) {
}
// delete old key if present
defer conn.Close()
conn.Do("DEL", index)
conn.Do("DEL", key)
}

// 1. connect
@@ -116,11 +116,11 @@ func testTopologyInRedis(t *testing.T, cfg map[string]interface{}) {
}

func TestPublishListTCP(t *testing.T) {
index := "test_publist_tcp"
key := "test_publist_tcp"
db := 0
redisConfig := map[string]interface{}{
"hosts": []string{getRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "list",
"timeout": "5s",
@@ -130,11 +130,11 @@ func TestPublishListTCP(t *testing.T) {
}

func TestPublishListTLS(t *testing.T) {
index := "test_publist_tls"
key := "test_publist_tls"
db := 0
redisConfig := map[string]interface{}{
"hosts": []string{getSRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "list",
"timeout": "5s",
@@ -154,7 +154,7 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) {
total := batches & batchSize

db := 0
index := cfg["index"].(string)
key := cfg["key"].(string)
if v, ok := cfg["db"]; ok {
db = v.(int)
}
@@ -166,15 +166,15 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) {

// delete old key if present
defer conn.Close()
conn.Do("DEL", index)
conn.Do("DEL", key)

out := newRedisTestingOutput(t, cfg)
err = sendTestEvents(out, batches, batchSize)
assert.NoError(t, err)

results := make([][]byte, total)
for i := range results {
results[i], err = redis.Bytes(conn.Do("LPOP", index))
results[i], err = redis.Bytes(conn.Do("LPOP", key))
assert.NoError(t, err)
}

@@ -188,10 +188,10 @@ func testPublishList(t *testing.T, cfg map[string]interface{}) {

func TestPublishChannelTCP(t *testing.T) {
db := 0
index := "test_pubchan_tcp"
key := "test_pubchan_tcp"
redisConfig := map[string]interface{}{
"hosts": []string{getRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "channel",
"timeout": "5s",
@@ -202,10 +202,10 @@ func TestPublishChannelTCP(t *testing.T) {

func TestPublishChannelTLS(t *testing.T) {
db := 0
index := "test_pubchan_tls"
key := "test_pubchan_tls"
redisConfig := map[string]interface{}{
"hosts": []string{getSRedisAddr()},
"index": index,
"key": key,
"db": db,
"datatype": "channel",
"timeout": "5s",
@@ -225,7 +225,7 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) {
total := batches & batchSize

db := 0
index := cfg["index"].(string)
key := cfg["key"].(string)
if v, ok := cfg["db"]; ok {
db = v.(int)
}
@@ -237,14 +237,14 @@ func testPublishChannel(t *testing.T, cfg map[string]interface{}) {

// delete old key if present
defer conn.Close()
conn.Do("DEL", index)
conn.Do("DEL", key)

// subscribe to packetbeat channel
psc := redis.PubSubConn{conn}
if err := psc.Subscribe(index); err != nil {
if err := psc.Subscribe(key); err != nil {
t.Fatal(err)
}
defer psc.Unsubscribe(index)
defer psc.Unsubscribe(key)

// connect and publish events
var wg sync.WaitGroup

0 comments on commit 245dd0a

Please sign in to comment.