Skip to content

Commit

Permalink
More fine grained response controls (#71)
Browse files Browse the repository at this point in the history
* feat(responsemanager): add ability to pause response outside of a hook

Add the ability for anyone who knows a requestID & peer to pause a response at any time

* feat(responsemanager): add direct cancellations

add function to directly cancel responses from requestID. also, move query execution code to
seperate struct, internal for now

* fix(responsemanager): minor heap allocation optimization

* feat(responsemanager): support extensions on resume

Support sending extensions when resuming a request

* fix(lint): fix lint errors
  • Loading branch information
hannahhoward authored Jul 2, 2020
1 parent a878543 commit 0e23085
Show file tree
Hide file tree
Showing 5 changed files with 482 additions and 270 deletions.
9 changes: 8 additions & 1 deletion graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,5 +269,12 @@ type GraphExchange interface {
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
UnpauseResponse(peer.ID, RequestID) error
// Can also send extensions with unpause
UnpauseResponse(peer.ID, RequestID, ...ExtensionData) error

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
PauseResponse(peer.ID, RequestID) error

// CancelResponse cancels an in progress response
CancelResponse(peer.ID, RequestID) error
}
14 changes: 12 additions & 2 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,18 @@ func (gs *GraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHoo
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.UnpauseResponse(p, requestID)
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
}

// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (gs *GraphSync) PauseResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.PauseResponse(p, requestID)
}

// CancelResponse cancels an in progress response
func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
return gs.responseManager.CancelResponse(p, requestID)
}

type graphSyncReceiver GraphSync
Expand Down
235 changes: 235 additions & 0 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package responsemanager

import (
"context"
"errors"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/runtraversal"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"
)

// TODO: Move this into a seperate module and fully seperate from the ResponseManager
type queryExecutor struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
peerManager PeerManager
loader ipld.Loader
queryQueue QueryQueue
messages chan responseManagerMessage
ctx context.Context
workSignal chan struct{}
ticker *time.Ticker
}

func (qe *queryExecutor) processQueriesWorker() {
const targetWork = 1
taskDataChan := make(chan responseTaskData)
var taskData responseTaskData
for {
pid, tasks, _ := qe.queryQueue.PopTasks(targetWork)
for len(tasks) == 0 {
select {
case <-qe.ctx.Done():
return
case <-qe.workSignal:
pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
case <-qe.ticker.C:
qe.queryQueue.ThawRound()
pid, tasks, _ = qe.queryQueue.PopTasks(targetWork)
}
}
for _, task := range tasks {
key := task.Topic.(responseKey)
select {
case qe.messages <- &responseDataRequest{key, taskDataChan}:
case <-qe.ctx.Done():
return
}
select {
case taskData = <-taskDataChan:
case <-qe.ctx.Done():
return
}
if taskData.empty {
log.Info("Empty task on peer request stack")
continue
}
status, err := qe.executeTask(key, taskData)
select {
case qe.messages <- &finishTaskRequest{key, status, err}:
case <-qe.ctx.Done():
}
}
qe.queryQueue.TasksDone(pid, tasks...)

}

}

func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) {
var err error
loader := taskData.loader
traverser := taskData.traverser
if loader == nil || traverser == nil {
loader, traverser, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request)
if err != nil {
return graphsync.RequestFailedUnknown, err
}
select {
case <-qe.ctx.Done():
return graphsync.RequestFailedUnknown, errors.New("context cancelled")
case qe.messages <- &setResponseDataRequest{key, loader, traverser}:
}
}
return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.pauseSignal, taskData.updateSignal)
}

func (qe *queryExecutor) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest) (ipld.Loader, ipldutil.Traverser, error) {
result := qe.requestHooks.ProcessRequestHooks(p, request)
peerResponseSender := qe.peerManager.SenderForPeer(p)
var validationErr error
err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
if result.Err != nil || !result.IsValidated {
transaction.FinishWithError(graphsync.RequestFailedUnknown)
validationErr = errors.New("request not valid")
}
return nil
})
if err != nil {
return nil, nil, err
}
if validationErr != nil {
return nil, nil, validationErr
}
if err := qe.processDoNoSendCids(request, peerResponseSender); err != nil {
return nil, nil, err
}
rootLink := cidlink.Link{Cid: request.Root()}
traverser := ipldutil.TraversalBuilder{
Root: rootLink,
Selector: request.Selector(),
Chooser: result.CustomChooser,
}.Start(ctx)
loader := result.CustomLoader
if loader == nil {
loader = qe.loader
}
return loader, traverser, nil
}

func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, peerResponseSender peerresponsemanager.PeerResponseSender) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
return nil
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return err
}
links := make([]ipld.Link, 0, cidSet.Len())
err = cidSet.ForEach(func(c cid.Cid) error {
links = append(links, cidlink.Link{Cid: c})
return nil
})
if err != nil {
return err
}
peerResponseSender.IgnoreBlocks(request.ID(), links)
return nil
}

func (qe *queryExecutor) executeQuery(
p peer.ID,
request gsmsg.GraphSyncRequest,
loader ipld.Loader,
traverser ipldutil.Traverser,
pauseSignal chan struct{},
updateSignal chan struct{}) (graphsync.ResponseStatusCode, error) {
updateChan := make(chan []gsmsg.GraphSyncRequest)
peerResponseSender := qe.peerManager.SenderForPeer(p)
err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error {
var err error
_ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error {
err = qe.checkForUpdates(p, request, pauseSignal, updateSignal, updateChan, transaction)
if err != nil {
if err == hooks.ErrPaused {
transaction.PauseRequest()
}
return nil
}
blockData := transaction.SendResponse(link, data)
if blockData.BlockSize() > 0 {
result := qe.blockHooks.ProcessBlockHooks(p, request, blockData)
for _, extension := range result.Extensions {
transaction.SendExtensionData(extension)
}
if result.Err == hooks.ErrPaused {
transaction.PauseRequest()
}
err = result.Err
}
return nil
})
return err
})
if err != nil {
if err != hooks.ErrPaused {
peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown)
return graphsync.RequestFailedUnknown, err
}
return graphsync.RequestPaused, err
}
return peerResponseSender.FinishRequest(request.ID()), nil
}

func (qe *queryExecutor) checkForUpdates(
p peer.ID,
request gsmsg.GraphSyncRequest,
pauseSignal chan struct{},
updateSignal chan struct{},
updateChan chan []gsmsg.GraphSyncRequest,
peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error {
for {
select {
case <-pauseSignal:
return hooks.ErrPaused
case <-updateSignal:
select {
case qe.messages <- &responseUpdateRequest{responseKey{p, request.ID()}, updateChan}:
case <-qe.ctx.Done():
}
select {
case updates := <-updateChan:
for _, update := range updates {
result := qe.updateHooks.ProcessUpdateHooks(p, request, update)
for _, extension := range result.Extensions {
peerResponseSender.SendExtensionData(extension)
}
if result.Err != nil {
return result.Err
}
}
case <-qe.ctx.Done():
}
default:
return nil
}
}
}
Loading

0 comments on commit 0e23085

Please sign in to comment.