From 9d1d999986d624d109bb0d2331b4c02e8fb508c2 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 7 Jan 2025 19:39:04 -0500 Subject: [PATCH 01/27] Use ReadWriteAliases Instead of Archive Flag Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 7 +++---- plugin/storage/es/spanstore/reader.go | 22 +++++----------------- 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index ee00688535e..f64b027fed2 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -196,7 +196,7 @@ func (f *Factory) getArchiveClient() es.Client { // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.logger, f.tracer) + sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, f.logger, f.tracer) if err != nil { return nil, err } @@ -218,7 +218,8 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if !f.archiveConfig.Enabled { return nil, nil } - sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.logger, f.tracer) + // TODO: should use_aliases be always set to true here? + sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer) if err != nil { return nil, err } @@ -236,7 +237,6 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { func createSpanReader( clientFn func() es.Client, cfg *config.Configuration, - archive bool, logger *zap.Logger, tp trace.TracerProvider, ) (spanstore.Reader, error) { @@ -252,7 +252,6 @@ func createSpanReader( ServiceIndex: cfg.Indices.Services, TagDotReplacement: cfg.Tags.DotReplacement, UseReadWriteAliases: cfg.UseReadWriteAliases, - Archive: archive, RemoteReadClusters: cfg.RemoteReadClusters, Logger: logger, Tracer: tp.Tracer("esSpanStore.SpanReader"), diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index d6876148faf..6d3ce1a0c39 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -109,7 +109,6 @@ type SpanReaderParams struct { SpanIndex cfg.IndexOptions ServiceIndex cfg.IndexOptions TagDotReplacement string - Archive bool UseReadWriteAliases bool RemoteReadClusters []string Logger *zap.Logger @@ -136,9 +135,9 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getLoggingTimeRangeIndexFn( p.Logger, - getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), + getTimeRangeIndexFn(p.UseReadWriteAliases, p.RemoteReadClusters), ), - sourceFn: getSourceFn(p.Archive, p.MaxDocCount), + sourceFn: getSourceFn(p.UseReadWriteAliases, p.MaxDocCount), maxDocCount: p.MaxDocCount, useReadWriteAliases: p.UseReadWriteAliases, logger: p.Logger, @@ -161,18 +160,7 @@ func getLoggingTimeRangeIndexFn(logger *zap.Logger, fn timeRangeIndexFn) timeRan } } -func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn { - if archive { - var archiveSuffix string - if useReadWriteAliases { - archiveSuffix = archiveReadIndexSuffix - } else { - archiveSuffix = archiveIndexSuffix - } - return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { - return []string{archiveIndex(indexPrefix, archiveSuffix)} - }, remoteReadClusters) - } +func getTimeRangeIndexFn(useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn { if useReadWriteAliases { return addRemoteReadClusters(func(indexPrefix string, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { return []string{indexPrefix + "read"} @@ -201,12 +189,12 @@ func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) tim } } -func getSourceFn(archive bool, maxDocCount int) sourceFn { +func getSourceFn(useReadWriteAliases bool, maxDocCount int) sourceFn { return func(query elastic.Query, nextTime uint64) *elastic.SearchSource { s := elastic.NewSearchSource(). Query(query). Size(maxDocCount) - if !archive { + if !useReadWriteAliases { s.Sort("startTime", true). SearchAfter(nextTime) } From 4f5cad5d1e08cbbe4c43840ef256aa0d40b401c7 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 7 Jan 2025 19:49:00 -0500 Subject: [PATCH 02/27] Remove Archive Flag From Writer Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 6 ++---- plugin/storage/es/spanstore/index_utils.go | 5 ----- plugin/storage/es/spanstore/reader.go | 11 ++++------- plugin/storage/es/spanstore/writer.go | 9 --------- 4 files changed, 6 insertions(+), 25 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index f64b027fed2..852dfa3453d 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -205,7 +205,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.primaryMetricsFactory, f.logger) + return createSpanWriter(f.getPrimaryClient, f.primaryConfig, f.primaryMetricsFactory, f.logger) } // CreateDependencyReader implements storage.Factory @@ -231,7 +231,7 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if !f.archiveConfig.Enabled { return nil, nil } - return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.archiveMetricsFactory, f.logger) + return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger) } func createSpanReader( @@ -261,7 +261,6 @@ func createSpanReader( func createSpanWriter( clientFn func() es.Client, cfg *config.Configuration, - archive bool, mFactory metrics.Factory, logger *zap.Logger, ) (spanstore.Writer, error) { @@ -283,7 +282,6 @@ func createSpanWriter( AllTagsAsFields: cfg.Tags.AllAsFields, TagKeysAsFields: tags, TagDotReplacement: cfg.Tags.DotReplacement, - Archive: archive, UseReadWriteAliases: cfg.UseReadWriteAliases, Logger: logger, MetricsFactory: mFactory, diff --git a/plugin/storage/es/spanstore/index_utils.go b/plugin/storage/es/spanstore/index_utils.go index 685df4b6e98..40a3e771545 100644 --- a/plugin/storage/es/spanstore/index_utils.go +++ b/plugin/storage/es/spanstore/index_utils.go @@ -12,8 +12,3 @@ func indexWithDate(indexPrefix, indexDateLayout string, date time.Time) string { spanDate := date.UTC().Format(indexDateLayout) return indexPrefix + spanDate } - -// returns archive index name -func archiveIndex(indexPrefix, archiveSuffix string) string { - return indexPrefix + archiveSuffix -} diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 6d3ce1a0c39..8836b20c902 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -27,13 +27,10 @@ import ( ) const ( - spanIndexBaseName = "jaeger-span-" - serviceIndexBaseName = "jaeger-service-" - archiveIndexSuffix = "archive" - archiveReadIndexSuffix = archiveIndexSuffix + "-read" - archiveWriteIndexSuffix = archiveIndexSuffix + "-write" - traceIDAggregation = "traceIDs" - indexPrefixSeparator = "-" + spanIndexBaseName = "jaeger-span-" + serviceIndexBaseName = "jaeger-service-" + traceIDAggregation = "traceIDs" + indexPrefixSeparator = "-" traceIDField = "traceID" durationField = "duration" diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 465279cab25..5518dcde428 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -55,7 +55,6 @@ type SpanWriterParams struct { AllTagsAsFields bool TagKeysAsFields []string TagDotReplacement string - Archive bool UseReadWriteAliases bool ServiceCacheTTL time.Duration } @@ -101,14 +100,6 @@ type spanAndServiceIndexFn func(spanTime time.Time) (string, string) func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn { spanIndexPrefix := p.IndexPrefix.Apply(spanIndexBaseName) serviceIndexPrefix := p.IndexPrefix.Apply(serviceIndexBaseName) - if p.Archive { - return func(_ time.Time) (string, string) { - if p.UseReadWriteAliases { - return archiveIndex(spanIndexPrefix, archiveWriteIndexSuffix), "" - } - return archiveIndex(spanIndexPrefix, archiveIndexSuffix), "" - } - } if p.UseReadWriteAliases { return func(_ /* spanTime */ time.Time) (string, string) { From af2e9ecc03cebe123994ca36c26cf39c2ef7c5bd Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 7 Jan 2025 22:52:35 -0500 Subject: [PATCH 03/27] Introduce New Arguments To Remove IsArchive Flag Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/reader.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 8836b20c902..84ede179061 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -103,6 +103,7 @@ type SpanReaderParams struct { MaxSpanAge time.Duration MaxDocCount int IndexPrefix cfg.IndexPrefix + IndexSuffix string SpanIndex cfg.IndexOptions ServiceIndex cfg.IndexOptions TagDotReplacement string @@ -132,7 +133,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getLoggingTimeRangeIndexFn( p.Logger, - getTimeRangeIndexFn(p.UseReadWriteAliases, p.RemoteReadClusters), + getTimeRangeIndexFn(p.IndexSuffix, p.UseReadWriteAliases, p.RemoteReadClusters), ), sourceFn: getSourceFn(p.UseReadWriteAliases, p.MaxDocCount), maxDocCount: p.MaxDocCount, @@ -157,7 +158,16 @@ func getLoggingTimeRangeIndexFn(logger *zap.Logger, fn timeRangeIndexFn) timeRan } } -func getTimeRangeIndexFn(useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn { +func getTimeRangeIndexFn(indexSuffix string, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn { + if indexSuffix != "" { + suffix := indexSuffix + if useReadWriteAliases { + suffix += "-read" + } + return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { + return []string{indexPrefix + suffix} + }, remoteReadClusters) + } if useReadWriteAliases { return addRemoteReadClusters(func(indexPrefix string, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { return []string{indexPrefix + "read"} From 904d3fbcb8eef38b59b7a1d77346cad2cf5f7b60 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 7 Jan 2025 22:52:51 -0500 Subject: [PATCH 04/27] Introduce New Arguments To Remove IsArchive Flag In Writer Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/writer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 5518dcde428..a46fde3975d 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -52,6 +52,7 @@ type SpanWriterParams struct { SpanIndex cfg.IndexOptions ServiceIndex cfg.IndexOptions IndexPrefix cfg.IndexPrefix + IndexSuffix string AllTagsAsFields bool TagKeysAsFields []string TagDotReplacement string @@ -100,6 +101,14 @@ type spanAndServiceIndexFn func(spanTime time.Time) (string, string) func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn { spanIndexPrefix := p.IndexPrefix.Apply(spanIndexBaseName) serviceIndexPrefix := p.IndexPrefix.Apply(serviceIndexBaseName) + if p.IndexSuffix != "" { + return func(_ time.Time) (string, string) { + if p.UseReadWriteAliases { + return spanIndexPrefix + p.IndexSuffix + "-write", "" + } + return spanIndexPrefix + p.IndexSuffix, "" + } + } if p.UseReadWriteAliases { return func(_ /* spanTime */ time.Time) (string, string) { From 6bc1ebfa465c12ed9941ade1a8a58ba4e9c6df98 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 7 Jan 2025 22:53:08 -0500 Subject: [PATCH 05/27] Fix Tests Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/reader_test.go | 47 ++++++++++------------ plugin/storage/es/spanstore/writer_test.go | 16 ++++---- 2 files changed, 30 insertions(+), 33 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 617d11194bf..3179be75d67 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -134,7 +134,7 @@ func withArchiveSpanReader(t *testing.T, readAlias bool, fn func(r *spanReaderTe Tracer: tracer.Tracer("test"), MaxSpanAge: 0, TagDotReplacement: "@", - Archive: true, + IndexSuffix: "archive", UseReadWriteAliases: readAlias, }), } @@ -199,7 +199,6 @@ func TestSpanReaderIndices(t *testing.T) { }{ { params: SpanReaderParams{ - Archive: false, SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, }, @@ -213,7 +212,6 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Archive: false, SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", @@ -228,27 +226,26 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Archive: true, + IndexSuffix: "archive", }, - indices: []string{spanIndexBaseName + archiveIndexSuffix, serviceIndexBaseName + archiveIndexSuffix}, + indices: []string{spanIndexBaseName + "archive", serviceIndexBaseName + "archive"}, }, { params: SpanReaderParams{ - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", IndexSuffix: "archive", }, - indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveIndexSuffix, "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + archiveIndexSuffix}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive", "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + "archive"}, }, { params: SpanReaderParams{ - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", IndexSuffix: "archive", UseReadWriteAliases: true, }, - indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveReadIndexSuffix, "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + archiveReadIndexSuffix}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive-read", "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + "archive-read"}, }, { params: SpanReaderParams{ SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, - Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -262,20 +259,20 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + IndexSuffix: "archive", RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndexBaseName + archiveIndexSuffix, - "cluster_one:" + spanIndexBaseName + archiveIndexSuffix, - "cluster_two:" + spanIndexBaseName + archiveIndexSuffix, - serviceIndexBaseName + archiveIndexSuffix, - "cluster_one:" + serviceIndexBaseName + archiveIndexSuffix, - "cluster_two:" + serviceIndexBaseName + archiveIndexSuffix, + spanIndexBaseName + "archive", + "cluster_one:" + spanIndexBaseName + "archive", + "cluster_two:" + spanIndexBaseName + "archive", + serviceIndexBaseName + "archive", + "cluster_one:" + serviceIndexBaseName + "archive", + "cluster_two:" + serviceIndexBaseName + "archive", }, }, { params: SpanReaderParams{ - Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ spanIndexBaseName + "read", @@ -288,15 +285,15 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + IndexSuffix: "archive", UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ - spanIndexBaseName + archiveReadIndexSuffix, - "cluster_one:" + spanIndexBaseName + archiveReadIndexSuffix, - "cluster_two:" + spanIndexBaseName + archiveReadIndexSuffix, - serviceIndexBaseName + archiveReadIndexSuffix, - "cluster_one:" + serviceIndexBaseName + archiveReadIndexSuffix, - "cluster_two:" + serviceIndexBaseName + archiveReadIndexSuffix, + spanIndexBaseName + "archive-read", + "cluster_one:" + spanIndexBaseName + "archive-read", + "cluster_two:" + spanIndexBaseName + "archive-read", + serviceIndexBaseName + "archive-read", + "cluster_one:" + serviceIndexBaseName + "archive-read", + "cluster_two:" + serviceIndexBaseName + "archive-read", }, }, } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 6c4c4b388fd..3f2385b1506 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -74,7 +74,7 @@ func TestSpanWriterIndices(t *testing.T) { { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, Archive: false, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, }, indices: []string{spanIndexBaseName + spanDataLayoutFormat, serviceIndexBaseName + serviceDataLayoutFormat}, }, @@ -88,7 +88,7 @@ func TestSpanWriterIndices(t *testing.T) { { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: false, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", }, indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + spanDataLayoutFormat, "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + serviceDataLayoutFormat}, }, @@ -102,23 +102,23 @@ func TestSpanWriterIndices(t *testing.T) { { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, Archive: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexSuffix: "archive", }, - indices: []string{spanIndexBaseName + archiveIndexSuffix, ""}, + indices: []string{spanIndexBaseName + "archive", ""}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", IndexSuffix: "archive", }, - indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveIndexSuffix, ""}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive", ""}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", IndexSuffix: "archive", UseReadWriteAliases: true, }, - indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + archiveWriteIndexSuffix, ""}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive-write", ""}, }, } for _, testCase := range testCases { From 0ae679d271fc1fbdf8202a4377d006592ce0173a Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 7 Jan 2025 22:53:23 -0500 Subject: [PATCH 06/27] Update Callsites Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 852dfa3453d..ee4a356cea1 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -196,7 +196,7 @@ func (f *Factory) getArchiveClient() es.Client { // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, f.logger, f.tracer) + sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, f.logger, f.tracer, "", f.primaryConfig.UseReadWriteAliases) if err != nil { return nil, err } @@ -218,8 +218,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if !f.archiveConfig.Enabled { return nil, nil } - // TODO: should use_aliases be always set to true here? - sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer) + sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, "archive", true) if err != nil { return nil, err } @@ -239,6 +238,8 @@ func createSpanReader( cfg *config.Configuration, logger *zap.Logger, tp trace.TracerProvider, + indexSuffix string, + useReadWriteAliases bool, ) (spanstore.Reader, error) { if cfg.UseILM && !cfg.UseReadWriteAliases { return nil, errors.New("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") @@ -251,7 +252,8 @@ func createSpanReader( SpanIndex: cfg.Indices.Spans, ServiceIndex: cfg.Indices.Services, TagDotReplacement: cfg.Tags.DotReplacement, - UseReadWriteAliases: cfg.UseReadWriteAliases, + UseReadWriteAliases: useReadWriteAliases, + IndexSuffix: indexSuffix, RemoteReadClusters: cfg.RemoteReadClusters, Logger: logger, Tracer: tp.Tracer("esSpanStore.SpanReader"), From c0ea9a2efb5d195be99e59e9cc2958db23d51643 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 7 Jan 2025 23:13:57 -0500 Subject: [PATCH 07/27] Propagate Arguments To Archive Constructor Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index ee4a356cea1..808c6112864 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -205,7 +205,7 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.getPrimaryClient, f.primaryConfig, f.primaryMetricsFactory, f.logger) + return createSpanWriter(f.getPrimaryClient, f.primaryConfig, f.primaryMetricsFactory, f.logger, "", f.primaryConfig.UseReadWriteAliases) } // CreateDependencyReader implements storage.Factory @@ -230,7 +230,7 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if !f.archiveConfig.Enabled { return nil, nil } - return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger) + return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger, "archive", true) } func createSpanReader( @@ -265,6 +265,8 @@ func createSpanWriter( cfg *config.Configuration, mFactory metrics.Factory, logger *zap.Logger, + indexSuffix string, + useReadWriteAliases bool, ) (spanstore.Writer, error) { var tags []string var err error @@ -279,12 +281,13 @@ func createSpanWriter( writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ Client: clientFn, IndexPrefix: cfg.Indices.IndexPrefix, + IndexSuffix: indexSuffix, SpanIndex: cfg.Indices.Spans, ServiceIndex: cfg.Indices.Services, AllTagsAsFields: cfg.Tags.AllAsFields, TagKeysAsFields: tags, TagDotReplacement: cfg.Tags.DotReplacement, - UseReadWriteAliases: cfg.UseReadWriteAliases, + UseReadWriteAliases: useReadWriteAliases, Logger: logger, MetricsFactory: mFactory, ServiceCacheTTL: cfg.ServiceCacheTTL, From 5659221c7d97d23301934f8ab809d04685652c9a Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Wed, 8 Jan 2025 18:11:11 -0500 Subject: [PATCH 08/27] Handle Archive And Use Aliases Both Set Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 12 ++++++++++-- plugin/storage/es/spanstore/reader.go | 9 ++------- plugin/storage/es/spanstore/writer.go | 7 +------ 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 808c6112864..4022b64ffa9 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -218,7 +218,11 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if !f.archiveConfig.Enabled { return nil, nil } - sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, "archive", true) + suffix := "archive" + if f.archiveConfig.UseReadWriteAliases { + suffix += "-read" + } + sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, suffix, true) if err != nil { return nil, err } @@ -230,7 +234,11 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if !f.archiveConfig.Enabled { return nil, nil } - return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger, "archive", true) + suffix := "archive" + if f.archiveConfig.UseReadWriteAliases { + suffix += "-write" + } + return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger, suffix, true) } func createSpanReader( diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 84ede179061..9236bf706fd 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -160,15 +160,10 @@ func getLoggingTimeRangeIndexFn(logger *zap.Logger, fn timeRangeIndexFn) timeRan func getTimeRangeIndexFn(indexSuffix string, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn { if indexSuffix != "" { - suffix := indexSuffix - if useReadWriteAliases { - suffix += "-read" - } return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { - return []string{indexPrefix + suffix} + return []string{indexPrefix + indexSuffix} }, remoteReadClusters) - } - if useReadWriteAliases { + } else if useReadWriteAliases { return addRemoteReadClusters(func(indexPrefix string, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { return []string{indexPrefix + "read"} }, remoteReadClusters) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index a46fde3975d..ad9caa898f2 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -103,14 +103,9 @@ func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn { serviceIndexPrefix := p.IndexPrefix.Apply(serviceIndexBaseName) if p.IndexSuffix != "" { return func(_ time.Time) (string, string) { - if p.UseReadWriteAliases { - return spanIndexPrefix + p.IndexSuffix + "-write", "" - } return spanIndexPrefix + p.IndexSuffix, "" } - } - - if p.UseReadWriteAliases { + } else if p.UseReadWriteAliases { return func(_ /* spanTime */ time.Time) (string, string) { return spanIndexPrefix + "write", serviceIndexPrefix + "write" } From 52f63fd08725125fb10fbbff72a00be93d515f6d Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 9 Jan 2025 18:19:22 -0500 Subject: [PATCH 09/27] Fix Branching In Reader And Change Variable Name Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 10 ++-- plugin/storage/es/spanstore/reader.go | 17 +++---- plugin/storage/es/spanstore/reader_test.go | 53 +++++----------------- 3 files changed, 26 insertions(+), 54 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 4022b64ffa9..83c875f8c41 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -234,11 +234,11 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if !f.archiveConfig.Enabled { return nil, nil } - suffix := "archive" + readAlias := "archive" if f.archiveConfig.UseReadWriteAliases { - suffix += "-write" + readAlias += "-write" } - return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger, suffix, true) + return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger, readAlias, true) } func createSpanReader( @@ -246,7 +246,7 @@ func createSpanReader( cfg *config.Configuration, logger *zap.Logger, tp trace.TracerProvider, - indexSuffix string, + readAlias string, useReadWriteAliases bool, ) (spanstore.Reader, error) { if cfg.UseILM && !cfg.UseReadWriteAliases { @@ -261,7 +261,7 @@ func createSpanReader( ServiceIndex: cfg.Indices.Services, TagDotReplacement: cfg.Tags.DotReplacement, UseReadWriteAliases: useReadWriteAliases, - IndexSuffix: indexSuffix, + ReadAlias: readAlias, RemoteReadClusters: cfg.RemoteReadClusters, Logger: logger, Tracer: tp.Tracer("esSpanStore.SpanReader"), diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 9236bf706fd..a57495aaa47 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -103,10 +103,10 @@ type SpanReaderParams struct { MaxSpanAge time.Duration MaxDocCount int IndexPrefix cfg.IndexPrefix - IndexSuffix string SpanIndex cfg.IndexOptions ServiceIndex cfg.IndexOptions TagDotReplacement string + ReadAlias string UseReadWriteAliases bool RemoteReadClusters []string Logger *zap.Logger @@ -133,7 +133,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getLoggingTimeRangeIndexFn( p.Logger, - getTimeRangeIndexFn(p.IndexSuffix, p.UseReadWriteAliases, p.RemoteReadClusters), + getTimeRangeIndexFn(p.UseReadWriteAliases, p.ReadAlias, p.RemoteReadClusters), ), sourceFn: getSourceFn(p.UseReadWriteAliases, p.MaxDocCount), maxDocCount: p.MaxDocCount, @@ -158,12 +158,13 @@ func getLoggingTimeRangeIndexFn(logger *zap.Logger, fn timeRangeIndexFn) timeRan } } -func getTimeRangeIndexFn(indexSuffix string, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn { - if indexSuffix != "" { - return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { - return []string{indexPrefix + indexSuffix} - }, remoteReadClusters) - } else if useReadWriteAliases { +func getTimeRangeIndexFn(useReadWriteAliases bool, readAlias string, remoteReadClusters []string) timeRangeIndexFn { + if useReadWriteAliases { + if readAlias != "" { + return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { + return []string{indexPrefix + readAlias} + }, remoteReadClusters) + } return addRemoteReadClusters(func(indexPrefix string, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { return []string{indexPrefix + "read"} }, remoteReadClusters) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 3179be75d67..b46ecf9b926 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -134,7 +134,7 @@ func withArchiveSpanReader(t *testing.T, readAlias bool, fn func(r *spanReaderTe Tracer: tracer.Tracer("test"), MaxSpanAge: 0, TagDotReplacement: "@", - IndexSuffix: "archive", + ReadAlias: "archive", UseReadWriteAliases: readAlias, }), } @@ -210,6 +210,12 @@ func TestSpanReaderIndices(t *testing.T) { }, indices: []string{spanIndexBaseName + "read", serviceIndexBaseName + "read"}, }, + { + params: SpanReaderParams{ + ReadAlias: "archive", // ignored because ReadWriteAliases is false + }, + indices: []string{spanIndexBaseName, serviceIndexBaseName}, + }, { params: SpanReaderParams{ SpanIndex: spanIndexOpts, @@ -226,22 +232,17 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - IndexSuffix: "archive", + ReadAlias: "archive", + UseReadWriteAliases: true, }, indices: []string{spanIndexBaseName + "archive", serviceIndexBaseName + "archive"}, }, { params: SpanReaderParams{ - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", IndexSuffix: "archive", + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", UseReadWriteAliases: true, ReadAlias: "archive", }, indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive", "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + "archive"}, }, - { - params: SpanReaderParams{ - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", IndexSuffix: "archive", UseReadWriteAliases: true, - }, - indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive-read", "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + "archive-read"}, - }, { params: SpanReaderParams{ SpanIndex: spanIndexOpts, @@ -259,7 +260,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - IndexSuffix: "archive", RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + UseReadWriteAliases: true, ReadAlias: "archive", RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ spanIndexBaseName + "archive", @@ -283,19 +284,6 @@ func TestSpanReaderIndices(t *testing.T) { "cluster_two:" + serviceIndexBaseName + "read", }, }, - { - params: SpanReaderParams{ - IndexSuffix: "archive", UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, - }, - indices: []string{ - spanIndexBaseName + "archive-read", - "cluster_one:" + spanIndexBaseName + "archive-read", - "cluster_two:" + spanIndexBaseName + "archive-read", - serviceIndexBaseName + "archive-read", - "cluster_one:" + serviceIndexBaseName + "archive-read", - "cluster_two:" + serviceIndexBaseName + "archive-read", - }, - }, } for _, testCase := range testCases { testCase.params.Client = clientFn @@ -1264,30 +1252,13 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { } func TestSpanReader_ArchiveTraces(t *testing.T) { - withArchiveSpanReader(t, false, func(r *spanReaderTest) { - mockSearchService(r). - Return(&elastic.SearchResult{}, nil) - mockArchiveMultiSearchService(r, "jaeger-span-archive"). - Return(&elastic.MultiSearchResult{ - Responses: []*elastic.SearchResult{}, - }, nil) - query := spanstore.GetTraceParameters{} - trace, err := r.reader.GetTrace(context.Background(), query) - require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") - require.Nil(t, trace) - require.EqualError(t, err, "trace not found") - }) -} - -func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { withArchiveSpanReader(t, true, func(r *spanReaderTest) { mockSearchService(r). Return(&elastic.SearchResult{}, nil) - mockArchiveMultiSearchService(r, "jaeger-span-archive-read"). + mockArchiveMultiSearchService(r, "jaeger-span-archive"). Return(&elastic.MultiSearchResult{ Responses: []*elastic.SearchResult{}, }, nil) - query := spanstore.GetTraceParameters{} trace, err := r.reader.GetTrace(context.Background(), query) require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") From c573a6698b88791d6d562cda5bddc2aa13b33b29 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 9 Jan 2025 18:27:56 -0500 Subject: [PATCH 10/27] Fix Branching In Writer And Change Variable Name Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 4 ++-- plugin/storage/es/spanstore/writer.go | 11 ++++++----- plugin/storage/es/spanstore/writer_test.go | 19 ++++++++++--------- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 83c875f8c41..56b8c6eb189 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -273,7 +273,7 @@ func createSpanWriter( cfg *config.Configuration, mFactory metrics.Factory, logger *zap.Logger, - indexSuffix string, + writeAlias string, useReadWriteAliases bool, ) (spanstore.Writer, error) { var tags []string @@ -289,13 +289,13 @@ func createSpanWriter( writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ Client: clientFn, IndexPrefix: cfg.Indices.IndexPrefix, - IndexSuffix: indexSuffix, SpanIndex: cfg.Indices.Spans, ServiceIndex: cfg.Indices.Services, AllTagsAsFields: cfg.Tags.AllAsFields, TagKeysAsFields: tags, TagDotReplacement: cfg.Tags.DotReplacement, UseReadWriteAliases: useReadWriteAliases, + WriteAlias: writeAlias, Logger: logger, MetricsFactory: mFactory, ServiceCacheTTL: cfg.ServiceCacheTTL, diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index ad9caa898f2..0202a8da391 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -52,11 +52,11 @@ type SpanWriterParams struct { SpanIndex cfg.IndexOptions ServiceIndex cfg.IndexOptions IndexPrefix cfg.IndexPrefix - IndexSuffix string AllTagsAsFields bool TagKeysAsFields []string TagDotReplacement string UseReadWriteAliases bool + WriteAlias string ServiceCacheTTL time.Duration } @@ -101,11 +101,12 @@ type spanAndServiceIndexFn func(spanTime time.Time) (string, string) func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn { spanIndexPrefix := p.IndexPrefix.Apply(spanIndexBaseName) serviceIndexPrefix := p.IndexPrefix.Apply(serviceIndexBaseName) - if p.IndexSuffix != "" { - return func(_ time.Time) (string, string) { - return spanIndexPrefix + p.IndexSuffix, "" + if p.UseReadWriteAliases { + if p.WriteAlias != "" { + return func(_ time.Time) (string, string) { + return spanIndexPrefix + p.WriteAlias, "" + } } - } else if p.UseReadWriteAliases { return func(_ /* spanTime */ time.Time) (string, string) { return spanIndexPrefix + "write", serviceIndexPrefix + "write" } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 3f2385b1506..e1965396534 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -85,6 +85,14 @@ func TestSpanWriterIndices(t *testing.T) { }, indices: []string{spanIndexBaseName + "write", serviceIndexBaseName + "write"}, }, + { + params: SpanWriterParams{ + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, + WriteAlias: "archive", // ignored because UseReadWriteAliases is false + }, + indices: []string{spanIndexBaseName + spanDataLayoutFormat, serviceIndexBaseName + serviceDataLayoutFormat}, + }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, @@ -102,24 +110,17 @@ func TestSpanWriterIndices(t *testing.T) { { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexSuffix: "archive", + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, WriteAlias: "archive", UseReadWriteAliases: true, }, indices: []string{spanIndexBaseName + "archive", ""}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", IndexSuffix: "archive", + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", WriteAlias: "archive", UseReadWriteAliases: true, }, indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive", ""}, }, - { - params: SpanWriterParams{ - Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", IndexSuffix: "archive", UseReadWriteAliases: true, - }, - indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive-write", ""}, - }, } for _, testCase := range testCases { w := NewSpanWriter(testCase.params) From 4976f7bbc6b51fc6851a850621aca04232c95ebd Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 9 Jan 2025 18:28:59 -0500 Subject: [PATCH 11/27] Fix Variable Naming Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 56b8c6eb189..8bfa7f79054 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -218,11 +218,11 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if !f.archiveConfig.Enabled { return nil, nil } - suffix := "archive" + readAlias := "archive" if f.archiveConfig.UseReadWriteAliases { - suffix += "-read" + readAlias += "-read" } - sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, suffix, true) + sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, readAlias, true) if err != nil { return nil, err } @@ -234,11 +234,11 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if !f.archiveConfig.Enabled { return nil, nil } - readAlias := "archive" + writeAlias := "archive" if f.archiveConfig.UseReadWriteAliases { - readAlias += "-write" + writeAlias += "-write" } - return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger, readAlias, true) + return createSpanWriter(f.getArchiveClient, f.archiveConfig, f.archiveMetricsFactory, f.logger, writeAlias, true) } func createSpanReader( From 77d47c06514d519300da5eaa6bf07fdea06bf134 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 9 Jan 2025 18:39:06 -0500 Subject: [PATCH 12/27] Move Check To Constructor Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/reader.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index a57495aaa47..6abb5cbf3f4 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -116,10 +116,16 @@ type SpanReaderParams struct { // NewSpanReader returns a new SpanReader with a metrics. func NewSpanReader(p SpanReaderParams) *SpanReader { maxSpanAge := p.MaxSpanAge + readAlias := "" // Setting the maxSpanAge to a large duration will ensure all spans in the "read" alias are accessible by queries (query window = [now - maxSpanAge, now]). // When read/write aliases are enabled, which are required for index rollovers, only the "read" alias is queried and therefore should not affect performance. if p.UseReadWriteAliases { maxSpanAge = rolloverMaxSpanAge + if p.ReadAlias != "" { + readAlias = p.ReadAlias + } else { + readAlias = "read" + } } return &SpanReader{ @@ -133,7 +139,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getLoggingTimeRangeIndexFn( p.Logger, - getTimeRangeIndexFn(p.UseReadWriteAliases, p.ReadAlias, p.RemoteReadClusters), + getTimeRangeIndexFn(readAlias, p.RemoteReadClusters), ), sourceFn: getSourceFn(p.UseReadWriteAliases, p.MaxDocCount), maxDocCount: p.MaxDocCount, @@ -158,15 +164,10 @@ func getLoggingTimeRangeIndexFn(logger *zap.Logger, fn timeRangeIndexFn) timeRan } } -func getTimeRangeIndexFn(useReadWriteAliases bool, readAlias string, remoteReadClusters []string) timeRangeIndexFn { - if useReadWriteAliases { - if readAlias != "" { - return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { - return []string{indexPrefix + readAlias} - }, remoteReadClusters) - } - return addRemoteReadClusters(func(indexPrefix string, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { - return []string{indexPrefix + "read"} +func getTimeRangeIndexFn(readAlias string, remoteReadClusters []string) timeRangeIndexFn { + if readAlias != "" { + return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { + return []string{indexPrefix + readAlias} }, remoteReadClusters) } return addRemoteReadClusters(timeRangeIndices, remoteReadClusters) From 6b97e7f8749d610c12921b80d388e56adb7c592c Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 9 Jan 2025 18:50:01 -0500 Subject: [PATCH 13/27] Move Check To Constructor Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/writer.go | 24 +++++++++++++--------- plugin/storage/es/spanstore/writer_test.go | 4 ++-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 0202a8da391..40455c2ea15 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -67,6 +67,15 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { serviceCacheTTL = serviceCacheTTLDefault } + writeAlias := "" + if p.UseReadWriteAliases { + if p.WriteAlias != "" { + writeAlias = p.WriteAlias + } else { + writeAlias = "write" + } + } + serviceOperationStorage := NewServiceOperationStorage(p.Client, p.Logger, serviceCacheTTL) return &SpanWriter{ client: p.Client, @@ -76,7 +85,7 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { }, serviceWriter: serviceOperationStorage.Write, spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), - spanServiceIndex: getSpanAndServiceIndexFn(p), + spanServiceIndex: getSpanAndServiceIndexFn(p, writeAlias), } } @@ -98,17 +107,12 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string, index // spanAndServiceIndexFn returns names of span and service indices type spanAndServiceIndexFn func(spanTime time.Time) (string, string) -func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn { +func getSpanAndServiceIndexFn(p SpanWriterParams, writeAlias string) spanAndServiceIndexFn { spanIndexPrefix := p.IndexPrefix.Apply(spanIndexBaseName) serviceIndexPrefix := p.IndexPrefix.Apply(serviceIndexBaseName) - if p.UseReadWriteAliases { - if p.WriteAlias != "" { - return func(_ time.Time) (string, string) { - return spanIndexPrefix + p.WriteAlias, "" - } - } - return func(_ /* spanTime */ time.Time) (string, string) { - return spanIndexPrefix + "write", serviceIndexPrefix + "write" + if writeAlias != "" { + return func(_ time.Time) (string, string) { + return spanIndexPrefix + writeAlias, serviceIndexPrefix + writeAlias } } return func(date time.Time) (string, string) { diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index e1965396534..c255ac1cd86 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -112,14 +112,14 @@ func TestSpanWriterIndices(t *testing.T) { Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, WriteAlias: "archive", UseReadWriteAliases: true, }, - indices: []string{spanIndexBaseName + "archive", ""}, + indices: []string{spanIndexBaseName + "archive", serviceIndexBaseName + "archive"}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", WriteAlias: "archive", UseReadWriteAliases: true, }, - indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive", ""}, + indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive", "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + "archive"}, }, } for _, testCase := range testCases { From bdae29b1cd6a680ba211e68c6117cfebe41656fd Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 9 Jan 2025 19:01:16 -0500 Subject: [PATCH 14/27] Update Test For New Codepath Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 7a3412f8e56..3121748b7c6 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -245,7 +245,7 @@ func TestArchiveDisabled(t *testing.T) { func TestArchiveEnabled(t *testing.T) { f := NewFactory() f.primaryConfig = &escfg.Configuration{} - f.archiveConfig = &escfg.Configuration{Enabled: true} + f.archiveConfig = &escfg.Configuration{Enabled: true, UseReadWriteAliases: true} f.newClientFn = (&mockClientBuilder{}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) require.NoError(t, err) From 2bf4a61be428a6f369a148d63ed4b98dc4821b03 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 9 Jan 2025 22:08:31 -0500 Subject: [PATCH 15/27] Remove Indirection In getSourceFn Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/reader.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 6abb5cbf3f4..df2e3292413 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -195,14 +195,11 @@ func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) tim func getSourceFn(useReadWriteAliases bool, maxDocCount int) sourceFn { return func(query elastic.Query, nextTime uint64) *elastic.SearchSource { - s := elastic.NewSearchSource(). + return elastic.NewSearchSource(). Query(query). - Size(maxDocCount) - if !useReadWriteAliases { - s.Sort("startTime", true). - SearchAfter(nextTime) - } - return s + Size(maxDocCount). + Sort("startTime", true). + SearchAfter(nextTime) } } From 6cf08e8246e99ef38e5d2b87dd6686e7a3e97139 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Thu, 9 Jan 2025 22:19:54 -0500 Subject: [PATCH 16/27] Fix Linting Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/reader.go | 4 ++-- plugin/storage/es/spanstore/reader_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index df2e3292413..6bdc670c55d 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -141,7 +141,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { p.Logger, getTimeRangeIndexFn(readAlias, p.RemoteReadClusters), ), - sourceFn: getSourceFn(p.UseReadWriteAliases, p.MaxDocCount), + sourceFn: getSourceFn(p.MaxDocCount), maxDocCount: p.MaxDocCount, useReadWriteAliases: p.UseReadWriteAliases, logger: p.Logger, @@ -193,7 +193,7 @@ func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) tim } } -func getSourceFn(useReadWriteAliases bool, maxDocCount int) sourceFn { +func getSourceFn(maxDocCount int) sourceFn { return func(query elastic.Query, nextTime uint64) *elastic.SearchSource { return elastic.NewSearchSource(). Query(query). diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index b46ecf9b926..3553a8fa830 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -1310,7 +1310,7 @@ func TestBuildTraceByIDQuery(t *testing.T) { } func TestTerminateAfterNotSet(t *testing.T) { - srcFn := getSourceFn(false, 99) + srcFn := getSourceFn(99) searchSource := srcFn(elastic.NewMatchAllQuery(), 1) sp, err := searchSource.Source() require.NoError(t, err) From 686cc8da6a4289a41747cb8aa194aee924910bb2 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sat, 11 Jan 2025 18:13:07 -0500 Subject: [PATCH 17/27] Fix Lint Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 3d828aaa9e6..78da9b80508 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -231,7 +231,7 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if f.archiveConfig.UseReadWriteAliases { writeAlias += "-write" } - archiveMetricsFactory := f.metricsFactory.Namespace( + archiveMetricsFactory := f.metricsFactory.Namespace( metrics.NSOptions{ Tags: map[string]string{ "role": "archive", From b785631a441c1da316821c6ce1c9c74c59bc5c71 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sat, 11 Jan 2025 22:39:44 -0500 Subject: [PATCH 18/27] Address Feedback From PR Review Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 2 +- plugin/storage/es/spanstore/reader.go | 25 ++++++++++++---------- plugin/storage/es/spanstore/reader_test.go | 10 ++++----- plugin/storage/es/spanstore/writer.go | 2 +- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 78da9b80508..4f2954bbd27 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -261,7 +261,7 @@ func createSpanReader( ServiceIndex: cfg.Indices.Services, TagDotReplacement: cfg.Tags.DotReplacement, UseReadWriteAliases: useReadWriteAliases, - ReadAlias: readAlias, + ReadAliasSuffix: readAlias, RemoteReadClusters: cfg.RemoteReadClusters, Logger: logger, Tracer: tp.Tracer("esSpanStore.SpanReader"), diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 6bdc670c55d..9f40d0674fb 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -48,7 +48,7 @@ const ( defaultNumTraces = 100 - rolloverMaxSpanAge = time.Hour * 24 * 365 * 50 + dawnOfTimeSpanAge = time.Hour * 24 * 365 * 50 ) var ( @@ -106,7 +106,7 @@ type SpanReaderParams struct { SpanIndex cfg.IndexOptions ServiceIndex cfg.IndexOptions TagDotReplacement string - ReadAlias string + ReadAliasSuffix string UseReadWriteAliases bool RemoteReadClusters []string Logger *zap.Logger @@ -120,9 +120,9 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { // Setting the maxSpanAge to a large duration will ensure all spans in the "read" alias are accessible by queries (query window = [now - maxSpanAge, now]). // When read/write aliases are enabled, which are required for index rollovers, only the "read" alias is queried and therefore should not affect performance. if p.UseReadWriteAliases { - maxSpanAge = rolloverMaxSpanAge - if p.ReadAlias != "" { - readAlias = p.ReadAlias + maxSpanAge = dawnOfTimeSpanAge + if p.ReadAliasSuffix != "" { + readAlias = p.ReadAliasSuffix } else { readAlias = "read" } @@ -139,7 +139,10 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getLoggingTimeRangeIndexFn( p.Logger, - getTimeRangeIndexFn(readAlias, p.RemoteReadClusters), + addRemoteReadClusters( + getTimeRangeIndexFn(p.UseReadWriteAliases, readAlias), + p.RemoteReadClusters, + ), ), sourceFn: getSourceFn(p.MaxDocCount), maxDocCount: p.MaxDocCount, @@ -164,13 +167,13 @@ func getLoggingTimeRangeIndexFn(logger *zap.Logger, fn timeRangeIndexFn) timeRan } } -func getTimeRangeIndexFn(readAlias string, remoteReadClusters []string) timeRangeIndexFn { - if readAlias != "" { - return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { +func getTimeRangeIndexFn(useReadWriteAliases bool, readAlias string) timeRangeIndexFn { + if useReadWriteAliases { + return func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string { return []string{indexPrefix + readAlias} - }, remoteReadClusters) + } } - return addRemoteReadClusters(timeRangeIndices, remoteReadClusters) + return timeRangeIndices } // Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices. diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 3553a8fa830..545a8d1efcc 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -134,7 +134,7 @@ func withArchiveSpanReader(t *testing.T, readAlias bool, fn func(r *spanReaderTe Tracer: tracer.Tracer("test"), MaxSpanAge: 0, TagDotReplacement: "@", - ReadAlias: "archive", + ReadAliasSuffix: "archive", UseReadWriteAliases: readAlias, }), } @@ -212,7 +212,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - ReadAlias: "archive", // ignored because ReadWriteAliases is false + ReadAliasSuffix: "archive", // ignored because ReadWriteAliases is false }, indices: []string{spanIndexBaseName, serviceIndexBaseName}, }, @@ -232,14 +232,14 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - ReadAlias: "archive", + ReadAliasSuffix: "archive", UseReadWriteAliases: true, }, indices: []string{spanIndexBaseName + "archive", serviceIndexBaseName + "archive"}, }, { params: SpanReaderParams{ - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", UseReadWriteAliases: true, ReadAlias: "archive", + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", UseReadWriteAliases: true, ReadAliasSuffix: "archive", }, indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive", "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + "archive"}, }, @@ -260,7 +260,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - UseReadWriteAliases: true, ReadAlias: "archive", RemoteReadClusters: []string{"cluster_one", "cluster_two"}, + UseReadWriteAliases: true, ReadAliasSuffix: "archive", RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ spanIndexBaseName + "archive", diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 40455c2ea15..664a0d33102 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -110,7 +110,7 @@ type spanAndServiceIndexFn func(spanTime time.Time) (string, string) func getSpanAndServiceIndexFn(p SpanWriterParams, writeAlias string) spanAndServiceIndexFn { spanIndexPrefix := p.IndexPrefix.Apply(spanIndexBaseName) serviceIndexPrefix := p.IndexPrefix.Apply(serviceIndexBaseName) - if writeAlias != "" { + if p.UseReadWriteAliases { return func(_ time.Time) (string, string) { return spanIndexPrefix + writeAlias, serviceIndexPrefix + writeAlias } From 82d049d26d8b306619dd66f04250b4a9dc4d475d Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 12 Jan 2025 13:32:54 -0500 Subject: [PATCH 19/27] Bring Test Back Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/reader_test.go | 23 +++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 545a8d1efcc..2a77b4426ad 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -118,7 +118,7 @@ func withSpanReader(t *testing.T, fn func(r *spanReaderTest)) { fn(r) } -func withArchiveSpanReader(t *testing.T, readAlias bool, fn func(r *spanReaderTest)) { +func withArchiveSpanReader(t *testing.T, readAlias bool, readAliasSuffix string, fn func(r *spanReaderTest)) { client := &mocks.Client{} tracer, exp, closer := tracerProvider(t) defer closer() @@ -134,7 +134,7 @@ func withArchiveSpanReader(t *testing.T, readAlias bool, fn func(r *spanReaderTe Tracer: tracer.Tracer("test"), MaxSpanAge: 0, TagDotReplacement: "@", - ReadAliasSuffix: "archive", + ReadAliasSuffix: readAliasSuffix, UseReadWriteAliases: readAlias, }), } @@ -1252,7 +1252,7 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { } func TestSpanReader_ArchiveTraces(t *testing.T) { - withArchiveSpanReader(t, true, func(r *spanReaderTest) { + withArchiveSpanReader(t, true, "archive", func(r *spanReaderTest) { mockSearchService(r). Return(&elastic.SearchResult{}, nil) mockArchiveMultiSearchService(r, "jaeger-span-archive"). @@ -1267,6 +1267,23 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { }) } +func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { + withArchiveSpanReader(t, true, "archive-read", func(r *spanReaderTest) { + mockSearchService(r). + Return(&elastic.SearchResult{}, nil) + mockArchiveMultiSearchService(r, "jaeger-span-archive-read"). + Return(&elastic.MultiSearchResult{ + Responses: []*elastic.SearchResult{}, + }, nil) + + query := spanstore.GetTraceParameters{} + trace, err := r.reader.GetTrace(context.Background(), query) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") + require.Nil(t, trace) + require.EqualError(t, err, "trace not found") + }) +} + func TestConvertTraceIDsStringsToModels(t *testing.T) { ids, err := convertTraceIDsStringsToModels([]string{"1", "2", "01", "02", "001", "002"}) require.NoError(t, err) From cde60af6a86d3687ab1e42b20e4680ba14ee63b3 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 12 Jan 2025 14:35:34 -0500 Subject: [PATCH 20/27] Address Feedback Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 10 +++++----- plugin/storage/es/spanstore/reader.go | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 4f2954bbd27..b0f87c3abb3 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -204,11 +204,11 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if !f.archiveConfig.Enabled { return nil, nil } - readAlias := "archive" + readAliasSuffix := "archive" if f.archiveConfig.UseReadWriteAliases { - readAlias += "-read" + readAliasSuffix += "-read" } - sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, readAlias, true) + sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, readAliasSuffix, true) if err != nil { return nil, err } @@ -246,7 +246,7 @@ func createSpanReader( cfg *config.Configuration, logger *zap.Logger, tp trace.TracerProvider, - readAlias string, + readAliasSuffix string, useReadWriteAliases bool, ) (spanstore.Reader, error) { if cfg.UseILM && !cfg.UseReadWriteAliases { @@ -261,7 +261,7 @@ func createSpanReader( ServiceIndex: cfg.Indices.Services, TagDotReplacement: cfg.Tags.DotReplacement, UseReadWriteAliases: useReadWriteAliases, - ReadAliasSuffix: readAlias, + ReadAliasSuffix: readAliasSuffix, RemoteReadClusters: cfg.RemoteReadClusters, Logger: logger, Tracer: tp.Tracer("esSpanStore.SpanReader"), diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 9f40d0674fb..454726a1039 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -116,15 +116,15 @@ type SpanReaderParams struct { // NewSpanReader returns a new SpanReader with a metrics. func NewSpanReader(p SpanReaderParams) *SpanReader { maxSpanAge := p.MaxSpanAge - readAlias := "" + readAliasSuffix := "" // Setting the maxSpanAge to a large duration will ensure all spans in the "read" alias are accessible by queries (query window = [now - maxSpanAge, now]). // When read/write aliases are enabled, which are required for index rollovers, only the "read" alias is queried and therefore should not affect performance. if p.UseReadWriteAliases { maxSpanAge = dawnOfTimeSpanAge if p.ReadAliasSuffix != "" { - readAlias = p.ReadAliasSuffix + readAliasSuffix = p.ReadAliasSuffix } else { - readAlias = "read" + readAliasSuffix = "read" } } @@ -140,7 +140,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { timeRangeIndices: getLoggingTimeRangeIndexFn( p.Logger, addRemoteReadClusters( - getTimeRangeIndexFn(p.UseReadWriteAliases, readAlias), + getTimeRangeIndexFn(p.UseReadWriteAliases, readAliasSuffix), p.RemoteReadClusters, ), ), From 2a5d877834813d8c9b429286941b9a436d254c12 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 12 Jan 2025 14:44:23 -0500 Subject: [PATCH 21/27] Rename Variable Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index b0f87c3abb3..66e52cdc12d 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -227,9 +227,9 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if !f.archiveConfig.Enabled { return nil, nil } - writeAlias := "archive" + writeAliasSuffix := "archive" if f.archiveConfig.UseReadWriteAliases { - writeAlias += "-write" + writeAliasSuffix += "-write" } archiveMetricsFactory := f.metricsFactory.Namespace( metrics.NSOptions{ @@ -238,7 +238,7 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { }, }, ) - return createSpanWriter(f.getArchiveClient, f.archiveConfig, archiveMetricsFactory, f.logger, writeAlias, true) + return createSpanWriter(f.getArchiveClient, f.archiveConfig, archiveMetricsFactory, f.logger, writeAliasSuffix, true) } func createSpanReader( @@ -273,7 +273,7 @@ func createSpanWriter( cfg *config.Configuration, mFactory metrics.Factory, logger *zap.Logger, - writeAlias string, + writeAliasSuffix string, useReadWriteAliases bool, ) (spanstore.Writer, error) { var tags []string @@ -295,7 +295,7 @@ func createSpanWriter( TagKeysAsFields: tags, TagDotReplacement: cfg.Tags.DotReplacement, UseReadWriteAliases: useReadWriteAliases, - WriteAlias: writeAlias, + WriteAlias: writeAliasSuffix, Logger: logger, MetricsFactory: mFactory, ServiceCacheTTL: cfg.ServiceCacheTTL, From bbd6c1f5d7e6056f99f4cde9151cff90a183e8eb Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 12 Jan 2025 14:59:18 -0500 Subject: [PATCH 22/27] Remove Config From Factory Test Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 3121748b7c6..7a3412f8e56 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -245,7 +245,7 @@ func TestArchiveDisabled(t *testing.T) { func TestArchiveEnabled(t *testing.T) { f := NewFactory() f.primaryConfig = &escfg.Configuration{} - f.archiveConfig = &escfg.Configuration{Enabled: true, UseReadWriteAliases: true} + f.archiveConfig = &escfg.Configuration{Enabled: true} f.newClientFn = (&mockClientBuilder{}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) require.NoError(t, err) From 7ddb76b7e2d4594e7d87141e2ffd59f4bca19d82 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 12 Jan 2025 15:01:46 -0500 Subject: [PATCH 23/27] Add Table Test Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/reader_test.go | 41 +++++++++++++++------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 2a77b4426ad..23205c2ffe6 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -1252,19 +1252,34 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { } func TestSpanReader_ArchiveTraces(t *testing.T) { - withArchiveSpanReader(t, true, "archive", func(r *spanReaderTest) { - mockSearchService(r). - Return(&elastic.SearchResult{}, nil) - mockArchiveMultiSearchService(r, "jaeger-span-archive"). - Return(&elastic.MultiSearchResult{ - Responses: []*elastic.SearchResult{}, - }, nil) - query := spanstore.GetTraceParameters{} - trace, err := r.reader.GetTrace(context.Background(), query) - require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") - require.Nil(t, trace) - require.EqualError(t, err, "trace not found") - }) + testCases := []struct { + useAliases bool + suffix string + expected string + }{ + {false, "", "jaeger-span-"}, + {true, "", "jaeger-span-read"}, + {false, "archive", "jaeger-span-"}, + {true, "archive", "jaeger-span-archive"}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("useAliases=%v suffix=%s", tc.useAliases, tc.suffix), func(t *testing.T) { + withArchiveSpanReader(t, tc.useAliases, tc.suffix, func(r *spanReaderTest) { + mockSearchService(r). + Return(&elastic.SearchResult{}, nil) + mockArchiveMultiSearchService(r, tc.expected). + Return(&elastic.MultiSearchResult{ + Responses: []*elastic.SearchResult{}, + }, nil) + query := spanstore.GetTraceParameters{} + trace, err := r.reader.GetTrace(context.Background(), query) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") + require.Nil(t, trace) + require.EqualError(t, err, "trace not found") + }) + }) + } } func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { From c527bb016d245c64a8a1f91d77ed9c71eb72661c Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 12 Jan 2025 15:18:26 -0500 Subject: [PATCH 24/27] Remove Redundant Test Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/reader_test.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 23205c2ffe6..864f1905ecf 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -1282,23 +1282,6 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { } } -func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { - withArchiveSpanReader(t, true, "archive-read", func(r *spanReaderTest) { - mockSearchService(r). - Return(&elastic.SearchResult{}, nil) - mockArchiveMultiSearchService(r, "jaeger-span-archive-read"). - Return(&elastic.MultiSearchResult{ - Responses: []*elastic.SearchResult{}, - }, nil) - - query := spanstore.GetTraceParameters{} - trace, err := r.reader.GetTrace(context.Background(), query) - require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") - require.Nil(t, trace) - require.EqualError(t, err, "trace not found") - }) -} - func TestConvertTraceIDsStringsToModels(t *testing.T) { ids, err := convertTraceIDsStringsToModels([]string{"1", "2", "01", "02", "001", "002"}) require.NoError(t, err) From f4bebe9d892ceaf17f6ba0dbb7940b38f54d17b4 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 12 Jan 2025 15:18:39 -0500 Subject: [PATCH 25/27] Handle Both read_aliases Cases Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory_test.go | 41 +++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 7a3412f8e56..7dcd103ebf1 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -8,6 +8,7 @@ import ( "context" "encoding/base64" "errors" + "fmt" "net/http" "net/http/httptest" "os" @@ -243,19 +244,33 @@ func TestArchiveDisabled(t *testing.T) { } func TestArchiveEnabled(t *testing.T) { - f := NewFactory() - f.primaryConfig = &escfg.Configuration{} - f.archiveConfig = &escfg.Configuration{Enabled: true} - f.newClientFn = (&mockClientBuilder{}).NewClient - err := f.Initialize(metrics.NullFactory, zap.NewNop()) - require.NoError(t, err) - defer f.Close() // Ensure resources are cleaned up if initialization is successful - w, err := f.CreateArchiveSpanWriter() - require.NoError(t, err) - assert.NotNil(t, w) - r, err := f.CreateArchiveSpanReader() - require.NoError(t, err) - assert.NotNil(t, r) + tests := []struct { + useReadWriteAliases bool + }{ + { + useReadWriteAliases: false, + }, + { + useReadWriteAliases: true, + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("useReadWriteAliases=%v", test.useReadWriteAliases), func(t *testing.T) { + f := NewFactory() + f.primaryConfig = &escfg.Configuration{} + f.archiveConfig = &escfg.Configuration{Enabled: true, UseReadWriteAliases: test.useReadWriteAliases} + f.newClientFn = (&mockClientBuilder{}).NewClient + err := f.Initialize(metrics.NullFactory, zap.NewNop()) + require.NoError(t, err) + defer f.Close() // Ensure resources are cleaned up if initialization is successful + w, err := f.CreateArchiveSpanWriter() + require.NoError(t, err) + assert.NotNil(t, w) + r, err := f.CreateArchiveSpanReader() + require.NoError(t, err) + assert.NotNil(t, r) + }) + } } func TestConfigureFromOptions(t *testing.T) { From baa463b2149ef8294385b9a4de131ee00eeddbd9 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 12 Jan 2025 22:10:04 -0500 Subject: [PATCH 26/27] Fix Variable Names Signed-off-by: Mahad Zaryab --- plugin/storage/es/factory.go | 2 +- plugin/storage/es/spanstore/writer.go | 12 ++++++------ plugin/storage/es/spanstore/writer_test.go | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 66e52cdc12d..e1c7ae807a4 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -295,7 +295,7 @@ func createSpanWriter( TagKeysAsFields: tags, TagDotReplacement: cfg.Tags.DotReplacement, UseReadWriteAliases: useReadWriteAliases, - WriteAlias: writeAliasSuffix, + WriteAliasSuffix: writeAliasSuffix, Logger: logger, MetricsFactory: mFactory, ServiceCacheTTL: cfg.ServiceCacheTTL, diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 664a0d33102..f8647a1db0f 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -56,7 +56,7 @@ type SpanWriterParams struct { TagKeysAsFields []string TagDotReplacement string UseReadWriteAliases bool - WriteAlias string + WriteAliasSuffix string ServiceCacheTTL time.Duration } @@ -67,12 +67,12 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { serviceCacheTTL = serviceCacheTTLDefault } - writeAlias := "" + writeAliasSuffix := "" if p.UseReadWriteAliases { - if p.WriteAlias != "" { - writeAlias = p.WriteAlias + if p.WriteAliasSuffix != "" { + writeAliasSuffix = p.WriteAliasSuffix } else { - writeAlias = "write" + writeAliasSuffix = "write" } } @@ -85,7 +85,7 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { }, serviceWriter: serviceOperationStorage.Write, spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), - spanServiceIndex: getSpanAndServiceIndexFn(p, writeAlias), + spanServiceIndex: getSpanAndServiceIndexFn(p, writeAliasSuffix), } } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index c255ac1cd86..bf1ef68325a 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -89,7 +89,7 @@ func TestSpanWriterIndices(t *testing.T) { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, - WriteAlias: "archive", // ignored because UseReadWriteAliases is false + WriteAliasSuffix: "archive", // ignored because UseReadWriteAliases is false }, indices: []string{spanIndexBaseName + spanDataLayoutFormat, serviceIndexBaseName + serviceDataLayoutFormat}, }, @@ -110,14 +110,14 @@ func TestSpanWriterIndices(t *testing.T) { { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, WriteAlias: "archive", UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, WriteAliasSuffix: "archive", UseReadWriteAliases: true, }, indices: []string{spanIndexBaseName + "archive", serviceIndexBaseName + "archive"}, }, { params: SpanWriterParams{ Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, - SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", WriteAlias: "archive", UseReadWriteAliases: true, + SpanIndex: spanIndexOpts, ServiceIndex: serviceIndexOpts, IndexPrefix: "foo:", WriteAliasSuffix: "archive", UseReadWriteAliases: true, }, indices: []string{"foo:" + config.IndexPrefixSeparator + spanIndexBaseName + "archive", "foo:" + config.IndexPrefixSeparator + serviceIndexBaseName + "archive"}, }, From 3b1e0fbb40d05ae1af0eba49094486978d870113 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 12 Jan 2025 22:18:55 -0500 Subject: [PATCH 27/27] Change Test Expecation Signed-off-by: Mahad Zaryab --- plugin/storage/es/spanstore/reader_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 864f1905ecf..9aa184e7aa9 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -1259,8 +1259,8 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { }{ {false, "", "jaeger-span-"}, {true, "", "jaeger-span-read"}, - {false, "archive", "jaeger-span-"}, - {true, "archive", "jaeger-span-archive"}, + {false, "foobar", "jaeger-span-"}, + {true, "foobar", "jaeger-span-foobar"}, } for _, tc := range testCases {