Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[storage] Remove dependency on archive flag in ES reader #6490

Merged
merged 33 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
9d1d999
Use ReadWriteAliases Instead of Archive Flag
mahadzaryab1 Jan 8, 2025
4f5cad5
Remove Archive Flag From Writer
mahadzaryab1 Jan 8, 2025
af2e9ec
Introduce New Arguments To Remove IsArchive Flag
mahadzaryab1 Jan 8, 2025
904d3fb
Introduce New Arguments To Remove IsArchive Flag In Writer
mahadzaryab1 Jan 8, 2025
6bc1ebf
Fix Tests
mahadzaryab1 Jan 8, 2025
0ae679d
Update Callsites
mahadzaryab1 Jan 8, 2025
c0ea9a2
Propagate Arguments To Archive Constructor
mahadzaryab1 Jan 8, 2025
5659221
Handle Archive And Use Aliases Both Set
mahadzaryab1 Jan 8, 2025
52f63fd
Fix Branching In Reader And Change Variable Name
mahadzaryab1 Jan 9, 2025
c573a66
Fix Branching In Writer And Change Variable Name
mahadzaryab1 Jan 9, 2025
4976f7b
Fix Variable Naming
mahadzaryab1 Jan 9, 2025
77d47c0
Move Check To Constructor
mahadzaryab1 Jan 9, 2025
6b97e7f
Move Check To Constructor
mahadzaryab1 Jan 9, 2025
bdae29b
Update Test For New Codepath
mahadzaryab1 Jan 10, 2025
2bf4a61
Remove Indirection In getSourceFn
mahadzaryab1 Jan 10, 2025
6cf08e8
Fix Linting
mahadzaryab1 Jan 10, 2025
77ca3c3
Merge branch 'main' into es-archive-dependency
mahadzaryab1 Jan 11, 2025
686cc8d
Fix Lint
mahadzaryab1 Jan 11, 2025
b6db77a
Merge branch 'main' into es-archive-dependency
mahadzaryab1 Jan 12, 2025
20f5f5d
Merge branch 'main' into es-archive-dependency
mahadzaryab1 Jan 12, 2025
b785631
Address Feedback From PR Review
mahadzaryab1 Jan 12, 2025
82d049d
Bring Test Back
mahadzaryab1 Jan 12, 2025
cde60af
Address Feedback
mahadzaryab1 Jan 12, 2025
2a5d877
Rename Variable
mahadzaryab1 Jan 12, 2025
bbd6c1f
Remove Config From Factory Test
mahadzaryab1 Jan 12, 2025
7ddb76b
Add Table Test
mahadzaryab1 Jan 12, 2025
c527bb0
Remove Redundant Test
mahadzaryab1 Jan 12, 2025
f4bebe9
Handle Both read_aliases Cases
mahadzaryab1 Jan 12, 2025
baa463b
Fix Variable Names
mahadzaryab1 Jan 13, 2025
3b1e0fb
Change Test Expecation
mahadzaryab1 Jan 13, 2025
865b6e3
Merge branch 'main' into es-archive-dependency
mahadzaryab1 Jan 13, 2025
f9bdd02
Merge branch 'main' into es-archive-dependency
mahadzaryab1 Jan 13, 2025
6688faa
Merge branch 'main' into es-archive-dependency
mahadzaryab1 Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.logger, f.tracer)
sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, f.logger, f.tracer, "", f.primaryConfig.UseReadWriteAliases)
if err != nil {
return nil, err
}
Expand All @@ -191,7 +191,7 @@

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, f.metricsFactory, f.logger, "", f.primaryConfig.UseReadWriteAliases)
}

// CreateDependencyReader implements storage.Factory
Expand All @@ -204,7 +204,11 @@
if !f.archiveConfig.Enabled {
return nil, nil
}
sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.logger, f.tracer)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
readAliasSuffix := "archive"
if f.archiveConfig.UseReadWriteAliases {
readAliasSuffix += "-read"
}

Check warning on line 210 in plugin/storage/es/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/factory.go#L209-L210

Added lines #L209 - L210 were not covered by tests
sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, f.logger, f.tracer, readAliasSuffix, true)
if err != nil {
return nil, err
}
Expand All @@ -223,22 +227,27 @@
if !f.archiveConfig.Enabled {
return nil, nil
}
writeAliasSuffix := "archive"
if f.archiveConfig.UseReadWriteAliases {
writeAliasSuffix += "-write"
}

Check warning on line 233 in plugin/storage/es/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/factory.go#L232-L233

Added lines #L232 - L233 were not covered by tests
archiveMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, archiveMetricsFactory, f.logger)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, archiveMetricsFactory, f.logger, writeAliasSuffix, true)
}

func createSpanReader(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
logger *zap.Logger,
tp trace.TracerProvider,
readAliasSuffix string,
useReadWriteAliases bool,
) (spanstore.Reader, error) {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, errors.New("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
Expand All @@ -251,8 +260,8 @@
SpanIndex: cfg.Indices.Spans,
ServiceIndex: cfg.Indices.Services,
TagDotReplacement: cfg.Tags.DotReplacement,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Archive: archive,
UseReadWriteAliases: useReadWriteAliases,
ReadAliasSuffix: readAliasSuffix,
RemoteReadClusters: cfg.RemoteReadClusters,
Logger: logger,
Tracer: tp.Tracer("esSpanStore.SpanReader"),
Expand All @@ -262,9 +271,10 @@
func createSpanWriter(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
writeAliasSuffix string,
useReadWriteAliases bool,
) (spanstore.Writer, error) {
var tags []string
var err error
Expand All @@ -284,8 +294,8 @@
AllTagsAsFields: cfg.Tags.AllAsFields,
TagKeysAsFields: tags,
TagDotReplacement: cfg.Tags.DotReplacement,
Archive: archive,
UseReadWriteAliases: cfg.UseReadWriteAliases,
UseReadWriteAliases: useReadWriteAliases,
WriteAliasSuffix: writeAliasSuffix,
Logger: logger,
MetricsFactory: mFactory,
ServiceCacheTTL: cfg.ServiceCacheTTL,
Expand Down
41 changes: 28 additions & 13 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -243,19 +244,33 @@ func TestArchiveDisabled(t *testing.T) {
}

func TestArchiveEnabled(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{}
f.archiveConfig = &escfg.Configuration{Enabled: true}
f.newClientFn = (&mockClientBuilder{}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer f.Close() // Ensure resources are cleaned up if initialization is successful
w, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.NotNil(t, w)
r, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.NotNil(t, r)
tests := []struct {
useReadWriteAliases bool
}{
{
useReadWriteAliases: false,
},
{
useReadWriteAliases: true,
},
}
for _, test := range tests {
t.Run(fmt.Sprintf("useReadWriteAliases=%v", test.useReadWriteAliases), func(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{}
f.archiveConfig = &escfg.Configuration{Enabled: true, UseReadWriteAliases: test.useReadWriteAliases}
f.newClientFn = (&mockClientBuilder{}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer f.Close() // Ensure resources are cleaned up if initialization is successful
w, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.NotNil(t, w)
r, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.NotNil(t, r)
})
}
}

func TestConfigureFromOptions(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions plugin/storage/es/spanstore/index_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,3 @@ func indexWithDate(indexPrefix, indexDateLayout string, date time.Time) string {
spanDate := date.UTC().Format(indexDateLayout)
return indexPrefix + spanDate
}

// returns archive index name
func archiveIndex(indexPrefix, archiveSuffix string) string {
return indexPrefix + archiveSuffix
}
64 changes: 28 additions & 36 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@
)

const (
spanIndexBaseName = "jaeger-span-"
serviceIndexBaseName = "jaeger-service-"
archiveIndexSuffix = "archive"
archiveReadIndexSuffix = archiveIndexSuffix + "-read"
archiveWriteIndexSuffix = archiveIndexSuffix + "-write"
traceIDAggregation = "traceIDs"
indexPrefixSeparator = "-"
spanIndexBaseName = "jaeger-span-"
serviceIndexBaseName = "jaeger-service-"
traceIDAggregation = "traceIDs"
indexPrefixSeparator = "-"

traceIDField = "traceID"
durationField = "duration"
Expand All @@ -51,7 +48,7 @@

defaultNumTraces = 100

rolloverMaxSpanAge = time.Hour * 24 * 365 * 50
dawnOfTimeSpanAge = time.Hour * 24 * 365 * 50
)

var (
Expand Down Expand Up @@ -109,7 +106,7 @@
SpanIndex cfg.IndexOptions
ServiceIndex cfg.IndexOptions
TagDotReplacement string
Archive bool
ReadAliasSuffix string
UseReadWriteAliases bool
RemoteReadClusters []string
Logger *zap.Logger
Expand All @@ -119,10 +116,16 @@
// NewSpanReader returns a new SpanReader with a metrics.
func NewSpanReader(p SpanReaderParams) *SpanReader {
maxSpanAge := p.MaxSpanAge
readAliasSuffix := ""
// Setting the maxSpanAge to a large duration will ensure all spans in the "read" alias are accessible by queries (query window = [now - maxSpanAge, now]).
// When read/write aliases are enabled, which are required for index rollovers, only the "read" alias is queried and therefore should not affect performance.
if p.UseReadWriteAliases {
maxSpanAge = rolloverMaxSpanAge
maxSpanAge = dawnOfTimeSpanAge
if p.ReadAliasSuffix != "" {
readAliasSuffix = p.ReadAliasSuffix
} else {
readAliasSuffix = "read"
}

Check warning on line 128 in plugin/storage/es/spanstore/reader.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/spanstore/reader.go#L127-L128

Added lines #L127 - L128 were not covered by tests
}

return &SpanReader{
Expand All @@ -136,9 +139,12 @@
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
timeRangeIndices: getLoggingTimeRangeIndexFn(
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
p.Logger,
getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters),
addRemoteReadClusters(
getTimeRangeIndexFn(p.UseReadWriteAliases, readAliasSuffix),
p.RemoteReadClusters,
),
),
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
sourceFn: getSourceFn(p.MaxDocCount),
maxDocCount: p.MaxDocCount,
useReadWriteAliases: p.UseReadWriteAliases,
logger: p.Logger,
Expand All @@ -161,24 +167,13 @@
}
}

func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn {
if archive {
var archiveSuffix string
if useReadWriteAliases {
archiveSuffix = archiveReadIndexSuffix
} else {
archiveSuffix = archiveIndexSuffix
}
return addRemoteReadClusters(func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {
return []string{archiveIndex(indexPrefix, archiveSuffix)}
}, remoteReadClusters)
}
func getTimeRangeIndexFn(useReadWriteAliases bool, readAlias string) timeRangeIndexFn {
if useReadWriteAliases {
return addRemoteReadClusters(func(indexPrefix string, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {
return []string{indexPrefix + "read"}
}, remoteReadClusters)
return func(indexPrefix, _ /* indexDateLayout */ string, _ /* startTime */ time.Time, _ /* endTime */ time.Time, _ /* reduceDuration */ time.Duration) []string {
return []string{indexPrefix + readAlias}
}
}
return addRemoteReadClusters(timeRangeIndices, remoteReadClusters)
return timeRangeIndices
}

// Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices.
Expand All @@ -201,16 +196,13 @@
}
}

func getSourceFn(archive bool, maxDocCount int) sourceFn {
func getSourceFn(maxDocCount int) sourceFn {
return func(query elastic.Query, nextTime uint64) *elastic.SearchSource {
s := elastic.NewSearchSource().
return elastic.NewSearchSource().
Query(query).
Size(maxDocCount)
if !archive {
s.Sort("startTime", true).
SearchAfter(nextTime)
}
return s
Size(maxDocCount).
Sort("startTime", true).
SearchAfter(nextTime)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading
Loading