From 46408193d10190637f022e384f9b7a752b78a0c6 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 17:10:47 -0400 Subject: [PATCH 01/12] add QueueSize() method to commitlog --- src/dbnode/persist/fs/commitlog/commit_log.go | 4 ++ .../persist/fs/commitlog/commit_log_test.go | 49 +++++++++++++++++-- src/dbnode/persist/fs/commitlog/types.go | 4 ++ 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/dbnode/persist/fs/commitlog/commit_log.go b/src/dbnode/persist/fs/commitlog/commit_log.go index c61d8044a8..dc2789c151 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log.go +++ b/src/dbnode/persist/fs/commitlog/commit_log.go @@ -355,6 +355,10 @@ func (l *commitLog) RotateLogs() (persist.CommitLogFile, error) { return file, nil } +func (l *commitLog) QueueSize() int64 { + return atomic.LoadInt64(&l.numWritesInQueue) +} + func (l *commitLog) flushEvery(interval time.Duration) { // Periodically flush the underlying commit log writer to cover // the case when writes stall for a considerable time diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index 8c67210954..ebd6f3b218 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -587,7 +587,7 @@ func TestCommitLogWriteErrorOnClosed(t *testing.T) { } func TestCommitLogWriteErrorOnFull(t *testing.T) { - // Set backlog of size one and don't automatically flush + // Set backlog of size one and don't automatically flush. backlogQueueSize := 1 flushInterval := time.Duration(0) opts, _ := newTestOptions(t, overrides{ @@ -610,24 +610,63 @@ func TestCommitLogWriteErrorOnFull(t *testing.T) { for { if err := commitLog.Write(ctx, series, dp, unit, nil); err != nil { - // Ensure queue full error + // Ensure queue full error. require.Equal(t, ErrCommitLogQueueFull, err) + require.Equal(t, int64(backlogQueueSize), commitLog.QueueSize()) break } writes = append(writes, testWrite{series, dp.Timestamp, dp.Value, unit, nil, nil}) - // Increment timestamp and value for next write + // Increment timestamp and value for next write. dp.Timestamp = dp.Timestamp.Add(time.Second) dp.Value += 1.0 } - // Close and consequently flush + // Close and consequently flush. require.NoError(t, commitLog.Close()) - // Assert write flushed by reading the commit log + // Assert write flushed by reading the commit log. assertCommitLogWritesByIterating(t, commitLog, writes) } +func TestCommitLogQueueSize(t *testing.T) { + // Set backlog of size one and don't automatically flush. + backlogQueueSize := 1 + flushInterval := time.Duration(0) + opts, _ := newTestOptions(t, overrides{ + backlogQueueSize: &backlogQueueSize, + flushInterval: &flushInterval, + strategy: StrategyWriteBehind, + }) + defer cleanup(t, opts) + + commitLog := newTestCommitLog(t, opts) + defer commitLog.Close() + + // Test filling queue. + var writes []testWrite + series := testSeries(0, "foo.bar", testTags1, 127) + dp := ts.Datapoint{Timestamp: time.Now(), Value: 123.456} + unit := xtime.Millisecond + + ctx := context.NewContext() + defer ctx.Close() + + for { + if err := commitLog.Write(ctx, series, dp, unit, nil); err != nil { + // Ensure queue full error. + require.Equal(t, ErrCommitLogQueueFull, err) + require.Equal(t, int64(backlogQueueSize), commitLog.QueueSize()) + break + } + writes = append(writes, testWrite{series, dp.Timestamp, dp.Value, unit, nil, nil}) + + // Increment timestamp and value for next write. + dp.Timestamp = dp.Timestamp.Add(time.Second) + dp.Value += 1.0 + } +} + func TestCommitLogFailOnWriteError(t *testing.T) { opts, scope := newTestOptions(t, overrides{ strategy: StrategyWriteBehind, diff --git a/src/dbnode/persist/fs/commitlog/types.go b/src/dbnode/persist/fs/commitlog/types.go index 2ee1310829..9e76777ae9 100644 --- a/src/dbnode/persist/fs/commitlog/types.go +++ b/src/dbnode/persist/fs/commitlog/types.go @@ -78,6 +78,10 @@ type CommitLog interface { // RotateLogs rotates the commitlog and returns the File that represents // the new commitlog file. RotateLogs() (persist.CommitLogFile, error) + + // QueueSize returns the number of writes that are currently in the commitlog + // queue. + QueueSize() int64 } // Iterator provides an iterator for commit logs From 42bb66d70d72442b3da5fcee55feedab85e01510 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 17:21:50 -0400 Subject: [PATCH 02/12] change db.IsOverloaded to use queue size --- src/dbnode/persist/fs/commitlog/commit_log.go | 2 +- .../persist/fs/commitlog/commit_log_mock.go | 16 +++++++++++- .../persist/fs/commitlog/commit_log_test.go | 2 +- src/dbnode/persist/fs/commitlog/types.go | 4 +-- src/dbnode/storage/database.go | 22 +++------------- src/dbnode/storage/database_test.go | 25 +++++++++++++++++++ 6 files changed, 47 insertions(+), 24 deletions(-) diff --git a/src/dbnode/persist/fs/commitlog/commit_log.go b/src/dbnode/persist/fs/commitlog/commit_log.go index dc2789c151..d499db4677 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log.go +++ b/src/dbnode/persist/fs/commitlog/commit_log.go @@ -355,7 +355,7 @@ func (l *commitLog) RotateLogs() (persist.CommitLogFile, error) { return file, nil } -func (l *commitLog) QueueSize() int64 { +func (l *commitLog) QueueLength() int64 { return atomic.LoadInt64(&l.numWritesInQueue) } diff --git a/src/dbnode/persist/fs/commitlog/commit_log_mock.go b/src/dbnode/persist/fs/commitlog/commit_log_mock.go index 2499b266b8..8aa15f2c37 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_mock.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/dbnode/persist/fs/commitlog/types.go -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -150,6 +150,20 @@ func (mr *MockCommitLogMockRecorder) RotateLogs() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RotateLogs", reflect.TypeOf((*MockCommitLog)(nil).RotateLogs)) } +// QueueLength mocks base method +func (m *MockCommitLog) QueueLength() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueueLength") + ret0, _ := ret[0].(int64) + return ret0 +} + +// QueueLength indicates an expected call of QueueLength +func (mr *MockCommitLogMockRecorder) QueueLength() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueLength", reflect.TypeOf((*MockCommitLog)(nil).QueueLength)) +} + // MockIterator is a mock of Iterator interface type MockIterator struct { ctrl *gomock.Controller diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index ebd6f3b218..fec386e89c 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -612,7 +612,7 @@ func TestCommitLogWriteErrorOnFull(t *testing.T) { if err := commitLog.Write(ctx, series, dp, unit, nil); err != nil { // Ensure queue full error. require.Equal(t, ErrCommitLogQueueFull, err) - require.Equal(t, int64(backlogQueueSize), commitLog.QueueSize()) + require.Equal(t, int64(backlogQueueSize), commitLog.QueueLength()) break } writes = append(writes, testWrite{series, dp.Timestamp, dp.Value, unit, nil, nil}) diff --git a/src/dbnode/persist/fs/commitlog/types.go b/src/dbnode/persist/fs/commitlog/types.go index 9e76777ae9..2c6d041975 100644 --- a/src/dbnode/persist/fs/commitlog/types.go +++ b/src/dbnode/persist/fs/commitlog/types.go @@ -79,9 +79,9 @@ type CommitLog interface { // the new commitlog file. RotateLogs() (persist.CommitLogFile, error) - // QueueSize returns the number of writes that are currently in the commitlog + // QueueLength returns the number of writes that are currently in the commitlog // queue. - QueueSize() int64 + QueueLength() int64 } // Iterator provides an iterator for commit logs diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index f424aecc62..fb7cc5c041 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -36,7 +36,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/dbnode/x/xcounter" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3x/context" xerrors "github.com/m3db/m3x/errors" @@ -101,10 +100,6 @@ type db struct { metrics databaseMetrics log xlog.Logger - errors xcounter.FrequencyCounter - errWindow time.Duration - errThreshold int64 - writeBatchPool *ts.WriteBatchPool } @@ -174,9 +169,6 @@ func NewDatabase( scope: scope, metrics: newDatabaseMetrics(scope), log: logger, - errors: xcounter.NewFrequencyCounter(opts.ErrorCounterOptions()), - errWindow: opts.ErrorWindowForLoad(), - errThreshold: opts.ErrorThresholdForLoad(), writeBatchPool: opts.WriteBatchPool(), } @@ -560,9 +552,6 @@ func (d *db) Write( dp := ts.Datapoint{Timestamp: timestamp, Value: value} err = d.commitLog.Write(ctx, series, dp, unit, annotation) - if err == commitlog.ErrCommitLogQueueFull { - d.errors.Record(1) - } if err != nil { return err } @@ -597,9 +586,6 @@ func (d *db) WriteTagged( dp := ts.Datapoint{Timestamp: timestamp, Value: value} err = d.commitLog.Write(ctx, series, dp, unit, annotation) - if err == commitlog.ErrCommitLogQueueFull { - d.errors.Record(1) - } if err != nil { return err } @@ -716,10 +702,6 @@ func (d *db) writeBatch( } err = d.commitLog.WriteBatch(ctx, writes) - if err == commitlog.ErrCommitLogQueueFull { - numFailedWrites := int64(len(writes.Iter())) - d.errors.Record(numFailedWrites) - } if err != nil { return err } @@ -871,7 +853,9 @@ func (d *db) Truncate(namespace ident.ID) (int64, error) { } func (d *db) IsOverloaded() bool { - return d.errors.Count(d.errWindow) > d.errThreshold + queueSize := float64(d.commitLog.QueueLength()) + queueCapacity := float64(d.opts.CommitLogOptions().BacklogQueueSize()) + return queueSize >= 0.9*queueCapacity } func (d *db) BootstrapState() DatabaseBootstrapState { diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index a3b15810bc..ebb7816399 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/sharding" @@ -1173,3 +1174,27 @@ func TestUpdateBatchWriterBasedOnShardResults(t *testing.T) { d.commitLog = commitlog require.NoError(t, d.Close()) } + +func TestDatabaseIsOverloaded(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := newTestDatabase(t, ctrl, BootstrapNotStarted) + defer func() { + close(mapCh) + }() + + queueCapacity := 100 + d.opts = d.opts.SetCommitLogOptions( + d.opts.CommitLogOptions().SetBacklogQueueSize(100), + ) + + mockCL := commitlog.NewMockCommitLog(ctrl) + d.commitLog = mockCl + + mockCL.EXPECT().QueueLength().Return(89) + require.Equal(t, false, d.IsOverloaded()) + + mockCL.EXPECT().QueueLength().Return(90) + require.Equal(t, true, d.IsOverloaded()) +} From df2e55bb46f569b93334b031e24d4b7d6cceef6b Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 17:24:57 -0400 Subject: [PATCH 03/12] add constant, comment, and fix test --- src/dbnode/storage/database.go | 10 +++++++++- src/dbnode/storage/database_test.go | 7 +++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index fb7cc5c041..fd4373dcd4 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -46,6 +46,14 @@ import ( "github.com/uber-go/tally" ) +const ( + // The database is considered overloaded if the queue size is 90% or more + // of the maximum capacity. We set this below 1.0 because checking the queue + // lengthy is racey so we're gonna burst past this value anyways, and the buffer + // give us breathing room to recover. + commitLogQueueCapacityOverloadedFactor = 0.9 +) + var ( // errDatabaseAlreadyOpen raised when trying to open a database that is already open. errDatabaseAlreadyOpen = errors.New("database is already open") @@ -855,7 +863,7 @@ func (d *db) Truncate(namespace ident.ID) (int64, error) { func (d *db) IsOverloaded() bool { queueSize := float64(d.commitLog.QueueLength()) queueCapacity := float64(d.opts.CommitLogOptions().BacklogQueueSize()) - return queueSize >= 0.9*queueCapacity + return queueSize >= commitLogQueueCapacityOverloadedFactor*queueCapacity } func (d *db) BootstrapState() DatabaseBootstrapState { diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index ebb7816399..fd11a1ab8e 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -1184,17 +1184,16 @@ func TestDatabaseIsOverloaded(t *testing.T) { close(mapCh) }() - queueCapacity := 100 d.opts = d.opts.SetCommitLogOptions( d.opts.CommitLogOptions().SetBacklogQueueSize(100), ) mockCL := commitlog.NewMockCommitLog(ctrl) - d.commitLog = mockCl + d.commitLog = mockCL - mockCL.EXPECT().QueueLength().Return(89) + mockCL.EXPECT().QueueLength().Return(int64(89)) require.Equal(t, false, d.IsOverloaded()) - mockCL.EXPECT().QueueLength().Return(90) + mockCL.EXPECT().QueueLength().Return(int64(90)) require.Equal(t, true, d.IsOverloaded()) } From 6d27c782c0000a873dbfb720a5d54e49107abc33 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 18:40:27 -0400 Subject: [PATCH 04/12] Add tests --- .../server/tchannelthrift/node/service.go | 20 ++++ .../tchannelthrift/node/service_test.go | 98 +++++++++++++++++++ 2 files changed, 118 insertions(+) diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 1c780d2a5d..525c8be427 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -752,6 +752,11 @@ func (s *service) getBlocksMetadataV2FromResult( } func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error { + if s.db.IsOverloaded() { + s.metrics.overloadRejected.Inc(1) + return tterrors.NewInternalError(errServerIsOverloaded) + } + callStart := s.nowFn() ctx := tchannelthrift.Context(tctx) @@ -788,6 +793,11 @@ func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error { } func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) error { + if s.db.IsOverloaded() { + s.metrics.overloadRejected.Inc(1) + return tterrors.NewInternalError(errServerIsOverloaded) + } + callStart := s.nowFn() ctx := tchannelthrift.Context(tctx) @@ -836,6 +846,11 @@ func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) } func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawRequest) error { + if s.db.IsOverloaded() { + s.metrics.overloadRejected.Inc(1) + return tterrors.NewInternalError(errServerIsOverloaded) + } + callStart := s.nowFn() ctx := tchannelthrift.Context(tctx) @@ -913,6 +928,11 @@ func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawReque } func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedBatchRawRequest) error { + if s.db.IsOverloaded() { + s.metrics.overloadRejected.Inc(1) + return tterrors.NewInternalError(errServerIsOverloaded) + } + callStart := s.nowFn() ctx := tchannelthrift.Context(tctx) diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index acdc0cd8a7..f8d2eec2c8 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1347,6 +1347,32 @@ func TestServiceWrite(t *testing.T) { require.NoError(t, err) } +func TestServiceWriteOverloaded(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockDB := storage.NewMockDatabase(ctrl) + mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + + service := NewService(mockDB, testTChannelThriftOptions).(*service) + + tctx, _ := tchannelthrift.NewContext(time.Minute) + ctx := tchannelthrift.Context(tctx) + defer ctx.Close() + + mockDB.EXPECT().IsOverloaded().Return(true) + err := service.Write(tctx, &rpc.WriteRequest{ + NameSpace: "metrics", + ID: "foo", + Datapoint: &rpc.Datapoint{ + Timestamp: time.Now().Unix(), + TimestampTimeType: rpc.TimeType_UNIX_SECONDS, + Value: 42.42, + }, + }) + require.Equal(t, errServerIsOverloaded, err) +} + func TestServiceWriteTagged(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1397,6 +1423,32 @@ func TestServiceWriteTagged(t *testing.T) { require.NoError(t, err) } +func TestServiceWriteTaggedOverloaded(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockDB := storage.NewMockDatabase(ctrl) + mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + + service := NewService(mockDB, testTChannelThriftOptions).(*service) + + tctx, _ := tchannelthrift.NewContext(time.Minute) + ctx := tchannelthrift.Context(tctx) + defer ctx.Close() + + mockDB.EXPECT().IsOverloaded().Return(true) + err := service.WriteTagged(tctx, &rpc.WriteTaggedRequest{ + NameSpace: "metrics", + ID: "foo", + Datapoint: &rpc.Datapoint{ + Timestamp: time.Now().Unix(), + TimestampTimeType: rpc.TimeType_UNIX_SECONDS, + Value: 42.42, + }, + }) + require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) +} + func TestServiceWriteBatchRaw(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1450,6 +1502,32 @@ func TestServiceWriteBatchRaw(t *testing.T) { require.NoError(t, err) } +func TestServiceWriteBatchRawOverloaded(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockDB := storage.NewMockDatabase(ctrl) + mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + + service := NewService(mockDB, testTChannelThriftOptions).(*service) + + tctx, _ := tchannelthrift.NewContext(time.Minute) + ctx := tchannelthrift.Context(tctx) + defer ctx.Close() + + nsID := "metrics" + writeBatch := ts.NewWriteBatch(0, ident.StringID(nsID), nil) + mockDB.EXPECT(). + BatchWriter(ident.NewIDMatcher(nsID), 0). + Return(writeBatch, nil) + + mockDB.EXPECT().IsOverloaded().Return(true) + err := service.WriteBatchRaw(tctx, &rpc.WriteBatchRawRequest{ + NameSpace: []byte(nsID), + }) + require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) +} + func TestServiceWriteTaggedBatchRaw(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1515,6 +1593,26 @@ func TestServiceWriteTaggedBatchRaw(t *testing.T) { require.NoError(t, err) } +func TestServiceWriteTaggedBatchRawOverloaded(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockDB := storage.NewMockDatabase(ctrl) + mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + + service := NewService(mockDB, testTChannelThriftOptions).(*service) + + tctx, _ := tchannelthrift.NewContext(time.Minute) + ctx := tchannelthrift.Context(tctx) + defer ctx.Close() + + mockDB.EXPECT().IsOverloaded().Return(true) + err := service.WriteTaggedBatchRaw(tctx, &rpc.WriteTaggedBatchRawRequest{ + NameSpace: []byte("metrics"), + }) + require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) +} + func TestServiceWriteTaggedBatchRawUnknownError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() From 184ea372a2d9b4e4bebd2a645d1e42863c1b0c72 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 18:41:28 -0400 Subject: [PATCH 05/12] Fix tests --- src/dbnode/network/server/tchannelthrift/node/service_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index f8d2eec2c8..c86c64db28 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1335,6 +1335,7 @@ func TestServiceWrite(t *testing.T) { Write(ctx, ident.NewIDMatcher(nsID), ident.NewIDMatcher(id), at, value, xtime.Second, nil). Return(nil) + mockDB.EXPECT().IsOverloaded().Return(false) err := service.Write(tctx, &rpc.WriteRequest{ NameSpace: nsID, ID: id, @@ -1370,7 +1371,7 @@ func TestServiceWriteOverloaded(t *testing.T) { Value: 42.42, }, }) - require.Equal(t, errServerIsOverloaded, err) + require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } func TestServiceWriteTagged(t *testing.T) { From fa93a3d8c0b1305dbd18574f7917345e5e9147ba Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 18:43:53 -0400 Subject: [PATCH 06/12] Fix more tests --- .../server/tchannelthrift/node/service_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index c86c64db28..ace7217b25 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1420,6 +1420,7 @@ func TestServiceWriteTagged(t *testing.T) { Value: tagValues[i], }) } + mockDB.EXPECT().IsOverloaded().Return(false) err := service.WriteTagged(tctx, request) require.NoError(t, err) } @@ -1496,6 +1497,7 @@ func TestServiceWriteBatchRaw(t *testing.T) { elements = append(elements, elem) } + mockDB.EXPECT().IsOverloaded().Return(false) err := service.WriteBatchRaw(tctx, &rpc.WriteBatchRawRequest{ NameSpace: []byte(nsID), Elements: elements, @@ -1516,15 +1518,9 @@ func TestServiceWriteBatchRawOverloaded(t *testing.T) { ctx := tchannelthrift.Context(tctx) defer ctx.Close() - nsID := "metrics" - writeBatch := ts.NewWriteBatch(0, ident.StringID(nsID), nil) - mockDB.EXPECT(). - BatchWriter(ident.NewIDMatcher(nsID), 0). - Return(writeBatch, nil) - mockDB.EXPECT().IsOverloaded().Return(true) err := service.WriteBatchRaw(tctx, &rpc.WriteBatchRawRequest{ - NameSpace: []byte(nsID), + NameSpace: []byte("metrics"), }) require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err) } @@ -1587,6 +1583,7 @@ func TestServiceWriteTaggedBatchRaw(t *testing.T) { elements = append(elements, elem) } + mockDB.EXPECT().IsOverloaded().Return(true) err := service.WriteTaggedBatchRaw(tctx, &rpc.WriteTaggedBatchRawRequest{ NameSpace: []byte(nsID), Elements: elements, @@ -1667,6 +1664,7 @@ func TestServiceWriteTaggedBatchRawUnknownError(t *testing.T) { elements = append(elements, elem) } + mockDB.EXPECT().IsOverloaded().Return(false) err := service.WriteTaggedBatchRaw(tctx, &rpc.WriteTaggedBatchRawRequest{ NameSpace: []byte(nsID), Elements: elements, From cda36811d07a1c7d1f25ec0ebbd46b94c9e2ee55 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 18:44:16 -0400 Subject: [PATCH 07/12] fix last test --- src/dbnode/network/server/tchannelthrift/node/service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index ace7217b25..8d7938b425 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -1583,7 +1583,7 @@ func TestServiceWriteTaggedBatchRaw(t *testing.T) { elements = append(elements, elem) } - mockDB.EXPECT().IsOverloaded().Return(true) + mockDB.EXPECT().IsOverloaded().Return(false) err := service.WriteTaggedBatchRaw(tctx, &rpc.WriteTaggedBatchRawRequest{ NameSpace: []byte(nsID), Elements: elements, From cbf4082bcec1b062f5b90f85c53c785e2269c0e0 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 18:50:12 -0400 Subject: [PATCH 08/12] improve commitlog queue length test --- src/dbnode/persist/fs/commitlog/commit_log_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index fec386e89c..17383fef9e 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -629,9 +629,9 @@ func TestCommitLogWriteErrorOnFull(t *testing.T) { assertCommitLogWritesByIterating(t, commitLog, writes) } -func TestCommitLogQueueSize(t *testing.T) { +func TestCommitLogQueueLength(t *testing.T) { // Set backlog of size one and don't automatically flush. - backlogQueueSize := 1 + backlogQueueSize := 10 flushInterval := time.Duration(0) opts, _ := newTestOptions(t, overrides{ backlogQueueSize: &backlogQueueSize, @@ -643,7 +643,6 @@ func TestCommitLogQueueSize(t *testing.T) { commitLog := newTestCommitLog(t, opts) defer commitLog.Close() - // Test filling queue. var writes []testWrite series := testSeries(0, "foo.bar", testTags1, 127) dp := ts.Datapoint{Timestamp: time.Now(), Value: 123.456} @@ -652,15 +651,15 @@ func TestCommitLogQueueSize(t *testing.T) { ctx := context.NewContext() defer ctx.Close() - for { + for i := 0; ; i++ { + // Write in a loop and check the queue length until the queue is full. if err := commitLog.Write(ctx, series, dp, unit, nil); err != nil { - // Ensure queue full error. require.Equal(t, ErrCommitLogQueueFull, err) - require.Equal(t, int64(backlogQueueSize), commitLog.QueueSize()) break } - writes = append(writes, testWrite{series, dp.Timestamp, dp.Value, unit, nil, nil}) + require.Equal(t, int64(i), commitLog.QueueLength()) + writes = append(writes, testWrite{series, dp.Timestamp, dp.Value, unit, nil, nil}) // Increment timestamp and value for next write. dp.Timestamp = dp.Timestamp.Add(time.Second) dp.Value += 1.0 From b63b91604a44a2c3560fa4560e5cffbecbf08849 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 18:51:39 -0400 Subject: [PATCH 09/12] fix commitlog test --- .../persist/fs/commitlog/commit_log_test.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index 17383fef9e..1962bee701 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -643,23 +643,22 @@ func TestCommitLogQueueLength(t *testing.T) { commitLog := newTestCommitLog(t, opts) defer commitLog.Close() - var writes []testWrite - series := testSeries(0, "foo.bar", testTags1, 127) - dp := ts.Datapoint{Timestamp: time.Now(), Value: 123.456} - unit := xtime.Millisecond - - ctx := context.NewContext() + var ( + series = testSeries(0, "foo.bar", testTags1, 127) + dp = ts.Datapoint{Timestamp: time.Now(), Value: 123.456} + unit = xtime.Millisecond + ctx = context.NewContext() + ) defer ctx.Close() for i := 0; ; i++ { // Write in a loop and check the queue length until the queue is full. + require.Equal(t, int64(i), commitLog.QueueLength()) if err := commitLog.Write(ctx, series, dp, unit, nil); err != nil { require.Equal(t, ErrCommitLogQueueFull, err) break } - require.Equal(t, int64(i), commitLog.QueueLength()) - writes = append(writes, testWrite{series, dp.Timestamp, dp.Value, unit, nil, nil}) // Increment timestamp and value for next write. dp.Timestamp = dp.Timestamp.Add(time.Second) dp.Value += 1.0 From eb01ae8a28d1a9484cb5a35e53a91778433fba87 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 18:52:27 -0400 Subject: [PATCH 10/12] fix comment --- src/dbnode/storage/database.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index fd4373dcd4..d757756145 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -49,8 +49,8 @@ import ( const ( // The database is considered overloaded if the queue size is 90% or more // of the maximum capacity. We set this below 1.0 because checking the queue - // lengthy is racey so we're gonna burst past this value anyways, and the buffer - // give us breathing room to recover. + // lengthy is racey so we're gonna burst past this value anyways and the buffer + // gives us breathing room to recover. commitLogQueueCapacityOverloadedFactor = 0.9 ) From d3be661f9a1a4924106c4e8dbe80d80bdeeda669 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 19:21:18 -0400 Subject: [PATCH 11/12] fix lint errors --- src/dbnode/storage/database.go | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index d757756145..e8034a39ae 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -559,12 +559,7 @@ func (d *db) Write( } dp := ts.Datapoint{Timestamp: timestamp, Value: value} - err = d.commitLog.Write(ctx, series, dp, unit, annotation) - if err != nil { - return err - } - - return nil + return d.commitLog.Write(ctx, series, dp, unit, annotation) } func (d *db) WriteTagged( @@ -593,12 +588,7 @@ func (d *db) WriteTagged( } dp := ts.Datapoint{Timestamp: timestamp, Value: value} - err = d.commitLog.Write(ctx, series, dp, unit, annotation) - if err != nil { - return err - } - - return nil + return d.commitLog.Write(ctx, series, dp, unit, annotation) } func (d *db) BatchWriter(namespace ident.ID, batchSize int) (ts.BatchWriter, error) { @@ -709,12 +699,7 @@ func (d *db) writeBatch( return nil } - err = d.commitLog.WriteBatch(ctx, writes) - if err != nil { - return err - } - - return nil + return d.commitLog.WriteBatch(ctx, writes) } func (d *db) QueryIDs( From e0d076b65e6cc76554f1df0a9b42d3bc2738037f Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 20 Mar 2019 23:17:48 -0400 Subject: [PATCH 12/12] delete frequency counter package --- src/dbnode/storage/options.go | 13 -- src/dbnode/storage/storage_mock.go | 29 ---- src/dbnode/storage/types.go | 7 - src/dbnode/x/xcounter/frequency_counter.go | 141 ------------------ .../x/xcounter/frequency_counter_test.go | 90 ----------- src/dbnode/x/xcounter/options.go | 60 -------- 6 files changed, 340 deletions(-) delete mode 100644 src/dbnode/x/xcounter/frequency_counter.go delete mode 100644 src/dbnode/x/xcounter/frequency_counter_test.go delete mode 100644 src/dbnode/x/xcounter/options.go diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index c1c9dea08f..c748e580b0 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -42,7 +42,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/dbnode/x/xcounter" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3x/context" "github.com/m3db/m3x/ident" @@ -113,7 +112,6 @@ type options struct { blockOpts block.Options commitLogOpts commitlog.Options runtimeOptsMgr m3dbruntime.OptionsManager - errCounterOpts xcounter.Options errWindowForLoad time.Duration errThresholdForLoad int64 indexingEnabled bool @@ -167,7 +165,6 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { blockOpts: block.NewOptions(), commitLogOpts: commitlog.NewOptions(), runtimeOptsMgr: m3dbruntime.NewOptionsManager(), - errCounterOpts: xcounter.NewOptions(), errWindowForLoad: defaultErrorWindowForLoad, errThresholdForLoad: defaultErrorThresholdForLoad, indexingEnabled: defaultIndexingEnabled, @@ -311,16 +308,6 @@ func (o *options) RuntimeOptionsManager() m3dbruntime.OptionsManager { return o.runtimeOptsMgr } -func (o *options) SetErrorCounterOptions(value xcounter.Options) Options { - opts := *o - opts.errCounterOpts = value - return &opts -} - -func (o *options) ErrorCounterOptions() xcounter.Options { - return o.errCounterOpts -} - func (o *options) SetErrorWindowForLoad(value time.Duration) Options { opts := *o opts.errWindowForLoad = value diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index ca3c530506..6bd38dda5f 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -43,7 +43,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/dbnode/x/xcounter" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3x/context" "github.com/m3db/m3x/ident" @@ -2835,34 +2834,6 @@ func (mr *MockOptionsMockRecorder) RuntimeOptionsManager() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RuntimeOptionsManager", reflect.TypeOf((*MockOptions)(nil).RuntimeOptionsManager)) } -// SetErrorCounterOptions mocks base method -func (m *MockOptions) SetErrorCounterOptions(value xcounter.Options) Options { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetErrorCounterOptions", value) - ret0, _ := ret[0].(Options) - return ret0 -} - -// SetErrorCounterOptions indicates an expected call of SetErrorCounterOptions -func (mr *MockOptionsMockRecorder) SetErrorCounterOptions(value interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetErrorCounterOptions", reflect.TypeOf((*MockOptions)(nil).SetErrorCounterOptions), value) -} - -// ErrorCounterOptions mocks base method -func (m *MockOptions) ErrorCounterOptions() xcounter.Options { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ErrorCounterOptions") - ret0, _ := ret[0].(xcounter.Options) - return ret0 -} - -// ErrorCounterOptions indicates an expected call of ErrorCounterOptions -func (mr *MockOptionsMockRecorder) ErrorCounterOptions() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ErrorCounterOptions", reflect.TypeOf((*MockOptions)(nil).ErrorCounterOptions)) -} - // SetErrorWindowForLoad mocks base method func (m *MockOptions) SetErrorWindowForLoad(value time.Duration) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 782e69b92c..331a3ce440 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -39,7 +39,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/repair" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/dbnode/x/xcounter" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3x/context" "github.com/m3db/m3x/ident" @@ -736,12 +735,6 @@ type Options interface { // RuntimeOptionsManager returns the runtime options manager. RuntimeOptionsManager() runtime.OptionsManager - // SetErrorCounterOptions sets the error counter options. - SetErrorCounterOptions(value xcounter.Options) Options - - // ErrorCounterOptions returns the error counter options. - ErrorCounterOptions() xcounter.Options - // SetErrorWindowForLoad sets the error window for load. SetErrorWindowForLoad(value time.Duration) Options diff --git a/src/dbnode/x/xcounter/frequency_counter.go b/src/dbnode/x/xcounter/frequency_counter.go deleted file mode 100644 index 32467b8b8f..0000000000 --- a/src/dbnode/x/xcounter/frequency_counter.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package xcounter - -import ( - "math" - "sync" - "sync/atomic" - "time" - - "github.com/m3db/m3x/clock" -) - -type frequencyBucket struct { - sync.RWMutex - - timestamp time.Time - count int64 -} - -// record records the frequency value n for a given time. -// If the frequency value is too old, it's discarded. -// Returns true if the value is processed, false if it -// requires further processing. -func (b *frequencyBucket) record(t time.Time, n int64) { - b.RLock() - if processed := b.recordWithLock(t, n); processed { - b.RUnlock() - return - } - b.RUnlock() - - b.Lock() - if processed := b.recordWithLock(t, n); processed { - b.Unlock() - return - } - b.timestamp = t - b.count = n - b.Unlock() -} - -// countFor returns the frequency value if the bucket -// timestamp matches the given time, 0 otherwise -func (b *frequencyBucket) countFor(t time.Time) int64 { - b.RLock() - if b.timestamp != t { - b.RUnlock() - return 0 - } - count := b.count - b.RUnlock() - - return count -} - -func (b *frequencyBucket) recordWithLock(t time.Time, n int64) bool { - // The information is too old to keep - if t.Before(b.timestamp) { - return true - } - // The timestamp matches so it's safe to record - if t.Equal(b.timestamp) { - // NB(xichen): use atomics here so multiple goroutines - // can record at the same time - atomic.AddInt64(&b.count, n) - return true - } - // Otherwise this bucket is stale, don't record - return false -} - -// FrequencyCounter keeps track of the frequency counts for the -// previous interval and the current interval -type FrequencyCounter struct { - nowFn clock.NowFn - interval time.Duration - buckets []frequencyBucket -} - -// NewFrequencyCounter creates a new frequency counter -func NewFrequencyCounter(opts Options) FrequencyCounter { - return FrequencyCounter{ - nowFn: time.Now, - interval: opts.Interval(), - buckets: make([]frequencyBucket, opts.NumBuckets()), - } -} - -// Record records a frequency value in the corresponding bucket -func (c *FrequencyCounter) Record(n int64) { - now := c.nowFn().Truncate(c.interval) - bucketIdx := c.bucketIdx(now) - c.buckets[bucketIdx].record(now, n) -} - -// Count returns the frequency count between now - dur and now -func (c *FrequencyCounter) Count(dur time.Duration) int64 { - if dur <= 0 { - return 0 - } - var ( - n = int(math.Min(float64(dur/c.interval), float64(len(c.buckets)))) - now = c.nowFn().Truncate(c.interval) - currIdx = c.bucketIdx(now) - currTime = now - total int64 - ) - // NB(xichen): we discount the current bucket because it may be incomplete - for i := 0; i < n; i++ { - currTime = currTime.Add(-c.interval) - currIdx-- - if currIdx < 0 { - currIdx += len(c.buckets) - } - total += c.buckets[currIdx].countFor(currTime) - } - return total -} - -func (c *FrequencyCounter) bucketIdx(now time.Time) int { - return int((now.UnixNano() / int64(c.interval)) % int64(len(c.buckets))) -} diff --git a/src/dbnode/x/xcounter/frequency_counter_test.go b/src/dbnode/x/xcounter/frequency_counter_test.go deleted file mode 100644 index 3b52b2a4af..0000000000 --- a/src/dbnode/x/xcounter/frequency_counter_test.go +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package xcounter - -import ( - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestFrequencyCounterSequential(t *testing.T) { - numBuckets := 10 - interval := time.Second - opts := NewOptions().SetNumBuckets(numBuckets).SetInterval(interval) - c := NewFrequencyCounter(opts) - var now time.Time - c.nowFn = func() time.Time { return now } - - // Record a value in each bucket - now = time.Now().Truncate(interval) - iter := 10 - for i := 0; i < iter; i++ { - now = now.Add(interval) - c.Record(1) - } - require.Equal(t, int64(iter-1), c.Count(time.Duration(iter)*interval)) - - // Move time forward - now = now.Add(interval) - c.Record(2) - require.Equal(t, int64(iter-1), c.Count(time.Duration(iter)*interval)) - - // Move time forward again - now = now.Add(interval) - c.Record(2) - require.Equal(t, int64(iter), c.Count(time.Duration(iter)*interval)) - - // Move time forward far into future - now = now.Add(20 * interval) - require.Equal(t, int64(0), c.Count(time.Duration(iter)*interval)) -} - -func TestFrequencyCounterParallel(t *testing.T) { - numBuckets := 10 - interval := time.Second - opts := NewOptions().SetNumBuckets(numBuckets).SetInterval(interval) - c := NewFrequencyCounter(opts) - var now time.Time - c.nowFn = func() time.Time { return now } - - var ( - wg sync.WaitGroup - numWorkers = 10 - iter = 10 - ) - now = time.Now().Truncate(interval) - for i := 0; i < iter; i++ { - now = now.Add(interval) - wg.Add(numWorkers) - for j := 0; j < numWorkers; j++ { - go func() { - defer wg.Done() - - c.Record(1) - }() - } - wg.Wait() - } - require.Equal(t, int64(iter-1)*int64(numWorkers), c.Count(time.Duration(iter)*interval)) -} diff --git a/src/dbnode/x/xcounter/options.go b/src/dbnode/x/xcounter/options.go deleted file mode 100644 index e0964e4b4b..0000000000 --- a/src/dbnode/x/xcounter/options.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright (c) 2017 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package xcounter - -import "time" - -const ( - defaultNumBuckets = 60 - defaultInterval = time.Second -) - -// Options controls the parameters for frequency counters -type Options struct { - numBuckets int - interval time.Duration -} - -// NewOptions creates new options -func NewOptions() Options { - return Options{ - numBuckets: defaultNumBuckets, - interval: defaultInterval, - } -} - -// NumBuckets returns the number of buckets -func (o Options) NumBuckets() int { return o.numBuckets } - -// Interval returns the interval associated with each bucket -func (o Options) Interval() time.Duration { return o.interval } - -// SetNumBuckets sets the number of buckets -func (o Options) SetNumBuckets(value int) Options { - o.numBuckets = value - return o -} - -// SetInterval sets the interval -func (o Options) SetInterval(value time.Duration) Options { - o.interval = value - return o -}