Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[abandoned][es] Add index rollover mode that can choose day and hour #2935

Closed
wants to merge 12 commits into from
Closed
55 changes: 28 additions & 27 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,34 @@ import (

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayout string `mapstructure:"index_date_layout"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayout string `mapstructure:"index_date_layout"`
IndexRolloverFrequency string `mapstructure:"-"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
}

// TagsAsFields holds configuration for tag schema.
Expand Down
83 changes: 49 additions & 34 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package es

import (
"flag"
"fmt"
"strings"
"time"

Expand All @@ -29,42 +28,45 @@ import (
)

const (
suffixUsername = ".username"
suffixPassword = ".password"
suffixSniffer = ".sniffer"
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
suffixBulkSize = ".bulk.size"
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixIndexPrefix = ".index-prefix"
suffixIndexDateSeparator = ".index-date-separator"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixUseILM = ".use-ilm"
suffixCreateIndexTemplate = ".create-index-templates"
suffixEnabled = ".enabled"
suffixVersion = ".version"
suffixMaxDocCount = ".max-doc-count"
suffixLogLevel = ".log-level"
suffixUsername = ".username"
suffixPassword = ".password"
suffixSniffer = ".sniffer"
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
suffixBulkSize = ".bulk.size"
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixIndexPrefix = ".index-prefix"
suffixIndexDateSeparator = ".index-date-separator"
suffixIndexRolloverFrequency = ".index-rollover-frequency"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixUseILM = ".use-ilm"
suffixCreateIndexTemplate = ".create-index-templates"
suffixEnabled = ".enabled"
suffixVersion = ".version"
suffixMaxDocCount = ".max-doc-count"
suffixLogLevel = ".log-level"
// default number of documents to return from a query (elasticsearch allowed limit)
// see search.max_buckets and index.max_result_window
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultRemoteReadClusters = ""
// default separator for Elasticsearch index date layout.
defaultIndexDateSeparator = "-"

defaultIndexRolloverFrequency = "day"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -205,7 +207,12 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
flagSet.String(
nsConfig.namespace+suffixIndexDateSeparator,
defaultIndexDateSeparator,
"Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \".")
"Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20\".")
flagSet.String(
nsConfig.namespace+suffixIndexRolloverFrequency,
defaultIndexRolloverFrequency,
"Rotates Jaeger indices over the given period. For example \"day\" creates \"jaeger-span-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding more detailed explanation to the website (for next release) and linking there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

"Jaeger additionally supports manual and automated (via ILM) index management. Reference: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jaeger additionally supports manual and automated (via ILM) index management.

This is confusing. Doesn't this day/hour enhancement classifies as "manual"? If not, what other "manual" rollover are we referring to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"manual" here refers to the esRollover/Cleaner python scripts that are used to manually rollover/delete an index (automated via cronjobs).

This day/hour enhancement is a simple and convenient way to rollover an index by duration only and lacks the ability to:

  • Rollover on other dimensions such as by size or number of ES documents.
  • Clean older indices.

Both the esRollover/Cleaner python scripts and ILM (supported in Jaeger) provide the above, which is why I suggested the term "index management" here because I consider these as complete index management solutions and want users to be aware of these more comprehensive but "higher-effort" alternatives.

Agree this is confusing, perhaps we could rephrase to: "This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover".

Open to suggestions.

flagSet.Bool(
nsConfig.namespace+suffixTagsAsFieldsAll,
nsConfig.Tags.AllAsFields,
Expand Down Expand Up @@ -295,7 +302,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.IndexDateLayout = initDateLayout(v.GetString(cfg.namespace + suffixIndexDateSeparator))
cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude)
cfg.Tags.File = v.GetString(cfg.namespace + suffixTagsFile)
Expand All @@ -317,6 +323,10 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
if len(remoteReadClusters) > 0 {
cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",")
}

rolloverFreq := strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequency))
separator := v.GetString(cfg.namespace + suffixIndexDateSeparator)
cfg.IndexDateLayout = initDateLayout(rolloverFreq, separator)
}

// GetPrimary returns primary configuration.
Expand All @@ -343,6 +353,11 @@ func stripWhiteSpace(str string) string {
return strings.Replace(str, " ", "", -1)
}

func initDateLayout(separator string) string {
return fmt.Sprintf("2006%s01%s02", separator, separator)
func initDateLayout(rolloverFreq, sep string) string {
// default to daily format
indexLayout := "2006" + sep + "01" + sep + "02"
if rolloverFreq == "hour" {
indexLayout = indexLayout + sep + "15"
}
return indexLayout
}
24 changes: 24 additions & 0 deletions plugin/storage/es/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,27 @@ func TestIndexDateSeparator(t *testing.T) {
})
}
}

func TestIndexRollover(t *testing.T) {
testCases := []struct {
name string
flags []string
wantDateLayout string
}{
{"not defined (default)", []string{}, "2006-01-02"},
{"day rotation", []string{"--es.index-rollover-frequency=day"}, "2006-01-02"},
{"hour rotation", []string{"--es.index-rollover-frequency=hour"}, "2006-01-02-15"},
{"error rotation change default", []string{"--es.index-rollover=hours"}, "2006-01-02"},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts := NewOptions("es")
v, command := config.Viperize(opts.AddFlags)
command.ParseFlags(tc.flags)
opts.InitFromViper(v)
primary := opts.GetPrimary()
assert.Equal(t, tc.wantDateLayout, primary.IndexDateLayout)

})
}
}
12 changes: 10 additions & 2 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/olivere/elastic"
Expand Down Expand Up @@ -199,9 +200,16 @@ func timeRangeIndices(indexName, indexDateLayout string, startTime time.Time, en
var indices []string
firstIndex := indexWithDate(indexName, indexDateLayout, startTime)
currentIndex := indexWithDate(indexName, indexDateLayout, endTime)

reduce := -24 * time.Hour
if strings.HasSuffix(indexDateLayout, "15") {
reduce = -1 * time.Hour
}
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
endTime = endTime.Add(-24 * time.Hour)
if len(indices) == 0 || indices[len(indices)-1] != currentIndex {
indices = append(indices, currentIndex)
}
endTime = endTime.Add(reduce)
currentIndex = indexWithDate(indexName, indexDateLayout, endTime)
}
indices = append(indices, firstIndex)
Expand Down
36 changes: 26 additions & 10 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,20 +147,28 @@ func TestSpanReaderIndices(t *testing.T) {
logger, _ := testutils.NewLogger()
metricsFactory := metricstest.NewFactory(0)
date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC)
dateFormat := date.UTC().Format("2006-01-02")
dateFormatDay := "2006-01-02"
dateFormatHour := "2006-01-02-15"

testCases := []struct {
indices []string
params SpanReaderParams
}{
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false},
indices: []string{spanIndex + dateFormat}},
IndexPrefix: "", Archive: false, IndexDateLayout: dateFormatDay},
indices: []string{spanIndex + date.UTC().Format(dateFormatDay)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, IndexDateLayout: dateFormatHour},
indices: []string{spanIndex + date.UTC().Format(dateFormatHour)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", UseReadWriteAliases: true},
indices: []string{spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: false},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat}},
IndexPrefix: "foo:", Archive: false, IndexDateLayout: dateFormatDay},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + date.UTC().Format(dateFormatDay)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: false, IndexDateLayout: dateFormatHour},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + date.UTC().Format(dateFormatHour)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", UseReadWriteAliases: true},
indices: []string{"foo:-" + spanIndex + "read"}},
Expand All @@ -174,11 +182,19 @@ func TestSpanReaderIndices(t *testing.T) {
IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
IndexDateLayout: dateFormatDay},
indices: []string{
spanIndex + date.UTC().Format(dateFormatDay),
"cluster_one:" + spanIndex + date.UTC().Format(dateFormatDay),
"cluster_two:" + spanIndex + date.UTC().Format(dateFormatDay)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"},
IndexDateLayout: dateFormatHour},
indices: []string{
spanIndex + dateFormat,
"cluster_one:" + spanIndex + dateFormat,
"cluster_two:" + spanIndex + dateFormat}},
spanIndex + date.UTC().Format(dateFormatHour),
"cluster_one:" + spanIndex + date.UTC().Format(dateFormatHour),
"cluster_two:" + spanIndex + date.UTC().Format(dateFormatHour)}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{
Expand All @@ -200,7 +216,7 @@ func TestSpanReaderIndices(t *testing.T) {
}
for _, testCase := range testCases {
r := NewSpanReader(testCase.params)
actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date)
actual := r.timeRangeIndices(r.spanIndexPrefix, r.indexDateLayout, date, date)
assert.Equal(t, testCase.indices, actual)
}
}
Expand Down