From 2e721a380ccc56dc1bec386c032de29cd0ad4266 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 25 Nov 2021 13:22:21 +1100 Subject: [PATCH] feat: add basic OT tracing for incoming requests Closes: #271 --- go.mod | 5 +- go.sum | 15 +++- impl/graphsync.go | 14 ++++ impl/graphsync_test.go | 126 ++++++++++++++++++++++++++++ requestmanager/client.go | 6 +- requestmanager/executor/executor.go | 7 ++ requestmanager/messages.go | 4 +- requestmanager/server.go | 16 +++- testutil/tracing.go | 106 +++++++++++++++++++++++ 9 files changed, 291 insertions(+), 8 deletions(-) create mode 100644 testutil/tracing.go diff --git a/go.mod b/go.mod index 63947240..bcceb789 100644 --- a/go.mod +++ b/go.mod @@ -35,8 +35,11 @@ require ( github.com/libp2p/go-msgio v0.0.6 github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multihash v0.0.15 - github.com/stretchr/testify v1.6.1 + github.com/stretchr/testify v1.7.0 github.com/whyrusleeping/cbor-gen v0.0.0-20210219115102-f37d292932f2 + go.opentelemetry.io/otel v1.2.0 + go.opentelemetry.io/otel/sdk v1.2.0 + go.opentelemetry.io/otel/trace v1.2.0 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 google.golang.org/protobuf v1.27.1 diff --git a/go.sum b/go.sum index d6ffa038..03a50df0 100644 --- a/go.sum +++ b/go.sum @@ -99,8 +99,9 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gopacket v1.1.17 h1:rMrlX2ZY2UbvT+sdz3+6J+pp2z+msCq9MxTU6ymxbBY= github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= @@ -616,8 +617,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli/v2 v2.0.0 h1:+HU9SCbu8GnEUFtIBfuUNXN39ofWViIEJIp6SURMpCg= @@ -650,6 +652,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4 h1:LYy1Hy3MJdrCdMwwzxA/dRok4ejH+RwNGbuoD9fCjto= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/otel v1.2.0 h1:YOQDvxO1FayUcT9MIhJhgMyNO1WqoduiyvQHzGN0kUQ= +go.opentelemetry.io/otel v1.2.0/go.mod h1:aT17Fk0Z1Nor9e0uisf98LrntPGMnk4frBO9+dkf69I= +go.opentelemetry.io/otel/sdk v1.2.0 h1:wKN260u4DesJYhyjxDa7LRFkuhH7ncEVKU37LWcyNIo= +go.opentelemetry.io/otel/sdk v1.2.0/go.mod h1:jNN8QtpvbsKhgaC6V5lHiejMoKD+V8uadoSafgHPx1U= +go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0= +go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= @@ -732,8 +740,9 @@ golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 h1:46ULzRKLh1CwgRq2dC5SlBzEqqNCi8rreOZnNrbqcIY= golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 h1:iGu644GcxtEcrInvDsQRCwJjtCIOlT2V7IRt6ah2Whw= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= diff --git a/impl/graphsync.go b/impl/graphsync.go index fe26fed1..9f139ffc 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -8,6 +8,9 @@ import ( "github.com/ipfs/go-peertaskqueue" ipld "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/allocator" @@ -304,6 +307,17 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, // Request initiates a new GraphSync request to the given peer using the given selector spec. func (gs *GraphSync) Request(ctx context.Context, p peer.ID, root ipld.Link, selector ipld.Node, extensions ...graphsync.ExtensionData) (<-chan graphsync.ResponseProgress, <-chan error) { + var extNames []string + for _, ext := range extensions { + extNames = append(extNames, string(ext.Name)) + } + var span trace.Span + ctx, span = otel.Tracer("graphsync").Start(ctx, "request", trace.WithAttributes( + attribute.String("peerID", p.Pretty()), + attribute.String("root", root.String()), + attribute.StringSlice("extensions", extNames), + )) + defer span.End() return gs.requestManager.NewRequest(ctx, p, root, selector, extensions...) } diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 6d0a9ce1..ea101cdf 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -11,6 +11,7 @@ import ( "math/rand" "os" "path/filepath" + "reflect" "testing" "time" @@ -48,6 +49,8 @@ import ( ) func TestMakeRequestToNetwork(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) @@ -84,6 +87,23 @@ func TestMakeRequestToNetwork(t *testing.T) { returnedData, found := receivedRequest.Extension(td.extensionName) require.True(t, found) require.Equal(t, td.extensionData, returnedData, "Failed to encode extension") + + tracing := collectTracing(t) + // single, incomplete trace expected, we stop short of executing the request + require.ElementsMatch(t, []string{"request->newRequest"}, tracing.TracesToStrings()) + + // make sure the attributes are what we expect + requestSpans := tracing.FindSpans("request") + peerIdAttr := testutil.AttributeValueInTraceSpan(t, requestSpans[0], "peerID") + require.Equal(t, td.host2.ID().Pretty(), peerIdAttr.AsString()) + rootAttr := testutil.AttributeValueInTraceSpan(t, requestSpans[0], "root") + require.Equal(t, blockChain.TipLink.String(), rootAttr.AsString()) + extensionsAttr := testutil.AttributeValueInTraceSpan(t, requestSpans[0], "extensions") + require.Equal(t, []string{string(td.extensionName)}, extensionsAttr.AsStringSlice()) + + newRequestSpans := tracing.FindSpans("newRequest") + requestIdAttr := testutil.AttributeValueInTraceSpan(t, newRequestSpans[0], "requestID") + require.Equal(t, int64(0), requestIdAttr.AsInt64()) } func TestSendResponseToIncomingRequest(t *testing.T) { @@ -153,6 +173,7 @@ func TestSendResponseToIncomingRequest(t *testing.T) { } func TestRejectRequestsByDefault(t *testing.T) { + collectTracing := testutil.SetupTracing() // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -171,9 +192,15 @@ func TestRejectRequestsByDefault(t *testing.T) { testutil.VerifyEmptyResponse(ctx, t, progressChan) testutil.VerifySingleTerminalError(ctx, t, errChan) + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -197,9 +224,15 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) { blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse)) testutil.VerifySingleTerminalError(ctx, t, errChan) require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks") + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -223,9 +256,15 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) { blockChain.VerifyResponseRange(ctx, progressChan, 0, int(linksToTraverse)) testutil.VerifySingleTerminalError(ctx, t, errChan) require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks") + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestGraphsyncRoundTrip(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -284,9 +323,15 @@ func TestGraphsyncRoundTrip(t *testing.T) { var finalResponseStatus graphsync.ResponseStatusCode testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status") require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus) + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestGraphsyncRoundTripPartial(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -333,9 +378,15 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { var finalResponseStatus graphsync.ResponseStatusCode testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status") require.Equal(t, graphsync.RequestCompletedPartial, finalResponseStatus) + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -382,9 +433,15 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) { require.Equal(t, blockChainLength, totalSent) require.Equal(t, blockChainLength-set.Len(), totalSentOnWire) + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -433,9 +490,15 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) { require.Equal(t, blockChainLength, totalSent) require.Equal(t, blockChainLength-50, totalSentOnWire) + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestPauseResume(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -485,8 +548,14 @@ func TestPauseResume(t *testing.T) { testutil.VerifyEmptyErrors(ctx, t, errChan) require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks") + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } + func TestPauseResumeRequest(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -531,9 +600,15 @@ func TestPauseResumeRequest(t *testing.T) { blockChain.VerifyRemainder(ctx, progressChan, stopPoint) testutil.VerifyEmptyErrors(ctx, t, errChan) require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks") + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestPauseResumeViaUpdate(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -590,9 +665,15 @@ func TestPauseResumeViaUpdate(t *testing.T) { require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data") require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data") + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -651,9 +732,15 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) { require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data") require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data") + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestNetworkDisconnect(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -722,9 +809,16 @@ func TestNetworkDisconnect(t *testing.T) { testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error") require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error()) testutil.AssertReceive(ctx, t, receiverError, &err, "should receive an error on receiver side") + + tracing := collectTracing(t) + // single trace expected, likely incomplete but but the executor may slip in and begin executing before disconnect + traces := tracing.TracesToStrings() + require.True(t, reflect.DeepEqual([]string{"request->newRequest"}, traces) || reflect.DeepEqual([]string{"request->newRequest->executeTask"}, traces)) } func TestConnectFail(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -757,9 +851,15 @@ func TestConnectFail(t *testing.T) { testutil.AssertReceive(ctx, t, reqNetworkError, &err, "should receive network error") testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error") require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error()) + + tracing := collectTracing(t) + // single, incomplete trace expected + require.ElementsMatch(t, []string{"request->newRequest"}, tracing.TracesToStrings()) } func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -822,9 +922,15 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { testutil.VerifyEmptyErrors(ctx, t, errChan) require.Len(t, td.blockStore1, 0, "should store no blocks in normal store") require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store") + + tracing := collectTracing(t) + // two request traces expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask", "request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -887,6 +993,9 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { testutil.VerifyEmptyErrors(ctx, t, errChan2) require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store 2") + tracing := collectTracing(t) + // two request traces expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask", "request->newRequest->executeTask"}, tracing.TracesToStrings()) } // TestRoundTripLargeBlocksSlowNetwork test verifies graphsync continues to work @@ -898,6 +1007,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) { // backlog of blocks and then sending them in one giant network packet that can't // be decoded on the client side func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network if testing.Short() { t.Skip() @@ -924,6 +1035,10 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) { blockChain.VerifyWholeChain(ctx, progressChan) testutil.VerifyEmptyErrors(ctx, t, errChan) + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } // What this test does: @@ -939,6 +1054,7 @@ func TestUnixFSFetch(t *testing.T) { if testing.Short() { t.Skip() } + collectTracing := testutil.SetupTracing() const unixfsChunkSize uint64 = 1 << 10 const unixfsLinksPerLevel = 1024 @@ -1044,9 +1160,15 @@ func TestUnixFSFetch(t *testing.T) { // verify original bytes match final bytes! require.Equal(t, origBytes, finalBytes, "should have gotten same bytes written as read but didn't") + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } func TestGraphsyncBlockListeners(t *testing.T) { + collectTracing := testutil.SetupTracing() + // create network ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 1*time.Second) @@ -1124,6 +1246,10 @@ func TestGraphsyncBlockListeners(t *testing.T) { require.Equal(t, blockChainLength, blocksOutgoing) require.Equal(t, blockChainLength, blocksIncoming) require.Equal(t, blockChainLength, blocksSent) + + tracing := collectTracing(t) + // single trace expected + require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings()) } type gsTestData struct { diff --git a/requestmanager/client.go b/requestmanager/client.go index 4e3f2cb0..cdcca26b 100644 --- a/requestmanager/client.go +++ b/requestmanager/client.go @@ -16,6 +16,7 @@ import ( "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/ipldutil" @@ -52,6 +53,7 @@ const ( type inProgressRequestStatus struct { ctx context.Context + span trace.Span startTime time.Time cancelFn func() p peer.ID @@ -178,9 +180,11 @@ func (rm *RequestManager) NewRequest(ctx context.Context, return rm.singleErrorResponse(fmt.Errorf("invalid selector spec")) } + span := trace.SpanFromContext(ctx) + inProgressRequestChan := make(chan inProgressRequest) - rm.send(&newRequestMessage{p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done()) + rm.send(&newRequestMessage{span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done()) var receivedInProgressRequest inProgressRequest select { case <-rm.ctx.Done(): diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index 95b7f23e..77a9ac18 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -12,6 +12,8 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/traversal" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" @@ -73,6 +75,10 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask. log.Info("Empty task on peer request stack") return false } + + _, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(ctx, requestTask.Span), "executeTask") + defer span.End() + log.Debugw("beginning request execution", "id", requestTask.Request.ID(), "peer", pid.String(), "root_cid", requestTask.Request.Root().String()) err := e.traverse(requestTask) if err != nil && !ipldutil.IsContextCancelErr(err) { @@ -92,6 +98,7 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask. // RequestTask are parameters for a single request execution type RequestTask struct { Ctx context.Context + Span trace.Span Request gsmsg.GraphSyncRequest LastResponse *atomic.Value DoNotSendCids *cid.Set diff --git a/requestmanager/messages.go b/requestmanager/messages.go index e5709ea8..0eb76ad2 100644 --- a/requestmanager/messages.go +++ b/requestmanager/messages.go @@ -5,6 +5,7 @@ import ( "github.com/ipfs/go-peertaskqueue/peertask" "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" gsmsg "github.com/ipfs/go-graphsync/message" @@ -83,6 +84,7 @@ func (trm *releaseRequestTaskMessage) handle(rm *RequestManager) { } type newRequestMessage struct { + span trace.Span p peer.ID root ipld.Link selector ipld.Node @@ -93,7 +95,7 @@ type newRequestMessage struct { func (nrm *newRequestMessage) handle(rm *RequestManager) { var ipr inProgressRequest - ipr.request, ipr.incoming, ipr.incomingError = rm.newRequest(nrm.p, nrm.root, nrm.selector, nrm.extensions) + ipr.request, ipr.incoming, ipr.incomingError = rm.newRequest(nrm.span, nrm.p, nrm.root, nrm.selector, nrm.extensions) ipr.requestID = ipr.request.ID() select { diff --git a/requestmanager/server.go b/requestmanager/server.go index 20a5be0c..142642f7 100644 --- a/requestmanager/server.go +++ b/requestmanager/server.go @@ -15,6 +15,9 @@ import ( "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/libp2p/go-libp2p-core/peer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" @@ -49,10 +52,17 @@ func (rm *RequestManager) cleanupInProcessRequests() { } } -func (rm *RequestManager) newRequest(p peer.ID, root ipld.Link, selector ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) { +func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld.Link, selector ipld.Node, extensions []graphsync.ExtensionData) (gsmsg.GraphSyncRequest, chan graphsync.ResponseProgress, chan error) { requestID := rm.nextRequestID rm.nextRequestID++ + ctx := trace.ContextWithSpan(rm.ctx, parentSpan) + var span trace.Span + ctx, span = otel.Tracer("graphsync").Start(ctx, "newRequest", trace.WithAttributes( + attribute.Int("requestID", int(requestID)), + )) + defer span.End() + log.Infow("graphsync request initiated", "request id", requestID, "peer", p, "root", root) request, hooksResult, err := rm.validateRequest(requestID, p, root, selector, extensions) @@ -71,9 +81,10 @@ func (rm *RequestManager) newRequest(p peer.ID, root ipld.Link, selector ipld.No } else { doNotSendCids = cid.NewSet() } - ctx, cancel := context.WithCancel(rm.ctx) + ctx, cancel := context.WithCancel(ctx) requestStatus := &inProgressRequestStatus{ ctx: ctx, + span: span, startTime: time.Now(), cancelFn: cancel, p: p, @@ -141,6 +152,7 @@ func (rm *RequestManager) requestTask(requestID graphsync.RequestID) executor.Re ipr.state = running return executor.RequestTask{ Ctx: ipr.ctx, + Span: ipr.span, Request: ipr.request, LastResponse: &ipr.lastResponse, DoNotSendCids: ipr.doNotSendCids, diff --git a/testutil/tracing.go b/testutil/tracing.go new file mode 100644 index 00000000..3ecfb9d7 --- /dev/null +++ b/testutil/tracing.go @@ -0,0 +1,106 @@ +package testutil + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +type Collector struct { + Spans tracetest.SpanStubs +} + +func (c *Collector) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error { + c.Spans = tracetest.SpanStubsFromReadOnlySpans(spans) + return nil +} + +func (c *Collector) Shutdown(ctx context.Context) error { + return nil +} + +func (c Collector) FindSpans(name string) tracetest.SpanStubs { + var found = tracetest.SpanStubs{} + for _, s := range c.Spans { + if s.Name == name { + found = append(found, s) + } + } + return found +} + +// TracesToString returns an array of traces represented as strings with each +// span in the trace identified by name separated by a '->' +func (c Collector) TracesToStrings() []string { + parents := c.FindParentSpans() + return c.tracesToString("", parents) +} + +func (c Collector) tracesToString(trace string, spans tracetest.SpanStubs) []string { + var traces []string + for _, span := range spans { + var t string + if trace == "" { + t = span.Name + } else { + t = fmt.Sprintf("%v->%v", trace, span.Name) + } + children := c.FindSpansWithParent(span) + if len(children) == 0 { + return []string{t} + } + traces = append(traces, c.tracesToString(t, children)...) + } + return traces +} + +func (c Collector) FindParentSpans() tracetest.SpanStubs { + var found = tracetest.SpanStubs{} + for _, s := range c.Spans { + if s.Parent.SpanID() == [8]byte{} { + found = append(found, s) + } + } + return found +} + +func (c Collector) FindSpansWithParent(stub tracetest.SpanStub) tracetest.SpanStubs { + var found = tracetest.SpanStubs{} + for _, s := range c.Spans { + if s.Parent.SpanID() == stub.SpanContext.SpanID() { + found = append(found, s) + } + } + return found +} + +var _ trace.SpanExporter = &Collector{} + +func SetupTracing() func(t *testing.T) *Collector { + collector := &Collector{} + tp := trace.NewTracerProvider(trace.WithBatcher(collector)) + otel.SetTracerProvider(tp) + + collect := func(t *testing.T) *Collector { + require.NoError(t, tp.Shutdown(context.Background())) + return collector + } + + return collect +} + +func AttributeValueInTraceSpan(t *testing.T, stub tracetest.SpanStub, attributeName string) attribute.Value { + for _, attr := range stub.Attributes { + if attr.Key == attribute.Key(attributeName) { + return attr.Value + } + } + require.Fail(t, "did not find expected attribute %v on trace span %v", attributeName, stub.Name) + return attribute.Value{} +}