Skip to content

Commit

Permalink
tracing: fix DHT keys as string attribute not being valid utf-8
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelMure committed Jul 27, 2023
1 parent ee95d1a commit b2c3c97
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 8 deletions.
4 changes: 2 additions & 2 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func workers(numWorkers int, fn func(interface{}), inputs <-chan interface{}) {
}

func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
_, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key)))
_, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()

kbID := kb.ConvertKey(key)
Expand Down Expand Up @@ -554,7 +554,7 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt

// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
var good bool
defer func() {
if !good {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/libp2p/go-libp2p-xor v0.1.0
github.com/libp2p/go-msgio v0.3.0
github.com/libp2p/go-netroute v0.2.1
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-base32 v0.1.0
github.com/multiformats/go-multiaddr v0.9.0
github.com/multiformats/go-multibase v0.2.0
Expand Down Expand Up @@ -81,7 +82,6 @@ require (
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
Expand Down
14 changes: 14 additions & 0 deletions internal/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,25 @@ package internal
import (
"context"
"fmt"
"unicode/utf8"

b58 "github.com/mr-tron/base58/base58"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("go-libp2p-kad-dht").Start(ctx, fmt.Sprintf("KademliaDHT.%s", name), opts...)
}

// KeyAsAttribute format a DHT key into a suitable tracing attribute.
// DHT keys can be either valid utf-8 or binary, when they are derived from, for example, a multihash.
// Tracing (and notably OpenTelemetry+grpc exporter) requires valid utf-8 for string attributes.
func KeyAsAttribute(name string, key string) attribute.KeyValue {
b := []byte(key)
if utf8.Valid(b) {
return attribute.String(name, key)
}
return attribute.String(name, b58.Encode(b))
}
2 changes: 1 addition & 1 deletion lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()

if key == "" {
Expand Down
2 changes: 1 addition & 1 deletion query.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type lookupWithFollowupResult struct {
// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the
// lookup that have not already been successfully queried.
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(attribute.String("Target", target)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(internal.KeyAsAttribute("Target", target)))
defer span.End()

// run the query
Expand Down
6 changes: 3 additions & 3 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()

if !dht.enableValues {
Expand Down Expand Up @@ -109,7 +109,7 @@ type recvdVal struct {

// GetValue searches for the value corresponding to given Key.
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
defer span.End()

if !dht.enableValues {
Expand Down Expand Up @@ -146,7 +146,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op

// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(attribute.String("Key", key)))
ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key)))
var good bool
defer func() {
if !good {
Expand Down

0 comments on commit b2c3c97

Please sign in to comment.