Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run multiple queries per table at once with boltdb-shipper #2656

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func (t *Table) Close() {
t.dbs = map[string]*downloadedFile{}
}

// Queries all the dbs for index.
func (t *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
// MultiQueries runs multiple queries without having to take lock multiple times for each query.
func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
// let us check if table is ready for use while also honoring the context timeout
select {
case <-ctx.Done():
Expand All @@ -216,10 +216,18 @@ func (t *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chun

t.lastUsedAt = time.Now()

for _, db := range t.dbs {
if err := t.boltDBIndexClient.QueryDB(ctx, db.boltdb, query, callback); err != nil {
return err
log, ctx := spanlogger.New(ctx, "Shipper.Downloads.Table.MultiQueries")
defer log.Span.Finish()

level.Debug(log).Log("table-name", t.name, "query-count", len(queries))

for name, db := range t.dbs {
for _, query := range queries {
if err := t.boltDBIndexClient.QueryDB(ctx, db.boltdb, query, callback); err != nil {
return err
}
}
level.Debug(log).Log("queried-db", name)
}

return nil
Expand Down
24 changes: 17 additions & 7 deletions pkg/storage/stores/shipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/storage/stores/shipper/util"
)

const cacheCleanupInterval = time.Hour
Expand Down Expand Up @@ -102,27 +104,35 @@ func (tm *TableManager) Stop() {
}

func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
return chunk_util.DoParallelQueries(ctx, tm.query, queries, callback)
queriesByTable := util.QueriesByTable(queries)
for tableName, queries := range queriesByTable {
err := tm.query(ctx, tableName, queries, callback)
if err != nil {
return err
}
}

return nil
}

func (tm *TableManager) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
log, ctx := spanlogger.New(ctx, "Shipper.Downloads.Query")
defer log.Span.Finish()

level.Debug(log).Log("table-name", query.TableName)
level.Debug(log).Log("table-name", tableName)

table := tm.getOrCreateTable(ctx, query.TableName)
table := tm.getOrCreateTable(ctx, tableName)

err := table.Query(ctx, query, callback)
err := util.DoParallelQueries(ctx, table, queries, callback)
if err != nil {
if table.Err() != nil {
// table is in invalid state, remove the table so that next queries re-create it.
tm.tablesMtx.Lock()
defer tm.tablesMtx.Unlock()

level.Error(pkg_util.Logger).Log("msg", fmt.Sprintf("table %s has some problem, cleaning it up", query.TableName), "err", table.Err())
level.Error(pkg_util.Logger).Log("msg", fmt.Sprintf("table %s has some problem, cleaning it up", tableName), "err", table.Err())

delete(tm.tables, query.TableName)
delete(tm.tables, tableName)
return table.Err()
}
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/storage/stores/shipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -58,7 +59,7 @@ func buildTestTable(t *testing.T, tableName, path string) (*Table, *local.BoltIn
}
}

func TestTable_Query(t *testing.T) {
func TestTable_MultiQueries(t *testing.T) {
tempDir, err := ioutil.TempDir("", "table-writes")
require.NoError(t, err)

Expand Down Expand Up @@ -90,7 +91,14 @@ func TestTable_Query(t *testing.T) {
stopFunc()
}()

testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 30)
// build queries each looking for specific value from all the dbs
var queries []chunk.IndexQuery
for i := 5; i < 25; i++ {
queries = append(queries, chunk.IndexQuery{ValueEqual: []byte(strconv.Itoa(i))})
}

// query the loaded table to see if it has right data.
testutil.TestSingleTableQuery(t, queries, table, 5, 20)
}

func TestTable_Sync(t *testing.T) {
Expand Down Expand Up @@ -136,7 +144,7 @@ func TestTable_Sync(t *testing.T) {
}()

// query table to see it has expected records setup
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 30)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, 30)

// add a sleep since we are updating a file and CI is sometimes too fast to create a difference in mtime of files
time.Sleep(time.Second)
Expand All @@ -150,7 +158,7 @@ func TestTable_Sync(t *testing.T) {
require.NoError(t, table.Sync(context.Background()))

// query and verify table has expected records from new and updated db and the records from deleted db are gone
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 10, 40)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 10, 40)

// verify files in cache where dbs for the table are synced to double check.
expectedFilesInDir := map[string]struct{}{
Expand Down Expand Up @@ -187,7 +195,7 @@ func TestTable_LastUsedAt(t *testing.T) {
require.InDelta(t, time.Now().Add(-time.Hour).Unix(), table.LastUsedAt().Unix(), 1)

// query the table which should set the last used at to now.
err = table.Query(context.Background(), chunk.IndexQuery{}, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
err = table.MultiQueries(context.Background(), []chunk.IndexQuery{{}}, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
return true
})
require.NoError(t, err)
Expand Down Expand Up @@ -226,7 +234,7 @@ func TestTable_doParallelDownload(t *testing.T) {

// ensure that we have `tc` number of files downloaded and opened.
require.Len(t, table.dbs, tc)
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, tc*10)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, tc*10)
})
}
}
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ func AddRecordsToBatch(batch chunk.WriteBatch, tableName string, start, numRecor
}

type SingleTableQuerier interface {
Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error
MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error
}

func TestSingleQuery(t *testing.T, query chunk.IndexQuery, querier SingleTableQuerier, start, numRecords int) {
func TestSingleTableQuery(t *testing.T, queries []chunk.IndexQuery, querier SingleTableQuerier, start, numRecords int) {
minValue := start
maxValue := start + numRecords
fetchedRecords := make(map[string]string)

err := querier.Query(context.Background(), query, makeTestCallback(t, minValue, maxValue, fetchedRecords))
err := querier.MultiQueries(context.Background(), queries, makeTestCallback(t, minValue, maxValue, fetchedRecords))

require.NoError(t, err)
require.Len(t, fetchedRecords, numRecords)
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ func newTableWithDBs(dbs map[string]*bbolt.DB, path, uploader string, storageCli
}, nil
}

// Query serves the index by querying all the open dbs.
func (lt *Table) Query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
// MultiQueries runs multiple queries without having to take lock multiple times for each query.
func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
lt.dbsMtx.RLock()
defer lt.dbsMtx.RUnlock()

for _, db := range lt.dbs {
if err := lt.boltdbIndexClient.QueryDB(ctx, db, query, callback); err != nil {
return err
for _, query := range queries {
if err := lt.boltdbIndexClient.QueryDB(ctx, db, query, callback); err != nil {
return err
}
}
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/storage/stores/shipper/uploads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/storage/stores/shipper/util"
)

type Config struct {
Expand Down Expand Up @@ -88,22 +90,30 @@ func (tm *TableManager) Stop() {
}

func (tm *TableManager) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
return chunk_util.DoParallelQueries(ctx, tm.query, queries, callback)
queriesByTable := util.QueriesByTable(queries)
for tableName, queries := range queriesByTable {
err := tm.query(ctx, tableName, queries, callback)
if err != nil {
return err
}
}

return nil
}

func (tm *TableManager) query(ctx context.Context, query chunk.IndexQuery, callback chunk_util.Callback) error {
func (tm *TableManager) query(ctx context.Context, tableName string, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
tm.tablesMtx.RLock()
defer tm.tablesMtx.RUnlock()

log, ctx := spanlogger.New(ctx, "Shipper.Uploads.Query")
defer log.Span.Finish()

table, ok := tm.tables[query.TableName]
table, ok := tm.tables[tableName]
if !ok {
return nil
}

return table.Query(ctx, query, callback)
return util.DoParallelQueries(ctx, table, queries, callback)
}

func (tm *TableManager) BatchWrite(ctx context.Context, batch chunk.WriteBatch) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/stores/shipper/uploads/table_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestLoadTables(t *testing.T) {
require.True(t, !stat.IsDir())

for tableName, expectedIndex := range expectedTables {
testutil.TestSingleQuery(t, chunk.IndexQuery{TableName: tableName}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{TableName: tableName}}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords)
}
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func TestTableManager_BatchWrite(t *testing.T) {
require.Len(t, tm.tables, len(tc))

for tableName, expectedIndex := range tc {
testutil.TestSingleQuery(t, chunk.IndexQuery{TableName: tableName}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{TableName: tableName}}, tm.tables[tableName], expectedIndex.start, expectedIndex.numRecords)
}
}

Expand Down
53 changes: 50 additions & 3 deletions pkg/storage/stores/shipper/uploads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -88,7 +89,7 @@ func TestLoadTable(t *testing.T) {
}()

// query the loaded table to see if it has right data.
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, 20)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, 20)
}

func TestTable_Write(t *testing.T) {
Expand Down Expand Up @@ -147,7 +148,7 @@ func TestTable_Write(t *testing.T) {
require.True(t, ok)

// test that the table has current + previous records
testutil.TestSingleQuery(t, chunk.IndexQuery{}, table, 0, (i+1)*10)
testutil.TestSingleTableQuery(t, []chunk.IndexQuery{{}}, table, 0, (i+1)*10)
testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, db, boltIndexClient, i*10, 10)
})
}
Expand Down Expand Up @@ -439,7 +440,7 @@ func TestTable_ImmutableUploads(t *testing.T) {
dir, err := ioutil.ReadDir(filepath.Join(objectStorageDir, table.name))
require.NoError(t, err)
for _, d := range dir {
os.RemoveAll(filepath.Join(objectStorageDir, table.name, d.Name()))
require.NoError(t, os.RemoveAll(filepath.Join(objectStorageDir, table.name, d.Name())))
}

// force upload of dbs
Expand All @@ -450,3 +451,49 @@ func TestTable_ImmutableUploads(t *testing.T) {
require.NoFileExists(t, filepath.Join(objectStorageDir, table.buildObjectKey(fmt.Sprint(expectedDB))))
}
}

func TestTable_MultiQueries(t *testing.T) {
indexPath, err := ioutil.TempDir("", "table-multi-queries")
require.NoError(t, err)

defer func() {
require.NoError(t, os.RemoveAll(indexPath))
}()

boltDBIndexClient, err := local.NewBoltDBIndexClient(local.BoltDBConfig{Directory: indexPath})
require.NoError(t, err)

defer func() {
boltDBIndexClient.Stop()
}()

// setup some dbs for a table at a path.
tablePath := testutil.SetupDBTablesAtPath(t, "test-table", indexPath, map[string]testutil.DBRecords{
"db1": {
Start: 0,
NumRecords: 10,
},
"db2": {
Start: 10,
NumRecords: 10,
},
}, false)

// try loading the table.
table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient)
require.NoError(t, err)
require.NotNil(t, table)

defer func() {
table.Stop()
}()

// build queries each looking for specific value from all the dbs
var queries []chunk.IndexQuery
for i := 5; i < 15; i++ {
queries = append(queries, chunk.IndexQuery{ValueEqual: []byte(strconv.Itoa(i))})
}

// query the loaded table to see if it has right data.
testutil.TestSingleTableQuery(t, queries, table, 5, 10)
}
50 changes: 50 additions & 0 deletions pkg/storage/stores/shipper/util/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package util

import (
"context"

"github.com/cortexproject/cortex/pkg/chunk"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
"github.com/cortexproject/cortex/pkg/util"
)

const maxQueriesPerGoroutine = 100

type TableQuerier interface {
MultiQueries(ctx context.Context, queries []chunk.IndexQuery, callback chunk_util.Callback) error
}

// QueriesByTable groups and returns queries by tables.
func QueriesByTable(queries []chunk.IndexQuery) map[string][]chunk.IndexQuery {
queriesByTable := make(map[string][]chunk.IndexQuery)
for _, query := range queries {
if _, ok := queriesByTable[query.TableName]; !ok {
queriesByTable[query.TableName] = []chunk.IndexQuery{}
}

queriesByTable[query.TableName] = append(queriesByTable[query.TableName], query)
}

return queriesByTable
}

func DoParallelQueries(ctx context.Context, tableQuerier TableQuerier, queries []chunk.IndexQuery, callback chunk_util.Callback) error {
errs := make(chan error)

for i := 0; i < len(queries); i += maxQueriesPerGoroutine {
q := queries[i:util.Min(i+maxQueriesPerGoroutine, len(queries))]
go func(queries []chunk.IndexQuery) {
errs <- tableQuerier.MultiQueries(ctx, queries, callback)
}(q)
}

var lastErr error
for i := 0; i < len(queries); i += maxQueriesPerGoroutine {
err := <-errs
if err != nil {
lastErr = err
}
}

return lastErr
}
Loading