diff --git a/api-put-object-streaming.go b/api-put-object-streaming.go index 98354900a..7f145bb97 100644 --- a/api-put-object-streaming.go +++ b/api-put-object-streaming.go @@ -156,8 +156,18 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN // Receive each part number from the channel allowing three parallel uploads. for w := 1; w <= opts.getNumThreads(); w++ { go func(w int, partSize int64) { - // Each worker will draw from the part channel and upload in parallel. - for uploadReq := range uploadPartsCh { + for { + var uploadReq uploadPartReq + var ok bool + select { + case <-ctx.Done(): + return + case uploadReq, ok = <-uploadPartsCh: + if !ok { + return + } + // Each worker will draw from the part channel and upload in parallel. + } // If partNumber was not uploaded we calculate the missing // part offset and size. For all other part numbers we @@ -214,17 +224,22 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN // Gather the responses as they occur and update any // progress bar. for u := 1; u <= totalPartsCount; u++ { - uploadRes := <-uploadedPartsCh - if uploadRes.Error != nil { - return UploadInfo{}, uploadRes.Error + select { + case <-ctx.Done(): + return UploadInfo{}, ctx.Err() + case uploadRes := <-uploadedPartsCh: + if uploadRes.Error != nil { + return UploadInfo{}, uploadRes.Error + } + + // Update the totalUploadedSize. + totalUploadedSize += uploadRes.Size + // Store the parts to be completed in order. + complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ + ETag: uploadRes.Part.ETag, + PartNumber: uploadRes.Part.PartNumber, + }) } - // Update the totalUploadedSize. - totalUploadedSize += uploadRes.Size - // Store the parts to be completed in order. - complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ - ETag: uploadRes.Part.ETag, - PartNumber: uploadRes.Part.PartNumber, - }) } // Verify if we uploaded all the data.