Skip to content

Commit

Permalink
Elasticsearch index must be lowercase (elastic#16081)
Browse files Browse the repository at this point in the history
* Index names must be lowercase

When indexing into Elasticsearch index names must always be lowercase.
If the index or indices setting are configured to produce non-lowercase
strings (e.g. by extracting part of the index name from the event
contents), we need to normalize them to be lowercase.

This change ensure that index names are always converted to lowercase.
Static strings are converted to lowercase upfront, while dynamic strings
will be post-processed.

* update kafka/redis/LS output to guarantee lowercase index
* add godoc
  • Loading branch information
Steffen Siering authored Feb 6, 2020
1 parent 90f79f9 commit 7ddcb1e
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 165 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update replicaset group to apps/v1 {pull}15854[15802]
- Fix issue where default go logger is not discarded when either * or stdout is selected. {issue}10251[10251] {pull}15708[15708]
- Upgrade go-ucfg to latest v0.8.1. {pull}15937{15937}
- Remove superfluous use of number_of_routing_shards setting from the default template. {pull}16038[16038]
- Fix index names for indexing not always guaranteed to be lower case. {pull}16081[16081]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newKafkaClient(
hosts: hosts,
topic: topic,
key: key,
index: index,
index: strings.ToLower(index),
codec: writer,
config: *cfg,
}
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/logstash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package logstash

import (
"strings"
"time"

"github.com/elastic/beats/libbeat/beat"
Expand Down Expand Up @@ -80,7 +81,7 @@ func readConfig(cfg *common.Config, info beat.Info) (*Config, error) {
}

if c.Index == "" {
c.Index = info.IndexPrefix
c.Index = strings.ToLower(info.IndexPrefix)
}

return &c, nil
Expand Down
3 changes: 3 additions & 0 deletions libbeat/outputs/logstash/enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package logstash

import (
"strings"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs/codec/json"
)
Expand All @@ -27,6 +29,7 @@ func makeLogstashEventEncoder(info beat.Info, escapeHTML bool, index string) fun
Pretty: false,
EscapeHTML: escapeHTML,
})
index = strings.ToLower(index)
return func(event interface{}) (d []byte, err error) {
d, err = enc.Encode(index, event.(*beat.Event))
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func esConnect(t *testing.T, index string) *esConnection {

host := getElasticsearchHost()
indexFmt := fmtstr.MustCompileEvent(fmt.Sprintf("%s-%%{+yyyy.MM.dd}", index))
indexSel := outil.MakeSelector(outil.FmtSelectorExpr(indexFmt, ""))
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "")
indexSel := outil.MakeSelector(indexFmtExpr)
index, _ = indexSel.Select(&beat.Event{
Timestamp: ts,
})
Expand Down
135 changes: 80 additions & 55 deletions libbeat/outputs/outil/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ package outil

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/conditions"
)

// Selector is used to produce a string based on the contents of a Beats event.
// A selector supports multiple rules that need to be configured.
type Selector struct {
sel SelectorExpr
}

// Settings configures how BuildSelectorFromConfig creates a Selector from
// a given configuration object.
type Settings struct {
// single selector key and default option keyword
Key string
Expand All @@ -44,6 +49,8 @@ type Settings struct {
FailEmpty bool
}

// SelectorExpr represents an expression object that can be composed with other
// expressions in order to build a Selector.
type SelectorExpr interface {
sel(evt *beat.Event) (string, error)
}
Expand Down Expand Up @@ -76,6 +83,7 @@ type mapSelector struct {

var nilSelector SelectorExpr = &emptySelector{}

// MakeSelector creates a selector from a set of selector expressions.
func MakeSelector(es ...SelectorExpr) Selector {
switch len(es) {
case 0:
Expand All @@ -95,10 +103,12 @@ func (s Selector) Select(evt *beat.Event) (string, error) {
return s.sel.sel(evt)
}

// IsEmpty checks if the selector is not configured and will always return an empty string.
func (s Selector) IsEmpty() bool {
return s.sel == nilSelector || s.sel == nil
}

// IsConst checks if the selector will always return the same string.
func (s Selector) IsConst() bool {
if s.sel == nilSelector {
return true
Expand All @@ -108,6 +118,7 @@ func (s Selector) IsConst() bool {
return ok
}

// BuildSelectorFromConfig creates a selector from a configuration object.
func BuildSelectorFromConfig(
cfg *common.Config,
settings Settings,
Expand Down Expand Up @@ -156,17 +167,13 @@ func BuildSelectorFromConfig(
return Selector{}, fmt.Errorf("%v in %v", err, cfg.PathOf(key))
}

if fmtstr.IsConst() {
str, err := fmtstr.Run(nil)
if err != nil {
return Selector{}, err
}
fmtsel, err := FmtSelectorExpr(fmtstr, "")
if err != nil {
return Selector{}, fmt.Errorf("%v in %v", err, cfg.PathOf(key))
}

if str != "" {
sel = append(sel, ConstSelectorExpr(str))
}
} else {
sel = append(sel, FmtSelectorExpr(fmtstr, ""))
if fmtsel != nilSelector {
sel = append(sel, fmtsel)
}
}

Expand All @@ -183,35 +190,84 @@ func BuildSelectorFromConfig(
return MakeSelector(sel...), nil
}

// EmptySelectorExpr create a selector expression that returns an empty string.
func EmptySelectorExpr() SelectorExpr {
return nilSelector
}

// ConstSelectorExpr creates a selector expression that always returns the configured string.
func ConstSelectorExpr(s string) SelectorExpr {
return &constSelector{s}
if s == "" {
return EmptySelectorExpr()
}
return &constSelector{strings.ToLower(s)}
}

func FmtSelectorExpr(fmt *fmtstr.EventFormatString, fallback string) SelectorExpr {
return &fmtSelector{*fmt, fallback}
// FmtSelectorExpr creates a selector expression using a format string. If the
// event can not be applied the default fallback constant string will be returned.
func FmtSelectorExpr(fmt *fmtstr.EventFormatString, fallback string) (SelectorExpr, error) {
if fmt.IsConst() {
str, err := fmt.Run(nil)
if err != nil {
return nil, err
}
if str == "" {
str = fallback
}
return ConstSelectorExpr(str), nil
}

return &fmtSelector{*fmt, strings.ToLower(fallback)}, nil
}

// ConcatSelectorExpr combines multiple expressions that are run one after the other.
// The first expression that returns a string wins.
func ConcatSelectorExpr(s ...SelectorExpr) SelectorExpr {
return &listSelector{s}
}

// ConditionalSelectorExpr executes the given expression only if the event
// matches the given condition.
func ConditionalSelectorExpr(
s SelectorExpr,
cond conditions.Condition,
) SelectorExpr {
return &condSelector{s, cond}
}

// LookupSelectorExpr replaces the produced string with an table entry.
// If there is no entry in the table the default fallback string will be reported.
func LookupSelectorExpr(
s SelectorExpr,
evtfmt *fmtstr.EventFormatString,
table map[string]string,
fallback string,
) SelectorExpr {
return &mapSelector{s, fallback, table}
) (SelectorExpr, error) {
if evtfmt.IsConst() {
str, err := evtfmt.Run(nil)
if err != nil {
return nil, err
}

str = table[strings.ToLower(str)]
if str == "" {
str = fallback
}
return ConstSelectorExpr(str), nil
}

return &mapSelector{
from: &fmtSelector{f: *evtfmt},
to: table,
otherwise: fallback,
}, nil
}

func lowercaseTable(table map[string]string) map[string]string {
tmp := make(map[string]string, len(table))
for k, v := range table {
tmp[strings.ToLower(k)] = strings.ToLower(v)
}
return tmp
}

func buildSingle(cfg *common.Config, key string) (SelectorExpr, error) {
Expand Down Expand Up @@ -239,7 +295,7 @@ func buildSingle(cfg *common.Config, key string) (SelectorExpr, error) {
if err != nil {
return nil, err
}
otherwise = tmp
otherwise = strings.ToLower(tmp)
}

// 3. extract optional `mapping`
Expand Down Expand Up @@ -276,45 +332,14 @@ func buildSingle(cfg *common.Config, key string) (SelectorExpr, error) {
// 5. build selector from available fields
var sel SelectorExpr
if len(mapping.Table) > 0 {
if evtfmt.IsConst() {
str, err := evtfmt.Run(nil)
if err != nil {
return nil, err
}

str = mapping.Table[str]
if str == "" {
str = otherwise
}

if str == "" {
sel = nilSelector
} else {
sel = ConstSelectorExpr(str)
}
} else {
sel = &mapSelector{
from: FmtSelectorExpr(evtfmt, ""),
to: mapping.Table,
otherwise: otherwise,
}
}
sel, err = LookupSelectorExpr(evtfmt, lowercaseTable(mapping.Table), otherwise)
} else {
if evtfmt.IsConst() {
str, err := evtfmt.Run(nil)
if err != nil {
return nil, err
}

if str == "" {
sel = nilSelector
} else {
sel = ConstSelectorExpr(str)
}
} else {
sel = FmtSelectorExpr(evtfmt, otherwise)
}
sel, err = FmtSelectorExpr(evtfmt, otherwise)
}
if err != nil {
return nil, err
}

if cond != nil && sel != nilSelector {
sel = ConditionalSelectorExpr(sel, cond)
}
Expand Down Expand Up @@ -363,7 +388,7 @@ func (s *fmtSelector) sel(evt *beat.Event) (string, error) {
if n == "" {
return s.otherwise, nil
}
return n, nil
return strings.ToLower(n), nil
}

func (s *mapSelector) sel(evt *beat.Event) (string, error) {
Expand Down
Loading

0 comments on commit 7ddcb1e

Please sign in to comment.