diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 0d625e67965a..38f3fc42f3fc 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -45,6 +45,7 @@ import ( // Configuration describes the configuration properties needed to connect to an ElasticSearch cluster type Configuration struct { Servers []string `mapstructure:"server_urls"` + RemoteReadClusters []string `mapstructure:"remote_read_clusters"` Username string `mapstructure:"username"` Password string `mapstructure:"password" json:"-"` TokenFilePath string `mapstructure:"token_file"` @@ -89,6 +90,7 @@ type TagsAsFields struct { // ClientBuilder creates new es.Client type ClientBuilder interface { NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) + GetRemoteReadClusters() []string GetNumShards() int64 GetNumReplicas() int64 GetMaxSpanAge() time.Duration @@ -193,6 +195,9 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac // ApplyDefaults copies settings from source unless its own value is non-zero. func (c *Configuration) ApplyDefaults(source *Configuration) { + if len(c.RemoteReadClusters) == 0 { + c.RemoteReadClusters = source.RemoteReadClusters + } if c.Username == "" { c.Username = source.Username } @@ -246,6 +251,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) { } } +// GetRemoteReadClusters returns list of remote read clusters +func (c *Configuration) GetRemoteReadClusters() []string { + return c.RemoteReadClusters +} + // GetNumShards returns number of shards from Configuration func (c *Configuration) GetNumShards() int64 { return c.NumShards diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 4f4bf496d7c3..3877840750cc 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -151,6 +151,7 @@ func createSpanReader( TagDotReplacement: cfg.GetTagDotReplacement(), UseReadWriteAliases: cfg.GetUseReadWriteAliases(), Archive: archive, + RemoteReadClusters: cfg.GetRemoteReadClusters(), }), nil } diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 68ca71c6cb32..481a67aaaff3 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -35,6 +35,7 @@ const ( suffixSnifferTLSEnabled = ".sniffer-tls-enabled" suffixTokenPath = ".token-file" suffixServerURLs = ".server-urls" + suffixRemoteReadClusters = ".remote-read-clusters" suffixMaxSpanAge = ".max-span-age" suffixNumShards = ".num-shards" suffixNumReplicas = ".num-replicas" @@ -59,8 +60,9 @@ const ( suffixLogLevel = ".log-level" // default number of documents to return from a query (elasticsearch allowed limit) // see search.max_buckets and index.max_result_window - defaultMaxDocCount = 10_000 - defaultServerURL = "http://127.0.0.1:9200" + defaultMaxDocCount = 10_000 + defaultServerURL = "http://127.0.0.1:9200" + defaultRemoteReadClusters = "" // default separator for Elasticsearch index date layout. defaultIndexDateSeparator = "-" ) @@ -102,6 +104,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options { CreateIndexTemplates: true, Version: 0, Servers: []string{defaultServerURL}, + RemoteReadClusters: []string{}, MaxDocCount: defaultMaxDocCount, LogLevel: "error", } @@ -162,6 +165,11 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixServerURLs, defaultServerURL, "The comma-separated list of Elasticsearch servers, must be full url i.e. http://localhost:9200") + flagSet.String( + nsConfig.namespace+suffixRemoteReadClusters, + defaultRemoteReadClusters, + "Comma-separated list of Elasticsearch remote cluster names for cross-cluster querying."+ + "See Elasticsearch remote clusters and cross-cluster query api.") flagSet.Duration( nsConfig.namespace+suffixTimeout, nsConfig.Timeout, @@ -278,6 +286,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Sniffer = v.GetBool(cfg.namespace + suffixSniffer) cfg.SnifferTLSEnabled = v.GetBool(cfg.namespace + suffixSnifferTLSEnabled) cfg.Servers = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixServerURLs)), ",") + cfg.RemoteReadClusters = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixRemoteReadClusters)), ",") cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge) cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards) cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index bbf46789b71e..df81bc5ae550 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -116,6 +116,7 @@ type SpanReaderParams struct { TagDotReplacement string Archive bool UseReadWriteAliases bool + RemoteReadClusters []string } // NewSpanReader returns a new SpanReader with a metrics. @@ -129,7 +130,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), indexDateLayout: p.IndexDateLayout, spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), - timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), + timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, } @@ -139,7 +140,7 @@ type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime t type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource -func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { +func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn { if archive { var archiveSuffix string if useReadWriteAliases { @@ -147,16 +148,35 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { } else { archiveSuffix = archiveIndexSuffix } - return func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string { + return addRemoteReadClusters(func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string { return []string{archiveIndex(indexName, archiveSuffix)} - } + }, remoteReadClusters) } if useReadWriteAliases { - return func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { + return addRemoteReadClusters(func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { return []string{indices + "read"} + }, remoteReadClusters) + } + return addRemoteReadClusters(timeRangeIndices, remoteReadClusters) +} + +// Elasticsearch cross cluster query api example: GET /twitter,cluster_one:twitter,cluster_two:twitter/_search +// Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices +func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) timeRangeIndexFn { + return func(indexName string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { + jaegerIndices := fn(indexName, indexDateLayout, startTime, endTime) + + if len(remoteReadClusters) > 0 { + for _, jaegerIndex := range jaegerIndices { + for _, remoteCluster := range remoteReadClusters { + remoteIndex := remoteCluster + ":" + jaegerIndex + jaegerIndices = append(jaegerIndices, remoteIndex) + } + } } + + return jaegerIndices } - return timeRangeIndices } func getSourceFn(archive bool, maxDocCount int) sourceFn { diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index de8416630169..7f4555eb00e7 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -149,35 +149,40 @@ func TestSpanReaderIndices(t *testing.T) { date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC) dateFormat := date.UTC().Format("2006-01-02") testCases := []struct { - index string - params SpanReaderParams + indices []string + params SpanReaderParams }{ {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false}, - index: spanIndex + dateFormat}, + indices: []string{spanIndex + dateFormat}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", UseReadWriteAliases: true}, - index: spanIndex + "read"}, + indices: []string{spanIndex + "read"}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: false}, - index: "foo:" + indexPrefixSeparator + spanIndex + dateFormat}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", UseReadWriteAliases: true}, - index: "foo:-" + spanIndex + "read"}, + indices: []string{"foo:-" + spanIndex + "read"}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true}, - index: spanIndex + archiveIndexSuffix}, + indices: []string{spanIndex + archiveIndexSuffix}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true}, - index: "foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}}, {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true}, - index: "foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}}, + indices: []string{spanIndex + dateFormat, + "cluster_one:" + spanIndex + dateFormat, + "cluster_two:" + spanIndex + dateFormat}}, } for _, testCase := range testCases { r := NewSpanReader(testCase.params) actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date) - assert.Equal(t, []string{testCase.index}, actual) + assert.Equal(t, testCase.indices, actual) } }