Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Ensure correct block order on reload #335

Merged
merged 1 commit into from
May 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)

b := createEmptyBlock(t, tmpdir)
b := createEmptyBlock(t, tmpdir, &BlockMeta{Version: 2})

testutil.Equals(t, false, b.meta.Compaction.Failed)
testutil.Ok(t, b.setCompactionFailed())
Expand All @@ -55,10 +55,11 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Equals(t, true, b.meta.Compaction.Failed)
}

func createEmptyBlock(t *testing.T, dir string) *Block {
// createEmpty block creates a block with the given meta but without any data.
func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
testutil.Ok(t, os.MkdirAll(dir, 0777))

testutil.Ok(t, writeMetaFile(dir, &BlockMeta{Version: 2}))
testutil.Ok(t, writeMetaFile(dir, meta))

ir, err := index.NewWriter(filepath.Join(dir, indexFilename))
testutil.Ok(t, err)
Expand Down
3 changes: 3 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,9 @@ func (db *DB) reload(deleteable ...string) (err error) {
blocks = append(blocks, b)
exist[meta.ULID] = struct{}{}
}
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
})

if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")
Expand Down
27 changes: 27 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"

"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
Expand Down Expand Up @@ -63,6 +65,31 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam
return result
}

// Ensure that blocks are held in memory in their time order
// and not in ULID order as they are read from the directory.
func TestDB_reloadOrder(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()

metas := []*BlockMeta{
{ULID: ulid.MustNew(100, nil), MinTime: 90, MaxTime: 100},
{ULID: ulid.MustNew(200, nil), MinTime: 70, MaxTime: 80},
{ULID: ulid.MustNew(300, nil), MinTime: 100, MaxTime: 110},
}
for _, m := range metas {
bdir := filepath.Join(db.Dir(), m.ULID.String())
createEmptyBlock(t, bdir, m)
}

testutil.Ok(t, db.reload())
blocks := db.Blocks()
testutil.Equals(t, 3, len(blocks))
testutil.Equals(t, *metas[1], blocks[0].Meta())
testutil.Equals(t, *metas[0], blocks[1].Meta())
testutil.Equals(t, *metas[2], blocks[2].Meta())
}

func TestDataAvailableOnlyAfterCommit(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
Expand Down