Skip to content

Commit

Permalink
syncv2: pairwise set reconciliation (#6350)
Browse files Browse the repository at this point in the history
## Motivation

Existing sync is causing a lot of resource usage, including memory consumption and network traffic, and can't replace gossip which can also be unreliable at times.
  • Loading branch information
ivan4th committed Oct 17, 2024
1 parent e20e5cf commit f9069f1
Show file tree
Hide file tree
Showing 34 changed files with 5,364 additions and 20 deletions.
Binary file added dev-docs/pairwise-sync.excalidraw.gz
Binary file not shown.
Binary file added dev-docs/pairwise-sync.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added dev-docs/ranges.excalidraw.gz
Binary file not shown.
Binary file added dev-docs/ranges.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
769 changes: 769 additions & 0 deletions dev-docs/sync2-set-reconciliation.md

Large diffs are not rendered by default.

25 changes: 15 additions & 10 deletions p2p/server/deadline_adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"time"

"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -42,10 +44,11 @@ func (err *deadlineAdjusterError) Error() string {

type deadlineAdjuster struct {
peerStream
adjustMtx sync.Mutex
timeout time.Duration
hardTimeout time.Duration
totalRead int
totalWritten int
totalRead atomic.Int64
totalWritten atomic.Int64
start time.Time
clock clockwork.Clock
chunkSize int
Expand Down Expand Up @@ -78,8 +81,8 @@ func (dadj *deadlineAdjuster) augmentError(what string, err error) error {
what: what,
innerErr: err,
elapsed: dadj.clock.Now().Sub(dadj.start),
totalRead: dadj.totalRead,
totalWritten: dadj.totalWritten,
totalRead: int(dadj.totalRead.Load()),
totalWritten: int(dadj.totalWritten.Load()),
timeout: dadj.timeout,
hardTimeout: dadj.hardTimeout,
}
Expand All @@ -93,6 +96,8 @@ func (dadj *deadlineAdjuster) Close() error {
}

func (dadj *deadlineAdjuster) adjust() error {
dadj.adjustMtx.Lock()
defer dadj.adjustMtx.Unlock()
now := dadj.clock.Now()
if dadj.hardDeadline.IsZero() {
dadj.hardDeadline = now.Add(dadj.hardTimeout)
Expand All @@ -102,12 +107,12 @@ func (dadj *deadlineAdjuster) adjust() error {
}
// Do not adjust the deadline too often
adj := false
if dadj.totalRead > dadj.nextAdjustRead {
dadj.nextAdjustRead = dadj.totalRead + dadj.chunkSize
if int(dadj.totalRead.Load()) > dadj.nextAdjustRead {
dadj.nextAdjustRead = int(dadj.totalRead.Load()) + dadj.chunkSize
adj = true
}
if dadj.totalWritten > dadj.nextAdjustWrite {
dadj.nextAdjustWrite = dadj.totalWritten + dadj.chunkSize
if int(dadj.totalWritten.Load()) > dadj.nextAdjustWrite {
dadj.nextAdjustWrite = int(dadj.totalWritten.Load()) + dadj.chunkSize
adj = true
}
if adj {
Expand All @@ -133,7 +138,7 @@ func (dadj *deadlineAdjuster) Read(p []byte) (int, error) {
to := min(len(p), n+dadj.chunkSize)
nCur, err := dadj.peerStream.Read(p[n:to])
n += nCur
dadj.totalRead += nCur
dadj.totalRead.Add(int64(nCur))
if err != nil {
return n, dadj.augmentError("read", err)
}
Expand All @@ -154,7 +159,7 @@ func (dadj *deadlineAdjuster) Write(p []byte) (n int, err error) {
to := min(len(p), n+dadj.chunkSize)
nCur, err = dadj.peerStream.Write(p[n:to])
n += nCur
dadj.totalWritten += nCur
dadj.totalWritten.Add(int64(nCur))
if err != nil {
return n, dadj.augmentError("write", err)
}
Expand Down
6 changes: 5 additions & 1 deletion p2p/server/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p/peerinfo"
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go -exclude_interfaces Host
//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go -exclude_interfaces Host,PeerInfoHost

// Host is a subset of libp2p Host interface that needs to be implemented to be usable with server.
type Host interface {
SetStreamHandler(protocol.ID, network.StreamHandler)
NewStream(context.Context, peer.ID, ...protocol.ID) (network.Stream, error)
Network() network.Network
ConnManager() connmgr.ConnManager
}

type PeerInfoHost interface {
Host
PeerInfo() peerinfo.PeerInfo
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/server/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 11 additions & 4 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,13 @@ type request struct {
received time.Time
}

func (s *Server) peerInfo() peerinfo.PeerInfo {
if h, ok := s.h.(PeerInfoHost); ok {
return h.PeerInfo()
}
return nil
}

func (s *Server) Run(ctx context.Context) error {
var eg errgroup.Group
for {
Expand Down Expand Up @@ -272,8 +279,8 @@ func (s *Server) Run(ctx context.Context) error {
}
ok := s.queueHandler(ctx, req.stream)
duration := time.Since(req.received)
if s.h.PeerInfo() != nil {
info := s.h.PeerInfo().EnsurePeerInfo(conn.RemotePeer())
if s.peerInfo() != nil {
info := s.peerInfo().EnsurePeerInfo(conn.RemotePeer())
info.ServerStats.RequestDone(duration, ok)
}
if s.metrics != nil {
Expand Down Expand Up @@ -448,8 +455,8 @@ func (s *Server) streamRequest(
if err != nil {
return nil, nil, err
}
if s.h.PeerInfo() != nil {
info = s.h.PeerInfo().EnsurePeerInfo(stream.Conn().RemotePeer())
if s.peerInfo() != nil {
info = s.peerInfo().EnsurePeerInfo(stream.Conn().RemotePeer())
}
dadj := newDeadlineAdjuster(stream, s.timeout, s.hardTimeout)
defer func() {
Expand Down
8 changes: 4 additions & 4 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ func TestServer(t *testing.T) {
require.NotEmpty(t, srvConns)
require.Equal(t, n+1, srv1.NumAcceptedRequests())

clientInfo := client.h.PeerInfo().EnsurePeerInfo(srvID)
clientInfo := client.peerInfo().EnsurePeerInfo(srvID)
require.Equal(t, 1, clientInfo.ClientStats.SuccessCount())
require.Zero(t, clientInfo.ClientStats.FailureCount())

serverInfo := srv1.h.PeerInfo().EnsurePeerInfo(mesh.Hosts()[0].ID())
serverInfo := srv1.peerInfo().EnsurePeerInfo(mesh.Hosts()[0].ID())
require.Eventually(t, func() bool {
return serverInfo.ServerStats.SuccessCount() == 1
}, 10*time.Second, 10*time.Millisecond)
Expand All @@ -144,11 +144,11 @@ func TestServer(t *testing.T) {
require.ErrorContains(t, err, testErr.Error())
require.Equal(t, n+1, srv1.NumAcceptedRequests())

clientInfo := client.h.PeerInfo().EnsurePeerInfo(srvID)
clientInfo := client.peerInfo().EnsurePeerInfo(srvID)
require.Zero(t, clientInfo.ClientStats.SuccessCount())
require.Equal(t, 1, clientInfo.ClientStats.FailureCount())

serverInfo := srv2.h.PeerInfo().EnsurePeerInfo(mesh.Hosts()[0].ID())
serverInfo := srv2.peerInfo().EnsurePeerInfo(mesh.Hosts()[0].ID())
require.Eventually(t, func() bool {
return serverInfo.ServerStats.FailureCount() == 1
}, 10*time.Second, 10*time.Millisecond)
Expand Down
69 changes: 69 additions & 0 deletions sync2/rangesync/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package rangesync

import (
"context"
"fmt"
"io"
"sync"

"github.com/libp2p/go-libp2p/core/host"
"go.uber.org/zap"

"github.com/spacemeshos/go-spacemesh/p2p/server"
)

// Handler is a function that handles a request for a Dispatcher.
type Handler func(ctx context.Context, s io.ReadWriter) error

// Dispatcher multiplexes a P2P Server to multiple set reconcilers.
type Dispatcher struct {
*server.Server
mtx sync.Mutex
logger *zap.Logger
handlers map[string]Handler
}

// NewDispatcher creates a new Dispatcher.
func NewDispatcher(logger *zap.Logger) *Dispatcher {
return &Dispatcher{
logger: logger,
handlers: make(map[string]Handler),
}
}

// SetupServer creates a new P2P Server for the Dispatcher.
func (d *Dispatcher) SetupServer(
host host.Host,
proto string,
opts ...server.Opt,
) *server.Server {
d.Server = server.New(host, proto, d.Dispatch, opts...)
return d.Server
}

// Register registers a handler with a Dispatcher.
func (d *Dispatcher) Register(name string, h Handler) {
d.mtx.Lock()
defer d.mtx.Unlock()
d.handlers[name] = h
}

// Dispatch dispatches a request to a handler.
func (d *Dispatcher) Dispatch(
ctx context.Context,
req []byte,
stream io.ReadWriter,
) (err error) {
name := string(req)
d.mtx.Lock()
h, ok := d.handlers[name]
d.mtx.Unlock()
if !ok {
return fmt.Errorf("no handler named %q", name)
}
d.logger.Debug("dispatch", zap.String("handler", name))
if err := h(ctx, stream); err != nil {
return fmt.Errorf("handler %q: %w", name, err)
}
return nil
}
78 changes: 78 additions & 0 deletions sync2/rangesync/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package rangesync_test

import (
"bytes"
"context"
"io"
"testing"
"time"

mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"github.com/spacemeshos/go-spacemesh/p2p/server"
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
)

func makeFakeDispHandler(n int) rangesync.Handler {
return func(ctx context.Context, stream io.ReadWriter) error {
x := rangesync.KeyBytes(bytes.Repeat([]byte{byte(n)}, 32))
c := rangesync.StartWireConduit(ctx, stream)
defer c.End()
s := rangesync.Sender{c}
s.SendRangeContents(x, x, n)
s.SendEndRound()
return nil
}
}

func TestDispatcher(t *testing.T) {
mesh, err := mocknet.FullMeshConnected(2)
require.NoError(t, err)

d := rangesync.NewDispatcher(zaptest.NewLogger(t))
d.Register("a", makeFakeDispHandler(42))
d.Register("b", makeFakeDispHandler(43))
d.Register("c", makeFakeDispHandler(44))

proto := "itest"
opts := []server.Opt{
server.WithTimeout(10 * time.Second),
server.WithLog(zaptest.NewLogger(t)),
}
s := d.SetupServer(mesh.Hosts()[0], proto, opts...)
require.Equal(t, s, d.Server)
runRequester(t, s)
srvPeerID := mesh.Hosts()[0].ID()

c := server.New(mesh.Hosts()[1], proto, d.Dispatch, opts...)
for _, tt := range []struct {
name string
want int
}{
{"a", 42},
{"b", 43},
{"c", 44},
} {
t.Run(tt.name, func(t *testing.T) {
require.NoError(t, c.StreamRequest(
context.Background(), srvPeerID, []byte(tt.name),
func(ctx context.Context, stream io.ReadWriter) error {
c := rangesync.StartWireConduit(ctx, stream)
defer c.End()
m, err := c.NextMessage()
require.NoError(t, err)
require.Equal(t, rangesync.MessageTypeRangeContents, m.Type())
exp := rangesync.KeyBytes(bytes.Repeat([]byte{byte(tt.want)}, 32))
require.Equal(t, exp, m.X())
require.Equal(t, exp, m.Y())
require.Equal(t, tt.want, m.Count())
m, err = c.NextMessage()
require.NoError(t, err)
require.Equal(t, rangesync.MessageTypeEndRound, m.Type())
return nil
}))
})
}
}
Loading

0 comments on commit f9069f1

Please sign in to comment.