Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID #355

Merged
merged 5 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,25 +486,15 @@ type GraphExchange interface {
// RegisterReceiverNetworkErrorListener adds a listener for when errors occur receiving data over the wire
RegisterReceiverNetworkErrorListener(listener OnReceiverNetworkErrorListener) UnregisterHookFunc

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
UnpauseRequest(RequestID, ...ExtensionData) error

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
PauseRequest(RequestID) error
// Pause pauses an in progress request or response (may take 1 or more blocks to process)
Pause(context.Context, RequestID) error

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
// Unpause unpauses a request or response that was paused
// Can also send extensions with unpause
UnpauseResponse(peer.ID, RequestID, ...ExtensionData) error

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
PauseResponse(peer.ID, RequestID) error

// CancelResponse cancels an in progress response
CancelResponse(peer.ID, RequestID) error
Unpause(context.Context, RequestID, ...ExtensionData) error

// CancelRequest cancels an in progress request
CancelRequest(context.Context, RequestID) error
// Cancel cancels an in progress request or response
Cancel(context.Context, RequestID) error

// Stats produces insight on the current state of a graphsync exchange
Stats() Stats
Expand Down
49 changes: 24 additions & 25 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graphsync

import (
"context"
"errors"
"time"

logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -296,6 +297,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseManager.Startup()
responseQueue.Startup(gsConfig.maxInProgressIncomingRequests, queryExecutor)
network.SetDelegate((*graphSyncReceiver)(graphSync))

return graphSync
}

Expand Down Expand Up @@ -402,35 +404,32 @@ func (gs *GraphSync) RegisterReceiverNetworkErrorListener(listener graphsync.OnR
return gs.receiverErrorListeners.Register(listener)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.requestManager.UnpauseRequest(requestID, extensions...)
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (gs *GraphSync) PauseRequest(requestID graphsync.RequestID) error {
return gs.requestManager.PauseRequest(requestID)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (gs *GraphSync) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.PauseResponse(p, requestID)
// Pause pauses an in progress request or response
func (gs *GraphSync) Pause(ctx context.Context, requestID graphsync.RequestID) error {
var reqNotFound graphsync.RequestNotFoundErr
if err := gs.requestManager.PauseRequest(ctx, requestID); !errors.As(err, &reqNotFound) {
return err
}
return gs.responseManager.PauseResponse(ctx, requestID)
}

// CancelResponse cancels an in progress response
func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.CancelResponse(p, requestID)
// Unpause unpauses a request or response that was paused
// Can also send extensions with unpause
func (gs *GraphSync) Unpause(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
var reqNotFound graphsync.RequestNotFoundErr
if err := gs.requestManager.UnpauseRequest(ctx, requestID, extensions...); !errors.As(err, &reqNotFound) {
return err
}
return gs.responseManager.UnpauseResponse(ctx, requestID, extensions...)
}

// CancelRequest cancels an in progress request
func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
return gs.requestManager.CancelRequest(ctx, requestID)
// Cancel cancels an in progress request or response
func (gs *GraphSync) Cancel(ctx context.Context, requestID graphsync.RequestID) error {
var reqNotFound graphsync.RequestNotFoundErr
if err := gs.requestManager.CancelRequest(ctx, requestID); !errors.As(err, &reqNotFound) {
return err
}
return gs.responseManager.CancelResponse(ctx, requestID)
}

// Stats produces insight on the current state of a graphsync exchange
Expand Down
6 changes: 3 additions & 3 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ func TestPauseResume(t *testing.T) {
require.Len(t, responderPeerState.IncomingState.Diagnostics(), 0)

requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
err := responder.Unpause(ctx, requestID)
require.NoError(t, err)

blockChain.VerifyRemainder(ctx, progressChan, stopPoint)
Expand Down Expand Up @@ -793,7 +793,7 @@ func TestPauseResumeRequest(t *testing.T) {
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)

requestID := <-requestIDChan
err := requestor.UnpauseRequest(requestID, td.extensionUpdate)
err := requestor.Unpause(ctx, requestID, td.extensionUpdate)
require.NoError(t, err)

blockChain.VerifyRemainder(ctx, progressChan, stopPoint)
Expand Down Expand Up @@ -1092,7 +1092,7 @@ func TestNetworkDisconnect(t *testing.T) {
require.NoError(t, td.mn.DisconnectPeers(td.host1.ID(), td.host2.ID()))
require.NoError(t, td.mn.UnlinkPeers(td.host1.ID(), td.host2.ID()))
requestID := <-requestIDChan
err := responder.UnpauseResponse(td.host1.ID(), requestID)
err := responder.Unpause(ctx, requestID)
require.NoError(t, err)

testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error")
Expand Down
8 changes: 4 additions & 4 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ func (rm *RequestManager) ProcessResponses(p peer.ID,

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
rm.send(&unpauseRequestMessage{requestID, extensions, response}, nil)
rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand All @@ -304,9 +304,9 @@ func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensio
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error {
func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&pauseRequestMessage{requestID, response}, nil)
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand Down
8 changes: 4 additions & 4 deletions requestmanager/requestmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ func TestPauseResume(t *testing.T) {

// attempt to unpause while request is not paused (note: hook on second block will keep it from
// reaching pause point)
err := td.requestManager.UnpauseRequest(rr.gsr.ID())
err := td.requestManager.UnpauseRequest(ctx, rr.gsr.ID())
require.EqualError(t, err, "request is not paused")
close(holdForResumeAttempt)
// verify responses sent read ONLY for blocks BEFORE the pause
Expand All @@ -834,7 +834,7 @@ func TestPauseResume(t *testing.T) {
td.fal.CleanupRequest(peers[0], rr.gsr.ID())

// unpause
err = td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
err = td.requestManager.UnpauseRequest(ctx, rr.gsr.ID(), td.extension1, td.extension2)
require.NoError(t, err)

// verify the correct new request with Do-no-send-cids & other extensions
Expand Down Expand Up @@ -875,7 +875,7 @@ func TestPauseResumeExternal(t *testing.T) {
hook := func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
blocksReceived++
if blocksReceived == pauseAt {
err := td.requestManager.PauseRequest(responseData.RequestID())
err := td.requestManager.PauseRequest(ctx, responseData.RequestID())
require.NoError(t, err)
close(holdForPause)
}
Expand Down Expand Up @@ -909,7 +909,7 @@ func TestPauseResumeExternal(t *testing.T) {
td.fal.CleanupRequest(peers[0], rr.gsr.ID())

// unpause
err := td.requestManager.UnpauseRequest(rr.gsr.ID(), td.extension1, td.extension2)
err := td.requestManager.UnpauseRequest(ctx, rr.gsr.ID(), td.extension1, td.extension2)
require.NoError(t, err)

// verify the correct new request with Do-no-send-cids & other extensions
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, onTermina
if !ok {
if onTerminated != nil {
select {
case onTerminated <- graphsync.RequestNotFoundErr{}:
case onTerminated <- &graphsync.RequestNotFoundErr{}:
rvagg marked this conversation as resolved.
Show resolved Hide resolved
case <-rm.ctx.Done():
}
}
Expand Down
42 changes: 19 additions & 23 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type inProgressResponseStatus struct {
ctx context.Context
span trace.Span
cancelFn func()
peer peer.ID
request gsmsg.GraphSyncRequest
loader ipld.BlockReadOpener
traverser ipldutil.Traverser
Expand All @@ -43,11 +44,6 @@ type inProgressResponseStatus struct {
responseStream responseassembler.ResponseStream
}

type responseKey struct {
rvagg marked this conversation as resolved.
Show resolved Hide resolved
p peer.ID
requestID graphsync.RequestID
}

// RequestHooks is an interface for processing request hooks
type RequestHooks interface {
ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult
Expand Down Expand Up @@ -107,7 +103,7 @@ type ResponseManager struct {
blockSentListeners BlockSentListeners
networkErrorListeners NetworkErrorListeners
messages chan responseManagerMessage
inProgressResponses map[responseKey]*inProgressResponseStatus
inProgressResponses map[graphsync.RequestID]*inProgressResponseStatus
connManager network.ConnManager
// maximum number of links to traverse per request. A value of zero = infinity, or no limit
maxLinksPerRequest uint64
Expand Down Expand Up @@ -144,7 +140,7 @@ func New(ctx context.Context,
blockSentListeners: blockSentListeners,
networkErrorListeners: networkErrorListeners,
messages: messages,
inProgressResponses: make(map[responseKey]*inProgressResponseStatus),
inProgressResponses: make(map[graphsync.RequestID]*inProgressResponseStatus),
connManager: connManager,
maxLinksPerRequest: maxLinksPerRequest,
responseQueue: responseQueue,
Expand All @@ -158,9 +154,9 @@ func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, reque
}

// UnpauseResponse unpauses a response that was previously paused
func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
rm.send(&unpauseRequestMessage{p, requestID, response, extensions}, nil)
rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand All @@ -170,9 +166,9 @@ func (rm *ResponseManager) UnpauseResponse(p peer.ID, requestID graphsync.Reques
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&pauseRequestMessage{p, requestID, response}, nil)
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand All @@ -182,9 +178,9 @@ func (rm *ResponseManager) PauseResponse(p peer.ID, requestID graphsync.RequestI
}

// CancelResponse cancels an in progress response
func (rm *ResponseManager) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrCancelledByCommand, response}, nil)
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
Expand All @@ -204,39 +200,39 @@ func (rm *ResponseManager) synchronize() {
}

// StartTask starts the given task from the peer task queue
func (rm *ResponseManager) StartTask(task *peertask.Task, responseTaskChan chan<- queryexecutor.ResponseTask) {
rm.send(&startTaskRequest{task, responseTaskChan}, nil)
func (rm *ResponseManager) StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- queryexecutor.ResponseTask) {
rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
}

// GetUpdates is called to read pending updates for a task and clear them
func (rm *ResponseManager) GetUpdates(p peer.ID, requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
rm.send(&responseUpdateRequest{responseKey{p, requestID}, updatesChan}, nil)
func (rm *ResponseManager) GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
}

// FinishTask marks a task from the task queue as done
func (rm *ResponseManager) FinishTask(task *peertask.Task, err error) {
func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error) {
done := make(chan struct{}, 1)
rm.send(&finishTaskRequest{task, err, done}, nil)
rm.send(&finishTaskRequest{task, p, err, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}

// CloseWithNetworkError closes a request due to a network error
func (rm *ResponseManager) CloseWithNetworkError(p peer.ID, requestID graphsync.RequestID) {
func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID) {
done := make(chan error, 1)
rm.send(&errorRequestMessage{p, requestID, queryexecutor.ErrNetworkError, done}, nil)
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
}
}

// TerminateRequest indicates a request has finished sending data and should no longer be tracked
func (rm *ResponseManager) TerminateRequest(p peer.ID, requestID graphsync.RequestID) {
func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
done := make(chan struct{}, 1)
rm.send(&terminateRequestMessage{p, requestID, done}, nil)
rm.send(&terminateRequestMessage{requestID, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
Expand Down
Loading