Skip to content

Commit

Permalink
fix(storage): add mutex around uses of mrd variables (#11405)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennaEpp authored Jan 8, 2025
1 parent 893d27a commit 54bfc32
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
for {
select {
case <-rr.ctx.Done():
rr.mu.Lock()
rr.done = true
rr.mu.Unlock()
return
case <-rr.managerRetry:
return
Expand Down Expand Up @@ -1349,7 +1351,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
rr.mu.Unlock()
}

rr.mu.Lock()
rr.objectSize = size
rr.mu.Unlock()

go streamManager()
go streamReceiver()
Expand Down Expand Up @@ -1424,8 +1428,12 @@ func (r *gRPCBidiReader) reopenStream(failSpec []rangeSpec) error {

// Add will add current range to stream.
func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback func(int64, int64, error)) {
if offset > mr.objectSize {
callback(offset, limit, fmt.Errorf("offset larger than size of object: %v", mr.objectSize))
mr.mu.Lock()
objectSize := mr.objectSize
mr.mu.Unlock()

if offset > objectSize {
callback(offset, limit, fmt.Errorf("offset larger than size of object: %v", objectSize))
return
}
if limit < 0 {
Expand Down Expand Up @@ -1463,8 +1471,10 @@ func (mr *gRPCBidiReader) close() error {
if mr.cancel != nil {
mr.cancel()
}
mr.mu.Lock()
mr.done = true
mr.activeTask = 0
mr.mu.Unlock()
mr.closeReceiver <- true
mr.closeManager <- true
return nil
Expand Down Expand Up @@ -1877,11 +1887,11 @@ type gRPCBidiReader struct {
closeManager chan bool
managerRetry chan bool
receiverRetry chan bool
mu sync.Mutex // protects all vars in gRPCBidiReader from concurrent access
mp map[int64]rangeSpec // always use the mutex when accessing the map
mu sync.Mutex // protects map from concurrent access.
done bool
activeTask int64
objectSize int64
done bool // always use the mutex when accessing this variable
activeTask int64 // always use the mutex when accessing this variable
objectSize int64 // always use the mutex when accessing this variable
retrier func(error, string)
streamRecreation bool // This helps us identify if stream recreation is in progress or not. If stream recreation gets called from two goroutine then this will stop second one.
}
Expand Down

0 comments on commit 54bfc32

Please sign in to comment.