Skip to content

Commit

Permalink
Merge pull request #2119 from urso/enh/es-index-config
Browse files Browse the repository at this point in the history
Configurable Elasticsearch index pattern
  • Loading branch information
tsg authored Aug 3, 2016
2 parents 45aef77 + 373a978 commit 7ee94d2
Showing 13 changed files with 72 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
==== Breaking changes

*Affecting all Beats*
- Change Elasticsearch output index configuration to be based on format strings. If index has been configured, no date will be appended anymore to the index name. {pull}2119[2119]

*Metricbeat*

2 changes: 1 addition & 1 deletion filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
@@ -440,7 +440,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: filebeat
#index: 'filebeat-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233
2 changes: 1 addition & 1 deletion libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
@@ -214,7 +214,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: beatname
#index: 'beatname-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233
4 changes: 3 additions & 1 deletion libbeat/outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/stretchr/testify/assert"
)

@@ -169,7 +170,8 @@ func newTestClient(url string) *Client {
}

func newTestClientAuth(url, user, pass string) *Client {
client, err := NewClient(url, "", nil, nil, user, pass, nil, 60*time.Second, 3, nil)
index := outil.MakeSelector()
client, err := NewClient(url, index, nil, nil, user, pass, nil, 60*time.Second, 3, nil)
if err != nil {
panic(err)
}
31 changes: 16 additions & 15 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
@@ -16,12 +16,13 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
)

type Client struct {
Connection
index string
index outil.Selector
params map[string]string

// buffered bulk requests
@@ -75,7 +76,10 @@ var (
)

func NewClient(
esURL, index string, proxyURL *url.URL, tls *tls.Config,
esURL string,
index outil.Selector,
proxyURL *url.URL,
tls *tls.Config,
username, password string,
params map[string]string,
timeout time.Duration,
@@ -228,7 +232,7 @@ func (client *Client) PublishEvents(
// successfully added to bulk request.
func bulkEncodePublishRequest(
body bulkWriter,
index string,
index outil.Selector,
events []common.MapStr,
) []common.MapStr {
okEvents := events[:0]
@@ -245,11 +249,10 @@ func bulkEncodePublishRequest(
return okEvents
}

func eventBulkMeta(index string, event common.MapStr) bulkMeta {
index = getIndex(event, index)
func eventBulkMeta(index outil.Selector, event common.MapStr) bulkMeta {
meta := bulkMeta{
Index: bulkMetaIndex{
Index: index,
Index: getIndex(event, index),
DocType: event["type"].(string),
},
}
@@ -259,29 +262,27 @@ func eventBulkMeta(index string, event common.MapStr) bulkMeta {
// getIndex returns the full index name
// Index is either defined in the config as part of the output
// or can be overload by the event through setting index
func getIndex(event common.MapStr, index string) string {
func getIndex(event common.MapStr, index outil.Selector) string {

ts := time.Time(event["@timestamp"].(common.Time)).UTC()

// Check for dynamic index
// XXX: is this used/needed?
if _, ok := event["beat"]; ok {
beatMeta, ok := event["beat"].(common.MapStr)
if ok {
// Check if index is set dynamically
if dynamicIndex, ok := beatMeta["index"]; ok {
dynamicIndexValue, ok := dynamicIndex.(string)
if ok {
index = dynamicIndexValue
if dynamicIndexValue, ok := dynamicIndex.(string); ok {
return fmt.Sprintf("%s-%d.%02d.%02d",
dynamicIndexValue, ts.Year(), ts.Month(), ts.Day())
}
}
}
}

// Append timestamp to index
index = fmt.Sprintf("%s-%d.%02d.%02d", index,
ts.Year(), ts.Month(), ts.Day())

return index
str, _ := index.Select(event)
return str
}

// bulkCollectPublishFails checks per item errors returning all events
15 changes: 13 additions & 2 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@ import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/stretchr/testify/assert"
)

@@ -128,7 +130,11 @@ func TestGetIndexStandard(t *testing.T) {
"field": 1,
}

index := getIndex(event, "beatname")
pattern := "beatname-%{+yyyy.MM.dd}"
fmtstr := fmtstr.MustCompileEvent(pattern)
indexSel := outil.MakeSelector(outil.FmtSelectorExpr(fmtstr, ""))

index := getIndex(event, indexSel)
assert.Equal(t, index, "beatname-"+extension)
}

@@ -145,7 +151,12 @@ func TestGetIndexOverwrite(t *testing.T) {
"index": "dynamicindex",
},
}
index := getIndex(event, "beatname")

pattern := "beatname-%%{+yyyy.MM.dd}"
fmtstr := fmtstr.MustCompileEvent(pattern)
indexSel := outil.MakeSelector(outil.FmtSelectorExpr(fmtstr, ""))

index := getIndex(event, indexSel)
assert.Equal(t, index, "dynamicindex-"+extension)
}

1 change: 0 additions & 1 deletion libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ type elasticsearchConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ProxyURL string `config:"proxy_url"`
Index string `config:"index"`
LoadBalance bool `config:"loadbalance"`
CompressionLevel int `config:"compression_level" validate:"min=0, max=9"`
TLS *outputs.TLSConfig `config:"tls"`
20 changes: 16 additions & 4 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
@@ -17,11 +17,12 @@ import (
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/mode/modeutil"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/paths"
)

type elasticsearchOutput struct {
index string
index outil.Selector
beatName string
mode mode.ConnectionMode
topology
@@ -57,7 +58,8 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu
}

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
pattern := fmt.Sprintf("%v-%%{+yyyy.MM.dd}", beatName)
cfg.SetString("index", -1, pattern)
}

output := &elasticsearchOutput{beatName: beatName}
@@ -77,6 +79,16 @@ func (out *elasticsearchOutput) init(
return err
}

index, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "index",
MultiKey: "indices",
EnableSingleOnly: true,
FailEmpty: true,
})
if err != nil {
return err
}

tlsConfig, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return err
@@ -87,6 +99,7 @@ func (out *elasticsearchOutput) init(
return err
}

out.index = index
clients, err := modeutil.MakeClients(cfg, makeClientFactory(tlsConfig, &config, out))
if err != nil {
return err
@@ -110,7 +123,6 @@ func (out *elasticsearchOutput) init(
}

out.mode = m
out.index = config.Index

return nil
}
@@ -241,7 +253,7 @@ func makeClientFactory(
}

return NewClient(
esURL, config.Index, proxyURL, tls,
esURL, out.index, proxyURL, tls,
config.Username, config.Password,
params, config.Timeout,
config.CompressionLevel,
16 changes: 9 additions & 7 deletions libbeat/outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ import (
var testOptions = outputs.Options{}

func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearchOutput {
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())
index := fmt.Sprintf("packetbeat-int-test-%d", os.Getpid())

esPort, err := strconv.Atoi(GetEsPort())

@@ -32,7 +32,7 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearc
"username": os.Getenv("ES_USER"),
"password": os.Getenv("ES_PASS"),
"path": "",
"index": index,
"index": fmt.Sprintf("%v-%%{+yyyy.MM.dd}", index),
"protocol": "http",
"flush_interval": flushInterval,
"bulk_max_size": bulkSize,
@@ -95,7 +95,7 @@ func TestOneEvent(t *testing.T) {
output := createElasticsearchConnection(0, 0)

event := common.MapStr{}
event["@timestamp"] = common.Time(time.Now())
event["@timestamp"] = common.Time(ts)
event["type"] = "redis"
event["status"] = "OK"
event["responsetime"] = 34
@@ -108,7 +108,7 @@ func TestOneEvent(t *testing.T) {
r["request"] = "MGET key1"
r["response"] = "value1"

index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day())
index, _ := output.index.Select(event)
debugf("index = %s", index)

client := output.randomClient()
@@ -167,7 +167,7 @@ func TestEvents(t *testing.T) {
output := createElasticsearchConnection(0, 0)

event := common.MapStr{}
event["@timestamp"] = common.Time(time.Now())
event["@timestamp"] = common.Time(ts)
event["type"] = "redis"
event["status"] = "OK"
event["responsetime"] = 34
@@ -181,7 +181,7 @@ func TestEvents(t *testing.T) {
r["response"] = "value1"
event["redis"] = r

index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day())
index, _ := output.index.Select(event)
output.randomClient().CreateIndex(index, common.MapStr{
"settings": common.MapStr{
"number_of_shards": 1,
@@ -233,7 +233,9 @@ func TestEvents(t *testing.T) {

func testBulkWithParams(t *testing.T, output elasticsearchOutput) {
ts := time.Now()
index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day())
index, _ := output.index.Select(common.MapStr{
"@timestamp": common.Time(ts),
})

output.randomClient().CreateIndex(index, common.MapStr{
"settings": common.MapStr{
13 changes: 9 additions & 4 deletions libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
@@ -10,9 +10,11 @@ import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/stretchr/testify/assert"
)

@@ -68,12 +70,15 @@ func esConnect(t *testing.T, index string) *esConnection {
ts := time.Now().UTC()

host := getElasticsearchHost()
index = fmt.Sprintf("%s-%02d.%02d.%02d",
index, ts.Year(), ts.Month(), ts.Day())
indexFmt := fmtstr.MustCompileEvent(fmt.Sprintf("%s-%%{+yyyy.MM.dd}", index))
indexSel := outil.MakeSelector(outil.FmtSelectorExpr(indexFmt, ""))
index, _ = indexSel.Select(common.MapStr{
"@timestamp": common.Time(ts),
})

username := os.Getenv("ES_USER")
password := os.Getenv("ES_PASS")
client, err := elasticsearch.NewClient(host, "", nil, nil, username, password,
client, err := elasticsearch.NewClient(host, indexSel, nil, nil, username, password,
nil, 60*time.Second, 0, nil)
if err != nil {
t.Fatal(err)
@@ -142,7 +147,7 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer {
bulkSize := 0
config, _ := common.NewConfigFrom(map[string]interface{}{
"hosts": []string{getElasticsearchHost()},
"index": index,
"index": connection.index,
"flush_interval": &flushInterval,
"bulk_max_size": &bulkSize,
"username": os.Getenv("ES_USER"),
2 changes: 1 addition & 1 deletion metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
@@ -362,7 +362,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: metricbeat
#index: 'metricbeat-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233
2 changes: 1 addition & 1 deletion packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
@@ -632,7 +632,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: packetbeat
#index: 'packetbeat-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233
2 changes: 1 addition & 1 deletion winlogbeat/winlogbeat.full.yml
Original file line number Diff line number Diff line change
@@ -249,7 +249,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: winlogbeat
#index: 'winlogbeat-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233

0 comments on commit 7ee94d2

Please sign in to comment.