forked from thanos-io/thanos
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
StoreGateway: Partition index-header download (thanos-io#6747)
* Partition index-header download Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Use int division instead of float Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Ignore errors in close() Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Fix e2e Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Use disk to buffer parts of index-header Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Fix lint Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Renaming variables Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Increase partition size Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Fix e2e failures Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Refactoring Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Fix e2e Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Fix lint Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Fix e2e Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Cosmetic changes Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Address review comments Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> --------- Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
- Loading branch information
1 parent
dfe056b
commit 06d4407
Showing
4 changed files
with
325 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,231 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package indexheader | ||
|
||
import ( | ||
"bufio" | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"io" | ||
"os" | ||
"path/filepath" | ||
|
||
"github.com/google/uuid" | ||
"github.com/pkg/errors" | ||
"github.com/prometheus/prometheus/tsdb/fileutil" | ||
"github.com/thanos-io/objstore" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
"golang.org/x/sync/errgroup" | ||
) | ||
|
||
// partitionSize is used for splitting range reads. | ||
const partitionSize = 16 * 1024 * 1024 // 16 MiB | ||
|
||
type parallelBucketReader struct { | ||
objstore.BucketReader | ||
tmpDir string | ||
partitionSize int64 | ||
} | ||
|
||
func WrapWithParallel(b objstore.BucketReader, tmpDir string) objstore.BucketReader { | ||
return ¶llelBucketReader{ | ||
BucketReader: b, | ||
tmpDir: tmpDir, | ||
partitionSize: partitionSize, | ||
} | ||
} | ||
|
||
// GetRange reads the range in parallel. | ||
func (b *parallelBucketReader) GetRange(ctx context.Context, name string, off int64, length int64) (io.ReadCloser, error) { | ||
partFilePrefix := uuid.New().String() | ||
g, gctx := errgroup.WithContext(ctx) | ||
|
||
numParts := length / b.partitionSize | ||
if length%b.partitionSize > 0 { | ||
// A partial partition is remaining | ||
numParts += 1 | ||
} | ||
|
||
parts := make([]Part, 0, numParts) | ||
|
||
partId := 0 | ||
for o := off; o < off+length; o += b.partitionSize { | ||
l := b.partitionSize | ||
if o+l > off+length { | ||
// Partial partition | ||
l = length - (int64(partId) * b.partitionSize) | ||
} | ||
|
||
partOff := o | ||
partLength := l | ||
part, err := b.createPart(partFilePrefix, partId, int(partLength)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
parts = append(parts, part) | ||
|
||
g.Go(func() error { | ||
rc, err := b.BucketReader.GetRange(gctx, name, partOff, partLength) | ||
defer runutil.CloseWithErrCapture(&err, rc, "close object") | ||
if err != nil { | ||
return errors.Wrap(err, fmt.Sprintf("get range part %v", partId)) | ||
} | ||
if _, err := io.Copy(part, rc); err != nil { | ||
return errors.Wrap(err, fmt.Sprintf("get range part %v", partId)) | ||
} | ||
return part.Flush() | ||
}) | ||
partId += 1 | ||
} | ||
|
||
if err := g.Wait(); err != nil { | ||
return nil, err | ||
} | ||
return newPartMerger(parts), nil | ||
} | ||
|
||
func (b *parallelBucketReader) createPart(partFilePrefix string, partId int, size int) (Part, error) { | ||
if b.tmpDir == "" { | ||
// Parts stored in memory | ||
return newPartBuffer(size), nil | ||
} | ||
|
||
partName := fmt.Sprintf("%s.part-%d", partFilePrefix, partId) | ||
filename := filepath.Join(b.tmpDir, partName) | ||
return newPartFile(filename) | ||
} | ||
|
||
type partMerger struct { | ||
closers []io.Closer | ||
multiReader io.Reader | ||
} | ||
|
||
func newPartMerger(parts []Part) *partMerger { | ||
readers := make([]io.Reader, 0, len(parts)) | ||
closers := make([]io.Closer, 0, len(parts)) | ||
for _, p := range parts { | ||
readers = append(readers, p.(io.Reader)) | ||
closers = append(closers, p.(io.Closer)) | ||
} | ||
return &partMerger{ | ||
closers: closers, | ||
multiReader: io.MultiReader(readers...), | ||
} | ||
} | ||
|
||
func (m *partMerger) Read(b []byte) (n int, err error) { | ||
n, err = m.multiReader.Read(b) | ||
return | ||
} | ||
|
||
func (m *partMerger) Close() (err error) { | ||
var firstErr error = nil | ||
for _, c := range m.closers { | ||
if err := c.Close(); err != nil { | ||
if firstErr == nil { | ||
firstErr = err | ||
} | ||
} | ||
} | ||
return firstErr | ||
} | ||
|
||
type Part interface { | ||
Read(buf []byte) (int, error) | ||
Write(buf []byte) (int, error) | ||
Flush() error | ||
} | ||
|
||
// partFile stores parts in temporary files. | ||
type partFile struct { | ||
file *os.File | ||
fileWriter *bufio.Writer | ||
fileReader *bufio.Reader | ||
} | ||
|
||
func newPartFile(filename string) (*partFile, error) { | ||
dir := filepath.Dir(filename) | ||
df, err := fileutil.OpenDir(dir) | ||
if os.IsNotExist(err) { | ||
if err := os.MkdirAll(dir, os.ModePerm); err != nil { | ||
return nil, errors.Wrap(err, "create temp dir") | ||
} | ||
df, err = fileutil.OpenDir(dir) | ||
} | ||
if err != nil { | ||
return nil, errors.Wrap(err, "open temp dir") | ||
} | ||
|
||
if err := df.Sync(); err != nil { | ||
return nil, errors.Wrap(err, "sync dir") | ||
} | ||
|
||
if err := os.RemoveAll(filename); err != nil { | ||
return nil, errors.Wrap(err, "remove existing file") | ||
} | ||
f, err := os.OpenFile(filepath.Clean(filename), os.O_CREATE|os.O_RDWR, 0600) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "open temp file") | ||
} | ||
return &partFile{ | ||
file: f, | ||
fileWriter: bufio.NewWriterSize(f, 32*1024), | ||
fileReader: bufio.NewReaderSize(f, 32*1024), | ||
}, nil | ||
} | ||
|
||
func (p *partFile) Close() error { | ||
if err := p.file.Close(); err != nil { | ||
return err | ||
} | ||
return os.Remove(p.file.Name()) | ||
} | ||
|
||
func (p *partFile) Flush() error { | ||
if err := p.fileWriter.Flush(); err != nil { | ||
return err | ||
} | ||
if err := p.file.Sync(); err != nil { | ||
return err | ||
} | ||
// Seek is necessary because the part was just written to. | ||
_, err := p.file.Seek(0, io.SeekStart) | ||
return err | ||
} | ||
|
||
func (p *partFile) Read(buf []byte) (int, error) { | ||
return p.fileReader.Read(buf) | ||
} | ||
|
||
func (p *partFile) Write(buf []byte) (int, error) { | ||
return p.fileWriter.Write(buf) | ||
} | ||
|
||
// partBuffer stores parts in memory. | ||
type partBuffer struct { | ||
buf *bytes.Buffer | ||
} | ||
|
||
func newPartBuffer(size int) *partBuffer { | ||
return &partBuffer{ | ||
buf: bytes.NewBuffer(make([]byte, 0, size)), | ||
} | ||
} | ||
|
||
func (p *partBuffer) Close() error { | ||
return nil | ||
} | ||
|
||
func (p *partBuffer) Read(b []byte) (int, error) { | ||
return p.buf.Read(b) | ||
} | ||
|
||
func (p *partBuffer) Write(b []byte) (int, error) { | ||
return p.buf.Write(b) | ||
} | ||
|
||
func (p *partBuffer) Flush() error { | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package indexheader | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"crypto/rand" | ||
"io" | ||
"math/big" | ||
"testing" | ||
|
||
"github.com/efficientgo/core/testutil" | ||
"github.com/thanos-io/objstore" | ||
) | ||
|
||
func TestParallelBucket_InMemoryBuffering(t *testing.T) { | ||
bkt := objstore.NewInMemBucket() | ||
parallelBucket := ¶llelBucketReader{ | ||
BucketReader: bkt, | ||
tmpDir: "", | ||
partitionSize: 100, | ||
} | ||
testParallelBucket(t, bkt, parallelBucket) | ||
} | ||
|
||
func TestParallelBucket_TmpFileBuffering(t *testing.T) { | ||
bkt := objstore.NewInMemBucket() | ||
parallelBucket := ¶llelBucketReader{ | ||
BucketReader: bkt, | ||
tmpDir: t.TempDir(), | ||
partitionSize: 100, | ||
} | ||
testParallelBucket(t, bkt, parallelBucket) | ||
} | ||
|
||
func testParallelBucket(t *testing.T, bkt objstore.Bucket, parallelBucket *parallelBucketReader) { | ||
name := "test/data" | ||
ctx := context.Background() | ||
|
||
var size int64 = 10 * 1024 | ||
o, err := rand.Int(rand.Reader, big.NewInt(size/2)) | ||
testutil.Ok(t, err) | ||
offset := o.Int64() | ||
|
||
l, err := rand.Int(rand.Reader, big.NewInt(size/2)) | ||
testutil.Ok(t, err) | ||
length := l.Int64() | ||
|
||
randBytes := uploadRandom(t, ctx, bkt, name, size) | ||
|
||
r1, err := parallelBucket.GetRange(ctx, name, offset, length) | ||
testutil.Ok(t, err) | ||
|
||
parallelBytes, err := io.ReadAll(r1) | ||
testutil.Ok(t, err) | ||
testutil.Assert(t, length == int64(len(parallelBytes))) | ||
|
||
expectedBytes := randBytes[offset : offset+length] | ||
testutil.Assert(t, length == int64(len(expectedBytes))) | ||
testutil.Equals(t, expectedBytes, parallelBytes) | ||
|
||
r2, err := bkt.GetRange(ctx, name, offset, length) | ||
testutil.Ok(t, err) | ||
memoryBytes, err := io.ReadAll(r2) | ||
testutil.Ok(t, err) | ||
testutil.Assert(t, length == int64(len(memoryBytes))) | ||
testutil.Equals(t, memoryBytes, parallelBytes) | ||
|
||
err = r1.Close() | ||
testutil.Ok(t, err) | ||
|
||
err = r2.Close() | ||
testutil.Ok(t, err) | ||
} | ||
|
||
func uploadRandom(t *testing.T, ctx context.Context, bkt objstore.Bucket, name string, size int64) []byte { | ||
b := make([]byte, size) | ||
_, err := rand.Read(b) | ||
testutil.Ok(t, err) | ||
r := bytes.NewReader(b) | ||
err = bkt.Upload(ctx, name, r) | ||
testutil.Ok(t, err) | ||
|
||
return b | ||
} |