Skip to content

Commit

Permalink
fixup! fixup! feat: add basic OT tracing for incoming requests
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Nov 26, 2021
1 parent 5f9fd7d commit ae60cab
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
54 changes: 45 additions & 9 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"math/rand"
"os"
"path/filepath"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -45,6 +44,7 @@ import (
gsmsg "github.com/ipfs/go-graphsync/message"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/ipfs/go-graphsync/taskqueue"
"github.com/ipfs/go-graphsync/testutil"
)

Expand Down Expand Up @@ -88,10 +88,11 @@ func TestMakeRequestToNetwork(t *testing.T) {
require.True(t, found)
require.Equal(t, td.extensionData, returnedData, "Failed to encode extension")

graphSync.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected, likely incomplete but but the executor may slip in and begin executing before end of test
traces := tracing.TracesToStrings()
require.True(t, reflect.DeepEqual([]string{"request->newRequest"}, traces) || reflect.DeepEqual([]string{"request->newRequest->executeTask"}, traces))
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())

// make sure the attributes are what we expect
requestSpans := tracing.FindSpans("request")
Expand Down Expand Up @@ -194,6 +195,8 @@ func TestRejectRequestsByDefault(t *testing.T) {
testutil.VerifyEmptyResponse(ctx, t, progressChan)
testutil.VerifySingleTerminalError(ctx, t, errChan)

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -226,6 +229,8 @@ func TestGraphsyncRoundTripRequestBudgetRequestor(t *testing.T) {
testutil.VerifySingleTerminalError(ctx, t, errChan)
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -258,6 +263,8 @@ func TestGraphsyncRoundTripRequestBudgetResponder(t *testing.T) {
testutil.VerifySingleTerminalError(ctx, t, errChan)
require.Len(t, td.blockStore1, int(linksToTraverse), "did not store all blocks")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -325,6 +332,8 @@ func TestGraphsyncRoundTrip(t *testing.T) {
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus)

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -380,6 +389,8 @@ func TestGraphsyncRoundTripPartial(t *testing.T) {
testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status")
require.Equal(t, graphsync.RequestCompletedPartial, finalResponseStatus)

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -435,6 +446,8 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {
require.Equal(t, blockChainLength, totalSent)
require.Equal(t, blockChainLength-set.Len(), totalSentOnWire)

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -492,6 +505,8 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {
require.Equal(t, blockChainLength, totalSent)
require.Equal(t, blockChainLength-50, totalSentOnWire)

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -549,6 +564,8 @@ func TestPauseResume(t *testing.T) {
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -602,6 +619,8 @@ func TestPauseResumeRequest(t *testing.T) {
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -667,6 +686,8 @@ func TestPauseResumeViaUpdate(t *testing.T) {
require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data")
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -734,6 +755,8 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
require.Equal(t, td.extensionResponseData, receivedReponseData, "did not receive correct extension response data")
require.Equal(t, td.extensionUpdateData, receivedUpdateData, "did not receive correct extension update data")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -811,10 +834,11 @@ func TestNetworkDisconnect(t *testing.T) {
require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error())
testutil.AssertReceive(ctx, t, receiverError, &err, "should receive an error on receiver side")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected, likely incomplete but but the executor may slip in and begin executing before disconnect
traces := tracing.TracesToStrings()
require.True(t, reflect.DeepEqual([]string{"request->newRequest"}, traces) || reflect.DeepEqual([]string{"request->newRequest->executeTask"}, traces))
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
}

func TestConnectFail(t *testing.T) {
Expand Down Expand Up @@ -853,9 +877,11 @@ func TestConnectFail(t *testing.T) {
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error())

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single, incomplete trace expected
require.ElementsMatch(t, []string{"request->newRequest"}, tracing.TracesToStrings())
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
Expand Down Expand Up @@ -924,6 +950,8 @@ func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
require.Len(t, td.blockStore1, 0, "should store no blocks in normal store")
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// two request traces expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask", "request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -994,6 +1022,8 @@ func TestGraphsyncRoundTripMultipleAlternatePersistence(t *testing.T) {
testutil.VerifyEmptyErrors(ctx, t, errChan2)
require.Len(t, altStore1, blockChainLength, "did not store all blocks in alternate store 2")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// two request traces expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask", "request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -1037,6 +1067,8 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {
blockChain.VerifyWholeChain(ctx, progressChan)
testutil.VerifyEmptyErrors(ctx, t, errChan)

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -1162,6 +1194,8 @@ func TestUnixFSFetch(t *testing.T) {
// verify original bytes match final bytes!
require.Equal(t, origBytes, finalBytes, "should have gotten same bytes written as read but didn't")

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down Expand Up @@ -1248,6 +1282,8 @@ func TestGraphsyncBlockListeners(t *testing.T) {
require.Equal(t, blockChainLength, blocksIncoming)
require.Equal(t, blockChainLength, blocksSent)

requestor.(*GraphSync).requestQueue.(*taskqueue.WorkerTaskQueue).WaitForNoActiveTasks()

tracing := collectTracing(t)
// single trace expected
require.ElementsMatch(t, []string{"request->newRequest->executeTask"}, tracing.TracesToStrings())
Expand Down
5 changes: 4 additions & 1 deletion requestmanager/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ipld/go-ipld-prime/traversal"
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
Expand Down Expand Up @@ -81,9 +82,11 @@ func (e *Executor) ExecuteTask(ctx context.Context, pid peer.ID, task *peertask.

log.Debugw("beginning request execution", "id", requestTask.Request.ID(), "peer", pid.String(), "root_cid", requestTask.Request.Root().String())
err := e.traverse(requestTask)
span.RecordError(err)
if err != nil && !ipldutil.IsContextCancelErr(err) {
e.manager.SendRequest(requestTask.P, gsmsg.CancelRequest(requestTask.Request.ID()))
if !isPausedErr(err) {
span.SetStatus(codes.Error, err.Error())
select {
case <-requestTask.Ctx.Done():
case requestTask.InProgressErr <- err:
Expand Down Expand Up @@ -126,7 +129,7 @@ func (e *Executor) traverse(rt RequestTask) error {
}
// get current link request
lnk, linkContext := rt.Traverser.CurrentRequest()
// attempt to load
// attempt to load``
log.Debugf("will load link=%s", lnk)
resultChan := e.loader(rt.P, rt.Request.ID(), lnk, linkContext)
var result types.AsyncLoadResult
Expand Down
5 changes: 5 additions & 0 deletions requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-graphsync"
Expand Down Expand Up @@ -67,6 +68,8 @@ func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld

request, hooksResult, err := rm.validateRequest(requestID, p, root, selector, extensions)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
rp, err := rm.singleErrorResponse(err)
return request, rp, err
}
Expand All @@ -75,6 +78,8 @@ func (rm *RequestManager) newRequest(parentSpan trace.Span, p peer.ID, root ipld
if has {
doNotSendCids, err = cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
rp, err := rm.singleErrorResponse(err)
return request, rp, err
}
Expand Down

0 comments on commit ae60cab

Please sign in to comment.