Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix edge case in warpsync which deadlocks syncing #1763

Merged
merged 1 commit into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions pkg/protocol/gossip/warpsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ func (ws *WarpSync) reset() {
type WarpSyncMilestoneRequester struct {
syncutils.Mutex

// used to cancel the warp sync requester.
ctx context.Context
// used to access the node storage.
storage *storage.Storage
// used to determine the sync status of the node.
Expand All @@ -269,12 +271,14 @@ type WarpSyncMilestoneRequester struct {

// NewWarpSyncMilestoneRequester creates a new WarpSyncMilestoneRequester instance.
func NewWarpSyncMilestoneRequester(
ctx context.Context,
dbStorage *storage.Storage,
syncManager *syncmanager.SyncManager,
requester *Requester,
preventDiscard bool) *WarpSyncMilestoneRequester {

return &WarpSyncMilestoneRequester{
ctx: ctx,
storage: dbStorage,
syncManager: syncManager,
requester: requester,
Expand All @@ -283,13 +287,10 @@ func NewWarpSyncMilestoneRequester(
}
}

// RequestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent.
// requestMissingMilestoneParents traverses the parents of a given milestone and requests each missing parent.
// Already requested milestones or traversed blocks will be ignored, to circumvent requesting
// the same parents multiple times.
func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.Context, msIndex iotago.MilestoneIndex) error {
w.Lock()
defer w.Unlock()

func (w *WarpSyncMilestoneRequester) requestMissingMilestoneParents(msIndex iotago.MilestoneIndex) error {
if msIndex <= w.syncManager.ConfirmedMilestoneIndex() {
return nil
}
Expand All @@ -301,7 +302,7 @@ func (w *WarpSyncMilestoneRequester) RequestMissingMilestoneParents(ctx context.
}

return dag.TraverseParents(
ctx,
w.ctx,
w.storage,
milestoneParents,
// traversal stops if no more blocks pass the given condition
Expand Down Expand Up @@ -345,14 +346,20 @@ func (w *WarpSyncMilestoneRequester) Cleanup() {

// RequestMilestoneRange requests up to N milestones nearest to the current confirmed milestone index.
// Returns the number of milestones requested.
func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context, rangeToRequest syncmanager.MilestoneIndexDelta, onExistingMilestoneInRange func(ctx context.Context, msIndex iotago.MilestoneIndex) error, from ...iotago.MilestoneIndex) syncmanager.MilestoneIndexDelta {
func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(rangeToRequest syncmanager.MilestoneIndexDelta, from ...iotago.MilestoneIndex) (syncmanager.MilestoneIndexDelta, iotago.MilestoneIndex, iotago.MilestoneIndex) {
w.Lock()
defer w.Unlock()

var requested syncmanager.MilestoneIndexDelta

startingPoint := w.syncManager.ConfirmedMilestoneIndex()
if len(from) > 0 {
startingPoint = from[0]
}

startIndex := startingPoint + 1
endIndex := startingPoint + rangeToRequest

var msIndexes []iotago.MilestoneIndex
for i := syncmanager.MilestoneIndexDelta(1); i <= rangeToRequest; i++ {
msIndexToRequest := startingPoint + i
Expand All @@ -366,22 +373,16 @@ func (w *WarpSyncMilestoneRequester) RequestMilestoneRange(ctx context.Context,
}

// milestone already exists
if onExistingMilestoneInRange != nil {
if err := onExistingMilestoneInRange(ctx, msIndexToRequest); err != nil && errors.Is(err, common.ErrOperationAborted) {
// do not proceed if the node was shut down
return 0
}
if err := w.requestMissingMilestoneParents(msIndexToRequest); err != nil && errors.Is(err, common.ErrOperationAborted) {
// do not proceed if the node was shut down
return 0, 0, 0
}
}

if len(msIndexes) == 0 {
return requested
}

// enqueue every milestone request to the request queue
for _, msIndex := range msIndexes {
w.requester.Request(msIndex, msIndex)
}

return requested
return requested, startIndex, endIndex
}
10 changes: 5 additions & 5 deletions plugins/warpsync/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type dependencies struct {

func configure() error {
warpSync = gossip.NewWarpSync(ParamsWarpSync.AdvancementRange)
warpSyncMilestoneRequester = gossip.NewWarpSyncMilestoneRequester(deps.Storage, deps.SyncManager, deps.Requester, true)
warpSyncMilestoneRequester = gossip.NewWarpSyncMilestoneRequester(Plugin.Daemon().ContextStopped(), deps.Storage, deps.SyncManager, deps.Requester, true)
configureEvents()

return nil
Expand Down Expand Up @@ -101,8 +101,8 @@ func configureEvents() {
onMilestoneSolidificationFailed = events.NewClosure(func(msIndex iotago.MilestoneIndex) {
if warpSync.CurrentCheckpoint != 0 && warpSync.CurrentCheckpoint < msIndex {
// rerequest since milestone requests could have been lost
Plugin.LogInfof("Requesting missing milestones %d - %d", msIndex, msIndex+warpSync.AdvancementRange)
warpSyncMilestoneRequester.RequestMilestoneRange(Plugin.Daemon().ContextStopped(), warpSync.AdvancementRange, nil)
_, msIndexStart, msIndexEnd := warpSyncMilestoneRequester.RequestMilestoneRange(warpSync.AdvancementRange)
Plugin.LogInfof("Requesting missing milestones %d - %d", msIndexStart, msIndexEnd)
}
})

Expand All @@ -112,7 +112,7 @@ func configureEvents() {
deps.RequestQueue.Filter(func(r *gossip.Request) bool {
return r.MilestoneIndex <= nextCheckpoint
})
warpSyncMilestoneRequester.RequestMilestoneRange(Plugin.Daemon().ContextStopped(), advRange, warpSyncMilestoneRequester.RequestMissingMilestoneParents, oldCheckpoint)
_, _, _ = warpSyncMilestoneRequester.RequestMilestoneRange(advRange, oldCheckpoint)
})

onWarpSyncTargetUpdated = events.NewClosure(func(checkpoint iotago.MilestoneIndex, newTarget iotago.MilestoneIndex) {
Expand All @@ -125,7 +125,7 @@ func configureEvents() {
return r.MilestoneIndex <= nextCheckpoint
})

msRequested := warpSyncMilestoneRequester.RequestMilestoneRange(Plugin.Daemon().ContextStopped(), advRange, warpSyncMilestoneRequester.RequestMissingMilestoneParents)
msRequested, _, _ := warpSyncMilestoneRequester.RequestMilestoneRange(advRange)
// if the amount of requested milestones doesn't correspond to the range,
// it means we already had the milestones in the database, which suggests
// that we should manually kick start the milestone solidifier.
Expand Down