diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e3c01f2..84e677d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,10 @@ ## master / unreleased + - [FEATURE] New public `Scanner interface` for allowing to build custom CLI tools to repair or handle unrepairable blocks. + - [FEATURE] New `scan` command for the `tsdb cli` tool that used the new tsdb scan interface to run a repair and move all unrepairable blocks out of the data folder to unblock Prometheus at the next startup. - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. ## 0.3.1 -- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. + - [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers. ## 0.3.0 - [CHANGE] `LastCheckpoint()` used to return just the segment name and now it returns the full relative path. diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index e51a4995..28552906 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -31,8 +31,10 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" "gopkg.in/alecthomas/kingpin.v2" ) @@ -48,6 +50,10 @@ func main() { listCmd = cli.Command("ls", "list db blocks") listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool() listPath = listCmd.Arg("db path", "database path (default is "+filepath.Join("benchout", "storage")+")").Default(filepath.Join("benchout", "storage")).String() + scanCmd = cli.Command("scan", "scans the db and promts to remove corrupted blocks") + scanCmdHumanReadable = scanCmd.Flag("human-readable", "print human readable values").Short('h').Bool() + scanPath = scanCmd.Arg("dir", "database path (default is current dir ./)").Default("./").ExistingDir() + logger = level.NewFilter(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), level.AllowError()) ) switch kingpin.MustParse(cli.Parse(os.Args[1:])) { @@ -58,16 +64,201 @@ func main() { samplesFile: *benchSamplesFile, } wb.run() + case listCmd.FullCommand(): db, err := tsdb.Open(*listPath, nil, nil, nil) if err != nil { exitWithError(err) } printBlocks(db.Blocks(), listCmdHumanReadable) + + case scanCmd.FullCommand(): + if err := scanTmps(*scanPath, scanCmdHumanReadable); err != nil { + exitWithError(err) + } + + scan, err := tsdb.NewDBScanner(*scanPath, logger) + if err != nil { + exitWithError(err) + } + if err := scanTombstones(scan, scanCmdHumanReadable); err != nil { + exitWithError(err) + } + if err := scanIndexes(scan, scanCmdHumanReadable); err != nil { + exitWithError(err) + } + if err := scanOverlappingBlocks(scan, scanCmdHumanReadable); err != nil { + exitWithError(err) + } + + fmt.Println("Scan complete!") } flag.CommandLine.Set("log.level", "debug") } +func scanOverlappingBlocks(scan tsdb.Scanner, hformat *bool) error { + overlaps, err := scan.Overlapping() + if err != nil { + return err + } + if len(overlaps) > 0 { + fmt.Println("Overlaping blocks.") + fmt.Println("Deleting these will remove all data in the listed time range.") + var blocksDel []*tsdb.Block + for t, overBcks := range overlaps { + var ULIDS string + for _, b := range overBcks { + ULIDS = ULIDS + b.Meta().ULID.String() + " " + } + fmt.Printf("overlapping blocks : %v %v-%v \n", ULIDS, time.Unix(t.Min/1000, 0).Format("06/01/02 15:04:05"), time.Unix(t.Max/1000, 0).Format("15:04:05 06/01/02")) + + var largest int + for i, b := range overBcks { + if b.Meta().Stats.NumSamples > overBcks[largest].Meta().Stats.NumSamples { + largest = i + } + } + fmt.Printf("\nBlock %v contains highest samples count and is ommited from the deletion list! \n\n", overBcks[largest]) + // Remove the largest block from the slice. + o := append(overBcks[:largest], overBcks[largest+1:]...) + // Add this range to all blocks for deletion. + blocksDel = append(blocksDel, o...) + } + + var paths []string + for _, b := range blocksDel { + paths = append(paths, b.Dir()) + } + printBlocks(blocksDel, hformat) + moveTo := filepath.Join(scan.Dir(), "overlappingBlocks") + confirmed, err := confirm("Confirm moving the overlapping blocks to: " + moveTo) + if err != nil { + return err + } + if confirmed { + for _, file := range paths { + fileutil.Replace(file, moveTo) + } + } + } + return nil +} + +func scanIndexes(scan tsdb.Scanner, hformat *bool) error { + corrupted, err := scan.Index() + if err != nil { + return err + } + + for cause, bdirs := range corrupted { + fmt.Println("Blocks with corrupted index! \n", cause) + printFiles(bdirs, hformat) + + moveTo := filepath.Join(scan.Dir(), "blocksWithInvalidIndexes") + confirmed, err := confirm("Confirm moving corrupted indexes to: " + moveTo) + if err != nil { + return err + } + if confirmed { + for _, file := range bdirs { + fileutil.Replace(file, moveTo) + } + } + } + return nil +} + +func scanTombstones(scan tsdb.Scanner, hformat *bool) error { + invalid, err := scan.Tombstones() + if err != nil { + return errors.Wrap(err, "scannings Tombstones") + } + + if len(invalid) > 0 { + fmt.Println("Tombstones include data to be deleted so removing these will cancel deleting these timeseries.") + for cause, files := range invalid { + for _, p := range files { + _, file := filepath.Split(p) + if file != "tombstone" { + return fmt.Errorf("path doesn't contain a valid tombstone filename: %v", p) + } + } + fmt.Println("invalid tombstones:", cause) + printFiles(files, hformat) + moveTo := filepath.Join(scan.Dir(), "badTombstones") + confirmed, err := confirm("Confirm moving corrupted tombstones to: " + moveTo) + if err != nil { + return err + } + if confirmed { + for _, file := range files { + fileutil.Replace(file, moveTo) + } + } + } + } + return nil +} + +func scanTmps(scanPath string, hformat *bool) error { + var files []string + filepath.Walk(scanPath, func(path string, f os.FileInfo, _ error) error { + if filepath.Ext(path) == ".tmp" { + files = append(files, path) + } + return nil + }) + if len(files) > 0 { + fmt.Println(` + These are usually caused by a crash or some incomplete operation and + are safe to delete as long as no other application is currently using this database.`) + printFiles(files, hformat) + confirmed, err := confirm("DELETE") + if err != nil { + return err + } + if confirmed { + if err := delAll(files); err != nil { + return errors.Wrap(err, "deleting temp files") + } + } + } + return nil +} + +func delAll(paths []string) error { + for _, p := range paths { + if err := os.RemoveAll(p); err != nil { + return errors.Wrapf(err, "error deleting:%v", p) + } + } + return nil +} + +func confirm(action string) (bool, error) { + for x := 0; x < 3; x++ { + fmt.Println(action, " (y/N)?") + var s string + _, err := fmt.Scanln(&s) + if err != nil { + return false, err + } + + s = strings.TrimSpace(s) + s = strings.ToLower(s) + + if s == "y" || s == "yes" { + return true, nil + } + if s == "n" || s == "no" { + return false, nil + } + fmt.Println(s, "is not a valid answer") + } + fmt.Printf("Bailing out, too many invalid answers! \n\n") + return false, nil +} + type writeBenchmark struct { outPath string samplesFile string @@ -248,23 +439,22 @@ func (b *writeBenchmark) startProfiling() { // Start CPU profiling. b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err)) + exitWithError(errors.Wrap(err, "bench: could not create cpu profile")) } if err := pprof.StartCPUProfile(b.cpuprof); err != nil { - exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err)) + exitWithError(errors.Wrap(err, "bench: could not start CPU profile")) } - // Start memory profiling. b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err)) + exitWithError(errors.Wrap(err, "bench: could not create memory profile: %v")) } runtime.MemProfileRate = 64 * 1024 // Start fatal profiling. b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create block profile: %v", err)) + exitWithError(errors.Wrap(err, "bench: could not create block profile: %v")) } runtime.SetBlockProfileRate(20) @@ -357,22 +547,40 @@ func exitWithError(err error) { os.Exit(1) } +func printFiles(files []string, humanReadable *bool) { + tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + defer tw.Flush() + + fmt.Fprintln(tw, "PATH\tSIZE\tDATE\t") + for _, path := range files { + f, e := os.Stat(path) + if e != nil { + exitWithError(e) + } + fmt.Fprintf(tw, + "%v\t%v\t%v\n", + path, f.Size(), getFormatedTime(f.ModTime().Unix(), humanReadable), + ) + } +} + func printBlocks(blocks []*tsdb.Block, humanReadable *bool) { tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) defer tw.Flush() - fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES") + fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tPATH") for _, b := range blocks { meta := b.Meta() fmt.Fprintf(tw, - "%v\t%v\t%v\t%v\t%v\t%v\n", + "%v\t%v\t%v\t%v\t%v\t%v\t%v\n", meta.ULID, getFormatedTime(meta.MinTime, humanReadable), getFormatedTime(meta.MaxTime, humanReadable), meta.Stats.NumSamples, meta.Stats.NumChunks, meta.Stats.NumSeries, + b.Dir(), ) } } diff --git a/db.go b/db.go index e5a057cb..384fe1c0 100644 --- a/db.go +++ b/db.go @@ -558,8 +558,9 @@ func (db *DB) reload() (err error) { sort.Slice(blocks, func(i, j int) bool { return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime }) - if err := validateBlockSequence(blocks); err != nil { - return errors.Wrap(err, "invalid block sequence") + + if overlaps := OverlappingBlocks(blocks); len(overlaps) > 0 { + return errors.Errorf("invalid block sequence , block time ranges overlap: %s", overlaps) } // Swap in new blocks first for subsequently created readers to be seen. @@ -595,45 +596,26 @@ func (db *DB) reload() (err error) { return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } -// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. -func validateBlockSequence(bs []*Block) error { - if len(bs) <= 1 { - return nil - } - - var metas []BlockMeta - for _, b := range bs { - metas = append(metas, b.meta) - } - - overlaps := OverlappingBlocks(metas) - if len(overlaps) > 0 { - return errors.Errorf("block time ranges overlap: %s", overlaps) - } - - return nil -} - // TimeRange specifies minTime and maxTime range. type TimeRange struct { Min, Max int64 } // Overlaps contains overlapping blocks aggregated by overlapping range. -type Overlaps map[TimeRange][]BlockMeta +type Overlaps map[TimeRange][]*Block // String returns human readable string form of overlapped blocks. func (o Overlaps) String() string { var res []string for r, overlaps := range o { var groups []string - for _, m := range overlaps { + for _, b := range overlaps { groups = append(groups, fmt.Sprintf( "", - m.ULID.String(), - m.MinTime, - m.MaxTime, - (time.Duration((m.MaxTime-m.MinTime)/1000)*time.Second).String(), + b.Meta().ULID.String(), + b.Meta().MinTime, + b.Meta().MaxTime, + (time.Duration((b.Meta().MaxTime-b.Meta().MinTime)/1000)*time.Second).String(), )) } res = append(res, fmt.Sprintf( @@ -648,15 +630,15 @@ func (o Overlaps) String() string { } // OverlappingBlocks returns all overlapping blocks from given meta files. -func OverlappingBlocks(bm []BlockMeta) Overlaps { +func OverlappingBlocks(bm []*Block) Overlaps { if len(bm) <= 1 { return nil } var ( - overlaps [][]BlockMeta + overlaps [][]*Block // pending contains not ended blocks in regards to "current" timestamp. - pending = []BlockMeta{bm[0]} + pending = []*Block{bm[0]} // continuousPending helps to aggregate same overlaps to single group. continuousPending = true ) @@ -665,11 +647,11 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps { // We check if any of the pending block finished (blocks that we have seen before, but their maxTime was still ahead current // timestamp). If not, it means they overlap with our current block. In the same time current block is assumed pending. for _, b := range bm[1:] { - var newPending []BlockMeta + var newPending []*Block for _, p := range pending { // "b.MinTime" is our current time. - if b.MinTime >= p.MaxTime { + if b.Meta().MinTime >= p.Meta().MaxTime { continuousPending = false continue } @@ -700,12 +682,12 @@ func OverlappingBlocks(bm []BlockMeta) Overlaps { minRange := TimeRange{Min: 0, Max: math.MaxInt64} for _, b := range overlap { - if minRange.Max > b.MaxTime { - minRange.Max = b.MaxTime + if minRange.Max > b.Meta().MaxTime { + minRange.Max = b.Meta().MaxTime } - if minRange.Min < b.MinTime { - minRange.Min = b.MinTime + if minRange.Min < b.Meta().MinTime { + minRange.Min = b.Meta().MinTime } } overlapGroups[minRange] = overlap diff --git a/db_test.go b/db_test.go index 691c73e4..ec4a8cfa 100644 --- a/db_test.go +++ b/db_test.go @@ -1050,87 +1050,87 @@ func expandSeriesSet(ss SeriesSet) ([]labels.Labels, error) { func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { // Create 10 blocks that does not overlap (0-10, 10-20, ..., 100-110) but in reverse order to ensure our algorithm // will handle that. - var metas = make([]BlockMeta, 11) + var blocks = make([]*Block, 11) for i := 10; i >= 0; i-- { - metas[i] = BlockMeta{MinTime: int64(i * 10), MaxTime: int64((i + 1) * 10)} + blocks[i] = &Block{meta: BlockMeta{MinTime: int64(i * 10), MaxTime: int64((i + 1) * 10)}} } - testutil.Assert(t, len(OverlappingBlocks(metas)) == 0, "we found unexpected overlaps") + testutil.Assert(t, len(OverlappingBlocks(blocks)) == 0, "we found unexpected overlaps") // Add overlapping blocks. We've to establish order again since we aren't interested // in trivial overlaps caused by unorderedness. - add := func(ms ...BlockMeta) []BlockMeta { - repl := append(append([]BlockMeta{}, metas...), ms...) + add := func(bs ...*Block) []*Block { + repl := append(append([]*Block{}, blocks...), bs...) sort.Slice(repl, func(i, j int) bool { - return repl[i].MinTime < repl[j].MinTime + return repl[i].Meta().MinTime < repl[j].Meta().MinTime }) return repl } // o1 overlaps with 10-20. - o1 := BlockMeta{MinTime: 15, MaxTime: 17} + o1 := &Block{meta: BlockMeta{MinTime: 15, MaxTime: 17}} testutil.Equals(t, Overlaps{ - {Min: 15, Max: 17}: {metas[1], o1}, + {Min: 15, Max: 17}: {blocks[1], o1}, }, OverlappingBlocks(add(o1))) // o2 overlaps with 20-30 and 30-40. - o2 := BlockMeta{MinTime: 21, MaxTime: 31} + o2 := &Block{meta: BlockMeta{MinTime: 21, MaxTime: 31}} testutil.Equals(t, Overlaps{ - {Min: 21, Max: 30}: {metas[2], o2}, - {Min: 30, Max: 31}: {o2, metas[3]}, + {Min: 21, Max: 30}: {blocks[2], o2}, + {Min: 30, Max: 31}: {o2, blocks[3]}, }, OverlappingBlocks(add(o2))) // o3a and o3b overlaps with 30-40 and each other. - o3a := BlockMeta{MinTime: 33, MaxTime: 39} - o3b := BlockMeta{MinTime: 34, MaxTime: 36} + o3a := &Block{meta: BlockMeta{MinTime: 33, MaxTime: 39}} + o3b := &Block{meta: BlockMeta{MinTime: 34, MaxTime: 36}} testutil.Equals(t, Overlaps{ - {Min: 34, Max: 36}: {metas[3], o3a, o3b}, + {Min: 34, Max: 36}: {blocks[3], o3a, o3b}, }, OverlappingBlocks(add(o3a, o3b))) // o4 is 1:1 overlap with 50-60. - o4 := BlockMeta{MinTime: 50, MaxTime: 60} + o4 := &Block{meta: BlockMeta{MinTime: 50, MaxTime: 60}} testutil.Equals(t, Overlaps{ - {Min: 50, Max: 60}: {metas[5], o4}, + {Min: 50, Max: 60}: {blocks[5], o4}, }, OverlappingBlocks(add(o4))) // o5 overlaps with 60-70, 70-80 and 80-90. - o5 := BlockMeta{MinTime: 61, MaxTime: 85} + o5 := &Block{meta: BlockMeta{MinTime: 61, MaxTime: 85}} testutil.Equals(t, Overlaps{ - {Min: 61, Max: 70}: {metas[6], o5}, - {Min: 70, Max: 80}: {o5, metas[7]}, - {Min: 80, Max: 85}: {o5, metas[8]}, + {Min: 61, Max: 70}: {blocks[6], o5}, + {Min: 70, Max: 80}: {o5, blocks[7]}, + {Min: 80, Max: 85}: {o5, blocks[8]}, }, OverlappingBlocks(add(o5))) // o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a. - o6a := BlockMeta{MinTime: 92, MaxTime: 105} - o6b := BlockMeta{MinTime: 94, MaxTime: 99} + o6a := &Block{meta: BlockMeta{MinTime: 92, MaxTime: 105}} + o6b := &Block{meta: BlockMeta{MinTime: 94, MaxTime: 99}} testutil.Equals(t, Overlaps{ - {Min: 94, Max: 99}: {metas[9], o6a, o6b}, - {Min: 100, Max: 105}: {o6a, metas[10]}, + {Min: 94, Max: 99}: {blocks[9], o6a, o6b}, + {Min: 100, Max: 105}: {o6a, blocks[10]}, }, OverlappingBlocks(add(o6a, o6b))) // All together. testutil.Equals(t, Overlaps{ - {Min: 15, Max: 17}: {metas[1], o1}, - {Min: 21, Max: 30}: {metas[2], o2}, {Min: 30, Max: 31}: {o2, metas[3]}, - {Min: 34, Max: 36}: {metas[3], o3a, o3b}, - {Min: 50, Max: 60}: {metas[5], o4}, - {Min: 61, Max: 70}: {metas[6], o5}, {Min: 70, Max: 80}: {o5, metas[7]}, {Min: 80, Max: 85}: {o5, metas[8]}, - {Min: 94, Max: 99}: {metas[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, metas[10]}, + {Min: 15, Max: 17}: {blocks[1], o1}, + {Min: 21, Max: 30}: {blocks[2], o2}, {Min: 30, Max: 31}: {o2, blocks[3]}, + {Min: 34, Max: 36}: {blocks[3], o3a, o3b}, + {Min: 50, Max: 60}: {blocks[5], o4}, + {Min: 61, Max: 70}: {blocks[6], o5}, {Min: 70, Max: 80}: {o5, blocks[7]}, {Min: 80, Max: 85}: {o5, blocks[8]}, + {Min: 94, Max: 99}: {blocks[9], o6a, o6b}, {Min: 100, Max: 105}: {o6a, blocks[10]}, }, OverlappingBlocks(add(o1, o2, o3a, o3b, o4, o5, o6a, o6b))) // Additional case. - var nc1 []BlockMeta - nc1 = append(nc1, BlockMeta{MinTime: 1, MaxTime: 5}) - nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3}) - nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3}) - nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3}) - nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 3}) - nc1 = append(nc1, BlockMeta{MinTime: 2, MaxTime: 6}) - nc1 = append(nc1, BlockMeta{MinTime: 3, MaxTime: 5}) - nc1 = append(nc1, BlockMeta{MinTime: 5, MaxTime: 7}) - nc1 = append(nc1, BlockMeta{MinTime: 7, MaxTime: 10}) - nc1 = append(nc1, BlockMeta{MinTime: 8, MaxTime: 9}) + var nc1 []*Block + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 1, MaxTime: 5}}) + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 2, MaxTime: 3}}) + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 2, MaxTime: 3}}) + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 2, MaxTime: 3}}) + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 2, MaxTime: 3}}) + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 2, MaxTime: 6}}) + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 3, MaxTime: 5}}) + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 5, MaxTime: 7}}) + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 7, MaxTime: 10}}) + nc1 = append(nc1, &Block{meta: BlockMeta{MinTime: 8, MaxTime: 9}}) testutil.Equals(t, Overlaps{ {Min: 2, Max: 3}: {nc1[0], nc1[1], nc1[2], nc1[3], nc1[4], nc1[5]}, // 1-5, 2-3, 2-3, 2-3, 2-3, 2,6 {Min: 3, Max: 5}: {nc1[0], nc1[5], nc1[6]}, // 1-5, 2-6, 3-5 diff --git a/repair_test.go b/repair_test.go index b3bf0acb..e0af0415 100644 --- a/repair_test.go +++ b/repair_test.go @@ -69,10 +69,10 @@ func TestRepairBadIndexVersion(t *testing.T) { testutil.NotOk(t, err) // Touch chunks dir in block. - os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777) - defer os.RemoveAll(filepath.Join(dbDir, "chunks")) + os.MkdirAll(chunkDir(dbDir), 0777) + defer os.RemoveAll(chunkDir(dbDir)) - r, err := index.NewFileReader(filepath.Join(dbDir, "index")) + r, err := index.NewFileReader(filepath.Join(dbDir, indexFilename)) testutil.Ok(t, err) p, err := r.Postings("b", "1") testutil.Ok(t, err) diff --git a/scan.go b/scan.go new file mode 100644 index 00000000..5c409f27 --- /dev/null +++ b/scan.go @@ -0,0 +1,255 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "fmt" + "sort" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/index" + "github.com/prometheus/tsdb/labels" +) + +// Scanner provides an interface for scanning different building components of a block. +type Scanner interface { + // Tombstones returns a list of block paths with invalid Tombstones grouped by the error. + Tombstones() (map[string][]string, error) + // Meta returns a list of block paths with invalid Meta file grouped by the error. + Meta() (map[string][]string, error) + // Index returns a list of block paths with invalid Index grouped by the error. + Index() (map[string][]string, error) + // Overlapping returns a list of blocks with overlapping time ranges. + Overlapping() (Overlaps, error) + // Dir returns the scanned directory. + Dir() string +} + +// DBScanner is the main struct for the scanner. +type DBScanner struct { + db *DB + logger log.Logger +} + +// NewDBScanner initializes a new database scanner. +func NewDBScanner(dir string, l log.Logger) (*DBScanner, error) { + blocks, err := blockDirs(dir) + if err != nil { + return nil, errors.Wrapf(err, "listing blocks") + } + if len(blocks) == 0 { + return nil, fmt.Errorf("directory doesn't include any blocks:%v", dir) + } + dbScanner := &DBScanner{ + db: &DB{ + dir: dir, + logger: l, + opts: &Options{}, + }, + logger: l, + } + return dbScanner, nil +} + +func (s *DBScanner) Dir() string { + return s.db.Dir() +} + +func (s *DBScanner) Tombstones() (map[string][]string, error) { + dirs, err := blockDirs(s.db.dir) + if err != nil { + return nil, err + } + + inv := make(map[string][]string) + for _, dir := range dirs { + if _, err = readTombstones(dir); err != nil { + inv[err.Error()] = append(inv[err.Error()], dir) + } + } + return inv, nil +} + +func (s *DBScanner) Meta() (map[string][]string, error) { + dirs, err := blockDirs(s.db.dir) + if err != nil { + return nil, err + } + + inv := make(map[string][]string) + for _, dir := range dirs { + if _, err = readMetaFile(dir); err != nil { + inv[err.Error()] = append(inv[err.Error()], dir) + } + } + return inv, nil +} + +func (s *DBScanner) Overlapping() (Overlaps, error) { + dirs, err := blockDirs(s.db.dir) + if err != nil { + return nil, err + } + + var blocks []*Block + for _, dir := range dirs { + if meta, err := readMetaFile(dir); err == nil { + blocks = append(blocks, &Block{ + meta: *meta, + dir: dir, + }) + } + } + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime + }) + return OverlappingBlocks(blocks), nil +} + +func (s *DBScanner) Index() (map[string][]string, error) { + inv := make(map[string][]string) + dirs, err := blockDirs(s.db.dir) + if err != nil { + return nil, err + } + + for _, dir := range dirs { + stats, err := indexStats(dir) + if err != nil { + inv[err.Error()] = append(inv[err.Error()], dir) + continue + } + + if stats.ErrSummary() != nil { + inv[err.Error()] = append(inv[err.Error()], dir) + continue + } + } + return inv, nil +} + +func indexStats(bdir string) (IndexStats, error) { + block, err := OpenBlock(bdir, nil) + if err != nil { + return IndexStats{}, errors.Wrap(err, "open block") + } + defer block.Close() + + indexr, err := block.Index() + if err != nil { + return IndexStats{}, errors.Wrap(err, "open block index") + } + defer indexr.Close() + + stats := IndexStats{ + MinTime: block.Meta().MinTime, + MaxTime: block.Meta().MaxTime, + BlockDir: bdir, + } + + p, err := indexr.Postings(index.AllPostingsKey()) + if err != nil { + return IndexStats{}, errors.Wrap(err, "get all postings") + } + var ( + lastLset labels.Labels + lset labels.Labels + chks []chunks.Meta + ) + + for p.Next() { + lastLset = append(lastLset[:0], lset...) + + id := p.At() + + if err = indexr.Series(id, &lset, &chks); err != nil { + return IndexStats{}, errors.Wrap(err, "read series") + + } + + if len(lset) == 0 { + return IndexStats{}, errors.Errorf("empty label set detected for series %d", id) + } + if lastLset != nil && labels.Compare(lset, lastLset) <= 0 { + return IndexStats{}, errors.Errorf("series are the same or out of order - current:%v previous:%v", lset, lastLset) + } + l0 := lset[0] + for _, l := range lset[1:] { + if l.Name <= l0.Name { + return IndexStats{}, errors.Errorf("out-of-order label set %s for series %d", lset, id) + } + l0 = l + } + if len(chks) == 0 { + return IndexStats{}, errors.Errorf("empty chunks for series %d", id) + } + + ooo := 0 + + stats.ChunksTotal += len(chks) + + for i, c := range chks { + if i > 0 { + // Overlapping chunks. + if c.OverlapsClosedInterval(chks[i-1].MinTime, chks[i-1].MaxTime) { + ooo++ + } + } + + if !c.OverlapsClosedInterval(block.Meta().MinTime, block.Meta().MaxTime) { + stats.ChunksEntireOutsiders++ + } else if c.MinTime < block.Meta().MinTime || c.MaxTime > block.Meta().MaxTime { + stats.ChunksPartialOutsiders++ + + } + + } + if ooo > 0 { + stats.ChunksOverlapingTotal += ooo + } + } + if p.Err() != nil { + return IndexStats{}, errors.Wrap(err, "walk postings") + } + + return stats, nil +} + +// IndexStats holds useful index counters to asses the index health. +type IndexStats struct { + BlockDir string + ChunksTotal int + ChunksOverlapingTotal int + ChunksPartialOutsiders int + ChunksEntireOutsiders int + MinTime, MaxTime int64 +} + +// ErrSummary implements the error interface and returns a formated index stats. +func (i IndexStats) ErrSummary() error { + if i.ChunksOverlapingTotal > 0 || + i.ChunksPartialOutsiders > 0 || + i.ChunksEntireOutsiders > 0 { + return errors.Errorf(`Time Range: %v - %v, Total Chunks:%v, Total Overlaping Chunks:%v Chunks partially outside:%v, Chunks completely outside:%v`, + i.MinTime, i.MaxTime, + i.ChunksTotal, + i.ChunksOverlapingTotal, + i.ChunksPartialOutsiders, + i.ChunksEntireOutsiders, + ) + } + return nil +} diff --git a/scan_test.go b/scan_test.go new file mode 100644 index 00000000..0e8577c0 --- /dev/null +++ b/scan_test.go @@ -0,0 +1,115 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tsdb + +import ( + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/prometheus/tsdb/testutil" +) + +func TestScanning(t *testing.T) { + // Create some blocks to work with. + tmp := testutil.NewTemporaryDirectory("scanTest", t) + defer tmp.Close() + block := createPopulatedBlock(t, tmp.Path(), 1, 1, 10) + defer block.Close() + b := createPopulatedBlock(t, tmp.Path(), 1, 10, 20) + defer b.Close() + + scanner, err := NewDBScanner(tmp.Path(), log.NewLogfmtLogger(os.Stderr)) + testutil.Ok(t, err) + + // Test that the scanner reports all current blocks as healthy. + corr, err := scanner.Index() + testutil.Ok(t, err) + testutil.Equals(t, 0, len(corr)) + + corr, err = scanner.Tombstones() + testutil.Ok(t, err) + testutil.Equals(t, 0, len(corr)) + + corr, err = scanner.Meta() + testutil.Ok(t, err) + testutil.Equals(t, 0, len(corr)) + + corrO, err := scanner.Overlapping() + testutil.Ok(t, err) + testutil.Equals(t, 0, len(corrO)) + + // Corrupt the block Meta file and check that the scanner reports it. + f, err := os.OpenFile(filepath.Join(block.Dir(), metaFilename), os.O_WRONLY, 0666) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + f.Close() + + corr, err = scanner.Meta() + testutil.Ok(t, err) + testutil.Equals(t, 1, len(corr)) + for _, blocks := range corr { + for _, fname := range blocks { + testutil.Equals(t, block.Dir(), fname) + } + } + + // Corrupt the block Index and check that the scanner reports it. + f, err = os.OpenFile(filepath.Join(block.Dir(), indexFilename), os.O_WRONLY, 0666) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + f.Close() + + corr, err = scanner.Index() + testutil.Ok(t, err) + testutil.Equals(t, 1, len(corr)) + for _, blocks := range corr { + for _, fname := range blocks { + testutil.Equals(t, block.Dir(), fname) + } + } + + // Corrupt the block Tombstone and check that the scanner reports it. + f, err = os.OpenFile(filepath.Join(block.Dir(), tombstoneFilename), os.O_WRONLY, 0666) + testutil.Ok(t, err) + _, err = f.Write([]byte{0}) + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + + corr, err = scanner.Tombstones() + testutil.Ok(t, err) + testutil.Equals(t, 1, len(corr)) + for _, blocks := range corr { + for _, fname := range blocks { + testutil.Equals(t, block.Dir(), fname) + } + } + + // Create an overlapping block and check that the scanner reports it. + overlapExp := createPopulatedBlock(t, tmp.Path(), 1, 15, 20) + defer overlapExp.Close() + corrO, err = scanner.Overlapping() + testutil.Ok(t, err) + testutil.Equals(t, 1, len(corrO)) + for overlap, blocks := range corrO { + for _, overlapAct := range blocks[1:] { // Skip the original block that overlaps. + testutil.Equals(t, overlapExp.Dir(), overlapAct.Dir()) + testutil.Equals(t, overlapExp.Meta().MinTime, overlap.Min) + testutil.Equals(t, overlapExp.Meta().MaxTime, overlap.Max) + } + } +}