Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve M3DBs ability to apply back pressure on writes #1482

Merged
merged 12 commits into from
Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/dbnode/network/server/tchannelthrift/node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,11 @@ func (s *service) getBlocksMetadataV2FromResult(
}

func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error {
if s.db.IsOverloaded() {
s.metrics.overloadRejected.Inc(1)
return tterrors.NewInternalError(errServerIsOverloaded)
}

callStart := s.nowFn()
ctx := tchannelthrift.Context(tctx)

Expand Down Expand Up @@ -788,6 +793,11 @@ func (s *service) Write(tctx thrift.Context, req *rpc.WriteRequest) error {
}

func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest) error {
if s.db.IsOverloaded() {
s.metrics.overloadRejected.Inc(1)
return tterrors.NewInternalError(errServerIsOverloaded)
}

callStart := s.nowFn()
ctx := tchannelthrift.Context(tctx)

Expand Down Expand Up @@ -836,6 +846,11 @@ func (s *service) WriteTagged(tctx thrift.Context, req *rpc.WriteTaggedRequest)
}

func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawRequest) error {
if s.db.IsOverloaded() {
s.metrics.overloadRejected.Inc(1)
return tterrors.NewInternalError(errServerIsOverloaded)
}

callStart := s.nowFn()
ctx := tchannelthrift.Context(tctx)

Expand Down Expand Up @@ -913,6 +928,11 @@ func (s *service) WriteBatchRaw(tctx thrift.Context, req *rpc.WriteBatchRawReque
}

func (s *service) WriteTaggedBatchRaw(tctx thrift.Context, req *rpc.WriteTaggedBatchRawRequest) error {
if s.db.IsOverloaded() {
s.metrics.overloadRejected.Inc(1)
return tterrors.NewInternalError(errServerIsOverloaded)
}

callStart := s.nowFn()
ctx := tchannelthrift.Context(tctx)

Expand Down
97 changes: 97 additions & 0 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,7 @@ func TestServiceWrite(t *testing.T) {
Write(ctx, ident.NewIDMatcher(nsID), ident.NewIDMatcher(id), at, value, xtime.Second, nil).
Return(nil)

mockDB.EXPECT().IsOverloaded().Return(false)
err := service.Write(tctx, &rpc.WriteRequest{
NameSpace: nsID,
ID: id,
Expand All @@ -1347,6 +1348,32 @@ func TestServiceWrite(t *testing.T) {
require.NoError(t, err)
}

func TestServiceWriteOverloaded(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockDB := storage.NewMockDatabase(ctrl)
mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes()

service := NewService(mockDB, testTChannelThriftOptions).(*service)

tctx, _ := tchannelthrift.NewContext(time.Minute)
ctx := tchannelthrift.Context(tctx)
defer ctx.Close()

mockDB.EXPECT().IsOverloaded().Return(true)
err := service.Write(tctx, &rpc.WriteRequest{
NameSpace: "metrics",
ID: "foo",
Datapoint: &rpc.Datapoint{
Timestamp: time.Now().Unix(),
TimestampTimeType: rpc.TimeType_UNIX_SECONDS,
Value: 42.42,
},
})
require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err)
}

func TestServiceWriteTagged(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -1393,10 +1420,37 @@ func TestServiceWriteTagged(t *testing.T) {
Value: tagValues[i],
})
}
mockDB.EXPECT().IsOverloaded().Return(false)
err := service.WriteTagged(tctx, request)
require.NoError(t, err)
}

func TestServiceWriteTaggedOverloaded(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockDB := storage.NewMockDatabase(ctrl)
mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes()

service := NewService(mockDB, testTChannelThriftOptions).(*service)

tctx, _ := tchannelthrift.NewContext(time.Minute)
ctx := tchannelthrift.Context(tctx)
defer ctx.Close()

mockDB.EXPECT().IsOverloaded().Return(true)
err := service.WriteTagged(tctx, &rpc.WriteTaggedRequest{
NameSpace: "metrics",
ID: "foo",
Datapoint: &rpc.Datapoint{
Timestamp: time.Now().Unix(),
TimestampTimeType: rpc.TimeType_UNIX_SECONDS,
Value: 42.42,
},
})
require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err)
}

func TestServiceWriteBatchRaw(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -1443,13 +1497,34 @@ func TestServiceWriteBatchRaw(t *testing.T) {
elements = append(elements, elem)
}

mockDB.EXPECT().IsOverloaded().Return(false)
err := service.WriteBatchRaw(tctx, &rpc.WriteBatchRawRequest{
NameSpace: []byte(nsID),
Elements: elements,
})
require.NoError(t, err)
}

func TestServiceWriteBatchRawOverloaded(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockDB := storage.NewMockDatabase(ctrl)
mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes()

service := NewService(mockDB, testTChannelThriftOptions).(*service)

tctx, _ := tchannelthrift.NewContext(time.Minute)
ctx := tchannelthrift.Context(tctx)
defer ctx.Close()

mockDB.EXPECT().IsOverloaded().Return(true)
err := service.WriteBatchRaw(tctx, &rpc.WriteBatchRawRequest{
NameSpace: []byte("metrics"),
})
require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err)
}

func TestServiceWriteTaggedBatchRaw(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -1508,13 +1583,34 @@ func TestServiceWriteTaggedBatchRaw(t *testing.T) {
elements = append(elements, elem)
}

mockDB.EXPECT().IsOverloaded().Return(false)
err := service.WriteTaggedBatchRaw(tctx, &rpc.WriteTaggedBatchRawRequest{
NameSpace: []byte(nsID),
Elements: elements,
})
require.NoError(t, err)
}

func TestServiceWriteTaggedBatchRawOverloaded(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockDB := storage.NewMockDatabase(ctrl)
mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes()

service := NewService(mockDB, testTChannelThriftOptions).(*service)

tctx, _ := tchannelthrift.NewContext(time.Minute)
ctx := tchannelthrift.Context(tctx)
defer ctx.Close()

mockDB.EXPECT().IsOverloaded().Return(true)
err := service.WriteTaggedBatchRaw(tctx, &rpc.WriteTaggedBatchRawRequest{
NameSpace: []byte("metrics"),
})
require.Equal(t, tterrors.NewInternalError(errServerIsOverloaded), err)
}

func TestServiceWriteTaggedBatchRawUnknownError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down Expand Up @@ -1568,6 +1664,7 @@ func TestServiceWriteTaggedBatchRawUnknownError(t *testing.T) {
elements = append(elements, elem)
}

mockDB.EXPECT().IsOverloaded().Return(false)
err := service.WriteTaggedBatchRaw(tctx, &rpc.WriteTaggedBatchRawRequest{
NameSpace: []byte(nsID),
Elements: elements,
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/persist/fs/commitlog/commit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ func (l *commitLog) RotateLogs() (persist.CommitLogFile, error) {
return file, nil
}

func (l *commitLog) QueueLength() int64 {
return atomic.LoadInt64(&l.numWritesInQueue)
}

func (l *commitLog) flushEvery(interval time.Duration) {
// Periodically flush the underlying commit log writer to cover
// the case when writes stall for a considerable time
Expand Down
16 changes: 15 additions & 1 deletion src/dbnode/persist/fs/commitlog/commit_log_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 42 additions & 5 deletions src/dbnode/persist/fs/commitlog/commit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func TestCommitLogWriteErrorOnClosed(t *testing.T) {
}

func TestCommitLogWriteErrorOnFull(t *testing.T) {
// Set backlog of size one and don't automatically flush
// Set backlog of size one and don't automatically flush.
backlogQueueSize := 1
flushInterval := time.Duration(0)
opts, _ := newTestOptions(t, overrides{
Expand All @@ -610,24 +610,61 @@ func TestCommitLogWriteErrorOnFull(t *testing.T) {

for {
if err := commitLog.Write(ctx, series, dp, unit, nil); err != nil {
// Ensure queue full error
// Ensure queue full error.
require.Equal(t, ErrCommitLogQueueFull, err)
require.Equal(t, int64(backlogQueueSize), commitLog.QueueLength())
break
}
writes = append(writes, testWrite{series, dp.Timestamp, dp.Value, unit, nil, nil})

// Increment timestamp and value for next write
// Increment timestamp and value for next write.
dp.Timestamp = dp.Timestamp.Add(time.Second)
dp.Value += 1.0
}

// Close and consequently flush
// Close and consequently flush.
require.NoError(t, commitLog.Close())

// Assert write flushed by reading the commit log
// Assert write flushed by reading the commit log.
assertCommitLogWritesByIterating(t, commitLog, writes)
}

func TestCommitLogQueueLength(t *testing.T) {
// Set backlog of size one and don't automatically flush.
backlogQueueSize := 10
flushInterval := time.Duration(0)
opts, _ := newTestOptions(t, overrides{
backlogQueueSize: &backlogQueueSize,
flushInterval: &flushInterval,
strategy: StrategyWriteBehind,
})
defer cleanup(t, opts)

commitLog := newTestCommitLog(t, opts)
defer commitLog.Close()

var (
series = testSeries(0, "foo.bar", testTags1, 127)
dp = ts.Datapoint{Timestamp: time.Now(), Value: 123.456}
unit = xtime.Millisecond
ctx = context.NewContext()
)
defer ctx.Close()

for i := 0; ; i++ {
// Write in a loop and check the queue length until the queue is full.
require.Equal(t, int64(i), commitLog.QueueLength())
if err := commitLog.Write(ctx, series, dp, unit, nil); err != nil {
require.Equal(t, ErrCommitLogQueueFull, err)
break
}

// Increment timestamp and value for next write.
dp.Timestamp = dp.Timestamp.Add(time.Second)
dp.Value += 1.0
}
}

func TestCommitLogFailOnWriteError(t *testing.T) {
opts, scope := newTestOptions(t, overrides{
strategy: StrategyWriteBehind,
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/persist/fs/commitlog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type CommitLog interface {
// RotateLogs rotates the commitlog and returns the File that represents
// the new commitlog file.
RotateLogs() (persist.CommitLogFile, error)

// QueueLength returns the number of writes that are currently in the commitlog
// queue.
QueueLength() int64
}

// Iterator provides an iterator for commit logs
Expand Down
Loading