Skip to content

Commit

Permalink
[storage] Remove dependency on archive flag in ES reader (#6490)
Browse files Browse the repository at this point in the history
  • Loading branch information
mahadzaryab1 authored Jan 14, 2025
1 parent fcc8936 commit c678a64
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 155 deletions.
30 changes: 20 additions & 10 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (f *Factory) getArchiveClient() es.Client {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.logger, f.tracer)
sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, f.logger, f.tracer, "", f.primaryConfig.UseReadWriteAliases)
if err != nil {
return nil, err
}
Expand All @@ -191,7 +191,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, f.metricsFactory, f.logger, "", f.primaryConfig.UseReadWriteAliases)
}

// CreateDependencyReader implements storage.Factory
Expand All @@ -204,7 +204,11 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.logger, f.tracer)
readAliasSuffix := "archive"
if f.archiveConfig.UseReadWriteAliases {
readAliasSuffix += "-read"
}
sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, readAliasSuffix, true)
if err != nil {
return nil, err
}
Expand All @@ -223,22 +227,27 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
writeAliasSuffix := "archive"
if f.archiveConfig.UseReadWriteAliases {
writeAliasSuffix += "-write"
}
archiveMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, archiveMetricsFactory, f.logger)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, archiveMetricsFactory, f.logger, writeAliasSuffix, true)
}

func createSpanReader(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
logger *zap.Logger,
tp trace.TracerProvider,
readAliasSuffix string,
useReadWriteAliases bool,
) (spanstore.Reader, error) {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, errors.New("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
Expand All @@ -251,8 +260,8 @@ func createSpanReader(
SpanIndex: cfg.Indices.Spans,
ServiceIndex: cfg.Indices.Services,
TagDotReplacement: cfg.Tags.DotReplacement,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Archive: archive,
UseReadWriteAliases: useReadWriteAliases,
ReadAliasSuffix: readAliasSuffix,
RemoteReadClusters: cfg.RemoteReadClusters,
Logger: logger,
Tracer: tp.Tracer("esSpanStore.SpanReader"),
Expand All @@ -262,9 +271,10 @@ func createSpanReader(
func createSpanWriter(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
writeAliasSuffix string,
useReadWriteAliases bool,
) (spanstore.Writer, error) {
var tags []string
var err error
Expand All @@ -284,8 +294,8 @@ func createSpanWriter(
AllTagsAsFields: cfg.Tags.AllAsFields,
TagKeysAsFields: tags,
TagDotReplacement: cfg.Tags.DotReplacement,
Archive: archive,
UseReadWriteAliases: cfg.UseReadWriteAliases,
UseReadWriteAliases: useReadWriteAliases,
WriteAliasSuffix: writeAliasSuffix,
Logger: logger,
MetricsFactory: mFactory,
ServiceCacheTTL: cfg.ServiceCacheTTL,
Expand Down
41 changes: 28 additions & 13 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -243,19 +244,33 @@ func TestArchiveDisabled(t *testing.T) {
}

func TestArchiveEnabled(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{}
f.archiveConfig = &escfg.Configuration{Enabled: true}
f.newClientFn = (&mockClientBuilder{}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer f.Close() // Ensure resources are cleaned up if initialization is successful
w, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.NotNil(t, w)
r, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.NotNil(t, r)
tests := []struct {
useReadWriteAliases bool
}{
{
useReadWriteAliases: false,
},
{
useReadWriteAliases: true,
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("useReadWriteAliases=%v", test.useReadWriteAliases), func(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{}
f.archiveConfig = &escfg.Configuration{Enabled: true, UseReadWriteAliases: test.useReadWriteAliases}
f.newClientFn = (&mockClientBuilder{}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer f.Close() // Ensure resources are cleaned up if initialization is successful
w, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.NotNil(t, w)
r, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.NotNil(t, r)
})
}
}

func TestConfigureFromOptions(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions plugin/storage/es/spanstore/index_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,3 @@ func indexWithDate(indexPrefix, indexDateLayout string, date time.Time) string {
spanDate := date.UTC().Format(indexDateLayout)
return indexPrefix + spanDate
}

// returns archive index name
func archiveIndex(indexPrefix, archiveSuffix string) string {
return indexPrefix + archiveSuffix
}
64 changes: 28 additions & 36 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ import (
)

const (
spanIndexBaseName = "jaeger-span-"
serviceIndexBaseName = "jaeger-service-"
archiveIndexSuffix = "archive"
archiveReadIndexSuffix = archiveIndexSuffix + "-read"
archiveWriteIndexSuffix = archiveIndexSuffix + "-write"
traceIDAggregation = "traceIDs"
indexPrefixSeparator = "-"
spanIndexBaseName = "jaeger-span-"
serviceIndexBaseName = "jaeger-service-"
traceIDAggregation = "traceIDs"
indexPrefixSeparator = "-"

traceIDField = "traceID"
durationField = "duration"
Expand All @@ -51,7 +48,7 @@ const (

defaultNumTraces = 100

rolloverMaxSpanAge = time.Hour * 24 * 365 * 50
dawnOfTimeSpanAge = time.Hour * 24 * 365 * 50
)

var (
Expand Down Expand Up @@ -109,7 +106,7 @@ type SpanReaderParams struct {
SpanIndex cfg.IndexOptions
ServiceIndex cfg.IndexOptions
TagDotReplacement string
Archive bool
ReadAliasSuffix string
UseReadWriteAliases bool
RemoteReadClusters []string
Logger *zap.Logger
Expand All @@ -119,10 +116,16 @@ type SpanReaderParams struct {
// NewSpanReader returns a new SpanReader with a metrics.
func NewSpanReader(p SpanReaderParams) *SpanReader {
maxSpanAge := p.MaxSpanAge
readAliasSuffix := ""
// Setting the maxSpanAge to a large duration will ensure all spans in the "read" alias are accessible by queries (query window = [now - maxSpanAge, now]).
// When read/write aliases are enabled, which are required for index rollovers, only the "read" alias is queried and therefore should not affect performance.
if p.UseReadWriteAliases {
maxSpanAge = rolloverMaxSpanAge
maxSpanAge = dawnOfTimeSpanAge
if p.ReadAliasSuffix != "" {
readAliasSuffix = p.ReadAliasSuffix
} else {
readAliasSuffix = "read"
}
}

return &SpanReader{
Expand All @@ -136,9 +139,12 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
timeRangeIndices: getLoggingTimeRangeIndexFn(
p.Logger,
getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters),
addRemoteReadClusters(
getTimeRangeIndexFn(p.UseReadWriteAliases, readAliasSuffix),
p.RemoteReadClusters,
),
),
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
sourceFn: getSourceFn(p.MaxDocCount),
maxDocCount: p.MaxDocCount,
useReadWriteAliases: p.UseReadWriteAliases,
logger: p.Logger,
Expand All @@ -161,24 +167,13 @@ func getLoggingTimeRangeIndexFn(logger *zap.Logger, fn timeRangeIndexFn) timeRan
}
}

func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn {
if archive {
var archiveSuffix string
if useReadWriteAliases {
archiveSuffix = archiveReadIndexSuffix
} else {
archiveSuffix = archiveIndexSuffix
}
return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {
return []string{archiveIndex(indexPrefix, archiveSuffix)}
}, remoteReadClusters)
}
func getTimeRangeIndexFn(useReadWriteAliases bool, readAlias string) timeRangeIndexFn {
if useReadWriteAliases {
return addRemoteReadClusters(func(indexPrefix string, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {
return []string{indexPrefix + "read"}
}, remoteReadClusters)
return func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {
return []string{indexPrefix + readAlias}
}
}
return addRemoteReadClusters(timeRangeIndices, remoteReadClusters)
return timeRangeIndices
}

// Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices.
Expand All @@ -201,16 +196,13 @@ func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) tim
}
}

func getSourceFn(archive bool, maxDocCount int) sourceFn {
func getSourceFn(maxDocCount int) sourceFn {
return func(query elastic.Query, nextTime uint64) *elastic.SearchSource {
s := elastic.NewSearchSource().
return elastic.NewSearchSource().
Query(query).
Size(maxDocCount)
if !archive {
s.Sort("startTime", true).
SearchAfter(nextTime)
}
return s
Size(maxDocCount).
Sort("startTime", true).
SearchAfter(nextTime)
}
}

Expand Down
Loading

0 comments on commit c678a64

Please sign in to comment.