Skip to content

Commit

Permalink
Add RemoveObjectsWithResult() to return delete-marker & delete-marker…
Browse files Browse the repository at this point in the history
…-version-id

Using this API will make it easier to get the delete marker version id
after removing an object in a versioned bucket.
  • Loading branch information
Anis Elleuch committed Nov 25, 2021
1 parent e517704 commit 85b031e
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 30 deletions.
128 changes: 99 additions & 29 deletions api-remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ func (c *Client) RemoveObject(ctx context.Context, bucketName, objectName string
return err
}

return c.removeObject(ctx, bucketName, objectName, opts)
res := c.removeObject(ctx, bucketName, objectName, opts)
return res.Err
}

func (c *Client) removeObject(ctx context.Context, bucketName, objectName string, opts RemoveObjectOptions) error {

func (c *Client) removeObject(ctx context.Context, bucketName, objectName string, opts RemoveObjectOptions) RemoveObjectResult {
// Get resources properly escaped and lined up before
// using them in http request.
urlValues := make(url.Values)
Expand Down Expand Up @@ -181,19 +181,25 @@ func (c *Client) removeObject(ctx context.Context, bucketName, objectName string
})
defer closeResponse(resp)
if err != nil {
return err
return RemoveObjectResult{Err: err}
}
if resp != nil {
// if some unexpected error happened and max retry is reached, we want to let client know
if resp.StatusCode != http.StatusNoContent {
return httpRespToErrorResponse(resp, bucketName, objectName)
err := httpRespToErrorResponse(resp, bucketName, objectName)
return RemoveObjectResult{Err: err}
}
}

// DeleteObject always responds with http '204' even for
// objects which do not exist. So no need to handle them
// specifically.
return nil
return RemoveObjectResult{
ObjectName: objectName,
ObjectVersionID: opts.VersionID,
DeleteMarker: resp.Header.Get("x-amz-delete-marker") == "true",
DeleteMarkerVersionID: resp.Header.Get("x-amz-version-id"),
}
}

// RemoveObjectError - container of Multi Delete S3 API error
Expand All @@ -203,6 +209,17 @@ type RemoveObjectError struct {
Err error
}

// RemoveObjectResult - container of Multi Delete S3 API result
type RemoveObjectResult struct {
ObjectName string
ObjectVersionID string

DeleteMarker bool
DeleteMarkerVersionID string

Err error
}

// generateRemoveMultiObjects - generate the XML request for remove multi objects request
func generateRemoveMultiObjectsRequest(objects []ObjectInfo) []byte {
delObjects := []deleteObject{}
Expand All @@ -212,31 +229,42 @@ func generateRemoveMultiObjectsRequest(objects []ObjectInfo) []byte {
VersionID: obj.VersionID,
})
}
xmlBytes, _ := xml.Marshal(deleteMultiObjects{Objects: delObjects, Quiet: true})
xmlBytes, _ := xml.Marshal(deleteMultiObjects{Objects: delObjects, Quiet: false})
return xmlBytes
}

// processRemoveMultiObjectsResponse - parse the remove multi objects web service
// and return the success/failure result status for each object
func processRemoveMultiObjectsResponse(body io.Reader, objects []ObjectInfo, errorCh chan<- RemoveObjectError) {
func processRemoveMultiObjectsResponse(body io.Reader, objects []ObjectInfo, resultCh chan<- RemoveObjectResult) {
// Parse multi delete XML response
rmResult := &deleteMultiObjectsResult{}
err := xmlDecoder(body, rmResult)
if err != nil {
errorCh <- RemoveObjectError{ObjectName: "", Err: err}
resultCh <- RemoveObjectResult{ObjectName: "", Err: err}
return
}

// Fill deletion that returned success
for _, obj := range rmResult.DeletedObjects {
resultCh <- RemoveObjectResult{
ObjectName: obj.Key,
// Only filled with versioned buckets
ObjectVersionID: obj.VersionID,
DeleteMarker: obj.DeleteMarker,
DeleteMarkerVersionID: obj.DeleteMarkerVersionID,
}
}

// Fill deletion that returned an error.
for _, obj := range rmResult.UnDeletedObjects {
// Version does not exist is not an error ignore and continue.
switch obj.Code {
case "InvalidArgument", "NoSuchVersion":
continue
}
errorCh <- RemoveObjectError{
ObjectName: obj.Key,
VersionID: obj.VersionID,
resultCh <- RemoveObjectResult{
ObjectName: obj.Key,
ObjectVersionID: obj.VersionID,
Err: ErrorResponse{
Code: obj.Code,
Message: obj.Message,
Expand Down Expand Up @@ -273,10 +301,54 @@ func (c *Client) RemoveObjects(ctx context.Context, bucketName string, objectsCh
return errorCh
}

go c.removeObjects(ctx, bucketName, objectsCh, errorCh, opts)
resultCh := make(chan RemoveObjectResult, 1)
go c.removeObjects(ctx, bucketName, objectsCh, resultCh, opts)
go func() {
defer close(errorCh)
for res := range resultCh {
// Send only errors to the error channel
if res.Err == nil {
continue
}
errorCh <- RemoveObjectError{
ObjectName: res.ObjectName,
VersionID: res.ObjectVersionID,
Err: res.Err,
}
}
}()

return errorCh
}

// RemoveObjectsWithResult removes multiple objects from a bucket while
// it is possible to specify objects versions which are received from
// objectsCh. Remove results, successes and failures are sent back via
// RemoveObjectResult channel
func (c *Client) RemoveObjectsWithResult(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, opts RemoveObjectsOptions) <-chan RemoveObjectResult {
resultCh := make(chan RemoveObjectResult, 1)

// Validate if bucket name is valid.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
defer close(resultCh)
resultCh <- RemoveObjectResult{
Err: err,
}
return resultCh
}
// Validate objects channel to be properly allocated.
if objectsCh == nil {
defer close(resultCh)
resultCh <- RemoveObjectResult{
Err: errInvalidArgument("Objects channel cannot be nil"),
}
return resultCh
}

go c.removeObjects(ctx, bucketName, objectsCh, resultCh, opts)
return resultCh
}

// Return true if the character is within the allowed characters in an XML 1.0 document
// The list of allowed characters can be found here: https://www.w3.org/TR/xml/#charsets
func validXMLChar(r rune) (ok bool) {
Expand All @@ -298,14 +370,14 @@ func hasInvalidXMLChar(str string) bool {
}

// Generate and call MultiDelete S3 requests based on entries received from objectsCh
func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, errorCh chan<- RemoveObjectError, opts RemoveObjectsOptions) {
func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh <-chan ObjectInfo, resultCh chan<- RemoveObjectResult, opts RemoveObjectsOptions) {
maxEntries := 1000
finish := false
urlValues := make(url.Values)
urlValues.Set("delete", "")

// Close error channel when Multi delete finishes.
defer close(errorCh)
// Close result channel when Multi delete finishes.
defer close(resultCh)

// Loop over entries by 1000 and call MultiDelete requests
for {
Expand All @@ -319,22 +391,20 @@ func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh
for object := range objectsCh {
if hasInvalidXMLChar(object.Key) {
// Use single DELETE so the object name will be in the request URL instead of the multi-delete XML document.
err := c.removeObject(ctx, bucketName, object.Key, RemoveObjectOptions{
removeResult := c.removeObject(ctx, bucketName, object.Key, RemoveObjectOptions{
VersionID: object.VersionID,
GovernanceBypass: opts.GovernanceBypass,
})
if err != nil {
if err := removeResult.Err; err != nil {
// Version does not exist is not an error ignore and continue.
switch ToErrorResponse(err).Code {
case "InvalidArgument", "NoSuchVersion":
continue
}
errorCh <- RemoveObjectError{
ObjectName: object.Key,
VersionID: object.VersionID,
Err: err,
}
resultCh <- removeResult
}

resultCh <- removeResult
continue
}

Expand Down Expand Up @@ -374,22 +444,22 @@ func (c *Client) removeObjects(ctx context.Context, bucketName string, objectsCh
if resp != nil {
if resp.StatusCode != http.StatusOK {
e := httpRespToErrorResponse(resp, bucketName, "")
errorCh <- RemoveObjectError{ObjectName: "", Err: e}
resultCh <- RemoveObjectResult{ObjectName: "", Err: e}
}
}
if err != nil {
for _, b := range batch {
errorCh <- RemoveObjectError{
ObjectName: b.Key,
VersionID: b.VersionID,
Err: err,
resultCh <- RemoveObjectResult{
ObjectName: b.Key,
ObjectVersionID: b.VersionID,
Err: err,
}
}
continue
}

// Process multiobjects remove xml response
processRemoveMultiObjectsResponse(resp.Body, batch, errorCh)
processRemoveMultiObjectsResponse(resp.Body, batch, resultCh)

closeResponse(resp)
}
Expand Down
2 changes: 1 addition & 1 deletion api-s3-datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ type deletedObject struct {
VersionID string `xml:"VersionId,omitempty"`
// These fields are ignored.
DeleteMarker bool
DeleteMarkerVersionID string
DeleteMarkerVersionID string `xml:"DeleteMarkerVersionId,omitempty"`
}

// nonDeletedObject container for Error element (failed deletion) in MultiObjects Delete XML response
Expand Down
Loading

0 comments on commit 85b031e

Please sign in to comment.