diff --git a/api-get-object.go b/api-get-object.go index 2bcdc30b8..b9f6ded99 100644 --- a/api-get-object.go +++ b/api-get-object.go @@ -40,6 +40,8 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o return nil, err } + gctx, cancel := context.WithCancel(ctx) + // Detect if snowball is server location we are talking to. var snowball bool if location, ok := c.bucketLocCache.Get(bucketName); ok { @@ -59,8 +61,6 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o reqCh := make(chan getRequest) // Create response channel. resCh := make(chan getResponse) - // Create done channel. - doneCh := make(chan struct{}) // This routine feeds partial object data as and when the caller reads. go func() { @@ -73,8 +73,8 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o // Loop through the incoming control messages and read data. for { select { - // When the done channel is closed exit our routine. - case <-doneCh: + // When context is closed exit our routine. + case <-gctx.Done(): // Close the http response body before returning. // This ends the connection with the server. if httpReader != nil { @@ -98,7 +98,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o } else if req.Offset > 0 { opts.SetRange(req.Offset, 0) } - httpReader, objectInfo, _, err = c.getObject(ctx, bucketName, objectName, opts) + httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts) if err != nil { resCh <- getResponse{Error: err} return @@ -140,7 +140,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o // Remove range header if already set, for stat Operations to get original file size. delete(opts.headers, "Range") - objectInfo, err = c.StatObject(ctx, bucketName, objectName, StatObjectOptions(opts)) + objectInfo, err = c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts)) if err != nil { resCh <- getResponse{ Error: err, @@ -163,7 +163,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o if etag != "" && !snowball { opts.SetMatchETag(etag) } - objectInfo, err := c.StatObject(ctx, bucketName, objectName, StatObjectOptions(opts)) + objectInfo, err := c.StatObject(gctx, bucketName, objectName, StatObjectOptions(opts)) if err != nil { resCh <- getResponse{ Error: err, @@ -203,7 +203,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o // Remove range header if already set delete(opts.headers, "Range") } - httpReader, objectInfo, _, err = c.getObject(ctx, bucketName, objectName, opts) + httpReader, objectInfo, _, err = c.getObject(gctx, bucketName, objectName, opts) if err != nil { resCh <- getResponse{ Error: err, @@ -250,7 +250,7 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, o }() // Create a newObject through the information sent back by reqCh. - return newObject(reqCh, resCh, doneCh), nil + return newObject(gctx, cancel, reqCh, resCh), nil } // get request message container to communicate with internal @@ -283,7 +283,8 @@ type Object struct { // User allocated and defined. reqCh chan<- getRequest resCh <-chan getResponse - doneCh chan<- struct{} + ctx context.Context + cancel context.CancelFunc currOffset int64 objectInfo ObjectInfo @@ -311,7 +312,12 @@ type Object struct { // as any error encountered. For all first requests sent on the object // it is also responsible for sending back the objectInfo. func (o *Object) doGetRequest(request getRequest) (getResponse, error) { - o.reqCh <- request + select { + case <-o.ctx.Done(): + return getResponse{}, o.ctx.Err() + case o.reqCh <- request: + } + response := <-o.resCh // Return any error to the top level. @@ -615,7 +621,7 @@ func (o *Object) Close() (err error) { } // Close successfully. - close(o.doneCh) + o.cancel() // Save for future operations. errMsg := "Object is already closed. Bad file descriptor." @@ -627,12 +633,13 @@ func (o *Object) Close() (err error) { // newObject instantiates a new *minio.Object* // ObjectInfo will be set by setObjectInfo -func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<- struct{}) *Object { +func newObject(ctx context.Context, cancel context.CancelFunc, reqCh chan<- getRequest, resCh <-chan getResponse) *Object { return &Object{ + ctx: ctx, + cancel: cancel, mutex: &sync.Mutex{}, reqCh: reqCh, resCh: resCh, - doneCh: doneCh, } }