Skip to content

Commit

Permalink
use zap in timesync (#6195)
Browse files Browse the repository at this point in the history
## Motivation

Part of the effort to migrate to zap.
  • Loading branch information
poszu committed Aug 1, 2024
1 parent 867a674 commit 3324aef
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 93 deletions.
4 changes: 0 additions & 4 deletions log/errcode/fatalerrcode.go

This file was deleted.

2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ func (app *App) initServices(ctx context.Context) error {
app.ptimesync = peersync.New(
app.host,
app.host,
peersync.WithLog(app.addLogger(TimeSyncLogger, lg)),
peersync.WithLog(app.addLogger(TimeSyncLogger, lg).Zap()),
peersync.WithConfig(app.Config.TIME.Peersync),
)
}
Expand Down
37 changes: 0 additions & 37 deletions timesync/peersync/clockerror.go

This file was deleted.

84 changes: 39 additions & 45 deletions timesync/peersync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
)

Expand All @@ -22,10 +21,10 @@ const (
)

var (
// ErrPeersNotSynced returned if system clock is out of sync with peers clock for configured period of time.
ErrPeersNotSynced = errors.New("timesync: peers are not time synced, make sure your system clock is accurate")
// ErrTimesyncFailed returned if we weren't able to collect enough clock samples from peers.
ErrTimesyncFailed = errors.New("timesync: failed request")
// errPeersNotSynced returned if system clock is out of sync with peers clock for configured period of time.
errPeersNotSynced = errors.New("timesync: peers are not time synced")
// errTimesyncFailed returned if we weren't able to collect enough clock samples from peers.
errTimesyncFailed = errors.New("timesync: failed request")
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./sync.go
Expand Down Expand Up @@ -99,7 +98,7 @@ func WithContext(ctx context.Context) Option {
}

// WithLog modifies Log used in Sync.
func WithLog(lg log.Log) Option {
func WithLog(lg *zap.Logger) Option {
return func(s *Sync) {
s.log = lg
}
Expand All @@ -115,7 +114,7 @@ func WithConfig(config Config) Option {
// New creates Sync instance and returns pointer.
func New(h host.Host, peers getPeers, opts ...Option) *Sync {
sync := &Sync{
log: log.NewNop(),
log: zap.NewNop(),
ctx: context.Background(),
time: systemTime{},
h: h,
Expand All @@ -132,10 +131,8 @@ func New(h host.Host, peers getPeers, opts ...Option) *Sync {

// Sync manages background worker that compares peers time with system time.
type Sync struct {
errCnt uint32

config Config
log log.Log
log *zap.Logger
time Time
h host.Host
peers getPeers
Expand All @@ -151,15 +148,15 @@ func (s *Sync) streamHandler(stream network.Stream) {
defer stream.SetDeadline(time.Time{})
var request Request
if _, err := codec.DecodeFrom(stream, &request); err != nil {
s.log.With().Debug("can't decode request", log.Err(err))
s.log.Debug("can't decode request", zap.Error(err))
return
}
resp := Response{
ID: request.ID,
Timestamp: uint64(s.time.Now().UnixNano()),
}
if _, err := codec.EncodeTo(stream, &resp); err != nil {
s.log.With().Debug("can't encode response", log.Err(err))
s.log.Debug("can't encode response", zap.Error(err))
}
}

Expand Down Expand Up @@ -188,48 +185,48 @@ func (s *Sync) Wait() error {

func (s *Sync) run() error {
var (
timer *time.Timer
round uint64
timer *time.Timer
round uint64
failures int
)
s.log.With().Debug("started sync background worker")
defer s.log.With().Debug("exiting sync background worker")
s.log.Debug("started sync background worker")
defer s.log.Debug("exiting sync background worker")
for {
prs := s.peers.GetPeers()
timeout := s.config.RoundRetryInterval
if len(prs) >= s.config.RequiredResponses {
s.log.With().Debug("starting time sync round with peers",
log.Uint64("round", round),
log.Int("peers_count", len(prs)),
log.Uint32("errors_count", atomic.LoadUint32(&s.errCnt)),
s.log.Debug("starting time sync round with peers",
zap.Uint64("round", round),
zap.Int("peers_count", len(prs)),
zap.Int("errors_count", failures),
)
ctx, cancel := context.WithTimeout(s.ctx, s.config.RoundTimeout)
offset, err := s.GetOffset(ctx, round, prs)
cancel()
if err == nil {
if offset > s.config.MaxClockOffset || (offset < 0 && -offset > s.config.MaxClockOffset) {
s.log.With().Warning("peers offset is larger than max allowed clock difference",
log.Uint64("round", round),
log.Duration("offset", offset),
log.Duration("max_offset", s.config.MaxClockOffset),
failures += 1
s.log.Warn("peers offset is larger than max allowed clock difference",
zap.Uint64("round", round),
zap.Duration("offset", offset),
zap.Duration("max_offset", s.config.MaxClockOffset),
)
if atomic.AddUint32(&s.errCnt, 1) == uint32(s.config.MaxOffsetErrors) {
return clockError{
err: ErrPeersNotSynced,
details: clockErrorDetails{Drift: offset},
}
if failures == s.config.MaxOffsetErrors {
s.log.Error("peers are not time synced, make sure your system clock is accurate")
return fmt.Errorf("%w: drift = %v", errPeersNotSynced, offset)
}
} else {
s.log.With().Debug("peers offset is within max allowed clock difference",
log.Uint64("round", round),
log.Duration("offset", offset),
log.Duration("max_offset", s.config.MaxClockOffset),
s.log.Debug("peers offset is within max allowed clock difference",
zap.Uint64("round", round),
zap.Duration("offset", offset),
zap.Duration("max_offset", s.config.MaxClockOffset),
)
atomic.StoreUint32(&s.errCnt, 0)
failures = 0
}
offsetGauge.Set(offset.Seconds())
timeout = s.config.RoundInterval
} else {
s.log.With().Error("failed to fetch offset from peers", log.Err(err))
s.log.Error("failed to fetch offset from peers", zap.Error(err))
}
round++
}
Expand Down Expand Up @@ -257,30 +254,27 @@ func (s *Sync) GetOffset(ctx context.Context, id uint64, prs []p2p.Peer) (time.D
}
wg sync.WaitGroup
)
buf, err := codec.Encode(&Request{ID: id})
if err != nil {
s.log.With().Panic("can't encode request to bytes", log.Err(err))
}
buf := codec.MustEncode(&Request{ID: id})

for _, pid := range prs {
wg.Add(1)
go func(pid p2p.Peer) {
defer wg.Done()
logger := s.log.WithFields(log.Stringer("pid", pid)).With()
stream, err := s.h.NewStream(network.WithNoDial(ctx, "existing connection"), pid, protocolName)
if err != nil {
logger.Debug("failed to create new stream", log.Err(err))
s.log.Debug("failed to create new stream", zap.Error(err), zap.Stringer("pid", pid))
return
}
defer stream.Close()
_ = stream.SetDeadline(s.time.Now().Add(s.config.RoundTimeout))
defer stream.SetDeadline(time.Time{})
if _, err := stream.Write(buf); err != nil {
logger.Debug("failed to send a request", log.Err(err))
s.log.Debug("failed to send a request", zap.Error(err), zap.Stringer("pid", pid))
return
}
var resp Response
if _, err := codec.DecodeFrom(stream, &resp); err != nil {
logger.Debug("failed to read response from peer", log.Err(err))
s.log.Debug("failed to read response from peer", zap.Error(err), zap.Stringer("pid", pid))
return
}
select {
Expand All @@ -299,5 +293,5 @@ func (s *Sync) GetOffset(ctx context.Context, id uint64, prs []p2p.Peer) (time.D
if round.Ready() {
return round.Offset(), nil
}
return 0, fmt.Errorf("%w: failed on timeout", ErrTimesyncFailed)
return 0, fmt.Errorf("%w: failed on timeout", errTimesyncFailed)
}
12 changes: 6 additions & 6 deletions timesync/peersync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/spacemeshos/go-scale/tester"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap/zaptest"

"github.com/spacemeshos/go-spacemesh/log/logtest"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/timesync/peersync/mocks"
)
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestSyncGetOffset(t *testing.T) {

sync := New(mesh.Hosts()[0], nil, WithTime(tm))
offset, err := sync.GetOffset(context.TODO(), 0, peers)
require.ErrorIs(t, err, ErrTimesyncFailed)
require.ErrorIs(t, err, errTimesyncFailed)
require.Empty(t, offset)
})
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestSyncTerminateOnError(t *testing.T) {
}()
select {
case err := <-errors:
require.ErrorContains(t, err, ErrPeersNotSynced.Error())
require.ErrorIs(t, err, errPeersNotSynced)
case <-time.After(100 * time.Millisecond):
require.FailNow(t, "timed out waiting for sync to fail")
}
Expand All @@ -132,7 +132,7 @@ func TestSyncSimulateMultiple(t *testing.T) {

delays := []time.Duration{0, 1200 * time.Millisecond, 1900 * time.Millisecond, 10 * time.Second}
instances := []*Sync{}
errors := []error{ErrPeersNotSynced, nil, nil, ErrPeersNotSynced}
errors := []error{errPeersNotSynced, nil, nil, errPeersNotSynced}
mesh, err := mocknet.FullMeshLinked(len(delays))
require.NoError(t, err)
hosts := []*p2p.Host{}
Expand All @@ -150,7 +150,7 @@ func TestSyncSimulateMultiple(t *testing.T) {
sync := New(hosts[i], hosts[i],
WithConfig(config),
WithTime(delayedTime(delay)),
WithLog(logtest.New(t).Named(fmt.Sprintf("%d-%s", i, hosts[i].ID()))),
WithLog(zaptest.NewLogger(t).Named(fmt.Sprintf("%d-%s", i, hosts[i].ID()))),
)
instances = append(instances, sync)
}
Expand All @@ -168,7 +168,7 @@ func TestSyncSimulateMultiple(t *testing.T) {
}()
select {
case err := <-wait:
require.ErrorContains(t, err, errors[i].Error())
require.ErrorIs(t, err, errors[i])
case <-time.After(1000 * time.Millisecond):
require.FailNowf(t, "timed out waiting for an error", "node %d", i)
}
Expand Down

0 comments on commit 3324aef

Please sign in to comment.