Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics-generator: include messaging systems and databases in service graphs #1576

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [FEATURE] Add tags option for s3 backends. This allows new objects to be written with the configured tags. [#1442](https://github.com/grafana/tempo/pull/1442) (@stevenbrookes)
* [FEATURE] Add anonymous usage reporting, enabled by default. [#1481](https://github.com/grafana/tempo/pull/1481) (@zalegrala)
**BREAKING CHANGE** As part of the usage stats inclusion, the distributor will also require access to the store. This is required so the distirbutor can know which cluster it should be reporting membership of.
* [FEATURE] Include messaging systems and databases in service graphs. [#1576](https://github.com/grafana/tempo/pull/1576) (@kvrhdn)
* [CHANGE] metrics-generator: Changed added metric label `instance` to `__metrics_gen_instance` to reduce collisions with custom dimensions. [#1439](https://github.com/grafana/tempo/pull/1439) (@joe-elliott)
* [CHANGE] Don't enforce `max_bytes_per_tag_values_query` when set to 0. [#1447](https://github.com/grafana/tempo/pull/1447) (@joe-elliott)
* [CHANGE] Add new querier service in deployment jsonnet to serve `/status` endpoint. [#1474](https://github.com/grafana/tempo/pull/1474) (@annanay25)
Expand Down
25 changes: 23 additions & 2 deletions modules/generator/processor/servicegraphs/servicegraphs.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Processor struct {
}

func New(cfg Config, tenant string, registry registry.Registry, logger log.Logger) gen.Processor {
labels := []string{"client", "server"}
labels := []string{"client", "server", "connection_type"}
for _, d := range cfg.Dimensions {
labels = append(labels, strutil.SanitizeLabelName(d))
}
Expand Down Expand Up @@ -149,20 +149,41 @@ func (p *Processor) consume(resourceSpans []*v1_trace.ResourceSpans) (err error)

for _, ils := range rs.InstrumentationLibrarySpans {
for _, span := range ils.Spans {
connectionType := store.Unknown

switch span.Kind {
case v1_trace.Span_SPAN_KIND_PRODUCER:
// override connection type and continue processing as span kind client
connectionType = store.MessagingSystem
fallthrough
case v1_trace.Span_SPAN_KIND_CLIENT:
key := buildKey(hex.EncodeToString(span.TraceId), hex.EncodeToString(span.SpanId))
isNew, err = p.store.UpsertEdge(key, func(e *store.Edge) {
e.TraceID = tempo_util.TraceIDToHexString(span.TraceId)
e.ConnectionType = connectionType
e.ClientService = svcName
e.ClientLatencySec = spanDurationSec(span)
e.Failed = e.Failed || p.spanFailed(span)
p.upsertDimensions(e.Dimensions, rs.Resource.Attributes, span.Attributes)

// A database request will only have one span, we don't wait for the server
// span but just copy details from the client span
if dbName, ok := processor_util.FindAttributeValue("db.name", rs.Resource.Attributes, span.Attributes); ok {
e.ConnectionType = store.Database
e.ServerService = dbName
e.ServerLatencySec = spanDurationSec(span)
}
})

case v1_trace.Span_SPAN_KIND_CONSUMER:
// override connection type and continue processing as span kind server
connectionType = store.MessagingSystem
fallthrough
case v1_trace.Span_SPAN_KIND_SERVER:
key := buildKey(hex.EncodeToString(span.TraceId), hex.EncodeToString(span.ParentSpanId))
isNew, err = p.store.UpsertEdge(key, func(e *store.Edge) {
e.TraceID = tempo_util.TraceIDToHexString(span.TraceId)
e.ConnectionType = connectionType
e.ServerService = svcName
e.ServerLatencySec = spanDurationSec(span)
e.Failed = e.Failed || p.spanFailed(span)
Expand Down Expand Up @@ -214,7 +235,7 @@ func (p *Processor) Shutdown(_ context.Context) {

func (p *Processor) onComplete(e *store.Edge) {
labelValues := make([]string, 0, 2+len(p.Cfg.Dimensions))
labelValues = append(labelValues, e.ClientService, e.ServerService)
labelValues = append(labelValues, e.ClientService, e.ServerService, string(e.ConnectionType))

for _, dimension := range p.Cfg.Dimensions {
labelValues = append(labelValues, e.Dimensions[dimension])
Expand Down
148 changes: 98 additions & 50 deletions modules/generator/processor/servicegraphs/servicegraphs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,65 +25,113 @@ func TestServiceGraphs(t *testing.T) {
cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", nil)

cfg.HistogramBuckets = []float64{2.0, 3.0}
cfg.Dimensions = []string{"component", "does-not-exist"}
cfg.HistogramBuckets = []float64{0.04}
cfg.Dimensions = []string{"beast"}

p := New(cfg, "test", testRegistry, log.NewNopLogger())
defer p.Shutdown(context.Background())

traces, err := loadTestData("testdata/test-sample.json")
request, err := loadTestData("testdata/trace-with-queue-database.json")
require.NoError(t, err)

p.PushSpans(context.Background(), &tempopb.PushSpansRequest{Batches: traces.Batches})
p.PushSpans(context.Background(), request)

// Manually call expire to force removal of completed edges.
sgp := p.(*Processor)
sgp.store.Expire()
requesterToServerLabels := labels.FromMap(map[string]string{
"client": "mythical-requester",
"server": "mythical-server",
"connection_type": "",
"beast": "manticore",
})
serverToDatabaseLabels := labels.FromMap(map[string]string{
"client": "mythical-server",
"server": "postgres",
"connection_type": "database",
"beast": "",
})
requesterToRecorderLabels := labels.FromMap(map[string]string{
"client": "mythical-requester",
"server": "mythical-recorder",
"connection_type": "messaging_system",
"beast": "",
})

fmt.Println(testRegistry)

lbAppLabels := labels.FromMap(map[string]string{
"client": "lb",
"server": "app",
"component": "net/http",
"does_not_exist": "",
// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, requesterToServerLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, requesterToServerLabels))

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, serverToDatabaseLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, serverToDatabaseLabels))

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, requesterToRecorderLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, requesterToRecorderLabels))

// histograms
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(requesterToServerLabels, 0.04)))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(requesterToServerLabels, math.Inf(1))))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_client_seconds_count`, requesterToServerLabels))
assert.InDelta(t, 0.045, testRegistry.Query(`traces_service_graph_request_client_seconds_sum`, requesterToServerLabels), 0.001)

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(requesterToServerLabels, 0.04)))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(requesterToServerLabels, math.Inf(1))))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_server_seconds_count`, requesterToServerLabels))
assert.InDelta(t, 0.029, testRegistry.Query(`traces_service_graph_request_server_seconds_sum`, requesterToServerLabels), 0.001)

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(serverToDatabaseLabels, 0.04)))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(serverToDatabaseLabels, math.Inf(1))))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_client_seconds_count`, serverToDatabaseLabels))
assert.InDelta(t, 0.023, testRegistry.Query(`traces_service_graph_request_client_seconds_sum`, serverToDatabaseLabels), 0.001)

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(serverToDatabaseLabels, 0.04)))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(serverToDatabaseLabels, math.Inf(1))))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_server_seconds_count`, serverToDatabaseLabels))
assert.InDelta(t, 0.023, testRegistry.Query(`traces_service_graph_request_server_seconds_sum`, serverToDatabaseLabels), 0.001)

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(requesterToRecorderLabels, 0.04)))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(requesterToRecorderLabels, math.Inf(1))))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_client_seconds_count`, requesterToRecorderLabels))
assert.InDelta(t, 0.000068, testRegistry.Query(`traces_service_graph_request_client_seconds_sum`, requesterToRecorderLabels), 0.001)

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(requesterToRecorderLabels, 0.04)))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(requesterToRecorderLabels, math.Inf(1))))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_server_seconds_count`, requesterToRecorderLabels))
assert.InDelta(t, 0.000693, testRegistry.Query(`traces_service_graph_request_server_seconds_sum`, requesterToRecorderLabels), 0.001)
}

func TestServiceGraphs_failedRequests(t *testing.T) {
testRegistry := registry.NewTestRegistry()

cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", nil)

p := New(cfg, "test", testRegistry, log.NewNopLogger())
defer p.Shutdown(context.Background())

request, err := loadTestData("testdata/trace-with-failed-requests.json")
require.NoError(t, err)

p.PushSpans(context.Background(), request)

requesterToServerLabels := labels.FromMap(map[string]string{
"client": "mythical-requester",
"server": "mythical-server",
"connection_type": "",
})
appDbLabels := labels.FromMap(map[string]string{
"client": "app",
"server": "db",
"component": "net/http",
"does_not_exist": "",
serverToDatabaseLabels := labels.FromMap(map[string]string{
"client": "mythical-server",
"server": "postgres",
"connection_type": "database",
})

fmt.Println(testRegistry)

assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_total`, appDbLabels))
assert.Equal(t, 0.0, testRegistry.Query(`traces_service_graph_request_failed_total`, appDbLabels))

assert.Equal(t, 2.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(appDbLabels, 2.0)))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(appDbLabels, 3.0)))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(appDbLabels, math.Inf(1))))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_client_seconds_count`, appDbLabels))
assert.Equal(t, 4.4, testRegistry.Query(`traces_service_graph_request_client_seconds_sum`, appDbLabels))

assert.Equal(t, 2.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(appDbLabels, 2.0)))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(appDbLabels, 3.0)))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(appDbLabels, math.Inf(1))))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_server_seconds_count`, appDbLabels))
assert.Equal(t, 5.0, testRegistry.Query(`traces_service_graph_request_server_seconds_sum`, appDbLabels))

assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_total`, lbAppLabels))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_failed_total`, lbAppLabels))

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(lbAppLabels, 2.0)))
assert.Equal(t, 2.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(lbAppLabels, 3.0)))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_client_seconds_bucket`, withLe(lbAppLabels, math.Inf(1))))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_client_seconds_count`, lbAppLabels))
assert.Equal(t, 7.8, testRegistry.Query(`traces_service_graph_request_client_seconds_sum`, lbAppLabels))

assert.Equal(t, 2.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(lbAppLabels, 2.0)))
assert.Equal(t, 2.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(lbAppLabels, 3.0)))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_server_seconds_bucket`, withLe(lbAppLabels, math.Inf(1))))
assert.Equal(t, 3.0, testRegistry.Query(`traces_service_graph_request_server_seconds_count`, lbAppLabels))
assert.Equal(t, 6.2, testRegistry.Query(`traces_service_graph_request_server_seconds_sum`, lbAppLabels))
// counters
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, requesterToServerLabels))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_failed_total`, requesterToServerLabels))

assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_total`, serverToDatabaseLabels))
assert.Equal(t, 1.0, testRegistry.Query(`traces_service_graph_request_failed_total`, serverToDatabaseLabels))
}

func TestServiceGraphs_tooManySpansErr(t *testing.T) {
Expand All @@ -95,22 +143,22 @@ func TestServiceGraphs_tooManySpansErr(t *testing.T) {
p := New(cfg, "test", &testRegistry, log.NewNopLogger())
defer p.Shutdown(context.Background())

traces, err := loadTestData("testdata/test-sample.json")
request, err := loadTestData("testdata/trace-with-queue-database.json")
require.NoError(t, err)

err = p.(*Processor).consume(traces.Batches)
err = p.(*Processor).consume(request.Batches)
assert.True(t, errors.As(err, &tooManySpansError{}))
}

func loadTestData(path string) (*tempopb.Trace, error) {
func loadTestData(path string) (*tempopb.PushSpansRequest, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}

trace := &tempopb.Trace{}
err = jsonpb.Unmarshal(f, trace)
return trace, err
return &tempopb.PushSpansRequest{Batches: trace.Batches}, err
}

func withLe(lbls labels.Labels, le float64) labels.Labels {
Expand Down
9 changes: 9 additions & 0 deletions modules/generator/processor/servicegraphs/store/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,20 @@ package store

import "time"

type ConnectionType string

const (
Unknown ConnectionType = ""
MessagingSystem = "messaging_system"
Database = "database"
)

// Edge is an Edge between two nodes in the graph
type Edge struct {
key string

TraceID string
ConnectionType ConnectionType
ServerService, ClientService string
ServerLatencySec, ClientLatencySec float64

Expand Down
12 changes: 9 additions & 3 deletions modules/generator/processor/servicegraphs/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *store) tryEvictHead() bool {
// UpsertEdge fetches an Edge from the store and updates it using the given callback. If the Edge
// doesn't exist yet, it creates a new one with the default TTL.
// If the Edge is complete after applying the callback, it's completed and removed.
func (s *store) UpsertEdge(key string, update Callback) (bool, error) {
func (s *store) UpsertEdge(key string, update Callback) (isNew bool, err error) {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand All @@ -93,16 +93,22 @@ func (s *store) UpsertEdge(key string, update Callback) (bool, error) {
return false, nil
}

edge := newEdge(key, s.ttl)
update(edge)

if edge.isComplete() {
s.onComplete(edge)
return true, nil
}

Comment on lines +96 to +103
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice improvement 👍

// Check we can add new edges
if s.l.Len() >= s.maxItems {
// todo: try to evict expired items
return false, ErrTooManyItems
}

edge := newEdge(key, s.ttl)
ele := s.l.PushBack(edge)
s.m[key] = ele
update(edge)

return true, nil
}
Expand Down
Loading