diff --git a/internal/pkg/ntp/ntp.go b/internal/pkg/ntp/ntp.go index 7c78b8e522..05dac14dd7 100644 --- a/internal/pkg/ntp/ntp.go +++ b/internal/pkg/ntp/ntp.go @@ -392,18 +392,15 @@ func (syncer *Syncer) queryNTP(server string) (*Measurement, error) { ) validationError := resp.Validate() + if validationError != nil { + return nil, validationError + } - measurement := &Measurement{ + return &Measurement{ ClockOffset: resp.ClockOffset, Leap: resp.Leap, - Spike: false, - } - - if validationError == nil { - measurement.Spike = syncer.isSpike(resp) - } - - return measurement, validationError + Spike: syncer.isSpike(resp), + }, nil } // log2i returns 0 for v == 0 and v == 1. diff --git a/internal/pkg/ntp/ntp_test.go b/internal/pkg/ntp/ntp_test.go index fcdc5bff1f..f2cc0ce172 100644 --- a/internal/pkg/ntp/ntp_test.go +++ b/internal/pkg/ntp/ntp_test.go @@ -31,8 +31,9 @@ type NTPSuite struct { systemClock time.Time clockAdjustments []time.Duration - failingServer int - spikyServer int + failingServer int + spikyServer int + kissOfDeathServer int } func TestNTPSuite(t *testing.T) { @@ -48,6 +49,8 @@ func (suite *NTPSuite) SetupTest() { suite.systemClock = time.Now().UTC() suite.clockAdjustments = nil suite.failingServer = 0 + suite.spikyServer = 0 + suite.kissOfDeathServer = 0 } func (suite *NTPSuite) getSystemClock() time.Time { @@ -72,6 +75,7 @@ func (suite *NTPSuite) adjustSystemClock(val *unix.Timex) (status timex.State, e return } +//nolint:gocyclo func (suite *NTPSuite) fakeQuery(host string) (resp *beevikntp.Response, err error) { switch host { case "127.0.0.1": // error @@ -160,6 +164,26 @@ func (suite *NTPSuite) fakeQuery(host string) (resp *beevikntp.Response, err err suite.Require().NoError(resp.Validate()) return resp, nil + case "127.0.0.8": // kiss of death alternating + suite.kissOfDeathServer++ + + if suite.kissOfDeathServer%2 == 1 { + return &beevikntp.Response{ // kiss of death + Stratum: 0, + Time: suite.systemClock, + ReferenceTime: suite.systemClock, + ClockOffset: 2 * time.Millisecond, + RTT: time.Millisecond / 2, + }, nil + } else { + return &beevikntp.Response{ // normal response + Stratum: 1, + Time: suite.systemClock, + ReferenceTime: suite.systemClock, + ClockOffset: time.Millisecond, + RTT: time.Millisecond / 2, + }, nil + } default: return nil, fmt.Errorf("unknown host %q", host) } @@ -242,6 +266,60 @@ func (suite *NTPSuite) TestSyncContinuous() { wg.Wait() } +//nolint:dupl +func (suite *NTPSuite) TestSyncKissOfDeath() { + syncer := ntp.NewSyncer(zaptest.NewLogger(suite.T()).With(zap.String("controller", "ntp")), []string{"127.0.0.8"}) + + syncer.AdjustTime = suite.adjustSystemClock + syncer.CurrentTime = suite.getSystemClock + syncer.NTPQuery = suite.fakeQuery + + syncer.MinPoll = time.Second + syncer.MaxPoll = time.Second + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + + wg.Add(1) + + go func() { + defer wg.Done() + + syncer.Run(ctx) + }() + + select { + case <-syncer.Synced(): + case <-time.After(10 * time.Second): + suite.Assert().Fail("time sync timeout") + } + + suite.Assert().NoError( + retry.Constant(10*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(func() error { + suite.clockLock.Lock() + defer suite.clockLock.Unlock() + + if len(suite.clockAdjustments) < 2 { + return retry.ExpectedErrorf("not enough syncs") + } + + for _, adj := range suite.clockAdjustments { + // kiss of death syncs should be ignored + suite.Assert().Equal(time.Millisecond, adj) + } + + return nil + }), + ) + + cancel() + + wg.Wait() +} + +//nolint:dupl func (suite *NTPSuite) TestSyncWithSpikes() { syncer := ntp.NewSyncer(zaptest.NewLogger(suite.T()).With(zap.String("controller", "ntp")), []string{"127.0.0.7"})