diff --git a/docs/components/store.md b/docs/components/store.md index f071e9475e..2cc130d9e9 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -221,3 +221,62 @@ While the remaining settings are **optional**: - `max_get_multi_concurrency`: maximum number of concurrent connections when fetching keys. If set to `0`, the concurrency is unlimited. - `max_get_multi_batch_size`: maximum number of keys a single underlying operation should fetch. If more keys are specified, internally keys are splitted into multiple batches and fetched concurrently, honoring `max_get_multi_concurrency`. If set to `0`, the batch size is unlimited. - `dns_provider_update_interval`: the DNS discovery update interval. + + +## Index Header + +In order to query series inside blocks from object storage, Store Gateway has to know certain initial info about each block such as: + +* symbols table to unintern string values +* postings offset for posting lookup + +In order to achieve so, on startup for each block `index-header` is built from pieces of original block's index and stored on disk. +Such `index-header` file is then mmaped and used by Store Gateway. + +### Format (version 1) + +The following describes the format of the `index-header` file found in each block store gateway local directory. +It is terminated by a table of contents which serves as an entry point into the index. + +``` +┌─────────────────────────────┬───────────────────────────────┐ +│ magic(0xBAAAD792) <4b> │ version(1) <1 byte> │ +├─────────────────────────────┬───────────────────────────────┤ +│ index version(2) <1 byte> │ index PostingOffsetTable <8b> │ +├─────────────────────────────┴───────────────────────────────┤ +│ ┌─────────────────────────────────────────────────────────┐ │ +│ │ Symbol Table (exact copy from original index) │ │ +│ ├─────────────────────────────────────────────────────────┤ │ +│ │ Posting Offset Table (exact copy from index) │ │ +│ ├─────────────────────────────────────────────────────────┤ │ +│ │ TOC │ │ +│ └─────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +When the index is written, an arbitrary number of padding bytes may be added between the lined out main sections above. When sequentially scanning through the file, any zero bytes after a section's specified length must be skipped. + +Most of the sections described below start with a `len` field. It always specifies the number of bytes just before the trailing CRC32 checksum. The checksum is always calculated over those `len` bytes. + +### Symbol Table + +See [Symbols](https://github.com/prometheus/prometheus/blob/d782387f814753b0118d402ec8cdbdef01bf9079/tsdb/docs/format/index.md#symbol-table) + +### Postings Offset Table + +See [Posting Offset Table](https://github.com/prometheus/prometheus/blob/d782387f814753b0118d402ec8cdbdef01bf9079/tsdb/docs/format/index.md#postings-offset-table) + +### TOC + +The table of contents serves as an entry point to the entire index and points to various sections in the file. +If a reference is zero, it indicates the respective section does not exist and empty results should be returned upon lookup. + +``` +┌─────────────────────────────────────────┐ +│ ref(symbols) <8b> │ +├─────────────────────────────────────────┤ +│ ref(postings offset table) <8b> │ +├─────────────────────────────────────────┤ +│ CRC32 <4b> │ +└─────────────────────────────────────────┘ +``` diff --git a/go.mod b/go.mod index aa141722b1..11cadaff65 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 github.com/prometheus/common v0.7.0 github.com/prometheus/procfs v0.0.6 // indirect - github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef // master ~ v2.15.2 + github.com/prometheus/prometheus v1.8.2-0.20200110114423-1e64d757f711 // master ~ v2.15.2 github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect github.com/satori/go.uuid v1.2.0 // indirect github.com/smartystreets/assertions v1.0.1 // indirect diff --git a/go.sum b/go.sum index 640a65ba00..bb7f998443 100644 --- a/go.sum +++ b/go.sum @@ -446,8 +446,8 @@ github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDa github.com/prometheus/procfs v0.0.6 h1:0qbH+Yqu/cj1ViVLvEWCP6qMQ4efWUj6bQqOEA0V0U4= github.com/prometheus/procfs v0.0.6/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/prometheus v0.0.0-20180315085919-58e2a31db8de/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s= -github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef h1:pYYKXo/zGx25kyViw+Gdbxd0ItIg+vkVKpwgWUEyIc4= -github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef/go.mod h1:7U90zPoLkWjEIQcy/rweQla82OCTUzxVHE51G3OhJbI= +github.com/prometheus/prometheus v1.8.2-0.20200110114423-1e64d757f711 h1:uEq+8hKI4kfycPLSKNw844YYkdMNpC2eZpov73AvlFk= +github.com/prometheus/prometheus v1.8.2-0.20200110114423-1e64d757f711/go.mod h1:7U90zPoLkWjEIQcy/rweQla82OCTUzxVHE51G3OhJbI= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= diff --git a/pkg/block/block.go b/pkg/block/block.go index 6fb43505cc..147a82f068 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -27,8 +27,10 @@ const ( MetaFilename = "meta.json" // IndexFilename is the known index file for block index. IndexFilename = "index" - // IndexCacheFilename is the canonical name for index cache file that stores essential information needed. + // IndexCacheFilename is the canonical name for json index cache file that stores essential information. IndexCacheFilename = "index.cache.json" + // IndexHeaderFilename is the canonical name for binary index header file that stores essential information. + IndexHeaderFilename = "index-header" // ChunksDirname is the known dir name for chunks with compressed samples. ChunksDirname = "chunks" diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 18f71c982e..35d22871f5 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -2,7 +2,6 @@ package block import ( "context" - "io" "io/ioutil" "os" "path" @@ -12,7 +11,6 @@ import ( "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" - "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/objstore/inmem" "github.com/thanos-io/thanos/pkg/testutil" @@ -104,7 +102,7 @@ func TestUpload(t *testing.T) { testutil.NotOk(t, err) testutil.Assert(t, strings.HasSuffix(err.Error(), "/meta.json: no such file or directory"), "") } - testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename))) + testutil.Copy(t, path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename)) { // Missing chunks. err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) @@ -115,7 +113,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 1, len(bkt.Objects())) } testutil.Ok(t, os.MkdirAll(path.Join(tmpDir, "test", b1.String(), ChunksDirname), os.ModePerm)) - testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), ChunksDirname, "000001"), path.Join(tmpDir, "test", b1.String(), ChunksDirname, "000001"))) + testutil.Copy(t, path.Join(tmpDir, b1.String(), ChunksDirname, "000001"), path.Join(tmpDir, "test", b1.String(), ChunksDirname, "000001")) { // Missing index file. err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) @@ -125,7 +123,7 @@ func TestUpload(t *testing.T) { // Only debug meta.json present. testutil.Equals(t, 1, len(bkt.Objects())) } - testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), IndexFilename), path.Join(tmpDir, "test", b1.String(), IndexFilename))) + testutil.Copy(t, path.Join(tmpDir, b1.String(), IndexFilename), path.Join(tmpDir, "test", b1.String(), IndexFilename)) testutil.Ok(t, os.Remove(path.Join(tmpDir, "test", b1.String(), MetaFilename))) { // Missing meta.json file. @@ -136,7 +134,7 @@ func TestUpload(t *testing.T) { // Only debug meta.json present. testutil.Equals(t, 1, len(bkt.Objects())) } - testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename))) + testutil.Copy(t, path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename)) { // Full block. testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))) @@ -170,31 +168,6 @@ func TestUpload(t *testing.T) { } } -func cpy(src, dst string) error { - sourceFileStat, err := os.Stat(src) - if err != nil { - return err - } - - if !sourceFileStat.Mode().IsRegular() { - return errors.Errorf("%s is not a regular file", src) - } - - source, err := os.Open(src) - if err != nil { - return err - } - defer source.Close() - - destination, err := os.Create(dst) - if err != nil { - return err - } - defer destination.Close() - _, err = io.Copy(destination, source) - return err -} - func TestDelete(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go new file mode 100644 index 0000000000..bcb10380ae --- /dev/null +++ b/pkg/block/indexheader/binary_reader.go @@ -0,0 +1,802 @@ +package indexheader + +import ( + "bufio" + "context" + "encoding/binary" + "hash" + "hash/crc32" + "io" + "io/ioutil" + "math" + "os" + "path/filepath" + "sort" + "unsafe" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb/encoding" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" +) + +const ( + // BinaryFormatV1 represents first version of index-header file. + BinaryFormatV1 = 1 + + indexTOCLen = 6*8 + crc32.Size + binaryTOCLen = 2*8 + crc32.Size + // headerLen represents number of bytes reserved of index header for header. + headerLen = 4 + 1 + 1 + 8 + + // MagicIndex are 4 bytes at the head of an index-header file. + MagicIndex = 0xBAAAD792 + + symbolFactor = 32 +) + +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable *crc32.Table + +func init() { + castagnoliTable = crc32.MakeTable(crc32.Castagnoli) +} + +// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the +// polynomial may be easily changed in one location at a later time, if necessary. +func newCRC32() hash.Hash32 { + return crc32.New(castagnoliTable) +} + +// BinaryTOC is a table of content for index-header file. +type BinaryTOC struct { + // Symbols holds start to the same symbols section as index related to this index header. + Symbols uint64 + // PostingsTable holds start to the the same Postings Offset Table section as index related to this index header. + PostingsOffsetTable uint64 +} + +// WriteBinary build index-header file from the pieces of index in object storage. +func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, fn string) (err error) { + ir, indexVersion, err := newChunkedIndexReader(ctx, bkt, id) + if err != nil { + return errors.Wrap(err, "new index reader") + } + + // Buffer for copying and encbuffers. + // This also will control the size of file writer buffer. + buf := make([]byte, 32*1024) + bw, err := newBinaryWriter(fn, buf) + if err != nil { + return errors.Wrap(err, "new binary index header writer") + } + defer runutil.CloseWithErrCapture(&err, bw, "close binary writer for %s", fn) + + if err := bw.AddIndexMeta(indexVersion, ir.toc.PostingsTable); err != nil { + return errors.Wrap(err, "add index meta") + } + + if err := ir.CopySymbols(bw.SymbolsWriter(), buf); err != nil { + return err + } + + if err := bw.f.Flush(); err != nil { + return errors.Wrap(err, "flush") + } + + if err := ir.CopyPostingsOffsets(bw.PostingOffsetsWriter(), buf); err != nil { + return err + } + + if err := bw.f.Flush(); err != nil { + return errors.Wrap(err, "flush") + } + + if err := bw.WriteTOC(); err != nil { + return errors.Wrap(err, "write index header TOC") + } + return nil +} + +type chunkedIndexReader struct { + ctx context.Context + path string + size uint64 + bkt objstore.BucketReader + toc *index.TOC +} + +func newChunkedIndexReader(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID) (*chunkedIndexReader, int, error) { + indexFilepath := filepath.Join(id.String(), block.IndexFilename) + size, err := bkt.ObjectSize(ctx, indexFilepath) + if err != nil { + return nil, 0, errors.Wrapf(err, "get object size of %s", indexFilepath) + } + + rc, err := bkt.GetRange(ctx, indexFilepath, 0, index.HeaderLen) + if err != nil { + return nil, 0, errors.Wrapf(err, "get TOC from object storage of %s", indexFilepath) + } + + b, err := ioutil.ReadAll(rc) + if err != nil { + runutil.CloseWithErrCapture(&err, rc, "close reader") + return nil, 0, errors.Wrapf(err, "get header from object storage of %s", indexFilepath) + } + + if err := rc.Close(); err != nil { + return nil, 0, errors.Wrap(err, "close reader") + } + + if m := binary.BigEndian.Uint32(b[0:4]); m != index.MagicIndex { + return nil, 0, errors.Errorf("invalid magic number %x for %s", m, indexFilepath) + } + + version := int(b[4:5][0]) + + if version != index.FormatV1 && version != index.FormatV2 { + return nil, 0, errors.Errorf("not supported index file version %d of %s", version, indexFilepath) + } + + ir := &chunkedIndexReader{ + ctx: ctx, + path: indexFilepath, + size: size, + bkt: bkt, + } + + toc, err := ir.readTOC() + if err != nil { + return nil, 0, err + } + ir.toc = toc + + return ir, version, nil +} + +func (r *chunkedIndexReader) readTOC() (*index.TOC, error) { + rc, err := r.bkt.GetRange(r.ctx, r.path, int64(r.size-indexTOCLen-crc32.Size), indexTOCLen+crc32.Size) + if err != nil { + return nil, errors.Wrapf(err, "get TOC from object storage of %s", r.path) + } + + tocBytes, err := ioutil.ReadAll(rc) + if err != nil { + runutil.CloseWithErrCapture(&err, rc, "close toc reader") + return nil, errors.Wrapf(err, "get TOC from object storage of %s", r.path) + } + + if err := rc.Close(); err != nil { + return nil, errors.Wrap(err, "close toc reader") + } + + toc, err := index.NewTOCFromByteSlice(realByteSlice(tocBytes)) + if err != nil { + return nil, errors.Wrap(err, "new TOC") + } + return toc, nil +} + +func (r *chunkedIndexReader) CopySymbols(w io.Writer, buf []byte) (err error) { + rc, err := r.bkt.GetRange(r.ctx, r.path, int64(r.toc.Symbols), int64(r.toc.Series-r.toc.Symbols)) + if err != nil { + return errors.Wrapf(err, "get symbols from object storage of %s", r.path) + } + defer runutil.CloseWithErrCapture(&err, rc, "close symbol reader") + + if _, err := io.CopyBuffer(w, rc, buf); err != nil { + return errors.Wrap(err, "copy symbols") + } + + return nil +} + +func (r *chunkedIndexReader) CopyPostingsOffsets(w io.Writer, buf []byte) (err error) { + rc, err := r.bkt.GetRange(r.ctx, r.path, int64(r.toc.PostingsTable), int64(r.size-r.toc.PostingsTable)) + if err != nil { + return errors.Wrapf(err, "get posting offset table from object storage of %s", r.path) + } + defer runutil.CloseWithErrCapture(&err, rc, "close posting offsets reader") + + if _, err := io.CopyBuffer(w, rc, buf); err != nil { + return errors.Wrap(err, "copy posting offsets") + } + + return nil +} + +type binaryWriter struct { + f *FileWriter + + toc BinaryTOC + + // Reusable memory. + buf encoding.Encbuf + + crc32 hash.Hash +} + +func newBinaryWriter(fn string, buf []byte) (w *binaryWriter, err error) { + dir := filepath.Dir(fn) + + df, err := fileutil.OpenDir(dir) + if err != nil { + return nil, err + } + defer runutil.CloseWithErrCapture(&err, df, "dir close") + + if err := os.RemoveAll(fn); err != nil { + return nil, errors.Wrap(err, "remove any existing index at path") + } + + // We use file writer for buffers not larger than reused one. + f, err := NewFileWriter(fn, len(buf)) + if err != nil { + return nil, err + } + if err := df.Sync(); err != nil { + return nil, errors.Wrap(err, "sync dir") + } + + w = &binaryWriter{ + f: f, + + // Reusable memory. + buf: encoding.Encbuf{B: buf}, + crc32: newCRC32(), + } + + w.buf.Reset() + w.buf.PutBE32(MagicIndex) + w.buf.PutByte(BinaryFormatV1) + + return w, w.f.Write(w.buf.Get()) +} + +type FileWriter struct { + f *os.File + fbuf *bufio.Writer + pos uint64 + name string +} + +// TODO(bwplotka): Added size to method, upstream this. +func NewFileWriter(name string, size int) (*FileWriter, error) { + f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + return &FileWriter{ + f: f, + fbuf: bufio.NewWriterSize(f, size), + pos: 0, + name: name, + }, nil +} + +func (fw *FileWriter) Pos() uint64 { + return fw.pos +} + +func (fw *FileWriter) Write(bufs ...[]byte) error { + for _, b := range bufs { + n, err := fw.fbuf.Write(b) + fw.pos += uint64(n) + if err != nil { + return err + } + // For now the index file must not grow beyond 64GiB. Some of the fixed-sized + // offset references in v1 are only 4 bytes large. + // Once we move to compressed/varint representations in those areas, this limitation + // can be lifted. + if fw.pos > 16*math.MaxUint32 { + return errors.Errorf("%q exceeding max size of 64GiB", fw.name) + } + } + return nil +} + +func (fw *FileWriter) Flush() error { + return fw.fbuf.Flush() +} + +func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error { + if err := fw.Flush(); err != nil { + return err + } + _, err := fw.f.WriteAt(buf, int64(pos)) + return err +} + +// AddPadding adds zero byte padding until the file size is a multiple size. +func (fw *FileWriter) AddPadding(size int) error { + p := fw.pos % uint64(size) + if p == 0 { + return nil + } + p = uint64(size) - p + + if err := fw.Write(make([]byte, p)); err != nil { + return errors.Wrap(err, "add padding") + } + return nil +} + +func (fw *FileWriter) Close() error { + if err := fw.Flush(); err != nil { + return err + } + if err := fw.f.Sync(); err != nil { + return err + } + return fw.f.Close() +} + +func (fw *FileWriter) Remove() error { + return os.Remove(fw.name) +} + +func (w *binaryWriter) AddIndexMeta(indexVersion int, indexPostingOffsetTable uint64) error { + w.buf.Reset() + w.buf.PutByte(byte(indexVersion)) + w.buf.PutBE64(indexPostingOffsetTable) + return w.f.Write(w.buf.Get()) +} + +func (w *binaryWriter) SymbolsWriter() io.Writer { + w.toc.Symbols = w.f.Pos() + return w +} + +func (w *binaryWriter) PostingOffsetsWriter() io.Writer { + w.toc.PostingsOffsetTable = w.f.Pos() + return w +} + +func (w *binaryWriter) WriteTOC() error { + w.buf.Reset() + + w.buf.PutBE64(w.toc.Symbols) + w.buf.PutBE64(w.toc.PostingsOffsetTable) + + w.buf.PutHash(w.crc32) + + return w.f.Write(w.buf.Get()) +} + +func (w *binaryWriter) Write(p []byte) (int, error) { + n := w.f.Pos() + err := w.f.Write(p) + return int(w.f.Pos() - n), err +} + +func (w *binaryWriter) Close() error { + return w.f.Close() +} + +type postingOffset struct { + // label value. + value string + // offset of this entry in posting offset table in index-header file. + tableOff int +} + +type BinaryReader struct { + b index.ByteSlice + toc *BinaryTOC + + // Close that releases the underlying resources of the byte slice. + c io.Closer + + // Map of LabelName to a list of some LabelValues's position in the offset table. + // The first and last values for each name are always present. + postings map[string][]postingOffset + // For the v1 format, labelname -> labelvalue -> offset. + postingsV1 map[string]map[string]index.Range + + symbols *index.Symbols + nameSymbols map[uint32]string // Cache of the label name symbol lookups, + // as there are not many and they are half of all lookups. + + dec *index.Decoder + + version int + indexVersion int + indexLastPostingEnd int64 +} + +// NewBinaryReader loads or builds new index-header if not present on disk. +func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*BinaryReader, error) { + binfn := filepath.Join(dir, id.String(), block.IndexHeaderFilename) + br, err := newFileBinaryReader(binfn) + if err == nil { + return br, nil + } + + level.Warn(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err) + + if err := WriteBinary(ctx, bkt, id, binfn); err != nil { + return nil, errors.Wrap(err, "write index header") + } + + level.Debug(logger).Log("msg", "build index-header file", "path", binfn, "err", err) + + return newFileBinaryReader(binfn) +} + +func newFileBinaryReader(path string) (bw *BinaryReader, err error) { + f, err := fileutil.OpenMmapFile(path) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + runutil.CloseWithErrCapture(&err, f, "index header close") + } + }() + + r := &BinaryReader{ + b: realByteSlice(f.Bytes()), + c: f, + postings: map[string][]postingOffset{}, + } + + // Verify header. + if r.b.Len() < headerLen { + return nil, errors.Wrap(encoding.ErrInvalidSize, "index header's header") + } + if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex { + return nil, errors.Errorf("invalid magic number %x", m) + } + r.version = int(r.b.Range(4, 5)[0]) + r.indexVersion = int(r.b.Range(5, 6)[0]) + + r.indexLastPostingEnd = int64(binary.BigEndian.Uint64(r.b.Range(6, headerLen))) + + if r.version != BinaryFormatV1 { + return nil, errors.Errorf("unknown index header file version %d", r.version) + } + + r.toc, err = newBinaryTOCFromByteSlice(r.b) + if err != nil { + return nil, errors.Wrap(err, "read index header TOC") + } + + r.symbols, err = index.NewSymbols(r.b, r.indexVersion, int(r.toc.Symbols)) + if err != nil { + return nil, errors.Wrap(err, "read symbols") + } + + var lastKey []string + if r.indexVersion == index.FormatV1 { + // Earlier V1 formats don't have a sorted postings offset table, so + // load the whole offset table into memory. + r.postingsV1 = map[string]map[string]index.Range{} + + var prevRng index.Range + if err := index.ReadOffsetTable(r.b, r.toc.PostingsOffsetTable, func(key []string, off uint64, _ int) error { + if len(key) != 2 { + return errors.Errorf("unexpected key length for posting table %d", len(key)) + } + // TODO(bwplotka): This is wrong, probably we have to sort. + if lastKey != nil { + prevRng.End = int64(off + 4) + r.postingsV1[lastKey[0]][lastKey[1]] = prevRng + } + + if _, ok := r.postingsV1[key[0]]; !ok { + r.postingsV1[key[0]] = map[string]index.Range{} + r.postings[key[0]] = nil // Used to get a list of labelnames in places. + } + + lastKey = key + prevRng = index.Range{Start: int64(off + 4)} + return nil + }); err != nil { + return nil, errors.Wrap(err, "read postings table") + } + if lastKey != nil { + prevRng.End = r.indexLastPostingEnd + 4 + r.postingsV1[lastKey[0]][lastKey[1]] = prevRng + } + } else { + lastTableOff := 0 + valueCount := 0 + + // For the postings offset table we keep every label name but only every nth + // label value (plus the first and last one), to save memory. + if err := index.ReadOffsetTable(r.b, r.toc.PostingsOffsetTable, func(key []string, off uint64, tableOff int) error { + if len(key) != 2 { + return errors.Errorf("unexpected key length for posting table %d", len(key)) + } + + if _, ok := r.postings[key[0]]; !ok { + // Next label name. + r.postings[key[0]] = []postingOffset{} + if lastKey != nil { + // Always include last value for each label name. + r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], tableOff: lastTableOff}) + } + valueCount = 0 + } + + if valueCount%symbolFactor == 0 { + r.postings[key[0]] = append(r.postings[key[0]], postingOffset{value: key[1], tableOff: tableOff}) + lastKey = nil + return nil + } + lastKey = key + lastTableOff = tableOff + valueCount++ + return nil + }); err != nil { + return nil, errors.Wrap(err, "read postings table") + } + if lastKey != nil { + r.postings[lastKey[0]] = append(r.postings[lastKey[0]], postingOffset{value: lastKey[1], tableOff: lastTableOff}) + } + // Trim any extra space in the slices. + for k, v := range r.postings { + l := make([]postingOffset, len(v)) + copy(l, v) + r.postings[k] = l + } + } + + r.nameSymbols = make(map[uint32]string, len(r.postings)) + for k := range r.postings { + if k == "" { + continue + } + off, err := r.symbols.ReverseLookup(k) + if err != nil { + return nil, errors.Wrap(err, "reverse symbol lookup") + } + r.nameSymbols[off] = k + } + + r.dec = &index.Decoder{LookupSymbol: r.LookupSymbol} + + return r, nil +} + +// newBinaryTOCFromByteSlice return parsed TOC from given index header byte slice. +func newBinaryTOCFromByteSlice(bs index.ByteSlice) (*BinaryTOC, error) { + if bs.Len() < binaryTOCLen { + return nil, encoding.ErrInvalidSize + } + b := bs.Range(bs.Len()-binaryTOCLen, bs.Len()) + + expCRC := binary.BigEndian.Uint32(b[len(b)-4:]) + d := encoding.Decbuf{B: b[:len(b)-4]} + + if d.Crc32(castagnoliTable) != expCRC { + return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read index header TOC") + } + + if err := d.Err(); err != nil { + return nil, err + } + + return &BinaryTOC{ + Symbols: d.Be64(), + PostingsOffsetTable: d.Be64(), + }, nil +} + +func (r BinaryReader) IndexVersion() int { + return r.indexVersion +} + +// TODO(bwplotka): Get advantage of multi value offset fetch. +func (r BinaryReader) PostingsOffset(name string, value string) (index.Range, error) { + rngs, err := r.postingsOffset(name, value) + if err != nil { + return index.Range{}, err + } + if len(rngs) != 1 { + return index.Range{}, NotFoundRangeErr + } + return rngs[0], nil +} + +func (r BinaryReader) postingsOffset(name string, values ...string) ([]index.Range, error) { + rngs := make([]index.Range, 0, len(values)) + if r.indexVersion == index.FormatV1 { + e, ok := r.postingsV1[name] + if !ok { + return nil, nil + } + for _, v := range values { + rng, ok := e[v] + if !ok { + continue + } + rngs = append(rngs, rng) + } + return rngs, nil + } + + e, ok := r.postings[name] + if !ok { + return nil, nil + } + + if len(values) == 0 { + return nil, nil + } + + skip := 0 + valueIndex := 0 + for valueIndex < len(values) && values[valueIndex] < e[0].value { + // Discard values before the start. + valueIndex++ + } + + var tmpRngs []index.Range // The start, end offsets in the postings table in the original index file. + for valueIndex < len(values) { + value := values[valueIndex] + + i := sort.Search(len(e), func(i int) bool { return e[i].value >= value }) + if i == len(e) { + // We're past the end. + break + } + if i > 0 && e[i].value != value { + // Need to look from previous entry. + i-- + } + // Don't Crc32 the entire postings offset table, this is very slow + // so hope any issues were caught at startup. + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil) + d.Skip(e[i].tableOff) + + tmpRngs = tmpRngs[:0] + // Iterate on the offset table. + for d.Err() == nil { + if skip == 0 { + // These are always the same number of bytes, + // and it's faster to skip than parse. + skip = d.Len() + d.Uvarint() // Keycount. + d.UvarintBytes() // Label name. + skip -= d.Len() + } else { + d.Skip(skip) + } + v := d.UvarintBytes() // Label value. + postingOffset := int64(d.Uvarint64()) // Offset. + for string(v) >= value { + if string(v) == value { + // Actual posting is 4 bytes after offset, which includes length. + tmpRngs = append(tmpRngs, index.Range{Start: postingOffset + 4}) + } + valueIndex++ + if valueIndex == len(values) { + break + } + value = values[valueIndex] + } + if i+1 == len(e) { + for i := range tmpRngs { + tmpRngs[i].End = r.indexLastPostingEnd + } + rngs = append(rngs, tmpRngs...) + // Need to go to a later postings offset entry, if there is one. + break + } + + if value >= e[i+1].value || valueIndex == len(values) { + d.Skip(skip) + d.UvarintBytes() // Label value. + postingOffset := int64(d.Uvarint64()) // Offset. + for j := range tmpRngs { + // Actual posting end is 4 bytes before next offset. + tmpRngs[j].End = postingOffset - 4 + } + rngs = append(rngs, tmpRngs...) + // Need to go to a later postings offset entry, if there is one. + break + } + } + if d.Err() != nil { + return nil, errors.Wrap(d.Err(), "get postings offset entry") + } + } + + return rngs, nil +} + +func (r BinaryReader) LookupSymbol(o uint32) (string, error) { + if s, ok := r.nameSymbols[o]; ok { + return s, nil + } + + if r.indexVersion == index.FormatV1 { + // For v1 little trick is needed. Refs are actual offset inside index, not index-header. This is different + // of the header length difference between two files. + o += headerLen - index.HeaderLen + } + + return r.symbols.Lookup(o) +} + +func (r BinaryReader) LabelValues(name string) ([]string, error) { + if r.indexVersion == index.FormatV1 { + e, ok := r.postingsV1[name] + if !ok { + return nil, nil + } + values := make([]string, 0, len(e)) + for k := range e { + values = append(values, k) + } + sort.Strings(values) + return values, nil + + } + e, ok := r.postings[name] + if !ok { + return nil, nil + } + if len(e) == 0 { + return nil, nil + } + values := make([]string, 0, len(e)*symbolFactor) + + d := encoding.NewDecbufAt(r.b, int(r.toc.PostingsOffsetTable), nil) + d.Skip(e[0].tableOff) + lastVal := e[len(e)-1].value + + skip := 0 + for d.Err() == nil { + if skip == 0 { + // These are always the same number of bytes, + // and it's faster to skip than parse. + skip = d.Len() + d.Uvarint() // Keycount. + d.UvarintBytes() // Label name. + skip -= d.Len() + } else { + d.Skip(skip) + } + s := yoloString(d.UvarintBytes()) // Label value. + values = append(values, s) + if s == lastVal { + break + } + d.Uvarint64() // Offset. + } + if d.Err() != nil { + return nil, errors.Wrap(d.Err(), "get postings offset entry") + } + return values, nil +} + +func yoloString(b []byte) string { + return *((*string)(unsafe.Pointer(&b))) +} + +func (r BinaryReader) LabelNames() []string { + allPostingsKeyName, _ := index.AllPostingsKey() + labelNames := make([]string, 0, len(r.postings)) + for name := range r.postings { + if name == allPostingsKeyName { + // This is not from any metric. + continue + } + labelNames = append(labelNames, name) + } + sort.Strings(labelNames) + return labelNames +} + +func (r *BinaryReader) Close() error { return r.c.Close() } diff --git a/pkg/block/indexheader/header.go b/pkg/block/indexheader/header.go index 2f3b2fbbcc..39934d5da9 100644 --- a/pkg/block/indexheader/header.go +++ b/pkg/block/indexheader/header.go @@ -1,19 +1,35 @@ package indexheader import ( + "io" + + "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/index" ) -// NotFoundRange is a range returned by PostingsOffset when there is no posting for given name and value pairs. -// Has to be default value of index.Range. -var NotFoundRange = index.Range{} +// NotFoundRangeErr is an error returned by PostingsOffset when there is no posting for given name and value pairs. +var NotFoundRangeErr = errors.New("range not found") -// Reader is an interface allowing to read essential, minimal number of index entries from the small portion of index file called header. +// Reader is an interface allowing to read essential, minimal number of index fields from the small portion of index file called header. type Reader interface { + io.Closer + + // IndexVersion returns version of index. IndexVersion() int + + // PostingsOffset returns start and end offsets of postings for given name and value. + // end offset might be bigger than actual posting ending, but not larger then the whole index file. + // NotFoundRangeErr is returned when no index can be found for given name and value. // TODO(bwplotka): Move to PostingsOffsets(name string, value ...string) []index.Range and benchmark. - PostingsOffset(name string, value string) index.Range + PostingsOffset(name string, value string) (index.Range, error) + + // LookupSymbol returns string based on given reference. + // Error is return if the symbol can't be found. LookupSymbol(o uint32) (string, error) - LabelValues(name string) []string + + // LabelValues returns all label values for given label name or error if not found. + LabelValues(name string) ([]string, error) + + // LabelNames returns all label names. LabelNames() []string } diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 3b360d126b..bcf9702065 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -2,14 +2,24 @@ package indexheader import ( "context" + "io" "io/ioutil" "os" "path/filepath" "testing" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/objstore/filesystem" + "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -20,53 +30,346 @@ func TestReaders(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() - b, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + // Create block index version 2. + id1, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{ {{Name: "a", Value: "1"}}, {{Name: "a", Value: "2"}}, {{Name: "a", Value: "3"}}, {{Name: "a", Value: "4"}}, - {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}}, - }, 100, 0, 1000, nil, 124) + {{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124) testutil.Ok(t, err) - t.Run("JSON", func(t *testing.T) { - fn := filepath.Join(tmpDir, b.String(), "index.cache.json") - testutil.Ok(t, WriteJSON(log.NewNopLogger(), filepath.Join(tmpDir, b.String(), "index"), fn)) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, id1.String()))) - jr, err := NewJSONReader(ctx, log.NewNopLogger(), nil, tmpDir, b) - testutil.Ok(t, err) + // Copy block index version 1 for backward compatibility. + /* The block here was produced at the commit + 706602daed1487f7849990678b4ece4599745905 used in 2.0.0 with: + db, _ := Open("v1db", nil, nil, nil) + app := db.Appender() + app.Add(labels.FromStrings("foo", "bar"), 1, 2) + app.Add(labels.FromStrings("foo", "baz"), 3, 4) + app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, 4) // Not in the block. + // Make sure we've enough values for the lack of sorting of postings offsets to show up. + for i := 0; i < 100; i++ { + app.Add(labels.FromStrings("bar", strconv.FormatInt(int64(i), 10)), 0, 0) + } + app.Commit() + db.compact() + db.Close() + */ + + m, err := metadata.Read("./testdata/index_format_v1") + testutil.Ok(t, err) + testutil.Copy(t, "./testdata/index_format_v1", filepath.Join(tmpDir, m.ULID.String())) + + _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, m.ULID.String()), metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + }, &m.BlockMeta) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()))) + + for _, id := range []ulid.ULID{id1, m.ULID} { + t.Run(id.String(), func(t *testing.T) { + indexFile, err := fileutil.OpenMmapFile(filepath.Join(tmpDir, id.String(), block.IndexFilename)) + testutil.Ok(t, err) + defer func() { _ = indexFile.Close() }() + + b := realByteSlice(indexFile.Bytes()) + + t.Run("binary", func(t *testing.T) { + fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) + testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) + + br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, br.Close()) }() - testutil.Equals(t, 6, len(jr.symbols)) - testutil.Equals(t, 2, len(jr.lvals)) - testutil.Equals(t, 6, len(jr.postings)) + if id == id1 { + testutil.Equals(t, 1, br.version) + testutil.Equals(t, 2, br.indexVersion) + testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 50}, br.toc) + testutil.Equals(t, int64(330), br.indexLastPostingEnd) + testutil.Equals(t, 8, br.symbols.Size()) + testutil.Equals(t, 3, len(br.postings)) + testutil.Equals(t, 0, len(br.postingsV1)) + testutil.Equals(t, 2, len(br.nameSymbols)) + } + + compareIndexToHeader(t, b, br) + }) + + t.Run("json", func(t *testing.T) { + fn := filepath.Join(tmpDir, id.String(), block.IndexCacheFilename) + testutil.Ok(t, WriteJSON(log.NewNopLogger(), filepath.Join(tmpDir, id.String(), "index"), fn)) + + jr, err := NewJSONReader(ctx, log.NewNopLogger(), nil, tmpDir, id) + testutil.Ok(t, err) + + defer func() { testutil.Ok(t, jr.Close()) }() + + if id == id1 { + testutil.Equals(t, 6, len(jr.symbols)) + testutil.Equals(t, 2, len(jr.lvals)) + testutil.Equals(t, 6, len(jr.postings)) + } + + compareIndexToHeader(t, b, jr) + }) + }) + } - testReader(t, jr) - }) } -func testReader(t *testing.T, r Reader) { - testutil.Equals(t, 2, r.IndexVersion()) - exp := []string{"1", "2", "3", "4", "a", "b"} - for i := range exp { - r, err := r.LookupSymbol(uint32(i)) +func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerReader Reader) { + indexReader, err := index.NewReader(indexByteSlice) + testutil.Ok(t, err) + defer func() { _ = indexReader.Close() }() + + testutil.Equals(t, indexReader.Version(), headerReader.IndexVersion()) + + if indexReader.Version() == index.FormatV2 { + // For v2 symbols ref sequential integers 0, 1, 2 etc. + iter := indexReader.Symbols() + i := 0 + for iter.Next() { + r, err := headerReader.LookupSymbol(uint32(i)) + testutil.Ok(t, err) + testutil.Equals(t, iter.At(), r) + + i++ + } + testutil.Ok(t, iter.Err()) + _, err := headerReader.LookupSymbol(uint32(i)) + testutil.NotOk(t, err) + + } else { + // For v1 symbols refs are actual offsets in the index. + symbols, err := getSymbolTable(indexByteSlice) testutil.Ok(t, err) - testutil.Equals(t, exp[i], r) + + for refs, sym := range symbols { + r, err := headerReader.LookupSymbol(refs) + testutil.Ok(t, err) + testutil.Equals(t, sym, r) + } + _, err = headerReader.LookupSymbol(200000) + testutil.NotOk(t, err) + } + + expLabelNames, err := indexReader.LabelNames() + testutil.Ok(t, err) + testutil.Equals(t, expLabelNames, headerReader.LabelNames()) + + expRanges, err := indexReader.PostingsRanges() + testutil.Ok(t, err) + + for _, lname := range expLabelNames { + expectedLabelVals, err := indexReader.LabelValues(lname) + testutil.Ok(t, err) + + vals, err := headerReader.LabelValues(lname) + testutil.Ok(t, err) + testutil.Equals(t, expectedLabelVals, vals) + + for i, v := range vals { + ptr, err := headerReader.PostingsOffset(lname, v) + testutil.Ok(t, err) + // For index-cache those values are exact. + // + // For binary they are exact except: + // * formatV2: last item posting offset. It's good enough if the value is larger than exact posting ending. + // * formatV1: all items. + if i == len(vals)-1 || indexReader.Version() == index.FormatV1 { + testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start) + testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End) + continue + } + testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}], ptr) + } } - _, err := r.LookupSymbol(uint32(len(exp))) + + vals, err := indexReader.LabelValues("not-existing") + testutil.Ok(t, err) + testutil.Equals(t, []string(nil), vals) + + vals, err = headerReader.LabelValues("not-existing") + testutil.Ok(t, err) + testutil.Equals(t, []string(nil), vals) + + _, err = headerReader.PostingsOffset("not-existing", "1") testutil.NotOk(t, err) +} - testutil.Equals(t, []string{"1", "2", "3", "4"}, r.LabelValues("a")) - testutil.Equals(t, []string{"1"}, r.LabelValues("b")) - testutil.Equals(t, []string{}, r.LabelValues("c")) +func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *metadata.Meta { + /* Copy index 6MB block index version 2. It was generated via thanosbench. Meta.json: + { + "ulid": "01DRBP4RNVZ94135ZA6B10EMRR", + "minTime": 1570766415000, + "maxTime": 1570939215001, + "stats": { + "numSamples": 115210000, + "numSeries": 10000, + "numChunks": 990000 + }, + "compaction": { + "level": 1, + "sources": [ + "01DRBP4RNVZ94135ZA6B10EMRR" + ] + }, + "version": 1, + "thanos": { + "labels": { + "cluster": "one", + "dataset": "continuous" + }, + "downsample": { + "resolution": 0 + }, + "source": "blockgen" + } + } + */ - testutil.Equals(t, []string{"a", "b"}, r.LabelNames()) + m, err := metadata.Read("./testdata/index_format_v2") + testutil.Ok(t, err) + testutil.Copy(t, "./testdata/index_format_v2", filepath.Join(tmpDir, m.ULID.String())) - ptr := r.PostingsOffset("a", "1") - testutil.Equals(t, index.Range{Start: 200, End: 212}, ptr) + _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, m.ULID.String()), metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + }, &m.BlockMeta) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(context.Background(), log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()))) - ptr = r.PostingsOffset("a", "2") - testutil.Equals(t, index.Range{Start: 220, End: 228}, ptr) + return m +} - ptr = r.PostingsOffset("b", "2") - testutil.Equals(t, NotFoundRange, ptr) +func BenchmarkJSONWrite(t *testing.B) { + ctx := context.Background() + logger := log.NewNopLogger() + tmpDir, err := ioutil.TempDir("", "bench-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + m := prepareIndexV2Block(t, tmpDir, bkt) + testutil.Ok(t, os.MkdirAll(filepath.Join(tmpDir, "local", m.ULID.String()), os.ModePerm)) + fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexCacheFilename) + t.ResetTimer() + for i := 0; i < t.N; i++ { + testutil.Ok(t, forceDownloadFile( + ctx, + logger, + bkt, + filepath.Join(m.ULID.String(), block.IndexFilename), + filepath.Join(tmpDir, "local", m.ULID.String(), block.IndexFilename), + )) + testutil.Ok(t, WriteJSON(logger, filepath.Join(tmpDir, "local", m.ULID.String(), block.IndexFilename), fn)) + } +} + +func forceDownloadFile(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, src, dst string) (err error) { + rc, err := bkt.Get(ctx, src) + if err != nil { + return errors.Wrapf(err, "get file %s", src) + } + defer runutil.CloseWithLogOnErr(logger, rc, "download block's file reader") + + f, err := os.OpenFile(dst, os.O_CREATE|os.O_RDWR, os.ModePerm) + if err != nil { + return errors.Wrap(err, "create file") + } + + if _, err := f.Seek(0, 0); err != nil { + return err + } + defer func() { + if err != nil { + if rerr := os.Remove(dst); rerr != nil { + level.Warn(logger).Log("msg", "failed to remove partially downloaded file", "file", dst, "err", rerr) + } + } + }() + defer runutil.CloseWithLogOnErr(logger, f, "download block's output file") + + if _, err = io.Copy(f, rc); err != nil { + return errors.Wrap(err, "copy object to file") + } + return nil +} + +func BenchmarkJSONReader(t *testing.B) { + logger := log.NewNopLogger() + tmpDir, err := ioutil.TempDir("", "bench-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + m := prepareIndexV2Block(t, tmpDir, bkt) + fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexCacheFilename) + testutil.Ok(t, WriteJSON(log.NewNopLogger(), filepath.Join(tmpDir, m.ULID.String(), block.IndexFilename), fn)) + + t.ResetTimer() + for i := 0; i < t.N; i++ { + jr, err := newFileJSONReader(logger, fn) + testutil.Ok(t, err) + testutil.Ok(t, jr.Close()) + } +} + +func BenchmarkBinaryWrite(t *testing.B) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "bench-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + m := prepareIndexV2Block(t, tmpDir, bkt) + fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) + + t.ResetTimer() + for i := 0; i < t.N; i++ { + testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) + } +} + +func BenchmarkBinaryReader(t *testing.B) { + ctx := context.Background() + tmpDir, err := ioutil.TempDir("", "bench-indexheader") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + + m := prepareIndexV2Block(t, tmpDir, bkt) + fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) + testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) + + t.ResetTimer() + for i := 0; i < t.N; i++ { + br, err := newFileBinaryReader(fn) + testutil.Ok(t, err) + testutil.Ok(t, br.Close()) + } } diff --git a/pkg/block/indexheader/json_reader.go b/pkg/block/indexheader/json_reader.go index cf0277a929..67b502e791 100644 --- a/pkg/block/indexheader/json_reader.go +++ b/pkg/block/indexheader/json_reader.go @@ -3,7 +3,6 @@ package indexheader import ( "context" "encoding/json" - "hash/crc32" "io/ioutil" "os" "path/filepath" @@ -58,15 +57,6 @@ func (b realByteSlice) Sub(start, end int) index.ByteSlice { return b[start:end] } -// The table gets initialized with sync.Once but may still cause a race -// with any other use of the crc32 package anywhere. Thus we initialize it -// before. -var castagnoliTable *crc32.Table - -func init() { - castagnoliTable = crc32.MakeTable(crc32.Castagnoli) -} - // readSymbols reads the symbol table fully into memory and allocates proper strings for them. // Strings backed by the mmap'd memory would cause memory faults if applications keep using them // after the reader is closed. @@ -126,7 +116,6 @@ func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { for o, s := range symbolsV2 { symbolsTable[uint32(o)] = s } - return symbolsTable, nil } @@ -206,11 +195,12 @@ type JSONReader struct { postings map[labels.Label]index.Range } +// NewJSONReader loads or builds new index-cache.json if not present on disk or object storage. func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*JSONReader, error) { cachefn := filepath.Join(dir, id.String(), block.IndexCacheFilename) - jr, err := newJSONReaderFromFile(logger, cachefn) + jr, err := newFileJSONReader(logger, cachefn) if err == nil { - return jr, err + return jr, nil } if !os.IsNotExist(errors.Cause(err)) && errors.Cause(err) != jsonUnmarshalError { @@ -219,7 +209,7 @@ func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketRe // Try to download index cache file from object store. if err = objstore.DownloadFile(ctx, logger, bkt, filepath.Join(id.String(), block.IndexCacheFilename), cachefn); err == nil { - return newJSONReaderFromFile(logger, cachefn) + return newFileJSONReader(logger, cachefn) } if !bkt.IsObjNotFoundErr(errors.Cause(err)) && errors.Cause(err) != jsonUnmarshalError { @@ -243,16 +233,16 @@ func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketRe return nil, errors.Wrap(err, "write index cache") } - return newJSONReaderFromFile(logger, cachefn) + return newFileJSONReader(logger, cachefn) } // ReadJSON reads an index cache file. -func newJSONReaderFromFile(logger log.Logger, fn string) (*JSONReader, error) { +func newFileJSONReader(logger log.Logger, fn string) (*JSONReader, error) { f, err := os.Open(fn) if err != nil { return nil, errors.Wrap(err, "open file") } - defer runutil.CloseWithLogOnErr(logger, f, "index reader") + defer runutil.CloseWithLogOnErr(logger, f, "index cache json close") var v indexCache @@ -317,20 +307,29 @@ func (r *JSONReader) IndexVersion() int { func (r *JSONReader) LookupSymbol(o uint32) (string, error) { idx := int(o) if idx >= len(r.symbols) { - return "", errors.Errorf("bucketIndexReader: unknown symbol offset %d", o) + return "", errors.Errorf("indexJSONReader: unknown symbol offset %d", o) } - + // NOTE: This is not entirely correct, symbols slice can have gaps. Not fixing as JSON reader + // is replaced by index-header. return r.symbols[idx], nil } -func (r *JSONReader) PostingsOffset(name, value string) index.Range { - return r.postings[labels.Label{Name: name, Value: value}] +func (r *JSONReader) PostingsOffset(name, value string) (index.Range, error) { + rng, ok := r.postings[labels.Label{Name: name, Value: value}] + if !ok { + return index.Range{}, NotFoundRangeErr + } + return rng, nil } // LabelValues returns label values for single name. -func (r *JSONReader) LabelValues(name string) []string { - res := make([]string, 0, len(r.lvals[name])) - return append(res, r.lvals[name]...) +func (r *JSONReader) LabelValues(name string) ([]string, error) { + vals, ok := r.lvals[name] + if !ok { + return nil, nil + } + res := make([]string, 0, len(vals)) + return append(res, vals...), nil } // LabelNames returns a list of label names. @@ -342,3 +341,5 @@ func (r *JSONReader) LabelNames() []string { sort.Strings(res) return res } + +func (r *JSONReader) Close() error { return nil } diff --git a/pkg/block/indexheader/testdata/index_format_v1/chunks/.gitkeep b/pkg/block/indexheader/testdata/index_format_v1/chunks/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/pkg/block/indexheader/testdata/index_format_v1/index b/pkg/block/indexheader/testdata/index_format_v1/index new file mode 100644 index 0000000000..76b0a30929 Binary files /dev/null and b/pkg/block/indexheader/testdata/index_format_v1/index differ diff --git a/pkg/block/indexheader/testdata/index_format_v1/meta.json b/pkg/block/indexheader/testdata/index_format_v1/meta.json new file mode 100644 index 0000000000..d99ae6b49c --- /dev/null +++ b/pkg/block/indexheader/testdata/index_format_v1/meta.json @@ -0,0 +1,17 @@ +{ + "version": 1, + "ulid": "01DXXFZDYD1MQW6079WK0K6EDQ", + "minTime": 0, + "maxTime": 7200000, + "stats": { + "numSamples": 102, + "numSeries": 102, + "numChunks": 102 + }, + "compaction": { + "level": 1, + "sources": [ + "01DXXFZDYD1MQW6079WK0K6EDQ" + ] + } +} diff --git a/pkg/block/indexheader/testdata/index_format_v2/chunks/.gitkeep b/pkg/block/indexheader/testdata/index_format_v2/chunks/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/pkg/block/indexheader/testdata/index_format_v2/index b/pkg/block/indexheader/testdata/index_format_v2/index new file mode 100644 index 0000000000..385479f00d Binary files /dev/null and b/pkg/block/indexheader/testdata/index_format_v2/index differ diff --git a/pkg/block/indexheader/testdata/index_format_v2/meta.json b/pkg/block/indexheader/testdata/index_format_v2/meta.json new file mode 100644 index 0000000000..66bc623f74 --- /dev/null +++ b/pkg/block/indexheader/testdata/index_format_v2/meta.json @@ -0,0 +1,27 @@ +{ + "ulid": "01DRBP4RNVZ94135ZA6B10EMRR", + "minTime": 1570766415000, + "maxTime": 1570939215001, + "stats": { + "numSamples": 115210000, + "numSeries": 10000, + "numChunks": 990000 + }, + "compaction": { + "level": 1, + "sources": [ + "01DRBP4RNVZ94135ZA6B10EMRR" + ] + }, + "version": 1, + "thanos": { + "labels": { + "cluster": "one", + "dataset": "continuous" + }, + "downsample": { + "resolution": 0 + }, + "source": "blockgen" + } +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d069d9c2ef..190c2de747 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -996,7 +996,10 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values") // Do it via index reader to have pending reader registered correctly. - res := indexr.block.indexHeaderReader.LabelValues(req.Label) + res, err := indexr.block.indexHeaderReader.LabelValues(req.Label) + if err != nil { + return errors.Wrap(err, "index header label values") + } mtx.Lock() sets = append(sets, res) @@ -1288,7 +1291,12 @@ func (r *bucketIndexReader) ExpandedPostings(ms []*labels.Matcher) ([]uint64, er // NOTE: Derived from tsdb.PostingsForMatchers. for _, m := range ms { // Each group is separate to tell later what postings are intersecting with what. - postingGroups = append(postingGroups, toPostingGroup(r.block.indexHeaderReader.LabelValues, m)) + pg, err := toPostingGroup(r.block.indexHeaderReader.LabelValues, m) + if err != nil { + return nil, errors.Wrap(err, "toPostingGroup") + } + + postingGroups = append(postingGroups, pg) } if len(postingGroups) == 0 { @@ -1363,7 +1371,7 @@ func allWithout(p []index.Postings) index.Postings { } // NOTE: Derived from tsdb.postingsForMatcher. index.Merge is equivalent to map duplication. -func toPostingGroup(lvalsFn func(name string) []string, m *labels.Matcher) *postingGroup { +func toPostingGroup(lvalsFn func(name string) ([]string, error), m *labels.Matcher) (*postingGroup, error) { var matchingLabels labels.Labels // If the matcher selects an empty value, it selects all the series which don't @@ -1373,7 +1381,11 @@ func toPostingGroup(lvalsFn func(name string) []string, m *labels.Matcher) *post allName, allValue := index.AllPostingsKey() matchingLabels = append(matchingLabels, labels.Label{Name: allName, Value: allValue}) - for _, val := range lvalsFn(m.Name) { + vals, err := lvalsFn(m.Name) + if err != nil { + return nil, err + } + for _, val := range vals { if !m.Matches(val) { matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val}) } @@ -1383,24 +1395,29 @@ func toPostingGroup(lvalsFn func(name string) []string, m *labels.Matcher) *post // This is known hack to return all series. // Ask for x != . Allow for that as Prometheus does, // even though it is expensive. - return newPostingGroup(matchingLabels, merge) + return newPostingGroup(matchingLabels, merge), nil } - return newPostingGroup(matchingLabels, allWithout) + return newPostingGroup(matchingLabels, allWithout), nil } // Fast-path for equal matching. if m.Type == labels.MatchEqual { - return newPostingGroup(labels.Labels{{Name: m.Name, Value: m.Value}}, merge) + return newPostingGroup(labels.Labels{{Name: m.Name, Value: m.Value}}, merge), nil } - for _, val := range lvalsFn(m.Name) { + vals, err := lvalsFn(m.Name) + if err != nil { + return nil, err + } + + for _, val := range vals { if m.Matches(val) { matchingLabels = append(matchingLabels, labels.Label{Name: m.Name, Value: val}) } } - return newPostingGroup(matchingLabels, merge) + return newPostingGroup(matchingLabels, merge), nil } type postingPtr struct { @@ -1440,13 +1457,17 @@ func (r *bucketIndexReader) fetchPostings(groups []*postingGroup) error { } // Cache miss; save pointer for actual posting in index stored in object store. - ptr := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value) - if ptr == indexheader.NotFoundRange { + ptr, err := r.block.indexHeaderReader.PostingsOffset(key.Name, key.Value) + if err == indexheader.NotFoundRangeErr { // This block does not have any posting for given key. g.Fill(j, index.EmptyPostings()) continue } + if err != nil { + return errors.Wrap(err, "index header PostingsOffset") + } + r.stats.postingsToFetch++ ptrs = append(ptrs, postingPtr{ptr: ptr, groupID: i, keyID: j}) } diff --git a/pkg/testutil/copy.go b/pkg/testutil/copy.go new file mode 100644 index 0000000000..c2c8bd480d --- /dev/null +++ b/pkg/testutil/copy.go @@ -0,0 +1,49 @@ +package testutil + +import ( + "io" + "os" + "path/filepath" + "testing" + + "github.com/pkg/errors" +) + +func Copy(t testing.TB, src, dst string) { + Ok(t, copyRecursive(src, dst)) +} + +func copyRecursive(src, dst string) error { + return filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + relPath, err := filepath.Rel(src, path) + if err != nil { + return err + } + + if info.IsDir() { + return os.MkdirAll(filepath.Join(dst, relPath), os.ModePerm) + } + + if !info.Mode().IsRegular() { + return errors.Errorf("%s is not a regular file", path) + } + + source, err := os.Open(path) + if err != nil { + return err + } + defer source.Close() + + destination, err := os.Create(filepath.Join(dst, relPath)) + if err != nil { + return err + } + defer destination.Close() + _, err = io.Copy(destination, source) + return err + }) +}