Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SearchTagValuesV2 perf improvements #3650

Merged
merged 8 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [ENHANCEMENT] Add support for sharded ingester queries [#3574](https://github.com/grafana/tempo/pull/3574) (@zalegrala)
* [ENHANCEMENT] TraceQL - Add support for scoped intrinsics using `:` [#3629](https://github.com/grafana/tempo/pull/3629) (@ie-pham)
available scoped intrinsics: trace:duration, trace:rootName, trace:rootService, span:duration, span:kind, span:name, span:status, span:statusMessage
* [ENHANCEMENT] Performance improvements on SearchTagValuesV2. [#3650](https://github.com/grafana/tempo/pull/3650) (@joe-elliott)
* [BUGFIX] Fix metrics query results when filtering and rating on the same attribute [#3428](https://github.com/grafana/tempo/issues/3428) (@mdisibio)
* [BUGFIX] Fix metrics query results when series contain empty strings or nil values [#3429](https://github.com/grafana/tempo/issues/3429) (@mdisibio)
* [BUGFIX] Fix metrics query duration check, add per-tenant override for max metrics query duration [#3479](https://github.com/grafana/tempo/issues/3479) (@mdisibio)
Expand Down
175 changes: 70 additions & 105 deletions tempodb/encoding/vparquet3/block_autocomplete.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,9 @@ func (b *backendBlock) FetchTagValues(ctx context.Context, req traceql.Autocompl
break
}
for _, oe := range res.OtherEntries {
if oe.Key == req.TagName.String() {
v := oe.Value.(traceql.Static)
if cb(v) {
return nil // We have enough values
}
v := oe.Value.(traceql.Static)
if cb(v) {
return nil // We have enough values
}
}
}
Expand Down Expand Up @@ -90,27 +88,24 @@ func createDistinctIterator(
rgs := rowGroupsFromFile(pf, opts)
makeIter := makeIterFunc(ctx, rgs, pf)

// Safeguard. Shouldn't be needed since we only use collect the tag we want.
keep := func(attr traceql.Attribute) bool { return tag == attr }

var currentIter parquetquery.Iterator

if len(spanConditions) > 0 {
currentIter, err = createDistinctSpanIterator(makeIter, keep, tag, currentIter, spanConditions, dc)
currentIter, err = createDistinctSpanIterator(makeIter, tag, currentIter, spanConditions, dc)
if err != nil {
return nil, errors.Wrap(err, "creating span iterator")
}
}

if len(resourceConditions) > 0 {
currentIter, err = createDistinctResourceIterator(makeIter, keep, tag, currentIter, resourceConditions, dc)
currentIter, err = createDistinctResourceIterator(makeIter, tag, currentIter, resourceConditions, dc)
if err != nil {
return nil, errors.Wrap(err, "creating resource iterator")
}
}

if len(traceConditions) > 0 {
currentIter, err = createDistinctTraceIterator(makeIter, keep, currentIter, traceConditions)
currentIter, err = createDistinctTraceIterator(makeIter, currentIter, traceConditions)
if err != nil {
return nil, errors.Wrap(err, "creating trace iterator")
}
Expand All @@ -123,7 +118,6 @@ func createDistinctIterator(
// one span each. Spans are returned that match any of the given conditions.
func createDistinctSpanIterator(
makeIter makeIterFn,
keep keepFn,
tag traceql.Attribute,
primaryIter parquetquery.Iterator,
conditions []traceql.Condition,
Expand Down Expand Up @@ -266,7 +260,7 @@ func createDistinctSpanIterator(
iters = append(iters, makeIter(columnPath, orIfNeeded(predicates), columnSelectAs[columnPath]))
}

attrIter, err := createDistinctAttributeIterator(makeIter, keep, tag, genericConditions, DefinitionLevelResourceSpansILSSpanAttrs,
attrIter, err := createDistinctAttributeIterator(makeIter, tag, genericConditions, DefinitionLevelResourceSpansILSSpanAttrs,
columnPathSpanAttrKey, columnPathSpanAttrString, columnPathSpanAttrInt, columnPathSpanAttrDouble, columnPathSpanAttrBool)
if err != nil {
return nil, errors.Wrap(err, "creating span attribute iterator")
Expand All @@ -285,7 +279,7 @@ func createDistinctSpanIterator(
return attrIter, nil
}

spanCol := &distinctSpanCollector{keep: keep}
spanCol := newDistinctValueCollector(mapSpanAttr)

// Left join here means the span id/start/end iterators + 1 are required,
// and all other conditions are optional. Whatever matches is returned.
Expand All @@ -294,7 +288,6 @@ func createDistinctSpanIterator(

func createDistinctAttributeIterator(
makeIter makeIterFn,
keep keepFn,
tag traceql.Attribute,
conditions []traceql.Condition,
definitionLevel int,
Expand Down Expand Up @@ -393,10 +386,7 @@ func createDistinctAttributeIterator(
definitionLevel,
[]parquetquery.Iterator{makeIter(keyPath, parquetquery.NewStringInPredicate(attrKeys), "key")},
valueIters,
&distinctAttrCollector{
scope: scopeFromDefinitionLevel(definitionLevel),
keep: keep,
},
newDistinctAttrCollector(scopeFromDefinitionLevel(definitionLevel)),
)
if err != nil {
return nil, fmt.Errorf("creating left join iterator: %w", err)
Expand Down Expand Up @@ -425,7 +415,6 @@ func oneLevelUp(definitionLevel int) int {

func createDistinctResourceIterator(
makeIter makeIterFn,
keep keepFn,
tag traceql.Attribute,
spanIterator parquetquery.Iterator,
conditions []traceql.Condition,
Expand Down Expand Up @@ -504,7 +493,7 @@ func createDistinctResourceIterator(
iters = append(iters, makeIter(columnPath, orIfNeeded(predicates), columnSelectAs[columnPath]))
}

attrIter, err := createDistinctAttributeIterator(makeIter, keep, tag, genericConditions, DefinitionLevelResourceAttrs,
attrIter, err := createDistinctAttributeIterator(makeIter, tag, genericConditions, DefinitionLevelResourceAttrs,
columnPathResourceAttrKey, columnPathResourceAttrString, columnPathResourceAttrInt, columnPathResourceAttrDouble, columnPathResourceAttrBool)
if err != nil {
return nil, errors.Wrap(err, "creating span attribute iterator")
Expand All @@ -513,7 +502,7 @@ func createDistinctResourceIterator(
iters = append(iters, attrIter)
}

batchCol := &distinctBatchCollector{keep: keep}
batchCol := newDistinctValueCollector(mapResourceAttr)

// Put span iterator last, so it is only read when
// the resource conditions are met.
Expand All @@ -526,7 +515,6 @@ func createDistinctResourceIterator(

func createDistinctTraceIterator(
makeIter makeIterFn,
keep keepFn,
resourceIter parquetquery.Iterator,
conds []traceql.Condition,
) (parquetquery.Iterator, error) {
Expand Down Expand Up @@ -576,26 +564,29 @@ func createDistinctTraceIterator(
// Final trace iterator
// Join iterator means it requires matching resources to have been found
// TraceCollor adds trace-level data to the spansets
return parquetquery.NewJoinIterator(DefinitionLevelTrace, traceIters, &distinctTraceCollector{
keep: keep,
}), nil
return parquetquery.NewJoinIterator(DefinitionLevelTrace, traceIters, newDistinctValueCollector(mapTraceAttr)), nil
}

type keepFn func(attr traceql.Attribute) bool

var _ parquetquery.GroupPredicate = (*distinctAttrCollector)(nil)

type distinctAttrCollector struct {
scope traceql.AttributeScope
keep keepFn

sentVals map[traceql.Static]struct{}
}

func newDistinctAttrCollector(scope traceql.AttributeScope) *distinctAttrCollector {
return &distinctAttrCollector{
scope: scope,
sentVals: make(map[traceql.Static]struct{}),
}
}

func (d *distinctAttrCollector) String() string {
return "distinctAttrCollector"
}

func (d *distinctAttrCollector) KeepGroup(result *parquetquery.IteratorResult) bool {
var key string
var val traceql.Static

for _, e := range result.Entries {
Expand All @@ -606,8 +597,6 @@ func (d *distinctAttrCollector) KeepGroup(result *parquetquery.IteratorResult) b
}

switch e.Key {
case "key":
key = e.Value.String()
case "string":
val = traceql.NewStaticString(e.Value.String())
case "int":
Expand All @@ -619,9 +608,12 @@ func (d *distinctAttrCollector) KeepGroup(result *parquetquery.IteratorResult) b
}
}

attr := traceql.NewScopedAttribute(d.scope, false, key)
if d.keep(attr) {
result.AppendOtherValue(attr.String(), val)
var empty traceql.Static
if val != empty {
if _, ok := d.sentVals[val]; !ok {
result.AppendOtherValue("", val)
d.sentVals[val] = struct{}{}
}
}

result.Entries = result.Entries[:0]
Expand All @@ -634,35 +626,49 @@ type entry struct {
Value parquet.Value
}

var _ parquetquery.GroupPredicate = (*distinctSpanCollector)(nil)
var _ parquetquery.GroupPredicate = (*distinctValueCollector)(nil)

type distinctSpanCollector struct {
keep keepFn
type distinctValueCollector struct {
mapToStatic func(entry) traceql.Static
sentVals map[traceql.Static]struct{}
}

func (d distinctSpanCollector) String() string { return "distinctSpanCollector" }
func newDistinctValueCollector(mapToStatic func(entry) traceql.Static) *distinctValueCollector {
return &distinctValueCollector{
mapToStatic: mapToStatic,
sentVals: make(map[traceql.Static]struct{}),
}
}

func (d distinctSpanCollector) KeepGroup(result *parquetquery.IteratorResult) bool {
func (d distinctValueCollector) String() string { return "distinctValueCollector" }

func (d distinctValueCollector) KeepGroup(result *parquetquery.IteratorResult) bool {
for _, e := range result.Entries {
if attr, static := mapSpanAttr(e); d.keep(attr) {
result.AppendOtherValue(attr.String(), static)
if e.Value.IsNull() {
continue
}
static := d.mapToStatic(e)

if _, ok := d.sentVals[static]; !ok {
result.AppendOtherValue("", static)
d.sentVals[static] = struct{}{}
}
}
result.Entries = result.Entries[:0]
return true
}

func mapSpanAttr(e entry) (traceql.Attribute, traceql.Static) {
func mapSpanAttr(e entry) traceql.Static {
switch e.Key {
case columnPathSpanID,
columnPathSpanParentID,
columnPathSpanNestedSetLeft,
columnPathSpanNestedSetRight,
columnPathSpanStartTime:
case columnPathSpanDuration:
return traceql.IntrinsicDurationAttribute, traceql.NewStaticDuration(time.Duration(e.Value.Int64()))
return traceql.NewStaticDuration(time.Duration(e.Value.Int64()))
case columnPathSpanName:
return traceql.IntrinsicNameAttribute, traceql.NewStaticString(e.Value.String())
return traceql.NewStaticString(e.Value.String())
case columnPathSpanStatusCode:
// Map OTLP status code back to TraceQL enum.
// For other values, use the raw integer.
Expand All @@ -677,9 +683,9 @@ func mapSpanAttr(e entry) (traceql.Attribute, traceql.Static) {
default:
status = traceql.Status(e.Value.Uint64())
}
return traceql.IntrinsicStatusAttribute, traceql.NewStaticStatus(status)
return traceql.NewStaticStatus(status)
case columnPathSpanStatusMessage:
return traceql.IntrinsicStatusMessageAttribute, traceql.NewStaticString(e.Value.String())
return traceql.NewStaticString(e.Value.String())
case columnPathSpanKind:
var kind traceql.Kind
switch e.Value.Uint64() {
Expand All @@ -698,90 +704,49 @@ func mapSpanAttr(e entry) (traceql.Attribute, traceql.Static) {
default:
kind = traceql.Kind(e.Value.Uint64())
}
return traceql.IntrinsicKindAttribute, traceql.NewStaticKind(kind)
return traceql.NewStaticKind(kind)
default:
// This exists for span-level dedicated columns like http.status_code
switch e.Value.Kind() {
case parquet.Boolean:
return newSpanAttr(e.Key), traceql.NewStaticBool(e.Value.Boolean())
return traceql.NewStaticBool(e.Value.Boolean())
case parquet.Int32, parquet.Int64:
return newSpanAttr(e.Key), traceql.NewStaticInt(int(e.Value.Int64()))
return traceql.NewStaticInt(int(e.Value.Int64()))
case parquet.Float:
return newSpanAttr(e.Key), traceql.NewStaticFloat(e.Value.Double())
return traceql.NewStaticFloat(e.Value.Double())
case parquet.ByteArray, parquet.FixedLenByteArray:
return newSpanAttr(e.Key), traceql.NewStaticString(e.Value.String())
}
}
return traceql.Attribute{}, traceql.Static{}
}

var _ parquetquery.GroupPredicate = (*distinctBatchCollector)(nil)

type distinctBatchCollector struct {
keep keepFn
}

func (d *distinctBatchCollector) String() string {
return "distinctBatchCollector"
}

func (d *distinctBatchCollector) KeepGroup(result *parquetquery.IteratorResult) bool {
// Gather Attributes from dedicated resource-level columns
for _, e := range result.Entries {
if attr, static := mapResourceAttr(e); d.keep(attr) {
result.AppendOtherValue(attr.String(), static)
return traceql.NewStaticString(e.Value.String())
}
}
result.Entries = result.Entries[:0]
return true
return traceql.Static{}
}

func mapResourceAttr(e entry) (traceql.Attribute, traceql.Static) {
func mapResourceAttr(e entry) traceql.Static {
switch e.Value.Kind() {
case parquet.Boolean:
return newResAttr(e.Key), traceql.NewStaticBool(e.Value.Boolean())
return traceql.NewStaticBool(e.Value.Boolean())
case parquet.Int32, parquet.Int64:
return newResAttr(e.Key), traceql.NewStaticInt(int(e.Value.Int64()))
return traceql.NewStaticInt(int(e.Value.Int64()))
case parquet.Float:
return newResAttr(e.Key), traceql.NewStaticFloat(e.Value.Double())
return traceql.NewStaticFloat(e.Value.Double())
case parquet.ByteArray, parquet.FixedLenByteArray:
return newResAttr(e.Key), traceql.NewStaticString(e.Value.String())
return traceql.NewStaticString(e.Value.String())
default:
return traceql.Attribute{}, traceql.Static{}
return traceql.Static{}
}
}

var _ parquetquery.GroupPredicate = (*distinctTraceCollector)(nil)

type distinctTraceCollector struct {
keep keepFn
}

func (d *distinctTraceCollector) String() string {
return "distinctTraceCollector"
}

func (d *distinctTraceCollector) KeepGroup(result *parquetquery.IteratorResult) bool {
for _, e := range result.Entries {
if attr, static := mapTraceAttr(e); d.keep(attr) {
result.AppendOtherValue(attr.String(), static)
}
}
result.Entries = result.Entries[:0]
return true
}

func mapTraceAttr(e entry) (traceql.Attribute, traceql.Static) {
func mapTraceAttr(e entry) traceql.Static {
switch e.Key {
case columnPathTraceID, columnPathEndTimeUnixNano, columnPathStartTimeUnixNano: // No TraceQL intrinsics for these
case columnPathDurationNanos:
return traceql.IntrinsicTraceDurationAttribute, traceql.NewStaticDuration(time.Duration(e.Value.Int64()))
return traceql.NewStaticDuration(time.Duration(e.Value.Int64()))
case columnPathRootSpanName:
return traceql.IntrinsicTraceRootSpanAttribute, traceql.NewStaticString(e.Value.String())
return traceql.NewStaticString(e.Value.String())
case columnPathRootServiceName:
return traceql.IntrinsicTraceRootServiceAttribute, traceql.NewStaticString(e.Value.String())
return traceql.NewStaticString(e.Value.String())
}
return traceql.Attribute{}, traceql.Static{}
return traceql.Static{}
}

func scopeFromDefinitionLevel(lvl int) traceql.AttributeScope {
Expand Down
Loading
Loading