Skip to content

Commit

Permalink
[index] Aggregating results on storage side (#1463)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Mar 26, 2019
1 parent b27738b commit 5e4d4f6
Show file tree
Hide file tree
Showing 34 changed files with 2,272 additions and 214 deletions.
55 changes: 42 additions & 13 deletions src/dbnode/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ genny-all: genny-map-all genny-arraypool-all genny-leakcheckpool-all

# Map generation rule for all generated maps
.PHONY: genny-map-all
genny-map-all: \
genny-map-client-received-blocks \
genny-map-storage-block-retriever \
genny-map-storage-bootstrap-result \
genny-map-storage \
genny-map-storage-namespace-metadata \
genny-map-storage-repair \
genny-map-storage-index-results \
genny-map-all: \
genny-map-client-received-blocks \
genny-map-storage-block-retriever \
genny-map-storage-bootstrap-result \
genny-map-storage \
genny-map-storage-namespace-metadata \
genny-map-storage-repair \
genny-map-storage-index-results \
genny-map-storage-index-aggregate-values \
genny-map-storage-index-aggregation-results \
genny-map-storage-bootstrap-bootstrapper-commitlog

# Map generation rule for client/receivedBlocksMap
Expand Down Expand Up @@ -131,11 +133,11 @@ genny-map-storage-repair: install-m3x-repo
# Map generation rule for storage/bootstrap/bootstrapper/commitlog
.PHONY: genny-map-storage-bootstrap-bootstrapper-commitlog
genny-map-storage-bootstrap-bootstrapper-commitlog: install-m3x-repo
cd $(m3x_package_path) && make idhashmap-gen \
pkg=commitlog \
value_type=metadataAndEncodersByTime \
cd $(m3x_package_path) && make idhashmap-gen \
pkg=commitlog \
value_type=metadataAndEncodersByTime \
target_package=$(m3db_package)/src/dbnode/storage/bootstrap/bootstrapper/commitlog \
rename_constructor=newMetadataAndEncodersByTimeMap \
rename_constructor=newMetadataAndEncodersByTimeMap \
rename_constructor_options=newMetadataAndEncodersByTimeMapOptions
# Rename both generated map and constructor files
mv -f $(m3db_package_path)/src/dbnode/storage/bootstrap/bootstrapper/commitlog/map_gen.go $(m3db_package_path)/src/dbnode/storage/bootstrap/bootstrapper/commitlog/metadata_and_encoders_by_time_map_gen.go
Expand All @@ -155,6 +157,33 @@ genny-map-storage-index-results: install-m3x-repo
# Rename generated map file
mv -f $(m3db_package_path)/src/dbnode/storage/index/map_gen.go $(m3db_package_path)/src/dbnode/storage/index/results_map_gen.go

# Map generation rule for storage/index/AggregateValuesMap
.PHONY: genny-map-storage-index-aggregate-values
genny-map-storage-index-aggregate-values: install-m3x-repo
cd $(m3x_package_path) && make hashmap-gen \
pkg=index \
key_type=ident.ID \
value_type=struct{} \
rename_type_prefix=AggregateValues \
rename_nogen_key=true \
rename_nogen_value=true \
target_package=$(m3db_package)/src/dbnode/storage/index
# Rename generated map file
mv -f $(m3db_package_path)/src/dbnode/storage/index/map_gen.go $(m3db_package_path)/src/dbnode/storage/index/aggregate_values_map_gen.go

# Map generation rule for storage/index/AggregateResultsMap
.PHONY: genny-map-storage-index-aggregation-results
genny-map-storage-index-aggregation-results: install-m3x-repo genny-map-storage-index-aggregate-values
cd $(m3x_package_path) && make idhashmap-gen \
pkg=index \
value_type=AggregateValues \
rename_type_prefix=AggregateResults \
target_package=$(m3db_package)/src/dbnode/storage/index
# Rename generated map file
mv -f $(m3db_package_path)/src/dbnode/storage/index/map_gen.go $(m3db_package_path)/src/dbnode/storage/index/aggregate_results_map_gen.go
# This map has a custom constructor; delete the genny generated one
rm -f $(m3db_package_path)/src/dbnode/storage/index/new_map_gen.go

# generation rule for all generated arraypools
.PHONY: genny-arraypool-all
genny-arraypool-all: genny-arraypool-node-segments
Expand All @@ -173,7 +202,7 @@ genny-arraypool-node-segments: install-m3x-repo

# generation rule for all generated leakcheckpools
.PHONY: genny-leakcheckpool-all
genny-leakcheckpool-all: genny-leakcheckpool-fetch-tagged-attempt \
genny-leakcheckpool-all: genny-leakcheckpool-fetch-tagged-attempt \
genny-leakcheckpool-fetch-state \
genny-leakcheckpool-fetch-tagged-op

Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
//go:generate sh -c "mockgen -package=index $PACKAGE/src/dbnode/storage/index Results,Block,OnIndexSeries | genclean -pkg $PACKAGE/src/dbnode/storage/index -out $GOPATH/src/$PACKAGE/src/dbnode/storage/index/index_mock.go"
//go:generate sh -c "mockgen -package=index $PACKAGE/src/dbnode/storage/index QueryResults,AggregateResults,Block,OnIndexSeries | genclean -pkg $PACKAGE/src/dbnode/storage/index -out $GOPATH/src/$PACKAGE/src/dbnode/storage/index/index_mock.go"

// mockgen rules for generating mocks for unexported interfaces (file mode)
//go:generate sh -c "mockgen -package=encoding -destination=$GOPATH/src/$PACKAGE/src/dbnode/encoding/encoding_mock.go -source=$GOPATH/src/$PACKAGE/src/dbnode/encoding/types.go"
Expand Down
26 changes: 13 additions & 13 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestServiceQuery(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

resMap := index.NewResults(ident.StringID(nsID),
index.ResultsOptions{}, testIndexOptions)
resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTags(
ident.StringTag(tags["foo"][0].name, tags["foo"][0].value),
ident.StringTag(tags["foo"][1].name, tags["foo"][1].value),
Expand All @@ -210,7 +210,7 @@ func TestServiceQuery(t *testing.T) {
StartInclusive: start,
EndExclusive: end,
Limit: 10,
}).Return(index.QueryResults{Results: resMap, Exhaustive: true}, nil)
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

limit := int64(10)
r, err := service.Query(tctx, &rpc.QueryRequest{
Expand Down Expand Up @@ -329,7 +329,7 @@ func TestServiceQueryUnknownErr(t *testing.T) {
StartInclusive: start,
EndExclusive: end,
Limit: 10,
}).Return(index.QueryResults{}, unknownErr)
}).Return(index.QueryResult{}, unknownErr)

limit := int64(10)
_, err = service.Query(tctx, &rpc.QueryRequest{
Expand Down Expand Up @@ -1064,8 +1064,8 @@ func TestServiceFetchTagged(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

resMap := index.NewResults(ident.StringID(nsID),
index.ResultsOptions{}, testIndexOptions)
resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
Expand All @@ -1083,7 +1083,7 @@ func TestServiceFetchTagged(t *testing.T) {
StartInclusive: start,
EndExclusive: end,
Limit: 10,
}).Return(index.QueryResults{Results: resMap, Exhaustive: true}, nil)
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS)
require.NoError(t, err)
Expand Down Expand Up @@ -1161,8 +1161,8 @@ func TestServiceFetchTaggedIsOverloaded(t *testing.T) {
req, err := idx.NewRegexpQuery([]byte("foo"), []byte("b.*"))
require.NoError(t, err)

resMap := index.NewResults(ident.StringID(nsID),
index.ResultsOptions{}, testIndexOptions)
resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.NewTags(
ident.StringTag("foo", "bar"),
ident.StringTag("baz", "dxk"),
Expand Down Expand Up @@ -1214,8 +1214,8 @@ func TestServiceFetchTaggedNoData(t *testing.T) {
require.NoError(t, err)
qry := index.Query{Query: req}

resMap := index.NewResults(ident.StringID(nsID),
index.ResultsOptions{}, testIndexOptions)
resMap := index.NewQueryResults(ident.StringID(nsID),
index.QueryResultsOptions{}, testIndexOptions)
resMap.Map().Set(ident.StringID("foo"), ident.Tags{})
resMap.Map().Set(ident.StringID("bar"), ident.Tags{})
mockDB.EXPECT().QueryIDs(
Expand All @@ -1226,7 +1226,7 @@ func TestServiceFetchTaggedNoData(t *testing.T) {
StartInclusive: start,
EndExclusive: end,
Limit: 10,
}).Return(index.QueryResults{Results: resMap, Exhaustive: true}, nil)
}).Return(index.QueryResult{Results: resMap, Exhaustive: true}, nil)

startNanos, err := convert.ToValue(start, rpc.TimeType_UNIX_NANOSECONDS)
require.NoError(t, err)
Expand Down Expand Up @@ -1299,7 +1299,7 @@ func TestServiceFetchTaggedErrs(t *testing.T) {
StartInclusive: start,
EndExclusive: end,
Limit: 10,
}).Return(index.QueryResults{}, fmt.Errorf("random err"))
}).Return(index.QueryResult{}, fmt.Errorf("random err"))
_, err = service.FetchTagged(tctx, &rpc.FetchTaggedRequest{
NameSpace: []byte(nsID),
Query: data,
Expand Down
18 changes: 13 additions & 5 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,8 +1201,10 @@ func withEncodingAndPoolingOptions(
postingsListOpts := poolOptions(policy.PostingsListPool, scope.SubScope("postingslist-pool"))
postingsList := postings.NewPool(postingsListOpts, roaring.NewPostingsList)

resultsPool := index.NewResultsPool(
poolOptions(policy.IndexResultsPool, scope.SubScope("index-results-pool")))
queryResultsPool := index.NewQueryResultsPool(
poolOptions(policy.IndexResultsPool, scope.SubScope("index-query-results-pool")))
aggregateQueryResultsPool := index.NewAggregateResultsPool(
poolOptions(policy.IndexResultsPool, scope.SubScope("index-aggregate-results-pool")))

indexOpts := opts.IndexOptions().
SetInstrumentOptions(iopts).
Expand All @@ -1219,12 +1221,18 @@ func withEncodingAndPoolingOptions(
SetPostingsListPool(postingsList)).
SetIdentifierPool(identifierPool).
SetCheckedBytesPool(bytesPool).
SetResultsPool(resultsPool)
SetQueryResultsPool(queryResultsPool).
SetAggregateResultsPool(aggregateQueryResultsPool)

resultsPool.Init(func() index.Results {
queryResultsPool.Init(func() index.QueryResults {
// NB(r): Need to initialize after setting the index opts so
// it sees the same reference of the options as is set for the DB.
return index.NewResults(nil, index.ResultsOptions{}, indexOpts)
return index.NewQueryResults(nil, index.QueryResultsOptions{}, indexOpts)
})
aggregateQueryResultsPool.Init(func() index.AggregateResults {
// NB(r): Need to initialize after setting the index opts so
// it sees the same reference of the options as is set for the DB.
return index.NewAggregateResults(nil, index.AggregateResultsOptions{}, indexOpts)
})

return opts.SetIndexOptions(indexOpts)
Expand Down
20 changes: 18 additions & 2 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,16 +707,32 @@ func (d *db) QueryIDs(
namespace ident.ID,
query index.Query,
opts index.QueryOptions,
) (index.QueryResults, error) {
) (index.QueryResult, error) {
n, err := d.namespaceFor(namespace)
if err != nil {
d.metrics.unknownNamespaceQueryIDs.Inc(1)
return index.QueryResults{}, err
return index.QueryResult{}, err
}

return n.QueryIDs(ctx, query, opts)
}

func (d *db) AggregateQuery(
ctx context.Context,
namespace ident.ID,
query index.Query,
opts index.QueryOptions,
aggResultOpts index.AggregateResultsOptions,
) (index.AggregateQueryResult, error) {
n, err := d.namespaceFor(namespace)
if err != nil {
d.metrics.unknownNamespaceQueryIDs.Inc(1)
return index.AggregateQueryResult{}, err
}

return n.AggregateQuery(ctx, query, opts, aggResultOpts)
}

func (d *db) ReadEncoded(
ctx context.Context,
namespace ident.ID,
Expand Down
19 changes: 15 additions & 4 deletions src/dbnode/storage/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,12 @@ func testDatabaseNamespaceIndexFunctions(t *testing.T, commitlogEnabled bool) {
1.0, xtime.Second, nil))

var (
q = index.Query{}
opts = index.QueryOptions{}
res = index.QueryResults{}
err error
q = index.Query{}
opts = index.QueryOptions{}
res = index.QueryResult{}
aggOpts = index.AggregateResultsOptions{}
aggRes = index.AggregateQueryResult{}
err error
)

ns.EXPECT().QueryIDs(ctx, q, opts).Return(res, nil)
Expand All @@ -715,6 +717,15 @@ func testDatabaseNamespaceIndexFunctions(t *testing.T, commitlogEnabled bool) {
_, err = d.QueryIDs(ctx, ident.StringID("testns"), q, opts)
require.Error(t, err)

ns.EXPECT().AggregateQuery(ctx, q, opts, aggOpts).Return(aggRes, nil)
_, err = d.AggregateQuery(ctx, ident.StringID("testns"), q, opts, aggOpts)
require.NoError(t, err)

ns.EXPECT().AggregateQuery(ctx, q, opts, aggOpts).
Return(aggRes, fmt.Errorf("random err"))
_, err = d.AggregateQuery(ctx, ident.StringID("testns"), q, opts, aggOpts)
require.Error(t, err)

ns.EXPECT().Close().Return(nil)

// Ensure commitlog is set before closing because this will call commitlog.Close()
Expand Down
Loading

0 comments on commit 5e4d4f6

Please sign in to comment.