Skip to content

Commit

Permalink
Add remote read clusters option to elasticsearch to enable cross-clus…
Browse files Browse the repository at this point in the history
…ter querying

Co-authored-by: allen13 <[email protected]>
Signed-off-by: David Grizzanti <[email protected]>
  • Loading branch information
dgrizzanti and allen13 committed Mar 11, 2021
1 parent 305ac76 commit 0e38cfa
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 18 deletions.
10 changes: 10 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
Expand Down Expand Up @@ -89,6 +90,7 @@ type TagsAsFields struct {
// ClientBuilder creates new es.Client
type ClientBuilder interface {
NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)
GetRemoteReadClusters() []string
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
Expand Down Expand Up @@ -193,6 +195,9 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac

// ApplyDefaults copies settings from source unless its own value is non-zero.
func (c *Configuration) ApplyDefaults(source *Configuration) {
if len(c.RemoteReadClusters) == 0 {
c.RemoteReadClusters = source.RemoteReadClusters
}
if c.Username == "" {
c.Username = source.Username
}
Expand Down Expand Up @@ -246,6 +251,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
}
}

// GetRemoteReadClusters returns list of remote read clusters
func (c *Configuration) GetRemoteReadClusters() []string {
return c.RemoteReadClusters
}

// GetNumShards returns number of shards from Configuration
func (c *Configuration) GetNumShards() int64 {
return c.NumShards
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func createSpanReader(
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Archive: archive,
RemoteReadClusters: cfg.GetRemoteReadClusters(),
}), nil
}

Expand Down
13 changes: 11 additions & 2 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
Expand All @@ -59,8 +60,9 @@ const (
suffixLogLevel = ".log-level"
// default number of documents to return from a query (elasticsearch allowed limit)
// see search.max_buckets and index.max_result_window
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultRemoteReadClusters = ""
// default separator for Elasticsearch index date layout.
defaultIndexDateSeparator = "-"
)
Expand Down Expand Up @@ -102,6 +104,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
CreateIndexTemplates: true,
Version: 0,
Servers: []string{defaultServerURL},
RemoteReadClusters: []string{},
MaxDocCount: defaultMaxDocCount,
LogLevel: "error",
}
Expand Down Expand Up @@ -162,6 +165,11 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixServerURLs,
defaultServerURL,
"The comma-separated list of Elasticsearch servers, must be full url i.e. http://localhost:9200")
flagSet.String(
nsConfig.namespace+suffixRemoteReadClusters,
defaultRemoteReadClusters,
"Comma-separated list of Elasticsearch remote cluster names for cross-cluster querying."+
"See Elasticsearch remote clusters and cross-cluster query api.")
flagSet.Duration(
nsConfig.namespace+suffixTimeout,
nsConfig.Timeout,
Expand Down Expand Up @@ -278,6 +286,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.Sniffer = v.GetBool(cfg.namespace + suffixSniffer)
cfg.SnifferTLSEnabled = v.GetBool(cfg.namespace + suffixSnifferTLSEnabled)
cfg.Servers = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixServerURLs)), ",")
cfg.RemoteReadClusters = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixRemoteReadClusters)), ",")
cfg.MaxSpanAge = v.GetDuration(cfg.namespace + suffixMaxSpanAge)
cfg.NumShards = v.GetInt64(cfg.namespace + suffixNumShards)
cfg.NumReplicas = v.GetInt64(cfg.namespace + suffixNumReplicas)
Expand Down
32 changes: 26 additions & 6 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type SpanReaderParams struct {
TagDotReplacement string
Archive bool
UseReadWriteAliases bool
RemoteReadClusters []string
}

// NewSpanReader returns a new SpanReader with a metrics.
Expand All @@ -129,7 +130,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex),
indexDateLayout: p.IndexDateLayout,
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases),
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters),
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
maxDocCount: p.MaxDocCount,
}
Expand All @@ -139,24 +140,43 @@ type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime t

type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource

func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn {
func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn {
if archive {
var archiveSuffix string
if useReadWriteAliases {
archiveSuffix = archiveReadIndexSuffix
} else {
archiveSuffix = archiveIndexSuffix
}
return func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return addRemoteReadClusters(func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{archiveIndex(indexName, archiveSuffix)}
}
}, remoteReadClusters)
}
if useReadWriteAliases {
return func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return addRemoteReadClusters(func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{indices + "read"}
}, remoteReadClusters)
}
return addRemoteReadClusters(timeRangeIndices, remoteReadClusters)
}

// Elasticsearch cross cluster query api example: GET /twitter,cluster_one:twitter,cluster_two:twitter/_search
// Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices
func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) timeRangeIndexFn {
return func(indexName string, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
jaegerIndices := fn(indexName, indexDateLayout, startTime, endTime)

if len(remoteReadClusters) > 0 {
for _, jaegerIndex := range jaegerIndices {
for _, remoteCluster := range remoteReadClusters {
remoteIndex := remoteCluster + ":" + jaegerIndex
jaegerIndices = append(jaegerIndices, remoteIndex)
}
}
}

return jaegerIndices
}
return timeRangeIndices
}

func getSourceFn(archive bool, maxDocCount int) sourceFn {
Expand Down
25 changes: 15 additions & 10 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,35 +149,40 @@ func TestSpanReaderIndices(t *testing.T) {
date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC)
dateFormat := date.UTC().Format("2006-01-02")
testCases := []struct {
index string
params SpanReaderParams
indices []string
params SpanReaderParams
}{
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false},
index: spanIndex + dateFormat},
indices: []string{spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", UseReadWriteAliases: true},
index: spanIndex + "read"},
indices: []string{spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: false},
index: "foo:" + indexPrefixSeparator + spanIndex + dateFormat},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", UseReadWriteAliases: true},
index: "foo:-" + spanIndex + "read"},
indices: []string{"foo:-" + spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true},
index: spanIndex + archiveIndexSuffix},
indices: []string{spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true},
index: "foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true},
index: "foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{spanIndex + dateFormat,
"cluster_one:" + spanIndex + dateFormat,
"cluster_two:" + spanIndex + dateFormat}},
}
for _, testCase := range testCases {
r := NewSpanReader(testCase.params)
actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date)
assert.Equal(t, []string{testCase.index}, actual)
assert.Equal(t, testCase.indices, actual)
}
}

Expand Down

0 comments on commit 0e38cfa

Please sign in to comment.