diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 4dae0fa90a8a..2d243bf9fe17 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -1190,7 +1190,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params for { select { case <-rr.ctx.Done(): + rr.mu.Lock() rr.done = true + rr.mu.Unlock() return case <-rr.managerRetry: return @@ -1349,7 +1351,9 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params rr.mu.Unlock() } + rr.mu.Lock() rr.objectSize = size + rr.mu.Unlock() go streamManager() go streamReceiver() @@ -1424,8 +1428,12 @@ func (r *gRPCBidiReader) reopenStream(failSpec []rangeSpec) error { // Add will add current range to stream. func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback func(int64, int64, error)) { - if offset > mr.objectSize { - callback(offset, limit, fmt.Errorf("offset larger than size of object: %v", mr.objectSize)) + mr.mu.Lock() + objectSize := mr.objectSize + mr.mu.Unlock() + + if offset > objectSize { + callback(offset, limit, fmt.Errorf("offset larger than size of object: %v", objectSize)) return } if limit < 0 { @@ -1463,8 +1471,10 @@ func (mr *gRPCBidiReader) close() error { if mr.cancel != nil { mr.cancel() } + mr.mu.Lock() mr.done = true mr.activeTask = 0 + mr.mu.Unlock() mr.closeReceiver <- true mr.closeManager <- true return nil @@ -1877,11 +1887,11 @@ type gRPCBidiReader struct { closeManager chan bool managerRetry chan bool receiverRetry chan bool + mu sync.Mutex // protects all vars in gRPCBidiReader from concurrent access mp map[int64]rangeSpec // always use the mutex when accessing the map - mu sync.Mutex // protects map from concurrent access. - done bool - activeTask int64 - objectSize int64 + done bool // always use the mutex when accessing this variable + activeTask int64 // always use the mutex when accessing this variable + objectSize int64 // always use the mutex when accessing this variable retrier func(error, string) streamRecreation bool // This helps us identify if stream recreation is in progress or not. If stream recreation gets called from two goroutine then this will stop second one. }