Skip to content

Commit

Permalink
sync2: advance OrderedSet on timer (#6559)
Browse files Browse the repository at this point in the history
## Motivation

`OrderedSet` is bound to a "snapshot" of its table, which is bounded by max `rowid` value. In order for `OrderedSet` to ingest newly added rows, its `Advance()` method must be called, which, in case of `DBSet`, updates the underlying `FPTree` with freshly added entries, which it discovers by selecting items with `rowid` > `maxSnapshotRowID`.

Before this change, `Advance()` was being called after each peer sync, including both incoming sync requests handled by the server and requests initiated by active sync, no matter whether any new items were received. This was causing some unnecessary database access activity which can be avoided.
  • Loading branch information
ivan4th committed Dec 20, 2024
1 parent ef8ba69 commit 73f3dfc
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 60 deletions.
7 changes: 4 additions & 3 deletions sync2/multipeer/setsyncbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func (ssb *SetSyncBase) syncPeer(
if err := ssb.handler.Commit(ctx, p, ssb.os, sr); err != nil {
return fmt.Errorf("commit: %w", err)
}
ssb.mtx.Lock()
defer ssb.mtx.Unlock()
return ssb.os.Advance()
}
ssb.mtx.Lock()
defer ssb.mtx.Unlock()
return ssb.os.Advance()
return nil
}

func (ssb *SetSyncBase) Sync(ctx context.Context, p p2p.Peer, x, y rangesync.KeyBytes) error {
Expand Down
29 changes: 24 additions & 5 deletions sync2/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
MaxAttempts uint `mapstructure:"max-attempts"`
MaxBatchRetries uint `mapstructure:"max-batch-retries"`
FailedBatchDelay time.Duration `mapstructure:"failed-batch-delay"`
AdvanceInterval time.Duration `mapstructure:"advance-interval"`
}

func (cfg *Config) Validate(logger *zap.Logger) bool {
Expand All @@ -45,6 +46,10 @@ func (cfg *Config) Validate(logger *zap.Logger) bool {
logger.Error("max-attempts must be at least 1")
r = false
}
if cfg.AdvanceInterval <= 0 {
logger.Error("advance-interval must be positive")
r = false
}
return r
}

Expand All @@ -60,6 +65,7 @@ func DefaultConfig() Config {
MaxAttempts: 3,
MaxBatchRetries: 3,
FailedBatchDelay: 10 * time.Second,
AdvanceInterval: 1 * time.Minute,
}
}

Expand Down Expand Up @@ -143,18 +149,31 @@ func (s *P2PHashSync) start() (isWaiting bool) {
isWaiting = true
s.startOnce.Do(func() {
isWaiting = false
var ctx context.Context
ctx, s.cancel = context.WithCancel(context.Background())
if s.enableActiveSync {
s.eg.Go(func() error {
defer s.running.Store(false)
var ctx context.Context
ctx, s.cancel = context.WithCancel(context.Background())
return s.reconciler.Run(ctx, s.kickCh)
})
return
} else {
s.logger.Info("active syncv2 is disabled")
return
}
s.eg.Go(func() error {
ticker := time.NewTicker(s.cfg.AdvanceInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
s.logger.Debug("advancing OrderedSet on timer")
if err := s.os.Advance(); err != nil {
s.logger.Error("error advancing the set", zap.Error(err))
}
}
}
})
})
return isWaiting
}
Expand All @@ -177,7 +196,7 @@ func (s *P2PHashSync) StartAndSync(ctx context.Context) error {

// Stop stops the multi-peer reconciler.
func (s *P2PHashSync) Stop() {
if !s.enableActiveSync || !s.running.Load() {
if !s.running.Load() {
return
}
if s.cancel != nil {
Expand Down
116 changes: 72 additions & 44 deletions sync2/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,51 @@ import (
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
)

type fakeHandler struct {
mtx *sync.Mutex
committed map[string]struct{}
type dumbSet struct {
*rangesync.DumbSet
mtx sync.Mutex
committed map[string]struct{}
advanceCount int
}

func (fh *fakeHandler) Commit(
func (ds *dumbSet) addCommitted(seq rangesync.SeqResult) error {
ds.mtx.Lock()
defer ds.mtx.Unlock()
if ds.committed == nil {
ds.committed = make(map[string]struct{})
}
for k := range seq.Seq {
ds.committed[string(k)] = struct{}{}
}
return seq.Error()
}

func (ds *dumbSet) Advance() error {
ds.mtx.Lock()
defer ds.mtx.Unlock()
for k := range ds.committed {
ds.DumbSet.AddUnchecked(rangesync.KeyBytes(k))
}
clear(ds.committed)
ds.advanceCount++
return ds.DumbSet.Advance()
}

func (ds *dumbSet) AdvanceCount() int {
ds.mtx.Lock()
defer ds.mtx.Unlock()
return ds.advanceCount
}

type fakeHandler struct{}

func (fh fakeHandler) Commit(
ctx context.Context,
peer p2p.Peer,
base rangesync.OrderedSet,
received rangesync.SeqResult,
) error {
fh.mtx.Lock()
defer fh.mtx.Unlock()
for k := range received.Seq {
fh.committed[string(k)] = struct{}{}
}
return nil
}

func (fh *fakeHandler) committedItems() (items []rangesync.KeyBytes) {
fh.mtx.Lock()
defer fh.mtx.Unlock()
for k := range fh.committed {
items = append(items, rangesync.KeyBytes(k))
}
return items
return base.(*dumbSet).addCommitted(received)
}

func TestP2P(t *testing.T) {
Expand All @@ -63,13 +82,11 @@ func TestP2P(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(numNodes)
require.NoError(t, err)
hs := make([]*sync2.P2PHashSync, numNodes)
handlers := make([]*fakeHandler, numNodes)
initialSet := make([]rangesync.KeyBytes, numHashes)
for n := range initialSet {
initialSet[n] = rangesync.RandomKeyBytes(32)
}
var eg errgroup.Group
var mtx sync.Mutex
defer eg.Wait()
for n := range hs {
ps := peers.New()
Expand All @@ -83,35 +100,31 @@ func TestP2P(t *testing.T) {
cfg := sync2.DefaultConfig()
cfg.SyncInterval = 100 * time.Millisecond
cfg.MaxDepth = maxDepth
cfg.AdvanceInterval = 200 * time.Millisecond
host := mesh.Hosts()[n]
handlers[n] = &fakeHandler{
mtx: &mtx,
committed: make(map[string]struct{}),
ds := dumbSet{DumbSet: new(rangesync.DumbSet)}
ds.SetAllowMultiReceive(true)
if n == 0 {
for _, h := range initialSet {
ds.AddUnchecked(h)
}
}
var os rangesync.DumbSet
d := rangesync.NewDispatcher(logger)
srv := d.SetupServer(host, "sync2test", server.WithLog(logger))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eg.Go(func() error { return srv.Run(ctx) })
hs[n], err = sync2.NewP2PHashSync(
logger.Named(fmt.Sprintf("node%d", n)),
d, "test", &os, keyLen, ps, handlers[n], cfg, true)
d, "test", &ds, keyLen, ps, fakeHandler{}, cfg, true)
require.NoError(t, err)
require.NoError(t, hs[n].Load())
is := hs[n].Set().(*rangesync.DumbSet)
is.SetAllowMultiReceive(true)
if n == 0 {
for _, h := range initialSet {
is.AddUnchecked(h)
}
}
require.False(t, hs[n].Synced())
hs[n].Start()
}

require.Eventually(t, func() bool {
for n, hsync := range hs {
for _, hsync := range hs {
// use a snapshot to avoid races
if !hsync.Synced() {
return false
Expand All @@ -120,17 +133,12 @@ func TestP2P(t *testing.T) {
require.NoError(t, hsync.Set().WithCopy(
context.Background(),
func(os rangesync.OrderedSet) error {
for _, k := range handlers[n].committedItems() {
os.(*rangesync.DumbSet).AddUnchecked(k)
}
empty, err := os.Empty()
require.NoError(t, err)
if empty {
r = false
} else {
k, err := os.Items().First()
require.NoError(t, err)
info, err := os.GetRangeInfo(k, k)
info, err := os.GetRangeInfo(nil, nil)
require.NoError(t, err)
if info.Count < numHashes {
r = false
Expand All @@ -145,19 +153,38 @@ func TestP2P(t *testing.T) {
return true
}, 30*time.Second, 300*time.Millisecond)

advCounts := make([]int, len(hs))
for n, hsync := range hs {
hsync.Stop()
require.NoError(t, hsync.Set().WithCopy(
context.Background(),
func(os rangesync.OrderedSet) error {
for _, k := range handlers[n].committedItems() {
os.(*rangesync.DumbSet).AddUnchecked(k)
}
actualItems, err := os.Items().Collect()
require.NoError(t, err)
require.ElementsMatch(t, initialSet, actualItems)
return nil
}))
// OrderedSet is advanced after each sync.
// The first set may not be advanced initially here b/c it does
// not receive any new items.
// There's some chance Advance() was called on it by timer handler.
advCounts[n] = hsync.Set().(*dumbSet).AdvanceCount()
if n > 0 {
require.NotZero(t, advCounts[n])
}
}

// Make sure OrderedSet is advanced without syncs using timer
require.Eventually(t, func() bool {
for n, hsync := range hs {
if hsync.Set().(*dumbSet).AdvanceCount() <= advCounts[n] {
return false
}
}
return true
}, 30*time.Second, 300*time.Millisecond)

for _, hsync := range hs {
hsync.Stop()
}
}

Expand Down Expand Up @@ -197,6 +224,7 @@ func TestConfigValidation(t *testing.T) {
"max-depth must be at least 1",
"batch-size must be at least 1",
"max-attempts must be at least 1",
"advance-interval must be positive",
},
},
{
Expand Down
27 changes: 19 additions & 8 deletions sync2/rangesync/dumbset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/md5"
"errors"
"slices"
"sync"
"time"

"github.com/spacemeshos/go-spacemesh/hash"
Expand Down Expand Up @@ -106,23 +107,26 @@ func realFPFunc(items []KeyBytes) Fingerprint {
// DumbSet is a simple OrderedSet implementation that doesn't include any optimizations.
// It is intended to be only used in tests.
type DumbSet struct {
keys []KeyBytes
received map[string]int
added map[string]bool
allowMutiReceive bool
FPFunc func(items []KeyBytes) Fingerprint
copyMtx sync.Mutex
keys []KeyBytes
received map[string]int
added map[string]bool
allowMultiReceive bool
FPFunc func(items []KeyBytes) Fingerprint
}

var _ OrderedSet = &DumbSet{}

// SetAllowMultiReceive sets whether the set allows receiving the same item multiple times.
func (ds *DumbSet) SetAllowMultiReceive(allow bool) {
ds.allowMutiReceive = allow
ds.allowMultiReceive = allow
}

// AddUnchecked adds an item to the set without registerting the item for checks
// as in case of Add and Receive.
func (ds *DumbSet) AddUnchecked(id KeyBytes) {
ds.copyMtx.Lock()
defer ds.copyMtx.Unlock()
if len(ds.keys) == 0 {
ds.keys = []KeyBytes{id}
}
Expand Down Expand Up @@ -180,7 +184,7 @@ func (ds *DumbSet) Receive(id KeyBytes) error {
}
sid := string(id)
ds.received[sid]++
if !ds.allowMutiReceive && ds.received[sid] > 1 {
if !ds.allowMultiReceive && ds.received[sid] > 1 {
panic("item already received: " + id.String())
}
return nil
Expand Down Expand Up @@ -306,7 +310,14 @@ func (ds *DumbSet) Items() SeqResult {

// WithCopy implements OrderedSet.
func (ds *DumbSet) WithCopy(_ context.Context, toCall func(OrderedSet) error) error {
return toCall(&DumbSet{keys: slices.Clone(ds.keys)})
ds.copyMtx.Lock()
copy := &DumbSet{
keys: slices.Clone(ds.keys),
allowMultiReceive: ds.allowMultiReceive,
FPFunc: ds.FPFunc,
}
ds.copyMtx.Unlock()
return toCall(copy)
}

// Recent implements OrderedSet.
Expand Down

0 comments on commit 73f3dfc

Please sign in to comment.