Skip to content

Commit

Permalink
store/bucket: wait until chunk loading ends in Close() (thanos-io#6582)
Browse files Browse the repository at this point in the history
Chunk reader needs to wait until the chunk loading ends in Close()
because otherwise there will be a race between appending to r.chunkBytes
and reading from it.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS authored Aug 16, 2023
1 parent 0543285 commit 51da039
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3207,6 +3207,10 @@ type bucketChunkReader struct {
mtx sync.Mutex
stats *queryStats
chunkBytes []*[]byte // Byte slice to return to the chunk pool on close.

loadingChunksMtx sync.Mutex
loadingChunks bool
finishLoadingChks chan struct{}
}

func newBucketChunkReader(block *bucketBlock) *bucketChunkReader {
Expand All @@ -3221,9 +3225,22 @@ func (r *bucketChunkReader) reset() {
for i := range r.toLoad {
r.toLoad[i] = r.toLoad[i][:0]
}
r.loadingChunksMtx.Lock()
r.loadingChunks = false
r.finishLoadingChks = make(chan struct{})
r.loadingChunksMtx.Unlock()
}

func (r *bucketChunkReader) Close() error {
// NOTE(GiedriusS): we need to wait until loading chunks because loading
// chunks modifies r.block.chunkPool.
r.loadingChunksMtx.Lock()
loadingChks := r.loadingChunks
r.loadingChunksMtx.Unlock()

if loadingChks {
<-r.finishLoadingChks
}
r.block.pendingReaders.Done()

for _, b := range r.chunkBytes {
Expand All @@ -3248,6 +3265,18 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int)

// load loads all added chunks and saves resulting aggrs to refs.
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error {
r.loadingChunksMtx.Lock()
r.loadingChunks = true
r.loadingChunksMtx.Unlock()

defer func() {
r.loadingChunksMtx.Lock()
r.loadingChunks = false
r.loadingChunksMtx.Unlock()

close(r.finishLoadingChks)
}()

g, ctx := errgroup.WithContext(ctx)

for seq, pIdxs := range r.toLoad {
Expand Down

0 comments on commit 51da039

Please sign in to comment.