Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use context to wait for cancelation on GetObject() #1666

Merged
merged 1 commit into from
Jun 17, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions api-get-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
return nil, err
}

gctx, cancel := context.WithCancel(ctx)

// Detect if snowball is server location we are talking to.
var snowball bool
if location, ok := c.bucketLocCache.Get(bucketName); ok {
Expand All @@ -59,8 +61,6 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
reqCh := make(chan getRequest)
// Create response channel.
resCh := make(chan getResponse)
// Create done channel.
doneCh := make(chan struct{})

// This routine feeds partial object data as and when the caller reads.
go func() {
Expand All @@ -73,8 +73,8 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
// Loop through the incoming control messages and read data.
for {
select {
// When the done channel is closed exit our routine.
case <-doneCh:
// When context is closed exit our routine.
case <-gctx.Done():
// Close the http response body before returning.
// This ends the connection with the server.
if httpReader != nil {
Expand All @@ -98,7 +98,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
} else if req.Offset > 0 {
opts.SetRange(req.Offset, 0)
}
httpReader, objectInfo, _, err = c.getObject(ctx, bucketName, objectName, opts)
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{Error: err}
return
Expand Down Expand Up @@ -140,7 +140,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o

// Remove range header if already set, for stat Operations to get original file size.
delete(opts.headers, "Range")
objectInfo, err = c.StatObject(ctx, bucketName, objectName, StatObjectOptions(opts))
objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
if err != nil {
resCh <- getResponse{
Error: err,
Expand All @@ -163,7 +163,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
if etag != "" && !snowball {
opts.SetMatchETag(etag)
}
objectInfo, err := c.StatObject(ctx, bucketName, objectName, StatObjectOptions(opts))
objectInfo, err := c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts))
if err != nil {
resCh <- getResponse{
Error: err,
Expand Down Expand Up @@ -203,7 +203,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
// Remove range header if already set
delete(opts.headers, "Range")
}
httpReader, objectInfo, _, err = c.getObject(ctx, bucketName, objectName, opts)
httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts)
if err != nil {
resCh <- getResponse{
Error: err,
Expand Down Expand Up @@ -250,7 +250,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o
}()

// Create a newObject through the information sent back by reqCh.
return newObject(reqCh, resCh, doneCh), nil
return newObject(gctx, cancel, reqCh, resCh), nil
}

// get request message container to communicate with internal
Expand Down Expand Up @@ -283,7 +283,8 @@ type Object struct {
// User allocated and defined.
reqCh chan<- getRequest
resCh <-chan getResponse
doneCh chan<- struct{}
ctx context.Context
cancel context.CancelFunc
currOffset int64
objectInfo ObjectInfo

Expand Down Expand Up @@ -311,7 +312,12 @@ type Object struct {
// as any error encountered. For all first requests sent on the object
// it is also responsible for sending back the objectInfo.
func (o *Object) doGetRequest(request getRequest) (getResponse, error) {
o.reqCh <- request
select {
case <-o.ctx.Done():
return getResponse{}, o.ctx.Err()
case o.reqCh <- request:
}

response := <-o.resCh

// Return any error to the top level.
Expand Down Expand Up @@ -615,7 +621,7 @@ func (o *Object) Close() (err error) {
}

// Close successfully.
close(o.doneCh)
o.cancel()

// Save for future operations.
errMsg := "Object is already closed. Bad file descriptor."
Expand All @@ -627,12 +633,13 @@ func (o *Object) Close() (err error) {

// newObject instantiates a new *minio.Object*
// ObjectInfo will be set by setObjectInfo
func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<- struct{}) *Object {
func newObject(ctx context.Context, cancel context.CancelFunc, reqCh chan<- getRequest, resCh <-chan getResponse) *Object {
return &Object{
ctx: ctx,
cancel: cancel,
mutex: &sync.Mutex{},
reqCh: reqCh,
resCh: resCh,
doneCh: doneCh,
}
}

Expand Down