Skip to content

Commit

Permalink
[es] Add remote read clusters option for cross-cluster querying (#2874)
Browse files Browse the repository at this point in the history
* Add remote read clusters option to elasticsearch to enable cross-cluster querying

Co-authored-by: allen13 <[email protected]>
Signed-off-by: David Grizzanti <[email protected]>

* Increase test timeout for TestKafkaStorage (#2873)

* Increasing test timeout to allow for trace storage settle

Signed-off-by: Jacob Goldsworthy <[email protected]>

* Updating max storage test wait time to 10 seconds

Signed-off-by: Jacob Goldsworthy <[email protected]>
Signed-off-by: David Grizzanti <[email protected]>

* Rename indexName to indexPrefix; short circuit addRemoteReadClusters when remoteReadClusters is emtpy

Signed-off-by: David Grizzanti <[email protected]>

* Add more test cases for remoteReadClusters with Archive: true and UseReadWriteAliases: true

Signed-off-by: David Grizzanti <[email protected]>

* Removing unnecessary changes from this PR

Signed-off-by: David Grizzanti <[email protected]>

* Increase test timeout for TestKafkaStorage (#2873)

* Increasing test timeout to allow for trace storage settle

Signed-off-by: Jacob Goldsworthy <[email protected]>

* Updating max storage test wait time to 10 seconds

Signed-off-by: Jacob Goldsworthy <[email protected]>
Signed-off-by: David Grizzanti <[email protected]>

* Update variable name for consistency

Signed-off-by: David Grizzanti <[email protected]>

* Fix index prefix name

Signed-off-by: David Grizzanti <[email protected]>

* Prevent remoteReadClusters from being []string{""} when no remote clusters are provided

Signed-off-by: David Grizzanti <[email protected]>

Co-authored-by: allen13 <[email protected]>
Co-authored-by: Jacob G <[email protected]>
  • Loading branch information
3 people authored Mar 16, 2021
1 parent bfd3e1b commit 915d3af
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 20 deletions.
10 changes: 10 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
Expand Down Expand Up @@ -89,6 +90,7 @@ type TagsAsFields struct {
// ClientBuilder creates new es.Client
type ClientBuilder interface {
NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)
GetRemoteReadClusters() []string
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
Expand Down Expand Up @@ -193,6 +195,9 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac

// ApplyDefaults copies settings from source unless its own value is non-zero.
func (c *Configuration) ApplyDefaults(source *Configuration) {
if len(c.RemoteReadClusters) == 0 {
c.RemoteReadClusters = source.RemoteReadClusters
}
if c.Username == "" {
c.Username = source.Username
}
Expand Down Expand Up @@ -246,6 +251,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
}
}

// GetRemoteReadClusters returns list of remote read clusters
func (c *Configuration) GetRemoteReadClusters() []string {
return c.RemoteReadClusters
}

// GetNumShards returns number of shards from Configuration
func (c *Configuration) GetNumShards() int64 {
return c.NumShards
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func createSpanReader(
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Archive: archive,
RemoteReadClusters: cfg.GetRemoteReadClusters(),
}), nil
}

Expand Down
17 changes: 15 additions & 2 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
Expand All @@ -59,8 +60,9 @@ const (
suffixLogLevel = ".log-level"
// default number of documents to return from a query (elasticsearch allowed limit)
// see search.max_buckets and index.max_result_window
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultRemoteReadClusters = ""
// default separator for Elasticsearch index date layout.
defaultIndexDateSeparator = "-"
)
Expand Down Expand Up @@ -102,6 +104,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
CreateIndexTemplates: true,
Version: 0,
Servers: []string{defaultServerURL},
RemoteReadClusters: []string{},
MaxDocCount: defaultMaxDocCount,
LogLevel: "error",
}
Expand Down Expand Up @@ -162,6 +165,11 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixServerURLs,
defaultServerURL,
"The comma-separated list of Elasticsearch servers, must be full url i.e. http://localhost:9200")
flagSet.String(
nsConfig.namespace+suffixRemoteReadClusters,
defaultRemoteReadClusters,
"Comma-separated list of Elasticsearch remote cluster names for cross-cluster querying."+
"See Elasticsearch remote clusters and cross-cluster query api.")
flagSet.Duration(
nsConfig.namespace+suffixTimeout,
nsConfig.Timeout,
Expand Down Expand Up @@ -304,6 +312,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
// TODO: Need to figure out a better way for do this.
cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey)
cfg.TLS = cfg.getTLSFlagsConfig().InitFromViper(v)

remoteReadClusters := stripWhiteSpace(v.GetString(cfg.namespace + suffixRemoteReadClusters))
if len(remoteReadClusters) > 0 {
cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",")
}
}

// GetPrimary returns primary configuration.
Expand Down
16 changes: 16 additions & 0 deletions plugin/storage/es/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestOptions(t *testing.T) {
assert.Empty(t, primary.Username)
assert.Empty(t, primary.Password)
assert.NotEmpty(t, primary.Servers)
assert.Empty(t, primary.RemoteReadClusters)
assert.Equal(t, int64(5), primary.NumShards)
assert.Equal(t, int64(1), primary.NumReplicas)
assert.Equal(t, 72*time.Hour, primary.MaxSpanAge)
Expand Down Expand Up @@ -58,6 +59,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--es.num-replicas=10",
"--es.index-date-separator=",
// a couple overrides
"--es.remote-read-clusters=cluster_one,cluster_two",
"--es.aux.server-urls=3.3.3.3, 4.4.4.4",
"--es.aux.max-span-age=24h",
"--es.aux.num-replicas=10",
Expand All @@ -77,6 +79,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "hello", primary.Username)
assert.Equal(t, "/foo/bar", primary.TokenFilePath)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
assert.Equal(t, []string{"cluster_one", "cluster_two"}, primary.RemoteReadClusters)
assert.Equal(t, 48*time.Hour, primary.MaxSpanAge)
assert.True(t, primary.Sniffer)
assert.True(t, primary.SnifferTLSEnabled)
Expand All @@ -103,6 +106,19 @@ func TestOptionsWithFlags(t *testing.T) {
assert.True(t, primary.UseILM)
}

func TestEmptyRemoteReadClusters(t *testing.T) {
opts := NewOptions("es", "es.aux")
v, command := config.Viperize(opts.AddFlags)
err := command.ParseFlags([]string{
"--es.remote-read-clusters=",
})
require.NoError(t, err)
opts.InitFromViper(v)

primary := opts.GetPrimary()
assert.Equal(t, []string{}, primary.RemoteReadClusters)
}

func TestMaxSpanAgeSetErrorInArchiveMode(t *testing.T) {
opts := NewOptions("es", archiveNamespace)
_, command := config.Viperize(opts.AddFlags)
Expand Down
37 changes: 29 additions & 8 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type SpanReaderParams struct {
TagDotReplacement string
Archive bool
UseReadWriteAliases bool
RemoteReadClusters []string
}

// NewSpanReader returns a new SpanReader with a metrics.
Expand All @@ -129,7 +130,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex),
indexDateLayout: p.IndexDateLayout,
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases),
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters),
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
maxDocCount: p.MaxDocCount,
}
Expand All @@ -139,24 +140,44 @@ type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime t

type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource

func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn {
func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn {
if archive {
var archiveSuffix string
if useReadWriteAliases {
archiveSuffix = archiveReadIndexSuffix
} else {
archiveSuffix = archiveIndexSuffix
}
return func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{archiveIndex(indexName, archiveSuffix)}
}
return addRemoteReadClusters(func(indexPrefix, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{archiveIndex(indexPrefix, archiveSuffix)}
}, remoteReadClusters)
}
if useReadWriteAliases {
return func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{indices + "read"}
return addRemoteReadClusters(func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{indexPrefix + "read"}
}, remoteReadClusters)
}
return addRemoteReadClusters(timeRangeIndices, remoteReadClusters)
}

// Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices.
// Elasticsearch cross cluster api example GET /twitter,cluster_one:twitter,cluster_two:twitter/_search.
func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) timeRangeIndexFn {
return func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
jaegerIndices := fn(indexPrefix, indexDateLayout, startTime, endTime)
if len(remoteReadClusters) == 0 {
return jaegerIndices
}

for _, jaegerIndex := range jaegerIndices {
for _, remoteCluster := range remoteReadClusters {
remoteIndex := remoteCluster + ":" + jaegerIndex
jaegerIndices = append(jaegerIndices, remoteIndex)
}
}

return jaegerIndices
}
return timeRangeIndices
}

func getSourceFn(archive bool, maxDocCount int) sourceFn {
Expand Down
44 changes: 34 additions & 10 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,35 +149,59 @@ func TestSpanReaderIndices(t *testing.T) {
date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC)
dateFormat := date.UTC().Format("2006-01-02")
testCases := []struct {
index string
params SpanReaderParams
indices []string
params SpanReaderParams
}{
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false},
index: spanIndex + dateFormat},
indices: []string{spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", UseReadWriteAliases: true},
index: spanIndex + "read"},
indices: []string{spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: false},
index: "foo:" + indexPrefixSeparator + spanIndex + dateFormat},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", UseReadWriteAliases: true},
index: "foo:-" + spanIndex + "read"},
indices: []string{"foo:-" + spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true},
index: spanIndex + archiveIndexSuffix},
indices: []string{spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true},
index: "foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true},
index: "foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{
spanIndex + dateFormat,
"cluster_one:" + spanIndex + dateFormat,
"cluster_two:" + spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{
spanIndex + archiveIndexSuffix,
"cluster_one:" + spanIndex + archiveIndexSuffix,
"cluster_two:" + spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{
spanIndex + "read",
"cluster_one:" + spanIndex + "read",
"cluster_two:" + spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{
spanIndex + archiveReadIndexSuffix,
"cluster_one:" + spanIndex + archiveReadIndexSuffix,
"cluster_two:" + spanIndex + archiveReadIndexSuffix}},
}
for _, testCase := range testCases {
r := NewSpanReader(testCase.params)
actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date)
assert.Equal(t, []string{testCase.index}, actual)
assert.Equal(t, testCase.indices, actual)
}
}

Expand Down

0 comments on commit 915d3af

Please sign in to comment.