Skip to content

Commit

Permalink
add tag search
Browse files Browse the repository at this point in the history
  • Loading branch information
pmenglund committed Feb 28, 2024
1 parent 3af1991 commit 2d99feb
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
5 changes: 2 additions & 3 deletions integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package integration_test
import (
"context"
"fmt"
"log"
"os"
"testing"

Expand Down Expand Up @@ -96,12 +95,12 @@ func deleteEverything(t *testing.T, rc *rockset.RockClient, cfg rss.Config) func

for _, d := range response.GetData() {
if d.GetStatus() != "DELETED" {
log.Printf("failed to delete %s: %s", d.GetId(), d.GetStatus())
t.Logf("failed to delete %s: %s", d.GetId(), d.GetStatus())
}
}
t.Logf("%s: deleted %d documents", coll, len(response.GetData()))

// wait for the delete to complete
// wait for the deletion to have propagated
w := wait.New(rc)
if err = w.UntilQueryable(ctx, cfg.Workspace, coll, []string{response.GetLastOffset()}); err != nil {
return err
Expand Down
21 changes: 5 additions & 16 deletions storage/spanstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,8 @@ func (s Store) findTraces(ctx context.Context, ids []model.TraceID) ([]*model.Tr

// TODO: this is very naïve and must be improved to avoid SQL injection
func buildQuery(config Config, query *spanstore.TraceQueryParameters) string {
// TODO implement tag filtering
// {
// Tags:map[sameplacetag1:sameplacevalue sameplacetag2:123 sameplacetag3:72.5 sameplacetag4:true]
// }

var q strings.Builder
q.WriteString("SELECT trace_id, MIN(start_time) AS start_time\n")
q.WriteString("SELECT spans.trace_id AS trace_id, MIN(spans.start_time) AS start_time\n")
q.WriteString(fmt.Sprintf("FROM %s.%s spans\n", config.Workspace, config.Spans))

q.WriteString("WHERE ")
Expand Down Expand Up @@ -239,6 +234,10 @@ func buildQuery(config Config, query *spanstore.TraceQueryParameters) string {
q.WriteString(fmt.Sprintf(" AND spans.duration <= %d", query.DurationMax))
}

for k, v := range query.Tags {
q.WriteString(fmt.Sprintf(" AND spans.kv.%s = '%s'", k, v))
}

q.WriteString("\nGROUP BY trace_id\n")
q.WriteString("ORDER BY start_time DESC, trace_id\n")

Expand Down Expand Up @@ -286,13 +285,3 @@ func traceIDs(ids []model.TraceID) (string, error) {

return strings.Join(tids, ","), nil
}

/*
SELECT trace_id, MIN(start_time) AS start_time
FROM test.spans spans
WHERE spans.process.service_name = 'query11-service'
AND spans.start_time \u003e= '2024-02-11T15:46:31.639875Z' AND
spans.start_time \u003c= '2024-02-11T17:46:31.639875Z'\n
GROUP BY trace_id\nORDER BY start_time DESC, trace_id\n
LIMIT 1000
*/
45 changes: 44 additions & 1 deletion storage/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,63 @@ package spanstore

import (
"context"
"strconv"

"github.com/jaegertracing/jaeger/model"
"github.com/rockset/rockset-go-client/writer"
)

const unspecified = "unspecified"

type Span struct {
model.Span
KV map[string]string `json:"kv"`
}

func extractKeyAndValue(tag model.KeyValue) (k, v string) {
var s string

switch tag.VType {
case model.StringType:
s = tag.VStr
case model.BoolType:
s = strconv.FormatBool(tag.VBool)
case model.Int64Type:
s = strconv.FormatInt(tag.VInt64, 10)
case model.Float64Type:
s = strconv.FormatFloat(tag.VFloat64, 'f', -1, 64)
case model.BinaryType:
s = string(tag.VBinary)
}

return tag.Key, s
}

func (s Store) WriteSpan(_ context.Context, span *model.Span) error {
// to speed up queries we convert tags & process tags to a single map of string keys and string values,
// as that is what we get from the web ui when someone is searching for a trace,
// which makes the query much faster as we index the keys and values.
sp := Span{
Span: *span,
KV: make(map[string]string),
}

for _, tag := range span.Tags {
k, v := extractKeyAndValue(tag)
sp.KV[k] = v
}
for _, tag := range span.Process.Tags {
k, v := extractKeyAndValue(tag)
sp.KV[k] = v
}

s.writer.C() <- writer.Request{
Workspace: s.config.Workspace,
Collection: s.config.Spans,
Data: *span,
Data: sp,
}

// TODO we should batch these updates, to reduce the number of writes
kind := unspecified
for _, tag := range span.Tags {
if tag.Key == "span.kind" {
Expand Down

0 comments on commit 2d99feb

Please sign in to comment.