Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Report a histogram of loaded docs per query #2381

Merged
merged 7 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,8 @@ func (i *nsIndex) queryWithSpan(
}
}

i.metrics.loadedDocsPerQuery.RecordValue(float64(results.TotalDocsCount()))

state.Lock()
// Take reference to vars to return while locked.
exhaustive := state.exhaustive
Expand Down Expand Up @@ -1844,6 +1846,8 @@ type nsIndexMetrics struct {
insertEndToEndLatency tally.Timer
blocksEvictedMutableSegments tally.Counter
blockMetrics nsIndexBlocksMetrics

loadedDocsPerQuery tally.Histogram
}

func newNamespaceIndexMetrics(
Expand Down Expand Up @@ -1889,6 +1893,10 @@ func newNamespaceIndexMetrics(
"insert-end-to-end-latency", iopts.TimerOptions()),
blocksEvictedMutableSegments: scope.Counter("blocks-evicted-mutable-segments"),
blockMetrics: newNamespaceIndexBlocksMetrics(opts, blocksScope),
loadedDocsPerQuery: scope.Histogram(
"loaded-docs-per-query",
tally.MustMakeExponentialValueBuckets(10, 2, 16),
),
}
}

Expand Down
12 changes: 11 additions & 1 deletion src/dbnode/storage/index/aggregate_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type aggregatedResults struct {
nsID ident.ID
aggregateOpts AggregateResultsOptions

resultsMap *AggregateResultsMap
resultsMap *AggregateResultsMap
totalDocsCount int

idPool ident.Pool
bytesPool pool.CheckedBytesPool
Expand Down Expand Up @@ -94,6 +95,7 @@ func (r *aggregatedResults) Reset(

// reset all keys in the map next
r.resultsMap.Reset()
r.totalDocsCount = 0

// NB: could do keys+value in one step but I'm trying to avoid
// using an internal method of a code-gen'd type.
Expand All @@ -104,6 +106,7 @@ func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, error) {
r.Lock()
err := r.addDocumentsBatchWithLock(batch)
size := r.resultsMap.Len()
r.totalDocsCount += len(batch)
r.Unlock()
return size, err
}
Expand Down Expand Up @@ -298,6 +301,13 @@ func (r *aggregatedResults) Size() int {
return l
}

func (r *aggregatedResults) TotalDocsCount() int {
r.RLock()
count := r.totalDocsCount
r.RUnlock()
return count
}

func (r *aggregatedResults) Finalize() {
r.Reset(nil, AggregateResultsOptions{})
if r.pool == nil {
Expand Down
36 changes: 36 additions & 0 deletions src/dbnode/storage/index/aggregate_results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,16 @@ func TestAggResultsInsertInvalid(t *testing.T) {
require.Error(t, err)
require.Equal(t, 0, size)

require.Equal(t, 0, res.Size())
require.Equal(t, 1, res.TotalDocsCount())

dInvalid = genDoc("", "foo")
size, err = res.AddDocuments([]doc.Document{dInvalid})
require.Error(t, err)
require.Equal(t, 0, size)

require.Equal(t, 0, res.Size())
require.Equal(t, 2, res.TotalDocsCount())
}

func TestAggResultsInsertEmptyTermValue(t *testing.T) {
Expand All @@ -67,6 +73,21 @@ func TestAggResultsInsertEmptyTermValue(t *testing.T) {
size, err := res.AddDocuments([]doc.Document{dValidEmptyTerm})
require.NoError(t, err)
require.Equal(t, 1, size)

require.Equal(t, 1, res.Size())
require.Equal(t, 1, res.TotalDocsCount())
}

func TestAggResultsInsertBatchOfTwo(t *testing.T) {
res := NewAggregateResults(nil, AggregateResultsOptions{}, testOpts)
d1 := genDoc("d1", "")
d2 := genDoc("d2", "")
size, err := res.AddDocuments([]doc.Document{d1, d2})
require.NoError(t, err)
require.Equal(t, 2, size)

require.Equal(t, 2, res.Size())
require.Equal(t, 2, res.TotalDocsCount())
}

func TestAggResultsTermOnlyInsert(t *testing.T) {
Expand All @@ -78,15 +99,24 @@ func TestAggResultsTermOnlyInsert(t *testing.T) {
require.Error(t, err)
require.Equal(t, 0, size)

require.Equal(t, 0, res.Size())
require.Equal(t, 1, res.TotalDocsCount())

dInvalid = genDoc("", "foo")
size, err = res.AddDocuments([]doc.Document{dInvalid})
require.Error(t, err)
require.Equal(t, 0, size)

require.Equal(t, 0, res.Size())
require.Equal(t, 2, res.TotalDocsCount())

valid := genDoc("foo", "")
size, err = res.AddDocuments([]doc.Document{valid})
require.NoError(t, err)
require.Equal(t, 1, size)

require.Equal(t, 1, res.Size())
require.Equal(t, 3, res.TotalDocsCount())
}

func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) {
Expand All @@ -95,9 +125,15 @@ func testAggResultsInsertIdempotency(t *testing.T, res AggregateResults) {
require.NoError(t, err)
require.Equal(t, 1, size)

require.Equal(t, 1, res.Size())
require.Equal(t, 1, res.TotalDocsCount())

size, err = res.AddDocuments([]doc.Document{dValid})
require.NoError(t, err)
require.Equal(t, 1, size)

require.Equal(t, 1, res.Size())
require.Equal(t, 2, res.TotalDocsCount())
}

func TestAggResultsInsertIdempotency(t *testing.T) {
Expand Down
42 changes: 42 additions & 0 deletions src/dbnode/storage/index/index_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion src/dbnode/storage/index/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type results struct {
nsID ident.ID
opts QueryResultsOptions

resultsMap *ResultsMap
resultsMap *ResultsMap
totalDocsCount int

idPool ident.Pool
bytesPool pool.CheckedBytesPool
Expand Down Expand Up @@ -88,6 +89,7 @@ func (r *results) Reset(nsID ident.ID, opts QueryResultsOptions) {

// Reset all keys in the map next, this will finalize the keys.
r.resultsMap.Reset()
r.totalDocsCount = 0

// NB: could do keys+value in one step but I'm trying to avoid
// using an internal method of a code-gen'd type.
Expand All @@ -101,6 +103,7 @@ func (r *results) AddDocuments(batch []doc.Document) (int, error) {
r.Lock()
err := r.addDocumentsBatchWithLock(batch)
size := r.resultsMap.Len()
r.totalDocsCount += len(batch)
r.Unlock()
return size, err
}
Expand Down Expand Up @@ -173,6 +176,13 @@ func (r *results) Size() int {
return v
}

func (r *results) TotalDocsCount() int {
r.RLock()
count := r.totalDocsCount
r.RUnlock()
return count
}

func (r *results) Finalize() {
// Reset locks so cannot hold onto lock for call to Finalize.
r.Reset(nil, QueryResultsOptions{})
Expand Down
29 changes: 29 additions & 0 deletions src/dbnode/storage/index/results_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func TestResultsInsertInvalid(t *testing.T) {
size, err := res.AddDocuments([]doc.Document{dInvalid})
require.Error(t, err)
require.Equal(t, 0, size)

require.Equal(t, 0, res.Size())
require.Equal(t, 1, res.TotalDocsCount())
}

func TestResultsInsertIdempotency(t *testing.T) {
Expand All @@ -68,9 +71,27 @@ func TestResultsInsertIdempotency(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, size)

require.Equal(t, 1, res.Size())
require.Equal(t, 1, res.TotalDocsCount())

size, err = res.AddDocuments([]doc.Document{dValid})
require.NoError(t, err)
require.Equal(t, 1, size)

require.Equal(t, 1, res.Size())
require.Equal(t, 2, res.TotalDocsCount())
}

func TestResultsInsertBatchOfTwo(t *testing.T) {
res := NewQueryResults(nil, QueryResultsOptions{}, testOpts)
d1 := doc.Document{ID: []byte("d1")}
d2 := doc.Document{ID: []byte("d2")}
size, err := res.AddDocuments([]doc.Document{d1, d2})
require.NoError(t, err)
require.Equal(t, 2, size)

require.Equal(t, 2, res.Size())
require.Equal(t, 2, res.TotalDocsCount())
}

func TestResultsFirstInsertWins(t *testing.T) {
Expand All @@ -80,6 +101,9 @@ func TestResultsFirstInsertWins(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, size)

require.Equal(t, 1, res.Size())
require.Equal(t, 1, res.TotalDocsCount())

tags, ok := res.Map().Get(ident.StringID("abc"))
require.True(t, ok)
require.Equal(t, 0, tags.Remaining())
Expand All @@ -92,6 +116,9 @@ func TestResultsFirstInsertWins(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, size)

require.Equal(t, 1, res.Size())
require.Equal(t, 2, res.TotalDocsCount())

tags, ok = res.Map().Get(ident.StringID("abc"))
require.True(t, ok)
require.Equal(t, 0, tags.Remaining())
Expand Down Expand Up @@ -171,6 +198,7 @@ func TestResultsReset(t *testing.T) {
require.False(t, ok)
require.Equal(t, 0, tags.Remaining())
require.Equal(t, 0, res.Size())
require.Equal(t, 0, res.TotalDocsCount())
}

func TestResultsResetNamespaceClones(t *testing.T) {
Expand Down Expand Up @@ -206,6 +234,7 @@ func TestFinalize(t *testing.T) {
tags, ok = res.Map().Get(ident.StringID("abc"))
require.False(t, ok)
require.Equal(t, 0, res.Size())
require.Equal(t, 0, res.TotalDocsCount())

for _, entry := range res.Map().Iter() {
id, _ := entry.Key(), entry.Value()
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/storage/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type BaseResults interface {
// Size returns the number of IDs tracked.
Size() int

// TotalDocsCount returns the total number of documents observed.
TotalDocsCount() int

// AddDocuments adds the batch of documents to the results set, it will
// take a copy of the bytes backing the documents so the original can be
// modified after this function returns without affecting the results map.
Expand Down