From b2c3c97d8a3289f10f41a017901e4c37a9d12242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Thu, 27 Jul 2023 10:37:54 +0200 Subject: [PATCH] tracing: fix DHT keys as string attribute not being valid utf-8 --- fullrt/dht.go | 4 ++-- go.mod | 2 +- internal/tracing.go | 14 ++++++++++++++ lookup.go | 2 +- query.go | 2 +- routing.go | 6 +++--- 6 files changed, 22 insertions(+), 8 deletions(-) diff --git a/fullrt/dht.go b/fullrt/dht.go index 3b0cd3e94..6cdde4cb2 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -425,7 +425,7 @@ func workers(numWorkers int, fn func(interface{}), inputs <-chan interface{}) { } func (dht *FullRT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { - _, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key))) + _, span := internal.StartSpan(ctx, "FullRT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() kbID := kb.ConvertKey(key) @@ -554,7 +554,7 @@ func (dht *FullRT) GetValue(ctx context.Context, key string, opts ...routing.Opt // SearchValue searches for the value corresponding to given Key and streams the results. func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { - ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "FullRT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) var good bool defer func() { if !good { diff --git a/go.mod b/go.mod index 0032196bc..1b88b20ae 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/libp2p/go-libp2p-xor v0.1.0 github.com/libp2p/go-msgio v0.3.0 github.com/libp2p/go-netroute v0.2.1 + github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-base32 v0.1.0 github.com/multiformats/go-multiaddr v0.9.0 github.com/multiformats/go-multibase v0.2.0 @@ -81,7 +82,6 @@ require ( github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect github.com/minio/sha256-simd v1.0.1 // indirect - github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect diff --git a/internal/tracing.go b/internal/tracing.go index 2a2f18647..6a0c36ac4 100644 --- a/internal/tracing.go +++ b/internal/tracing.go @@ -3,11 +3,25 @@ package internal import ( "context" "fmt" + "unicode/utf8" + b58 "github.com/mr-tron/base58/base58" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { return otel.Tracer("go-libp2p-kad-dht").Start(ctx, fmt.Sprintf("KademliaDHT.%s", name), opts...) } + +// KeyAsAttribute format a DHT key into a suitable tracing attribute. +// DHT keys can be either valid utf-8 or binary, when they are derived from, for example, a multihash. +// Tracing (and notably OpenTelemetry+grpc exporter) requires valid utf-8 for string attributes. +func KeyAsAttribute(name string, key string) attribute.KeyValue { + b := []byte(key) + if utf8.Valid(b) { + return attribute.String(name, key) + } + return attribute.String(name, b58.Encode(b)) +} diff --git a/lookup.go b/lookup.go index 563ed6bee..868777816 100644 --- a/lookup.go +++ b/lookup.go @@ -22,7 +22,7 @@ import ( // If the context is canceled, this function will return the context error along // with the closest K peers it has found so far. func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetClosestPeers", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() if key == "" { diff --git a/query.go b/query.go index 524269aec..7c01a2af2 100644 --- a/query.go +++ b/query.go @@ -81,7 +81,7 @@ type lookupWithFollowupResult struct { // After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the // lookup that have not already been successfully queried. func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(attribute.String("Target", target))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.RunLookupWithFollowup", trace.WithAttributes(internal.KeyAsAttribute("Target", target))) defer span.End() // run the query diff --git a/routing.go b/routing.go index 20490041b..26c079761 100644 --- a/routing.go +++ b/routing.go @@ -34,7 +34,7 @@ import ( // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.PutValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() if !dht.enableValues { @@ -109,7 +109,7 @@ type recvdVal struct { // GetValue searches for the value corresponding to given Key. func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.GetValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) defer span.End() if !dht.enableValues { @@ -146,7 +146,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op // SearchValue searches for the value corresponding to given Key and streams the results. func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { - ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(attribute.String("Key", key))) + ctx, span := internal.StartSpan(ctx, "IpfsDHT.SearchValue", trace.WithAttributes(internal.KeyAsAttribute("Key", key))) var good bool defer func() { if !good {