diff --git a/integration/grpc_test.go b/integration/grpc_test.go index 3eb972e..bc0d8bc 100644 --- a/integration/grpc_test.go +++ b/integration/grpc_test.go @@ -3,7 +3,6 @@ package integration_test import ( "context" "fmt" - "log" "os" "testing" @@ -96,12 +95,12 @@ func deleteEverything(t *testing.T, rc *rockset.RockClient, cfg rss.Config) func for _, d := range response.GetData() { if d.GetStatus() != "DELETED" { - log.Printf("failed to delete %s: %s", d.GetId(), d.GetStatus()) + t.Logf("failed to delete %s: %s", d.GetId(), d.GetStatus()) } } t.Logf("%s: deleted %d documents", coll, len(response.GetData())) - // wait for the delete to complete + // wait for the deletion to have propagated w := wait.New(rc) if err = w.UntilQueryable(ctx, cfg.Workspace, coll, []string{response.GetLastOffset()}); err != nil { return err diff --git a/storage/spanstore/store.go b/storage/spanstore/store.go index 97ae19a..0284e09 100644 --- a/storage/spanstore/store.go +++ b/storage/spanstore/store.go @@ -204,13 +204,8 @@ func (s Store) findTraces(ctx context.Context, ids []model.TraceID) ([]*model.Tr // TODO: this is very naïve and must be improved to avoid SQL injection func buildQuery(config Config, query *spanstore.TraceQueryParameters) string { - // TODO implement tag filtering - // { - // Tags:map[sameplacetag1:sameplacevalue sameplacetag2:123 sameplacetag3:72.5 sameplacetag4:true] - // } - var q strings.Builder - q.WriteString("SELECT trace_id, MIN(start_time) AS start_time\n") + q.WriteString("SELECT spans.trace_id AS trace_id, MIN(spans.start_time) AS start_time\n") q.WriteString(fmt.Sprintf("FROM %s.%s spans\n", config.Workspace, config.Spans)) q.WriteString("WHERE ") @@ -239,6 +234,10 @@ func buildQuery(config Config, query *spanstore.TraceQueryParameters) string { q.WriteString(fmt.Sprintf(" AND spans.duration <= %d", query.DurationMax)) } + for k, v := range query.Tags { + q.WriteString(fmt.Sprintf(" AND spans.kv.%s = '%s'", k, v)) + } + q.WriteString("\nGROUP BY trace_id\n") q.WriteString("ORDER BY start_time DESC, trace_id\n") @@ -286,13 +285,3 @@ func traceIDs(ids []model.TraceID) (string, error) { return strings.Join(tids, ","), nil } - -/* -SELECT trace_id, MIN(start_time) AS start_time -FROM test.spans spans -WHERE spans.process.service_name = 'query11-service' -AND spans.start_time \u003e= '2024-02-11T15:46:31.639875Z' AND -spans.start_time \u003c= '2024-02-11T17:46:31.639875Z'\n -GROUP BY trace_id\nORDER BY start_time DESC, trace_id\n -LIMIT 1000 -*/ diff --git a/storage/spanstore/writer.go b/storage/spanstore/writer.go index 953576f..3fc2644 100644 --- a/storage/spanstore/writer.go +++ b/storage/spanstore/writer.go @@ -2,6 +2,7 @@ package spanstore import ( "context" + "strconv" "github.com/jaegertracing/jaeger/model" "github.com/rockset/rockset-go-client/writer" @@ -9,13 +10,55 @@ import ( const unspecified = "unspecified" +type Span struct { + model.Span + KV map[string]string `json:"kv"` +} + +func extractKeyAndValue(tag model.KeyValue) (k, v string) { + var s string + + switch tag.VType { + case model.StringType: + s = tag.VStr + case model.BoolType: + s = strconv.FormatBool(tag.VBool) + case model.Int64Type: + s = strconv.FormatInt(tag.VInt64, 10) + case model.Float64Type: + s = strconv.FormatFloat(tag.VFloat64, 'f', -1, 64) + case model.BinaryType: + s = string(tag.VBinary) + } + + return tag.Key, s +} + func (s Store) WriteSpan(_ context.Context, span *model.Span) error { + // to speed up queries we convert tags & process tags to a single map of string keys and string values, + // as that is what we get from the web ui when someone is searching for a trace, + // which makes the query much faster as we index the keys and values. + sp := Span{ + Span: *span, + KV: make(map[string]string), + } + + for _, tag := range span.Tags { + k, v := extractKeyAndValue(tag) + sp.KV[k] = v + } + for _, tag := range span.Process.Tags { + k, v := extractKeyAndValue(tag) + sp.KV[k] = v + } + s.writer.C() <- writer.Request{ Workspace: s.config.Workspace, Collection: s.config.Spans, - Data: *span, + Data: sp, } + // TODO we should batch these updates, to reduce the number of writes kind := unspecified for _, tag := range span.Tags { if tag.Key == "span.kind" {