Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spv: Implement per-peer increasing backoffs #2466

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

params := s.wallet.ChainParams()

ntfnCtx, ntfnCtxCancel := context.WithCancel(context.Background())
defer ntfnCtxCancel()
jrick marked this conversation as resolved.
Show resolved Hide resolved
s.notifier = &notifier{
syncer: s,
ctx: ctx,
ctx: ntfnCtx,
closed: make(chan struct{}),
}
addr, err := normalizeAddress(s.opts.Address, s.opts.DefaultPort)
Expand Down Expand Up @@ -589,12 +591,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
opts = append(opts, wsrpc.WithTLSConfig(tc))
}
client, err := wsrpc.Dial(ctx, addr, opts...)
wsClient, err := wsrpc.Dial(ctx, addr, opts...)
if err != nil {
return err
}
defer client.Close()
s.rpc = dcrd.New(client)
defer wsClient.Close()
s.rpc = dcrd.New(wsClient)

// Verify that the server is running on the expected network.
var netID wire.CurrencyNet
Expand Down Expand Up @@ -723,10 +725,27 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return err
}

defer func() {
ntfnCtxCancel()

select {
case <-ctx.Done():
wsClient.Close()
default:
}

// Wait for notifications to finish before returning
<-s.notifier.closed
}()
jrick marked this conversation as resolved.
Show resolved Hide resolved

// Ensure wallet.Run cleanly finishes/is canceled first when outer
// context is canceled.
walletCtx, walletCtxCancel := context.WithCancel(context.Background())
jrick marked this conversation as resolved.
Show resolved Hide resolved
defer walletCtxCancel()
g.Go(func() error {
// Run wallet background goroutines (currently, this just runs
// mixclient).
return s.wallet.Run(ctx)
return s.wallet.Run(walletCtx)
})

// Request notifications for mixing messages.
Expand All @@ -739,18 +758,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

log.Infof("Blockchain sync completed, wallet ready for general usage.")

// Wait for notifications to finish before returning
defer func() {
<-s.notifier.closed
}()

g.Go(func() error {
select {
case <-ctx.Done():
client.Close()
walletCtxCancel()
jrick marked this conversation as resolved.
Show resolved Hide resolved
return ctx.Err()
case <-client.Done():
return client.Err()
case <-wsClient.Done():
return wsClient.Err()
}
})
return g.Wait()
Expand Down
31 changes: 20 additions & 11 deletions dcrwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,20 +523,25 @@ func spvLoop(ctx context.Context, w *wallet.Wallet) {
addr := &net.TCPAddr{IP: net.ParseIP("::1"), Port: 0}
amgrDir := filepath.Join(cfg.AppDataDir.Value, w.ChainParams().Name)
amgr := addrmgr.New(amgrDir, cfg.lookup)
lp := p2p.NewLocalPeer(w.ChainParams(), addr, amgr)
lp.SetDialFunc(cfg.dial)
lp.SetDisableRelayTx(cfg.SPVDisableRelayTx)
syncer := spv.NewSyncer(w, lp)
if len(cfg.SPVConnect) > 0 {
syncer.SetPersistentPeers(cfg.SPVConnect)
}
w.SetNetworkBackend(syncer)
for {
lp := p2p.NewLocalPeer(w.ChainParams(), addr, amgr)
lp.SetDialFunc(cfg.dial)
lp.SetDisableRelayTx(cfg.SPVDisableRelayTx)
syncer := spv.NewSyncer(w, lp)
if len(cfg.SPVConnect) > 0 {
syncer.SetPersistentPeers(cfg.SPVConnect)
}
err := syncer.Run(ctx)
if done(ctx) {
if err == nil || done(ctx) {
loggers.SyncLog.Infof("SPV synchronization stopped")
return
}
log.Errorf("SPV synchronization ended: %v", err)
loggers.SyncLog.Errorf("SPV synchronization stopped: %v", err)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
}
}

Expand Down Expand Up @@ -571,7 +576,11 @@ func rpcSyncLoop(ctx context.Context, w *wallet.Wallet) {
syncer := chain.NewSyncer(w, rpcOptions)
err := syncer.Run(ctx)
if err != nil {
loggers.SyncLog.Errorf("Wallet synchronization stopped: %v", err)
if errors.Is(err, context.Canceled) || ctx.Err() != nil {
loggers.SyncLog.Infof("RPC synchronization stopped")
return
}
loggers.SyncLog.Errorf("RPC synchronization stopped: %v", err)
select {
case <-ctx.Done():
return
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
)

replace github.com/decred/dcrd/mixing => github.com/jrick/dcrd/mixing v0.0.0-20250123211715-8a5ce5c2063a
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ github.com/decred/dcrd/gcs/v4 v4.1.0 h1:tpW7JW53yJZlgNwl/n2NL1b8NxHaIPRUyNuLMkB/
github.com/decred/dcrd/gcs/v4 v4.1.0/go.mod h1:nPTbGM/I3Ihe5KFvUmxZEqQP/jDZQjQ63+WEi/f4lqU=
github.com/decred/dcrd/hdkeychain/v3 v3.1.2 h1:x25WuuE7zM/20EynuVMyOhL0K8BwGBBsexGq8xTiHFA=
github.com/decred/dcrd/hdkeychain/v3 v3.1.2/go.mod h1:FnNJmZ7jqUDeAo6/c/xkQi5cuxh3EWtJeMmW6/Z8lcc=
github.com/decred/dcrd/mixing v0.4.2 h1:mpt2pNIFTI6L1hXrieAWJTQJv5t9WzHcNnhI+tnAG90=
github.com/decred/dcrd/mixing v0.4.2/go.mod h1:VF87lOn41kitgWVOwmXoB4qMYF7+bxItZXyw4JfW3EQ=
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0 h1:l0DnCcILTNrpy8APF3FLN312ChpkQaAuW30aC/RgBaw=
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0/go.mod h1:j+kkRPXPJB5S9VFOsx8SQLcU7PTFkPKRc1aCHN4ENzA=
github.com/decred/dcrd/rpcclient/v8 v8.0.1 h1:hd81e4w1KSqvPcozJlnz6XJfWKDNuahgooH/N5E8vOU=
Expand All @@ -78,6 +76,8 @@ github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LF
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/jrick/bitset v1.0.0 h1:Ws0PXV3PwXqWK2n7Vz6idCdrV/9OrBXgHEJi27ZB9Dw=
github.com/jrick/bitset v1.0.0/go.mod h1:ZOYB5Uvkla7wIEY4FEssPVi3IQXa02arznRaYaAEPe4=
github.com/jrick/dcrd/mixing v0.0.0-20250123211715-8a5ce5c2063a h1:wGTjDa+kmjKBkhnc8BACcU+OreLC+gxTFHLz5t+AFkw=
github.com/jrick/dcrd/mixing v0.0.0-20250123211715-8a5ce5c2063a/go.mod h1:VF87lOn41kitgWVOwmXoB4qMYF7+bxItZXyw4JfW3EQ=
github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
github.com/jrick/wsrpc/v2 v2.3.8 h1:9vfM8o9g00HXQb/3D6+Y9Cy1uybjD7K1272vtdXXBps=
Expand Down
Loading
Loading