From 87c840151fb781e1f32bb805415aeb32bb3b247c Mon Sep 17 00:00:00 2001 From: linasm Date: Mon, 1 Jun 2020 13:49:15 +0300 Subject: [PATCH 1/4] [dbnode] Track loaded docs per completed query --- src/dbnode/storage/index.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index e079bbd011..946e772f77 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1359,6 +1359,8 @@ func (i *nsIndex) queryWithSpan( } } + i.metrics.loadedDocsPerQuery.RecordValue(float64(results.Size())) + state.Lock() // Take reference to vars to return while locked. exhaustive := state.exhaustive @@ -1844,6 +1846,8 @@ type nsIndexMetrics struct { insertEndToEndLatency tally.Timer blocksEvictedMutableSegments tally.Counter blockMetrics nsIndexBlocksMetrics + + loadedDocsPerQuery tally.Histogram } func newNamespaceIndexMetrics( @@ -1889,6 +1893,10 @@ func newNamespaceIndexMetrics( "insert-end-to-end-latency", iopts.TimerOptions()), blocksEvictedMutableSegments: scope.Counter("blocks-evicted-mutable-segments"), blockMetrics: newNamespaceIndexBlocksMetrics(opts, blocksScope), + loadedDocsPerQuery: scope.Histogram( + "loaded-docs-per-query", + tally.MustMakeExponentialValueBuckets(10, 10, 7), + ), } } From a3ddddc60fe790937c5ff975c11c3666f191790a Mon Sep 17 00:00:00 2001 From: linasm Date: Mon, 1 Jun 2020 16:16:06 +0300 Subject: [PATCH 2/4] Adjust bucket sizes --- src/dbnode/storage/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 946e772f77..dfeea9faa7 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1895,7 +1895,7 @@ func newNamespaceIndexMetrics( blockMetrics: newNamespaceIndexBlocksMetrics(opts, blocksScope), loadedDocsPerQuery: scope.Histogram( "loaded-docs-per-query", - tally.MustMakeExponentialValueBuckets(10, 10, 7), + tally.MustMakeExponentialValueBuckets(10, 2, 16), ), } } From 9861846d54111ac8491c1d878171fb9def36a852 Mon Sep 17 00:00:00 2001 From: linasm Date: Thu, 4 Jun 2020 15:01:28 +0300 Subject: [PATCH 3/4] Count all the documents observed while building the result --- src/dbnode/storage/index.go | 2 +- src/dbnode/storage/index/aggregate_results.go | 12 ++++++- .../storage/index/aggregate_results_test.go | 36 +++++++++++++++++++ src/dbnode/storage/index/results.go | 12 ++++++- src/dbnode/storage/index/results_test.go | 29 +++++++++++++++ src/dbnode/storage/index/types.go | 3 ++ 6 files changed, 91 insertions(+), 3 deletions(-) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index dfeea9faa7..73d0700a9e 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1359,7 +1359,7 @@ func (i *nsIndex) queryWithSpan( } } - i.metrics.loadedDocsPerQuery.RecordValue(float64(results.Size())) + i.metrics.loadedDocsPerQuery.RecordValue(float64(results.TotalDocsCount())) state.Lock() // Take reference to vars to return while locked. diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index 9d1377c18a..5e48c43ff1 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -41,7 +41,8 @@ type aggregatedResults struct { nsID ident.ID aggregateOpts AggregateResultsOptions - resultsMap *AggregateResultsMap + resultsMap *AggregateResultsMap + totalDocsCount int idPool ident.Pool bytesPool pool.CheckedBytesPool @@ -94,6 +95,7 @@ func (r *aggregatedResults) Reset( // reset all keys in the map next r.resultsMap.Reset() + r.totalDocsCount = 0 // NB: could do keys+value in one step but I'm trying to avoid // using an internal method of a code-gen'd type. @@ -104,6 +106,7 @@ func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, error) { r.Lock() err := r.addDocumentsBatchWithLock(batch) size := r.resultsMap.Len() + r.totalDocsCount += len(batch) r.Unlock() return size, err } @@ -298,6 +301,13 @@ func (r *aggregatedResults) Size() int { return l } +func (r *aggregatedResults) TotalDocsCount() int { + r.RLock() + count := r.totalDocsCount + r.RUnlock() + return count +} + func (r *aggregatedResults) Finalize() { r.Reset(nil, AggregateResultsOptions{}) if r.pool == nil { diff --git a/src/dbnode/storage/index/aggregate_results_test.go b/src/dbnode/storage/index/aggregate_results_test.go index 69d6c3d058..1ab470a187 100644 --- a/src/dbnode/storage/index/aggregate_results_test.go +++ b/src/dbnode/storage/index/aggregate_results_test.go @@ -55,10 +55,16 @@ func TestAggResultsInsertInvalid(t *testing.T) { require.Error(t, err) require.Equal(t, 0, size) + require.Equal(t, 0, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) + dInvalid = genDoc("", "foo") size, err = res.AddDocuments([]doc.Document{dInvalid}) require.Error(t, err) require.Equal(t, 0, size) + + require.Equal(t, 0, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) } func TestAggResultsInsertEmptyTermValue(t *testing.T) { @@ -67,6 +73,21 @@ func TestAggResultsInsertEmptyTermValue(t *testing.T) { size, err := res.AddDocuments([]doc.Document{dValidEmptyTerm}) require.NoError(t, err) require.Equal(t, 1, size) + + require.Equal(t, 1, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) +} + +func TestAggResultsInsertBatchOfTwo(t *testing.T) { + res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts) + d1 := genDoc("d1", "") + d2 := genDoc("d2", "") + size, err := res.AddDocuments([]doc.Document{d1, d2}) + require.NoError(t, err) + require.Equal(t, 2, size) + + require.Equal(t, 2, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) } func TestAggResultsTermOnlyInsert(t *testing.T) { @@ -78,15 +99,24 @@ func TestAggResultsTermOnlyInsert(t *testing.T) { require.Error(t, err) require.Equal(t, 0, size) + require.Equal(t, 0, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) + dInvalid = genDoc("", "foo") size, err = res.AddDocuments([]doc.Document{dInvalid}) require.Error(t, err) require.Equal(t, 0, size) + require.Equal(t, 0, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) + valid := genDoc("foo", "") size, err = res.AddDocuments([]doc.Document{valid}) require.NoError(t, err) require.Equal(t, 1, size) + + require.Equal(t, 1, res.Size()) + require.Equal(t, 3, res.TotalDocsCount()) } func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) { @@ -95,9 +125,15 @@ func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) { require.NoError(t, err) require.Equal(t, 1, size) + require.Equal(t, 1, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) + size, err = res.AddDocuments([]doc.Document{dValid}) require.NoError(t, err) require.Equal(t, 1, size) + + require.Equal(t, 1, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) } func TestAggResultsInsertIdempotency(t *testing.T) { diff --git a/src/dbnode/storage/index/results.go b/src/dbnode/storage/index/results.go index 48a35d2e9c..254083a8b0 100644 --- a/src/dbnode/storage/index/results.go +++ b/src/dbnode/storage/index/results.go @@ -40,7 +40,8 @@ type results struct { nsID ident.ID opts QueryResultsOptions - resultsMap *ResultsMap + resultsMap *ResultsMap + totalDocsCount int idPool ident.Pool bytesPool pool.CheckedBytesPool @@ -88,6 +89,7 @@ func (r *results) Reset(nsID ident.ID, opts QueryResultsOptions) { // Reset all keys in the map next, this will finalize the keys. r.resultsMap.Reset() + r.totalDocsCount = 0 // NB: could do keys+value in one step but I'm trying to avoid // using an internal method of a code-gen'd type. @@ -101,6 +103,7 @@ func (r *results) AddDocuments(batch []doc.Document) (int, error) { r.Lock() err := r.addDocumentsBatchWithLock(batch) size := r.resultsMap.Len() + r.totalDocsCount += len(batch) r.Unlock() return size, err } @@ -173,6 +176,13 @@ func (r *results) Size() int { return v } +func (r *results) TotalDocsCount() int { + r.RLock() + count := r.totalDocsCount + r.RUnlock() + return count +} + func (r *results) Finalize() { // Reset locks so cannot hold onto lock for call to Finalize. r.Reset(nil, QueryResultsOptions{}) diff --git a/src/dbnode/storage/index/results_test.go b/src/dbnode/storage/index/results_test.go index 7ed6d4c17a..0b962c930f 100644 --- a/src/dbnode/storage/index/results_test.go +++ b/src/dbnode/storage/index/results_test.go @@ -59,6 +59,9 @@ func TestResultsInsertInvalid(t *testing.T) { size, err := res.AddDocuments([]doc.Document{dInvalid}) require.Error(t, err) require.Equal(t, 0, size) + + require.Equal(t, 0, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) } func TestResultsInsertIdempotency(t *testing.T) { @@ -68,9 +71,27 @@ func TestResultsInsertIdempotency(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, size) + require.Equal(t, 1, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) + size, err = res.AddDocuments([]doc.Document{dValid}) require.NoError(t, err) require.Equal(t, 1, size) + + require.Equal(t, 1, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) +} + +func TestResultsInsertBatchOfTwo(t *testing.T) { + res := NewQueryResults(nil, QueryResultsOptions{}, testOpts) + d1 := doc.Document{ID: []byte("d1")} + d2 := doc.Document{ID: []byte("d2")} + size, err := res.AddDocuments([]doc.Document{d1, d2}) + require.NoError(t, err) + require.Equal(t, 2, size) + + require.Equal(t, 2, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) } func TestResultsFirstInsertWins(t *testing.T) { @@ -80,6 +101,9 @@ func TestResultsFirstInsertWins(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, size) + require.Equal(t, 1, res.Size()) + require.Equal(t, 1, res.TotalDocsCount()) + tags, ok := res.Map().Get(ident.StringID("abc")) require.True(t, ok) require.Equal(t, 0, tags.Remaining()) @@ -92,6 +116,9 @@ func TestResultsFirstInsertWins(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, size) + require.Equal(t, 1, res.Size()) + require.Equal(t, 2, res.TotalDocsCount()) + tags, ok = res.Map().Get(ident.StringID("abc")) require.True(t, ok) require.Equal(t, 0, tags.Remaining()) @@ -171,6 +198,7 @@ func TestResultsReset(t *testing.T) { require.False(t, ok) require.Equal(t, 0, tags.Remaining()) require.Equal(t, 0, res.Size()) + require.Equal(t, 0, res.TotalDocsCount()) } func TestResultsResetNamespaceClones(t *testing.T) { @@ -206,6 +234,7 @@ func TestFinalize(t *testing.T) { tags, ok = res.Map().Get(ident.StringID("abc")) require.False(t, ok) require.Equal(t, 0, res.Size()) + require.Equal(t, 0, res.TotalDocsCount()) for _, entry := range res.Map().Iter() { id, _ := entry.Key(), entry.Value() diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 2d0863585a..08ebe49ed3 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -126,6 +126,9 @@ type BaseResults interface { // Size returns the number of IDs tracked. Size() int + // TotalDocsCount returns the total number of documents observed. + TotalDocsCount() int + // AddDocuments adds the batch of documents to the results set, it will // take a copy of the bytes backing the documents so the original can be // modified after this function returns without affecting the results map. From 2edc85daaaec0ac030e4c95ae1ad84a8b5bc4219 Mon Sep 17 00:00:00 2001 From: linasm Date: Thu, 4 Jun 2020 16:11:33 +0300 Subject: [PATCH 4/4] Regenerate mocks --- src/dbnode/storage/index/index_mock.go | 42 ++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index 10485f55d2..e7577a93dd 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -100,6 +100,20 @@ func (mr *MockBaseResultsMockRecorder) Size() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockBaseResults)(nil).Size)) } +// TotalDocsCount mocks base method +func (m *MockBaseResults) TotalDocsCount() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TotalDocsCount") + ret0, _ := ret[0].(int) + return ret0 +} + +// TotalDocsCount indicates an expected call of TotalDocsCount +func (mr *MockBaseResultsMockRecorder) TotalDocsCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockBaseResults)(nil).TotalDocsCount)) +} + // AddDocuments mocks base method func (m *MockBaseResults) AddDocuments(batch []doc.Document) (int, error) { m.ctrl.T.Helper() @@ -178,6 +192,20 @@ func (mr *MockQueryResultsMockRecorder) Size() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockQueryResults)(nil).Size)) } +// TotalDocsCount mocks base method +func (m *MockQueryResults) TotalDocsCount() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TotalDocsCount") + ret0, _ := ret[0].(int) + return ret0 +} + +// TotalDocsCount indicates an expected call of TotalDocsCount +func (mr *MockQueryResultsMockRecorder) TotalDocsCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockQueryResults)(nil).TotalDocsCount)) +} + // AddDocuments mocks base method func (m *MockQueryResults) AddDocuments(batch []doc.Document) (int, error) { m.ctrl.T.Helper() @@ -343,6 +371,20 @@ func (mr *MockAggregateResultsMockRecorder) Size() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockAggregateResults)(nil).Size)) } +// TotalDocsCount mocks base method +func (m *MockAggregateResults) TotalDocsCount() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TotalDocsCount") + ret0, _ := ret[0].(int) + return ret0 +} + +// TotalDocsCount indicates an expected call of TotalDocsCount +func (mr *MockAggregateResultsMockRecorder) TotalDocsCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TotalDocsCount", reflect.TypeOf((*MockAggregateResults)(nil).TotalDocsCount)) +} + // AddDocuments mocks base method func (m *MockAggregateResults) AddDocuments(batch []doc.Document) (int, error) { m.ctrl.T.Helper()