diff --git a/api-remove.go b/api-remove.go index fd3f1e120..0fee90226 100644 --- a/api-remove.go +++ b/api-remove.go @@ -136,11 +136,11 @@ func (c *Client) RemoveObject(ctx context.Context, bucketName, objectName string return err } - return c.removeObject(ctx, bucketName, objectName, opts) + res := c.removeObject(ctx, bucketName, objectName, opts) + return res.Err } -func (c *Client) removeObject(ctx context.Context, bucketName, objectName string, opts RemoveObjectOptions) error { - +func (c *Client) removeObject(ctx context.Context, bucketName, objectName string, opts RemoveObjectOptions) RemoveObjectResult { // Get resources properly escaped and lined up before // using them in http request. urlValues := make(url.Values) @@ -181,19 +181,25 @@ func (c *Client) removeObject(ctx context.Context, bucketName, objectName string }) defer closeResponse(resp) if err != nil { - return err + return RemoveObjectResult{Err: err} } if resp != nil { // if some unexpected error happened and max retry is reached, we want to let client know if resp.StatusCode != http.StatusNoContent { - return httpRespToErrorResponse(resp, bucketName, objectName) + err := httpRespToErrorResponse(resp, bucketName, objectName) + return RemoveObjectResult{Err: err} } } // DeleteObject always responds with http '204' even for // objects which do not exist. So no need to handle them // specifically. - return nil + return RemoveObjectResult{ + ObjectName: objectName, + ObjectVersionID: opts.VersionID, + DeleteMarker: resp.Header.Get("x-amz-delete-marker") == "true", + DeleteMarkerVersionID: resp.Header.Get("x-amz-version-id"), + } } // RemoveObjectError - container of Multi Delete S3 API error @@ -203,6 +209,17 @@ type RemoveObjectError struct { Err error } +// RemoveObjectResult - container of Multi Delete S3 API result +type RemoveObjectResult struct { + ObjectName string + ObjectVersionID string + + DeleteMarker bool + DeleteMarkerVersionID string + + Err error +} + // generateRemoveMultiObjects - generate the XML request for remove multi objects request func generateRemoveMultiObjectsRequest(objects []ObjectInfo) []byte { delObjects := []deleteObject{} @@ -212,21 +229,32 @@ func generateRemoveMultiObjectsRequest(objects []ObjectInfo) []byte { VersionID: obj.VersionID, }) } - xmlBytes, _ := xml.Marshal(deleteMultiObjects{Objects: delObjects, Quiet: true}) + xmlBytes, _ := xml.Marshal(deleteMultiObjects{Objects: delObjects, Quiet: false}) return xmlBytes } // processRemoveMultiObjectsResponse - parse the remove multi objects web service // and return the success/failure result status for each object -func processRemoveMultiObjectsResponse(body io.Reader, objects []ObjectInfo, errorCh chan<- RemoveObjectError) { +func processRemoveMultiObjectsResponse(body io.Reader, objects []ObjectInfo, resultCh chan<- RemoveObjectResult) { // Parse multi delete XML response rmResult := &deleteMultiObjectsResult{} err := xmlDecoder(body, rmResult) if err != nil { - errorCh <- RemoveObjectError{ObjectName: "", Err: err} + resultCh <- RemoveObjectResult{ObjectName: "", Err: err} return } + // Fill deletion that returned success + for _, obj := range rmResult.DeletedObjects { + resultCh <- RemoveObjectResult{ + ObjectName: obj.Key, + // Only filled with versioned buckets + ObjectVersionID: obj.VersionID, + DeleteMarker: obj.DeleteMarker, + DeleteMarkerVersionID: obj.DeleteMarkerVersionID, + } + } + // Fill deletion that returned an error. for _, obj := range rmResult.UnDeletedObjects { // Version does not exist is not an error ignore and continue. @@ -234,9 +262,9 @@ func processRemoveMultiObjectsResponse(body io.Reader, objects []ObjectInfo, err case "InvalidArgument", "NoSuchVersion": continue } - errorCh <- RemoveObjectError{ - ObjectName: obj.Key, - VersionID: obj.VersionID, + resultCh <- RemoveObjectResult{ + ObjectName: obj.Key, + ObjectVersionID: obj.VersionID, Err: ErrorResponse{ Code: obj.Code, Message: obj.Message, @@ -273,10 +301,54 @@ func (c *Client) RemoveObjects(ctx context.Context, bucketName string, objectsCh return errorCh } - go c.removeObjects(ctx, bucketName, objectsCh, errorCh, opts) + resultCh := make(chan RemoveObjectResult, 1) + go c.removeObjects(ctx, bucketName, objectsCh, resultCh, opts) + go func() { + defer close(errorCh) + for res := range resultCh { + // Send only errors to the error channel + if res.Err == nil { + continue + } + errorCh <- RemoveObjectError{ + ObjectName: res.ObjectName, + VersionID: res.ObjectVersionID, + Err: res.Err, + } + } + }() + return errorCh } +// RemoveObjectsWithResult removes multiple objects from a bucket while +// it is possible to specify objects versions which are received from +// objectsCh. Remove results, successes and failures are sent back via +// RemoveObjectResult channel +func (c *Client) RemoveObjectsWithResult(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, opts RemoveObjectsOptions) <-chan RemoveObjectResult { + resultCh := make(chan RemoveObjectResult, 1) + + // Validate if bucket name is valid. + if err := s3utils.CheckValidBucketName(bucketName); err != nil { + defer close(resultCh) + resultCh <- RemoveObjectResult{ + Err: err, + } + return resultCh + } + // Validate objects channel to be properly allocated. + if objectsCh == nil { + defer close(resultCh) + resultCh <- RemoveObjectResult{ + Err: errInvalidArgument("Objects channel cannot be nil"), + } + return resultCh + } + + go c.removeObjects(ctx, bucketName, objectsCh, resultCh, opts) + return resultCh +} + // Return true if the character is within the allowed characters in an XML 1.0 document // The list of allowed characters can be found here: https://www.w3.org/TR/xml/#charsets func validXMLChar(r rune) (ok bool) { @@ -298,14 +370,14 @@ func hasInvalidXMLChar(str string) bool { } // Generate and call MultiDelete S3 requests based on entries received from objectsCh -func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, errorCh chan<- RemoveObjectError, opts RemoveObjectsOptions) { +func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, resultCh chan<- RemoveObjectResult, opts RemoveObjectsOptions) { maxEntries := 1000 finish := false urlValues := make(url.Values) urlValues.Set("delete", "") - // Close error channel when Multi delete finishes. - defer close(errorCh) + // Close result channel when Multi delete finishes. + defer close(resultCh) // Loop over entries by 1000 and call MultiDelete requests for { @@ -319,22 +391,20 @@ func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh for object := range objectsCh { if hasInvalidXMLChar(object.Key) { // Use single DELETE so the object name will be in the request URL instead of the multi-delete XML document. - err := c.removeObject(ctx, bucketName, object.Key, RemoveObjectOptions{ + removeResult := c.removeObject(ctx, bucketName, object.Key, RemoveObjectOptions{ VersionID: object.VersionID, GovernanceBypass: opts.GovernanceBypass, }) - if err != nil { + if err := removeResult.Err; err != nil { // Version does not exist is not an error ignore and continue. switch ToErrorResponse(err).Code { case "InvalidArgument", "NoSuchVersion": continue } - errorCh <- RemoveObjectError{ - ObjectName: object.Key, - VersionID: object.VersionID, - Err: err, - } + resultCh <- removeResult } + + resultCh <- removeResult continue } @@ -374,22 +444,22 @@ func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh if resp != nil { if resp.StatusCode != http.StatusOK { e := httpRespToErrorResponse(resp, bucketName, "") - errorCh <- RemoveObjectError{ObjectName: "", Err: e} + resultCh <- RemoveObjectResult{ObjectName: "", Err: e} } } if err != nil { for _, b := range batch { - errorCh <- RemoveObjectError{ - ObjectName: b.Key, - VersionID: b.VersionID, - Err: err, + resultCh <- RemoveObjectResult{ + ObjectName: b.Key, + ObjectVersionID: b.VersionID, + Err: err, } } continue } // Process multiobjects remove xml response - processRemoveMultiObjectsResponse(resp.Body, batch, errorCh) + processRemoveMultiObjectsResponse(resp.Body, batch, resultCh) closeResponse(resp) } diff --git a/api-s3-datatypes.go b/api-s3-datatypes.go index 948f8a740..592d4cdcc 100644 --- a/api-s3-datatypes.go +++ b/api-s3-datatypes.go @@ -335,7 +335,7 @@ type deletedObject struct { VersionID string `xml:"VersionId,omitempty"` // These fields are ignored. DeleteMarker bool - DeleteMarkerVersionID string + DeleteMarkerVersionID string `xml:"DeleteMarkerVersionId,omitempty"` } // nonDeletedObject container for Error element (failed deletion) in MultiObjects Delete XML response diff --git a/functional_tests.go b/functional_tests.go index 49c781f60..49c664ae5 100644 --- a/functional_tests.go +++ b/functional_tests.go @@ -2628,6 +2628,138 @@ func testRemoveMultipleObjects() { successLogger(testName, function, args, startTime).Info() } +// Test removing multiple objects and check for results +func testRemoveMultipleObjectsWithResult() { + // initialize logging params + startTime := time.Now() + testName := getFuncName() + function := "RemoveObjects(bucketName, objectsCh)" + args := map[string]interface{}{ + "bucketName": "", + } + + // Seed random based on current time. + rand.Seed(time.Now().Unix()) + + // Instantiate new minio client object. + c, err := minio.New(os.Getenv(serverEndpoint), + &minio.Options{ + Creds: credentials.NewStaticV4(os.Getenv(accessKey), os.Getenv(secretKey), ""), + Secure: mustParseBool(os.Getenv(enableHTTPS)), + }) + if err != nil { + logError(testName, function, args, startTime, "", "MinIO client object creation failed", err) + return + } + + // Set user agent. + c.SetAppInfo("MinIO-go-FunctionalTest", "0.1.0") + + // Enable tracing, write to stdout. + // c.TraceOn(os.Stderr) + + // Generate a new random bucket name. + bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test-") + args["bucketName"] = bucketName + + // Make a new bucket. + err = c.MakeBucket(context.Background(), bucketName, minio.MakeBucketOptions{Region: "us-east-1", ObjectLocking: true}) + if err != nil { + logError(testName, function, args, startTime, "", "MakeBucket failed", err) + return + } + + defer cleanupVersionedBucket(bucketName, c) + + r := bytes.NewReader(bytes.Repeat([]byte("a"), 8)) + + nrObjects := 10 + nrLockedObjects := 5 + + objectsCh := make(chan minio.ObjectInfo) + + go func() { + defer close(objectsCh) + // Upload objects and send them to objectsCh + for i := 0; i < nrObjects; i++ { + objectName := "sample" + strconv.Itoa(i) + ".txt" + info, err := c.PutObject(context.Background(), bucketName, objectName, r, 8, + minio.PutObjectOptions{ContentType: "application/octet-stream"}) + if err != nil { + logError(testName, function, args, startTime, "", "PutObject failed", err) + return + } + if i < nrLockedObjects { + // t := time.Date(2130, time.April, 25, 14, 0, 0, 0, time.UTC) + t := time.Now().Add(5 * time.Minute) + m := minio.RetentionMode(minio.Governance) + opts := minio.PutObjectRetentionOptions{ + GovernanceBypass: false, + RetainUntilDate: &t, + Mode: &m, + VersionID: info.VersionID, + } + err = c.PutObjectRetention(context.Background(), bucketName, objectName, opts) + if err != nil { + logError(testName, function, args, startTime, "", "Error setting retention", err) + return + } + } + + objectsCh <- minio.ObjectInfo{ + Key: info.Key, + VersionID: info.VersionID, + } + } + }() + + // Call RemoveObjects API + resultCh := c.RemoveObjectsWithResult(context.Background(), bucketName, objectsCh, minio.RemoveObjectsOptions{}) + + var foundNil, foundErr int + + for { + // Check if errorCh doesn't receive any error + select { + case deleteRes, ok := <-resultCh: + if !ok { + goto out + } + if deleteRes.ObjectName == "" { + logError(testName, function, args, startTime, "", "Unexpected object name", nil) + return + } + if deleteRes.ObjectVersionID == "" { + logError(testName, function, args, startTime, "", "Unexpected object version ID", nil) + return + } + + if deleteRes.Err == nil { + foundNil++ + } else { + foundErr++ + } + } + } +out: + if foundNil+foundErr != nrObjects { + logError(testName, function, args, startTime, "", "Unexpected number of results", nil) + return + } + + if foundNil != nrObjects-nrLockedObjects { + logError(testName, function, args, startTime, "", "Unexpected number of nil errors", nil) + return + } + + if foundErr != nrLockedObjects { + logError(testName, function, args, startTime, "", "Unexpected number of errors", nil) + return + } + + successLogger(testName, function, args, startTime).Info() +} + // Tests FPutObject of a big file to trigger multipart func testFPutObjectMultipart() { // initialize logging params @@ -12010,6 +12142,7 @@ func main() { testGetObjectClosedTwice() testGetObjectS3Zip() testRemoveMultipleObjects() + testRemoveMultipleObjectsWithResult() testFPutObjectMultipart() testFPutObject() testGetObjectReadSeekFunctional()