From 1b12ab982637ae58573a3bbcdf6331e7d3002e86 Mon Sep 17 00:00:00 2001 From: wanjunlei <53003665+wanjunlei@users.noreply.github.com> Date: Sat, 28 Jan 2023 15:34:30 +0800 Subject: [PATCH] Store: Support disable cache index header file. (#5773) * Store: Support disable cache index header file. Signed-off-by: wanjunlei * Store: add a seprate flag to disable caching index header file Signed-off-by: wanjunlei * Tools: add cleanup API for bucket web Signed-off-by: wanjunlei * resolve conversation Signed-off-by: wanjunlei * resolve confilcts Signed-off-by: wanjunlei * change the flag to `--cache-index-header` Signed-off-by: wanjunlei * Wrap mem writer in file writer Signed-off-by: wanjunlei * update CHANGELOG Signed-off-by: wanjunlei * update CHANGELOG Signed-off-by: wanjunlei * fix bug Signed-off-by: wanjunlei --------- Signed-off-by: wanjunlei Co-authored-by: wanjunlei --- CHANGELOG.md | 1 + cmd/thanos/store.go | 15 +- docs/components/store.md | 13 +- pkg/block/indexheader/binary_reader.go | 389 +++++++++++++------- pkg/block/indexheader/header_test.go | 12 +- pkg/block/indexheader/lazy_binary_reader.go | 39 +- pkg/store/bucket.go | 24 +- test/e2e/store_gateway_test.go | 224 +++++++++++ 8 files changed, 546 insertions(+), 171 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9be78a554..a6fc6ae5ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5990](https://github.com/thanos-io/thanos/pull/5990) Cache/Redis: add support for Redis Sentinel via new option `master_name`. - [#6008](https://github.com/thanos-io/thanos/pull/6008) *: Add counter metric `gate_queries_total` to gate. +- [#5773](https://github.com/thanos-io/thanos/pull/5773) Store: Support disable cache index header file. ### Changed diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index a9eb00f212..0a454a4c92 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -58,6 +58,7 @@ type storeConfig struct { indexCacheConfigs extflag.PathOrContent objStoreConfig extflag.PathOrContent dataDir string + cacheIndexHeader bool grpcConfig grpcConfig httpConfig httpConfig indexCacheSizeBytes units.Base2Bytes @@ -90,9 +91,12 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { sc.httpConfig = *sc.httpConfig.registerFlag(cmd) sc.grpcConfig = *sc.grpcConfig.registerFlag(cmd) - cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar."). + cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar. Ignored if -no-cache-index-header option is specified."). Default("./data").StringVar(&sc.dataDir) + cmd.Flag("cache-index-header", "Cache TSDB index-headers on disk to reduce startup time. When set to true, Thanos Store will download index headers from remote object storage on startup and create a header file on disk. Use --data-dir to set the directory in which index headers will be downloaded."). + Default("true").BoolVar(&sc.cacheIndexHeader) + cmd.Flag("index-cache-size", "Maximum size of items held in the in-memory index cache. Ignored if --index-cache.config or --index-cache.config-file option is specified."). Default("250MB").BytesVar(&sc.indexCacheSizeBytes) @@ -237,6 +241,11 @@ func runStore( conf storeConfig, flagsMap map[string]string, ) error { + dataDir := conf.dataDir + if !conf.cacheIndexHeader { + dataDir = "" + } + grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() statusProber := prober.Combine( @@ -318,7 +327,7 @@ func runStore( } ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency) - metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, bkt, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime), block.NewLabelShardedMetaFilter(relabelConfig), @@ -360,7 +369,7 @@ func runStore( bs, err := store.NewBucketStore( bkt, metaFetcher, - conf.dataDir, + dataDir, store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk. store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount), store.NewBytesLimiterFactory(conf.maxDownloadedBytes), diff --git a/docs/components/store.md b/docs/components/store.md index 7f3df835df..a67411934c 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -36,6 +36,12 @@ Flags: Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1. + --cache-index-header Cache TSDB index-headers on disk to reduce + startup time. When set to true, Thanos Store + will download index headers from remote object + storage on startup and create a header file on + disk. Use --data-dir to set the directory in + which index headers will be downloaded. --chunk-pool-size=2GB Maximum size of concurrently allocatable bytes reserved strictly to reuse for chunks in memory. @@ -47,9 +53,10 @@ Flags: purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. - NOTE: Putting raw blocks here will not cause - the store to read them. For such use cases use - Prometheus + sidecar. + NOTE: Putting raw blocks here will not + cause the store to read them. For such use + cases use Prometheus + sidecar. Ignored if + -no-cache-index-header option is specified. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 06f11c8f34..0c7d062c9c 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -5,6 +5,7 @@ package indexheader import ( "bufio" + "bytes" "context" "encoding/binary" "hash" @@ -69,57 +70,64 @@ type BinaryTOC struct { PostingsOffsetTable uint64 } -// WriteBinary build index-header file from the pieces of index in object storage. -func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, filename string) (err error) { +// WriteBinary build index header from the pieces of index in object storage, and cached in file if necessary. +func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, filename string) ([]byte, error) { ir, indexVersion, err := newChunkedIndexReader(ctx, bkt, id) if err != nil { - return errors.Wrap(err, "new index reader") + return nil, errors.Wrap(err, "new index reader") + } + tmpFilename := "" + if filename != "" { + tmpFilename = filename + ".tmp" } - tmpFilename := filename + ".tmp" // Buffer for copying and encbuffers. // This also will control the size of file writer buffer. buf := make([]byte, 32*1024) - bw, err := newBinaryWriter(tmpFilename, buf) + bw, err := newBinaryWriter(id, tmpFilename, buf) if err != nil { - return errors.Wrap(err, "new binary index header writer") + return nil, errors.Wrap(err, "new binary index header writer") } defer runutil.CloseWithErrCapture(&err, bw, "close binary writer for %s", tmpFilename) if err := bw.AddIndexMeta(indexVersion, ir.toc.PostingsTable); err != nil { - return errors.Wrap(err, "add index meta") + return nil, errors.Wrap(err, "add index meta") } if err := ir.CopySymbols(bw.SymbolsWriter(), buf); err != nil { - return err + return nil, err } - if err := bw.f.Flush(); err != nil { - return errors.Wrap(err, "flush") + if err := bw.writer.Flush(); err != nil { + return nil, errors.Wrap(err, "flush") } if err := ir.CopyPostingsOffsets(bw.PostingOffsetsWriter(), buf); err != nil { - return err + return nil, err } - if err := bw.f.Flush(); err != nil { - return errors.Wrap(err, "flush") + if err := bw.writer.Flush(); err != nil { + return nil, errors.Wrap(err, "flush") } if err := bw.WriteTOC(); err != nil { - return errors.Wrap(err, "write index header TOC") + return nil, errors.Wrap(err, "write index header TOC") } - if err := bw.f.Flush(); err != nil { - return errors.Wrap(err, "flush") + if err := bw.writer.Flush(); err != nil { + return nil, errors.Wrap(err, "flush") } - if err := bw.f.f.Sync(); err != nil { - return errors.Wrap(err, "sync") + if err := bw.writer.Sync(); err != nil { + return nil, errors.Wrap(err, "sync") } - // Create index-header in atomic way, to avoid partial writes (e.g during restart or crash of store GW). - return os.Rename(tmpFilename, filename) + if tmpFilename != "" { + // Create index-header in atomic way, to avoid partial writes (e.g during restart or crash of store GW). + return nil, os.Rename(tmpFilename, filename) + } + + return bw.Buffer(), nil } type chunkedIndexReader struct { @@ -231,7 +239,7 @@ func (r *chunkedIndexReader) CopyPostingsOffsets(w io.Writer, buf []byte) (err e // TODO(bwplotka): Add padding for efficient read. type binaryWriter struct { - f *FileWriter + writer PosWriter toc BinaryTOC @@ -241,38 +249,48 @@ type binaryWriter struct { crc32 hash.Hash } -func newBinaryWriter(fn string, buf []byte) (w *binaryWriter, err error) { - dir := filepath.Dir(fn) - - df, err := fileutil.OpenDir(dir) - if os.IsNotExist(err) { - if err := os.MkdirAll(dir, os.ModePerm); err != nil { - return nil, err - } - df, err = fileutil.OpenDir(dir) - } +func newBinaryWriter(id ulid.ULID, cacheFilename string, buf []byte) (w *binaryWriter, err error) { + var memoryWriter *MemoryWriter + memoryWriter, err = NewMemoryWriter(id, len(buf)) if err != nil { - return nil, err } + var binWriter PosWriter = memoryWriter - defer runutil.CloseWithErrCapture(&err, df, "dir close") + if cacheFilename != "" { + dir := filepath.Dir(cacheFilename) - if err := os.RemoveAll(fn); err != nil { - return nil, errors.Wrap(err, "remove any existing index at path") - } + df, err := fileutil.OpenDir(dir) + if os.IsNotExist(err) { + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return nil, err + } + df, err = fileutil.OpenDir(dir) + } + if err != nil { + return nil, err + } - // We use file writer for buffers not larger than reused one. - f, err := NewFileWriter(fn, len(buf)) - if err != nil { - return nil, err - } - if err := df.Sync(); err != nil { - return nil, errors.Wrap(err, "sync dir") + defer runutil.CloseWithErrCapture(&err, df, "dir close") + + if err := os.RemoveAll(cacheFilename); err != nil { + return nil, errors.Wrap(err, "remove any existing index at path") + } + + // We use file writer for buffers not larger than reused one. + var fileWriter *FileWriter + fileWriter, err = NewFileWriter(cacheFilename, memoryWriter) + if err != nil { + return nil, err + } + if err := df.Sync(); err != nil { + return nil, errors.Wrap(err, "sync dir") + } + binWriter = fileWriter } w = &binaryWriter{ - f: f, + writer: binWriter, // Reusable memory. buf: encoding.Encbuf{B: buf}, @@ -283,38 +301,42 @@ func newBinaryWriter(fn string, buf []byte) (w *binaryWriter, err error) { w.buf.PutBE32(MagicIndex) w.buf.PutByte(BinaryFormatV1) - return w, w.f.Write(w.buf.Get()) + return w, w.writer.Write(w.buf.Get()) } -type FileWriter struct { - f *os.File - fbuf *bufio.Writer - pos uint64 - name string +type PosWriter interface { + Pos() uint64 + Write(bufs ...[]byte) error + Buffer() []byte + Flush() error + Sync() error + Close() error +} + +type MemoryWriter struct { + id ulid.ULID + buf bytes.Buffer + pos uint64 } // TODO(bwplotka): Added size to method, upstream this. -func NewFileWriter(name string, size int) (*FileWriter, error) { - f, err := os.OpenFile(filepath.Clean(name), os.O_CREATE|os.O_RDWR, 0600) - if err != nil { - return nil, err - } - return &FileWriter{ - f: f, - fbuf: bufio.NewWriterSize(f, size), - pos: 0, - name: name, +func NewMemoryWriter(id ulid.ULID, size int) (*MemoryWriter, error) { + var buf bytes.Buffer + return &MemoryWriter{ + id: id, + buf: buf, + pos: 0, }, nil } -func (fw *FileWriter) Pos() uint64 { - return fw.pos +func (mw *MemoryWriter) Pos() uint64 { + return mw.pos } -func (fw *FileWriter) Write(bufs ...[]byte) error { +func (mw *MemoryWriter) Write(bufs ...[]byte) error { for _, b := range bufs { - n, err := fw.fbuf.Write(b) - fw.pos += uint64(n) + n, err := mw.buf.Write(b) + mw.pos += uint64(n) if err != nil { return err } @@ -322,40 +344,83 @@ func (fw *FileWriter) Write(bufs ...[]byte) error { // offset references in v1 are only 4 bytes large. // Once we move to compressed/varint representations in those areas, this limitation // can be lifted. - if fw.pos > 16*math.MaxUint32 { - return errors.Errorf("%q exceeding max size of 64GiB", fw.name) + if mw.pos > 16*math.MaxUint32 { + return errors.Errorf("%q exceeding max size of 64GiB", mw.id) } } return nil } -func (fw *FileWriter) Flush() error { - return fw.fbuf.Flush() +func (mw *MemoryWriter) Buffer() []byte { + return mw.buf.Bytes() } -func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error { - if err := fw.Flush(); err != nil { - return err - } - _, err := fw.f.WriteAt(buf, int64(pos)) - return err +func (mw *MemoryWriter) Flush() error { + return nil } -// AddPadding adds zero byte padding until the file size is a multiple size. -func (fw *FileWriter) AddPadding(size int) error { - p := fw.pos % uint64(size) - if p == 0 { - return nil +func (mw *MemoryWriter) Sync() error { + return nil +} + +func (mw *MemoryWriter) Close() error { + return mw.Flush() +} + +type FileWriter struct { + f *os.File + memWriter *MemoryWriter + fileWriter *bufio.Writer + name string +} + +// TODO(bwplotka): Added size to method, upstream this. +func NewFileWriter(name string, memWriter *MemoryWriter) (*FileWriter, error) { + f, err := os.OpenFile(filepath.Clean(name), os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + return nil, err } - p = uint64(size) - p + return &FileWriter{ + f: f, + memWriter: memWriter, + fileWriter: bufio.NewWriterSize(f, memWriter.buf.Len()), + name: name, + }, nil +} + +func (fw *FileWriter) Pos() uint64 { + return fw.memWriter.Pos() +} - if err := fw.Write(make([]byte, p)); err != nil { - return errors.Wrap(err, "add padding") +func (fw *FileWriter) Write(bufs ...[]byte) error { + if err := fw.memWriter.Write(bufs...); err != nil { + return err + } + for _, b := range bufs { + _, err := fw.fileWriter.Write(b) + if err != nil { + return err + } } return nil } +func (fw *FileWriter) Buffer() []byte { + return fw.memWriter.Buffer() +} + +func (fw *FileWriter) Flush() error { + if err := fw.memWriter.Flush(); err != nil { + return err + } + + return fw.fileWriter.Flush() +} + func (fw *FileWriter) Close() error { + if err := fw.memWriter.Close(); err != nil { + return err + } if err := fw.Flush(); err != nil { return err } @@ -365,6 +430,13 @@ func (fw *FileWriter) Close() error { return fw.f.Close() } +func (fw *FileWriter) Sync() error { + if err := fw.memWriter.Sync(); err != nil { + return err + } + return fw.f.Sync() +} + func (fw *FileWriter) Remove() error { return os.Remove(fw.name) } @@ -373,16 +445,16 @@ func (w *binaryWriter) AddIndexMeta(indexVersion int, indexPostingOffsetTable ui w.buf.Reset() w.buf.PutByte(byte(indexVersion)) w.buf.PutBE64(indexPostingOffsetTable) - return w.f.Write(w.buf.Get()) + return w.writer.Write(w.buf.Get()) } func (w *binaryWriter) SymbolsWriter() io.Writer { - w.toc.Symbols = w.f.Pos() + w.toc.Symbols = w.writer.Pos() return w } func (w *binaryWriter) PostingOffsetsWriter() io.Writer { - w.toc.PostingsOffsetTable = w.f.Pos() + w.toc.PostingsOffsetTable = w.writer.Pos() return w } @@ -394,17 +466,21 @@ func (w *binaryWriter) WriteTOC() error { w.buf.PutHash(w.crc32) - return w.f.Write(w.buf.Get()) + return w.writer.Write(w.buf.Get()) } func (w *binaryWriter) Write(p []byte) (int, error) { - n := w.f.Pos() - err := w.f.Write(p) - return int(w.f.Pos() - n), err + n := w.writer.Pos() + err := w.writer.Write(p) + return int(w.writer.Pos() - n), err +} + +func (w *binaryWriter) Buffer() []byte { + return w.writer.Buffer() } func (w *binaryWriter) Close() error { - return w.f.Close() + return w.writer.Close() } type postingValueOffsets struct { @@ -458,21 +534,45 @@ type BinaryReader struct { // NewBinaryReader loads or builds new index-header if not present on disk. func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int) (*BinaryReader, error) { - binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename) - br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling) - if err == nil { - return br, nil + if dir != "" { + binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename) + br, err := newFileBinaryReader(binfn, postingOffsetsInMemSampling) + if err == nil { + return br, nil + } + + level.Debug(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err) + + start := time.Now() + if _, err := WriteBinary(ctx, bkt, id, binfn); err != nil { + return nil, errors.Wrap(err, "write index header") + } + + level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start)) + return newFileBinaryReader(binfn, postingOffsetsInMemSampling) + } else { + buf, err := WriteBinary(ctx, bkt, id, "") + if err != nil { + return nil, errors.Wrap(err, "generate index header") + } + + return newMemoryBinaryReader(buf, postingOffsetsInMemSampling) } +} - level.Debug(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err) +func newMemoryBinaryReader(buf []byte, postingOffsetsInMemSampling int) (bw *BinaryReader, err error) { + r := &BinaryReader{ + b: realByteSlice(buf), + c: nil, + postings: map[string]*postingValueOffsets{}, + postingOffsetsInMemSampling: postingOffsetsInMemSampling, + } - start := time.Now() - if err := WriteBinary(ctx, bkt, id, binfn); err != nil { - return nil, errors.Wrap(err, "write index header") + if err := r.init(); err != nil { + return nil, err } - level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start)) - return newFileBinaryReader(binfn, postingOffsetsInMemSampling) + return r, nil } func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *BinaryReader, err error) { @@ -493,12 +593,44 @@ func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *Bina postingOffsetsInMemSampling: postingOffsetsInMemSampling, } + if err := r.init(); err != nil { + return nil, err + } + + return r, nil +} + +// newBinaryTOCFromByteSlice return parsed TOC from given index header byte slice. +func newBinaryTOCFromByteSlice(bs index.ByteSlice) (*BinaryTOC, error) { + if bs.Len() < binaryTOCLen { + return nil, encoding.ErrInvalidSize + } + b := bs.Range(bs.Len()-binaryTOCLen, bs.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := encoding.Decbuf{B: b[:len(b)-4]} + + if d.Crc32(castagnoliTable) != expCRC { + return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read index header TOC") + } + + if err := d.Err(); err != nil { + return nil, err + } + + return &BinaryTOC{ + Symbols: d.Be64(), + PostingsOffsetTable: d.Be64(), + }, nil +} + +func (r *BinaryReader) init() (err error) { // Verify header. if r.b.Len() < headerLen { - return nil, errors.Wrap(encoding.ErrInvalidSize, "index header's header") + return errors.Wrap(encoding.ErrInvalidSize, "index header's header") } if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { - return nil, errors.Errorf("invalid magic number %x", m) + return errors.Errorf("invalid magic number %x", m) } r.version = int(r.b.Range(4, 5)[0]) r.indexVersion = int(r.b.Range(5, 6)[0]) @@ -506,18 +638,18 @@ func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *Bina r.indexLastPostingEnd = int64(binary.BigEndian.Uint64(r.b.Range(6, headerLen))) if r.version != BinaryFormatV1 { - return nil, errors.Errorf("unknown index header file version %d", r.version) + return errors.Errorf("unknown index header file version %d", r.version) } r.toc, err = newBinaryTOCFromByteSlice(r.b) if err != nil { - return nil, errors.Wrap(err, "read index header TOC") + return errors.Wrap(err, "read index header TOC") } // TODO(bwplotka): Consider contributing to Prometheus to allow specifying custom number for symbolsFactor. r.symbols, err = index.NewSymbols(r.b, r.indexVersion, int(r.toc.Symbols)) if err != nil { - return nil, errors.Wrap(err, "read symbols") + return errors.Wrap(err, "read symbols") } var lastName, lastValue []byte @@ -543,7 +675,7 @@ func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *Bina prevRng = index.Range{Start: int64(postingsOffset + postingLengthFieldSize)} return nil }); err != nil { - return nil, errors.Wrap(err, "read postings table") + return errors.Wrap(err, "read postings table") } if string(lastName) != "" { prevRng.End = r.indexLastPostingEnd - crc32.Size @@ -562,7 +694,7 @@ func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *Bina if lastName != nil { // Always include last value for each label name, unless it was just added in previous iteration based // on valueCount. - if (valueCount-1)%postingOffsetsInMemSampling != 0 { + if (valueCount-1)%r.postingOffsetsInMemSampling != 0 { r.postings[string(lastName)].offsets = append(r.postings[string(lastName)].offsets, postingOffset{value: string(lastValue), tableOff: lastTableOff}) } r.postings[string(lastName)].lastValOffset = int64(postingsOffset - crc32.Size) @@ -577,16 +709,16 @@ func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *Bina lastTableOff = labelOffset valueCount++ - if (valueCount-1)%postingOffsetsInMemSampling == 0 { + if (valueCount-1)%r.postingOffsetsInMemSampling == 0 { r.postings[string(name)].offsets = append(r.postings[string(name)].offsets, postingOffset{value: string(value), tableOff: labelOffset}) } return nil }); err != nil { - return nil, errors.Wrap(err, "read postings table") + return errors.Wrap(err, "read postings table") } if lastName != nil { - if (valueCount-1)%postingOffsetsInMemSampling != 0 { + if (valueCount-1)%r.postingOffsetsInMemSampling != 0 { // Always include last value for each label name if not included already based on valueCount. r.postings[string(lastName)].offsets = append(r.postings[string(lastName)].offsets, postingOffset{value: string(lastValue), tableOff: lastTableOff}) } @@ -609,38 +741,14 @@ func newFileBinaryReader(path string, postingOffsetsInMemSampling int) (bw *Bina } off, err := r.symbols.ReverseLookup(k) if err != nil { - return nil, errors.Wrap(err, "reverse symbol lookup") + return errors.Wrap(err, "reverse symbol lookup") } r.nameSymbols[off] = k } r.dec = &index.Decoder{LookupSymbol: r.LookupSymbol} - return r, nil -} - -// newBinaryTOCFromByteSlice return parsed TOC from given index header byte slice. -func newBinaryTOCFromByteSlice(bs index.ByteSlice) (*BinaryTOC, error) { - if bs.Len() < binaryTOCLen { - return nil, encoding.ErrInvalidSize - } - b := bs.Range(bs.Len()-binaryTOCLen, bs.Len()) - - expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) - d := encoding.Decbuf{B: b[:len(b)-4]} - - if d.Crc32(castagnoliTable) != expCRC { - return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read index header TOC") - } - - if err := d.Err(); err != nil { - return nil, err - } - - return &BinaryTOC{ - Symbols: d.Be64(), - PostingsOffsetTable: d.Be64(), - }, nil + return nil } func (r *BinaryReader) IndexVersion() (int, error) { @@ -910,7 +1018,12 @@ func (r *BinaryReader) LabelNames() ([]string, error) { return labelNames, nil } -func (r *BinaryReader) Close() error { return r.c.Close() } +func (r *BinaryReader) Close() error { + if r.c == nil { + return nil + } + return r.c.Close() +} type realByteSlice []byte diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 3f9605296e..d0d7eb5f7d 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -97,7 +97,8 @@ func TestReaders(t *testing.T) { t.Run("binary reader", func(t *testing.T) { fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) - testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) + _, err := WriteBinary(ctx, bkt, id, fn) + testutil.Ok(t, err) br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3) testutil.Ok(t, err) @@ -170,7 +171,8 @@ func TestReaders(t *testing.T) { t.Run("lazy binary reader", func(t *testing.T) { fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) - testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) + _, err := WriteBinary(ctx, bkt, id, fn) + testutil.Ok(t, err) br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), nil) testutil.Ok(t, err) @@ -340,7 +342,8 @@ func BenchmarkBinaryWrite(t *testing.B) { t.ResetTimer() for i := 0; i < t.N; i++ { - testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) + _, err := WriteBinary(ctx, bkt, m.ULID, fn) + testutil.Ok(t, err) } } @@ -353,7 +356,8 @@ func BenchmarkBinaryReader(t *testing.B) { m := prepareIndexV2Block(t, tmpDir, bkt) fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) - testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) + _, err = WriteBinary(ctx, bkt, m.ULID, fn) + testutil.Ok(t, err) t.ResetTimer() for i := 0; i < t.N; i++ { diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 13bf476812..c3bee382c2 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -71,7 +71,6 @@ type LazyBinaryReader struct { logger log.Logger bkt objstore.BucketReader dir string - filepath string id ulid.ULID postingOffsetsInMemSampling int metrics *LazyBinaryReaderMetrics @@ -99,22 +98,23 @@ func NewLazyBinaryReader( metrics *LazyBinaryReaderMetrics, onClosed func(*LazyBinaryReader), ) (*LazyBinaryReader, error) { - filepath := filepath.Join(dir, id.String(), block.IndexHeaderFilename) - - // If the index-header doesn't exist we should download it. - if _, err := os.Stat(filepath); err != nil { - if !os.IsNotExist(err) { - return nil, errors.Wrap(err, "read index header") + if dir != "" { + indexHeaderFile := filepath.Join(dir, id.String(), block.IndexHeaderFilename) + // If the index-header doesn't exist we should download it. + if _, err := os.Stat(indexHeaderFile); err != nil { + if !os.IsNotExist(err) { + return nil, errors.Wrap(err, "read index header") + } + + level.Debug(logger).Log("msg", "the index-header doesn't exist on disk; recreating", "path", indexHeaderFile) + + start := time.Now() + if _, err := WriteBinary(ctx, bkt, id, indexHeaderFile); err != nil { + return nil, errors.Wrap(err, "write index header") + } + + level.Debug(logger).Log("msg", "built index-header file", "path", indexHeaderFile, "elapsed", time.Since(start)) } - - level.Debug(logger).Log("msg", "the index-header doesn't exist on disk; recreating", "path", filepath) - - start := time.Now() - if err := WriteBinary(ctx, bkt, id, filepath); err != nil { - return nil, errors.Wrap(err, "write index header") - } - - level.Debug(logger).Log("msg", "built index-header file", "path", filepath, "elapsed", time.Since(start)) } return &LazyBinaryReader{ @@ -122,7 +122,6 @@ func NewLazyBinaryReader( logger: logger, bkt: bkt, dir: dir, - filepath: filepath, id: id, postingOffsetsInMemSampling: postingOffsetsInMemSampling, metrics: metrics, @@ -241,7 +240,7 @@ func (r *LazyBinaryReader) load() (returnErr error) { return r.readerErr } - level.Debug(r.logger).Log("msg", "lazy loading index-header file", "path", r.filepath) + level.Debug(r.logger).Log("msg", "lazy loading index-header", "block", r.id) r.metrics.loadCount.Inc() startTime := time.Now() @@ -249,11 +248,11 @@ func (r *LazyBinaryReader) load() (returnErr error) { if err != nil { r.metrics.loadFailedCount.Inc() r.readerErr = err - return errors.Wrapf(err, "lazy load index-header file at %s", r.filepath) + return errors.Wrapf(err, "lazy load index-header for block %s", r.id) } r.reader = reader - level.Debug(r.logger).Log("msg", "lazy loaded index-header file", "path", r.filepath, "elapsed", time.Since(startTime)) + level.Debug(r.logger).Log("msg", "lazy loaded index-header", "block", r.id, "elapsed", time.Since(startTime)) r.metrics.loadDuration.Observe(time.Since(startTime).Seconds()) return nil diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b1fbd898a2..ad37f3254f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -495,6 +495,10 @@ func NewBucketStore( return nil, errors.Wrap(err, "validate config") } + if dir == "" { + return s, nil + } + if err := os.MkdirAll(dir, 0750); err != nil { return nil, errors.Wrap(err, "create dir") } @@ -591,6 +595,10 @@ func (s *BucketStore) InitialSync(ctx context.Context) error { return errors.Wrap(err, "sync block") } + if s.dir == "" { + return nil + } + fis, err := os.ReadDir(s.dir) if err != nil { return errors.Wrap(err, "read dir") @@ -624,15 +632,20 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { } func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err error) { - dir := filepath.Join(s.dir, meta.ULID.String()) + var dir string + if s.dir != "" { + dir = filepath.Join(s.dir, meta.ULID.String()) + } start := time.Now() level.Debug(s.logger).Log("msg", "loading new block", "id", meta.ULID) defer func() { if err != nil { s.metrics.blockLoadFailures.Inc() - if err2 := os.RemoveAll(dir); err2 != nil { - level.Warn(s.logger).Log("msg", "failed to remove block we cannot load", "err", err2) + if dir != "" { + if err2 := os.RemoveAll(dir); err2 != nil { + level.Warn(s.logger).Log("msg", "failed to remove block we cannot load", "err", err2) + } } level.Warn(s.logger).Log("msg", "loading block failed", "elapsed", time.Since(start), "id", meta.ULID, "err", err) } else { @@ -721,6 +734,11 @@ func (s *BucketStore) removeBlock(id ulid.ULID) error { if err := b.Close(); err != nil { return errors.Wrap(err, "close block") } + + if b.dir == "" { + return nil + } + return os.RemoveAll(b.dir) } diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 9a6aaa0b3f..0b16f58bbd 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -279,6 +279,230 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached")) // TODO(khyati) Let's add some case for compaction-meta.json once the PR will be merged: https://github.com/thanos-io/thanos/pull/2136. } +// Test store with `--no-cache-index-header` flag. +func TestStoreGatewayNoCacheFile(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("store-no-cache") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + const bucket = "store-no-cache-test" + m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + s1 := e2ethanos.NewStoreGW( + e, + "1", + client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), + }, + "", + []string{"--no-cache-index-header"}, + relabel.Config{ + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("value2"), + SourceLabels: model.LabelNames{"ext1"}, + }, + ) + testutil.Ok(t, e2e.StartAndWaitReady(s1)) + + q := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc")).WithEnabledFeatures([]string{"promql-negative-offset", "promql-at-modifier"}).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm)) + + series := []labels.Labels{labels.FromStrings("a", "1", "b", "2")} + extLset := labels.FromStrings("ext1", "value1", "replica", "1") + extLset2 := labels.FromStrings("ext1", "value1", "replica", "2") + extLset3 := labels.FromStrings("ext1", "value2", "replica", "3") + extLset4 := labels.FromStrings("ext1", "value1", "replica", "3") + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + t.Cleanup(cancel) + + now := time.Now() + id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + id2, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset2, 0, metadata.NoneFunc) + testutil.Ok(t, err) + id3, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset3, 0, metadata.NoneFunc) + testutil.Ok(t, err) + id4, err := e2eutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0, metadata.NoneFunc) + testutil.Ok(t, err) + l := log.NewLogfmtLogger(os.Stdout) + bkt, err := s3.NewBucketWithConfig(l, + e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test-feed") + testutil.Ok(t, err) + + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id2.String()), id2.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id3.String()), id3.String())) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id4.String()), id4.String())) + + // Wait for store to sync blocks. + // thanos_blocks_meta_synced: 2x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) + + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_drops_total")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) + + t.Run("query works", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return fmt.Sprintf("%s @ end()", testQuery) }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "1", + }, + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "2", + }, + }, + ) + + // 2 x postings, 2 x series, 2x chunks. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_bucket_store_series_data_touched")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(6), "thanos_bucket_store_series_data_fetched")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_series_blocks_queried")) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + }, + }, + ) + + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(12), "thanos_bucket_store_series_data_touched")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(8), "thanos_bucket_store_series_data_fetched")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2+2), "thanos_bucket_store_series_blocks_queried")) + }) + t.Run("remove meta.json from id1 block", func(t *testing.T) { + testutil.Ok(t, bkt.Delete(ctx, filepath.Join(id1.String(), block.MetaFilename))) + + // Wait for store to sync blocks. + // thanos_blocks_meta_synced: 1x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) + + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_block_drops_total")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) + + // TODO(bwplotka): Entries are still in LRU cache. + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "2", + }, + }, + ) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4+1), "thanos_bucket_store_series_blocks_queried")) + }) + t.Run("upload block id5, similar to id1", func(t *testing.T) { + id5, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset4, 0, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id5.String()), id5.String())) + + // Wait for store to sync blocks. + // thanos_blocks_meta_synced: 2x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) + + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(2), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_block_drops_total")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "2", + }, + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "3", // New block. + }, + }, + ) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(5+2), "thanos_bucket_store_series_blocks_queried")) + }) + t.Run("delete whole id2 block #yolo", func(t *testing.T) { + testutil.Ok(t, block.Delete(ctx, l, bkt, id2)) + + // Wait for store to sync blocks. + // thanos_blocks_meta_synced: 1x loadedMeta 1x labelExcludedMeta 1x TooFreshMeta 1x noMeta. + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_blocks_meta_sync_failures_total")) + + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_bucket_store_blocks_loaded")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1+1), "thanos_bucket_store_block_drops_total")) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(0), "thanos_bucket_store_block_load_failures_total")) + + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return testQuery }, + time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "3", + }, + }, + ) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(7+1), "thanos_bucket_store_series_blocks_queried")) + }) + + t.Run("negative offset should work", func(t *testing.T) { + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { return "{a=\"1\"} offset -4h" }, + func() time.Time { return time.Now().Add(-4 * time.Hour) }, promclient.QueryOptions{ + Deduplicate: false, + }, + []model.Metric{ + { + "a": "1", + "b": "2", + "ext1": "value1", + "replica": "3", + }, + }, + ) + testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(7+2), "thanos_bucket_store_series_blocks_queried")) + }) +} + func TestStoreGatewayMemcachedCache(t *testing.T) { t.Parallel()