Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync2: fix Advance race #6588

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sync2/multipeer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
type SyncBase interface {
// Count returns the number of items in the set.
Count() (int, error)
// Advance advances the underlying OrderedSet, loading the items since the last Advance call
// or when OrderedSet was first loaded.
Advance() error
// Sync synchronizes the set with the peer.
// It returns a sequence of new keys that were received from the peer and the
// number of received items.
Expand Down
38 changes: 38 additions & 0 deletions sync2/multipeer/mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion sync2/multipeer/setsyncbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func NewSetSyncBase(

// Count implements SyncBase.
func (ssb *SetSyncBase) Count() (int, error) {
// TODO: don't lock on potentially db-bound operations
// In most cases ssb.os.SetInfo will not access the database, so we're not holding
// the lock for long here.
ssb.mtx.Lock()
defer ssb.mtx.Unlock()
info, err := ssb.os.SetInfo()
Expand All @@ -48,6 +49,13 @@ func (ssb *SetSyncBase) Count() (int, error) {
return info.Count, nil
}

// Advance implements SyncBase.
func (ssb *SetSyncBase) Advance() error {
ssb.mtx.Lock()
defer ssb.mtx.Unlock()
return ssb.os.Advance()
}

func (ssb *SetSyncBase) syncPeer(
ctx context.Context,
p p2p.Peer,
Expand Down
2 changes: 1 addition & 1 deletion sync2/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *P2PHashSync) start() (isWaiting bool) {
return ctx.Err()
case <-ticker.C:
s.logger.Debug("advancing OrderedSet on timer")
if err := s.os.Advance(); err != nil {
if err := s.syncBase.Advance(); err != nil {
s.logger.Error("error advancing the set", zap.Error(err))
}
}
Expand Down
Loading