Skip to content

Commit

Permalink
Add API to compose objects through server-side copying
Browse files Browse the repository at this point in the history
The new ComposeObject API provides a way to create objects by
concatenating existing objects. It takes a list of source objects
along with optional start-end range specifications, and concatenates
them into a new object.

The API supports:

* Create an object from upto 10000 existing objects.
* Create objects upto 5TiB in size, from source objects of any size.
* Support copy-conditions on each source object separately.
* Support SSE-C (i.e. Server-Side-Encryption with Customer provided
  key) for both encryption of destination object, and decryption of
  source objects.
* Support for setting/replacing custom metadata in the destination
  object.

This API has been used to refactor the CopyObject API - that API now
supports source objects of any size, SSE-C for source and destination,
and settings custom metadata.
  • Loading branch information
donatello committed Jun 27, 2017
1 parent 4dde80b commit 6316b75
Show file tree
Hide file tree
Showing 8 changed files with 970 additions and 115 deletions.
505 changes: 505 additions & 0 deletions api-compose-object.go

Large diffs are not rendered by default.

56 changes: 3 additions & 53 deletions api-put-object-copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,7 @@

package minio

import (
"net/http"

"github.com/minio/minio-go/pkg/s3utils"
)

// CopyObject - copy a source object into a new object with the provided name in the provided bucket
func (c Client) CopyObject(bucketName string, objectName string, objectSource string, cpCond CopyConditions) error {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return err
}
if err := s3utils.CheckValidObjectName(objectName); err != nil {
return err
}
if objectSource == "" {
return ErrInvalidArgument("Object source cannot be empty.")
}

// customHeaders apply headers.
customHeaders := make(http.Header)
for _, cond := range cpCond.conditions {
customHeaders.Set(cond.key, cond.value)
}

// Set copy source.
customHeaders.Set("x-amz-copy-source", s3utils.EncodePath(objectSource))

// Execute PUT on objectName.
resp, err := c.executeMethod("PUT", requestMetadata{
bucketName: bucketName,
objectName: objectName,
customHeader: customHeaders,
})
defer closeResponse(resp)
if err != nil {
return err
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return httpRespToErrorResponse(resp, bucketName, objectName)
}
}

// Decode copy response on success.
cpObjRes := copyObjectResult{}
err = xmlDecoder(resp.Body, &cpObjRes)
if err != nil {
return err
}

// Return nil on success.
return nil
// CopyObject - copy a source object into a new object
func (c Client) CopyObject(dst DestinationInfo, src SourceInfo) error {
return c.ComposeObject(dst, []SourceInfo{src})
}
147 changes: 140 additions & 7 deletions api_functional_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"net/url"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -948,18 +949,19 @@ func TestCopyObjectV2(t *testing.T) {
len(buf), n)
}

// Set copy conditions.
copyConds := CopyConditions{}
err = copyConds.SetModified(time.Date(2014, time.April, 0, 0, 0, 0, 0, time.UTC))
dst, err := NewDestinationInfo(bucketName+"-copy", objectName+"-copy", nil, nil)
if err != nil {
t.Fatal("Error:", err)
t.Fatal(err)
}

// Copy source.
copySource := bucketName + "/" + objectName
src := NewSourceInfo(bucketName, objectName, nil)
err = src.SetModifiedSinceCond(time.Date(2014, time.April, 0, 0, 0, 0, 0, time.UTC))
if err != nil {
t.Fatal("Error:", err)
}

// Perform the Copy
err = c.CopyObject(bucketName+"-copy", objectName+"-copy", copySource, copyConds)
err = c.CopyObject(dst, src)
if err != nil {
t.Fatal("Error:", err, bucketName+"-copy", objectName+"-copy")
}
Expand Down Expand Up @@ -1297,3 +1299,134 @@ func TestFunctionalV2(t *testing.T) {
t.Fatal("Error: ", err)
}
}

func testComposeObjectErrorCases(c *Client, t *testing.T) {
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")

// Make a new bucket in 'us-east-1' (source bucket).
err := c.MakeBucket(bucketName, "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}

// Test that more than 10K source objects cannot be
// concatenated.
srcArr := [10001]SourceInfo{}
srcSlice := srcArr[:]
dst, err := NewDestinationInfo(bucketName, "object", nil, nil)
if err != nil {
t.Fatal(err)
}

if err := c.ComposeObject(dst, srcSlice); err == nil {
t.Fatal("Error was expected.")
} else if err.Error() != "There must be as least one and upto 10000 source objects." {
t.Fatal("Got unexpected error: ", err)
}

// Create a source with invalid offset spec and check that
// error is returned:
// 1. Create the source object.
const badSrcSize = 5 * 1024 * 1024
buf := bytes.Repeat([]byte("1"), badSrcSize)
_, err = c.PutObject(bucketName, "badObject", bytes.NewReader(buf), "")
if err != nil {
t.Fatal("Error:", err)
}
// 2. Set invalid range spec on the object (going beyond
// object size)
badSrc := NewSourceInfo(bucketName, "badObject", nil)
err = badSrc.SetRange(1, badSrcSize)
if err != nil {
t.Fatal("Error:", err)
}
// 3. ComposeObject call should fail.
if err := c.ComposeObject(dst, []SourceInfo{badSrc}); err == nil {
t.Fatal("Error was expected.")
} else if !strings.Contains(err.Error(), "has invalid segment-to-copy") {
t.Fatal("Got unexpected error: ", err)
}
}

// Test expected error cases
func TestComposeObjectErrorCasesV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}

// Instantiate new minio client object
c, err := NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableSecurity)),
)
if err != nil {
t.Fatal("Error:", err)
}

testComposeObjectErrorCases(c, t)
}

func testComposeMultipleSources(c *Client, t *testing.T) {
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket in 'us-east-1' (source bucket).
err := c.MakeBucket(bucketName, "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}

// Upload a small source object
const srcSize = 1024 * 1024 * 5
buf := bytes.Repeat([]byte("1"), srcSize)
_, err = c.PutObject(bucketName, "srcObject", bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err)
}

// We will append 10 copies of the object.
srcs := []SourceInfo{}
for i := 0; i < 10; i++ {
srcs = append(srcs, NewSourceInfo(bucketName, "srcObject", nil))
}

dst, err := NewDestinationInfo(bucketName, "dstObject", nil, nil)
if err != nil {
t.Fatal(err)
}
err = c.ComposeObject(dst, srcs)
if err != nil {
t.Fatal("Error:", err)
}

objProps, err := c.StatObject(bucketName, "dstObject")
if err != nil {
t.Fatal("Error:", err)
}

if objProps.Size != 10*srcSize {
t.Fatal("Size mismatched! Expected:", 10000*srcSize, "but got:", objProps.Size)
}
}

// Test concatenating multiple objects objects
func TestCompose10KSourcesV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}

// Instantiate new minio client object
c, err := NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableSecurity)),
)
if err != nil {
t.Fatal("Error:", err)
}

testComposeMultipleSources(c, t)
}
74 changes: 59 additions & 15 deletions api_functional_v4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1743,41 +1743,45 @@ func TestCopyObject(t *testing.T) {
t.Fatal("Error:", err)
}

// Copy Source
src := NewSourceInfo(bucketName, objectName, nil)

// Set copy conditions.
copyConds := CopyConditions{}

// Start by setting wrong conditions
err = copyConds.SetModified(time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC))
// All invalid conditions first.
err = src.SetModifiedSinceCond(time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC))
if err == nil {
t.Fatal("Error:", err)
}
err = copyConds.SetUnmodified(time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC))
err = src.SetUnmodifiedSinceCond(time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC))
if err == nil {
t.Fatal("Error:", err)
}
err = copyConds.SetMatchETag("")
err = src.SetMatchETagCond("")
if err == nil {
t.Fatal("Error:", err)
}
err = copyConds.SetMatchETagExcept("")
err = src.SetMatchETagExceptCond("")
if err == nil {
t.Fatal("Error:", err)
}

err = copyConds.SetModified(time.Date(2014, time.April, 0, 0, 0, 0, 0, time.UTC))
err = src.SetModifiedSinceCond(time.Date(2014, time.April, 0, 0, 0, 0, 0, time.UTC))
if err != nil {
t.Fatal("Error:", err)
}
err = copyConds.SetMatchETag(objInfo.ETag)
err = src.SetMatchETagCond(objInfo.ETag)
if err != nil {
t.Fatal("Error:", err)
}

// Copy source.
copySource := bucketName + "/" + objectName
dst, err := NewDestinationInfo(bucketName+"-copy", objectName+"-copy", nil, nil)
if err != nil {
t.Fatal(err)
}

// Perform the Copy
err = c.CopyObject(bucketName+"-copy", objectName+"-copy", copySource, copyConds)
err = c.CopyObject(dst, src)
if err != nil {
t.Fatal("Error:", err, bucketName+"-copy", objectName+"-copy")
}
Expand Down Expand Up @@ -1807,18 +1811,18 @@ func TestCopyObject(t *testing.T) {
}

// CopyObject again but with wrong conditions
copyConds = CopyConditions{}
err = copyConds.SetUnmodified(time.Date(2014, time.April, 0, 0, 0, 0, 0, time.UTC))
src = NewSourceInfo(bucketName, objectName, nil)
err = src.SetUnmodifiedSinceCond(time.Date(2014, time.April, 0, 0, 0, 0, 0, time.UTC))
if err != nil {
t.Fatal("Error:", err)
}
err = copyConds.SetMatchETagExcept(objInfo.ETag)
err = src.SetMatchETagExceptCond(objInfo.ETag)
if err != nil {
t.Fatal("Error:", err)
}

// Perform the Copy which should fail
err = c.CopyObject(bucketName+"-copy", objectName+"-copy", copySource, copyConds)
err = c.CopyObject(dst, src)
if err == nil {
t.Fatal("Error:", err, bucketName+"-copy", objectName+"-copy should fail")
}
Expand Down Expand Up @@ -2586,3 +2590,43 @@ func TestPutObjectUploadSeekedObject(t *testing.T) {
t.Fatal("Error:", err)
}
}

// Test expected error cases
func TestComposeObjectErrorCases(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}

// Instantiate new minio client object
c, err := NewV4(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableSecurity)),
)
if err != nil {
t.Fatal("Error:", err)
}

testComposeObjectErrorCases(c, t)
}

// Test concatenating 10K objects
func TestCompose10KSources(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}

// Instantiate new minio client object
c, err := NewV4(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableSecurity)),
)
if err != nil {
t.Fatal("Error:", err)
}

testComposeMultipleSources(c, t)
}
10 changes: 9 additions & 1 deletion constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,18 @@ package minio

/// Multipart upload defaults.

// miniPartSize - minimum part size 64MiB per object after which
// absMinPartSize - absolute minimum part size (5 MiB) below which
// a part in a multipart upload may not be uploaded.
const absMinPartSize = 1024 * 1024 * 5

// minPartSize - minimum part size 64MiB per object after which
// putObject behaves internally as multipart.
const minPartSize = 1024 * 1024 * 64

// copyPartSize - default (and maximum) part size to copy in a
// copy-object request (5GiB)
const copyPartSize = 1024 * 1024 * 1024 * 5

// maxPartsCount - maximum number of parts for a single multipart session.
const maxPartsCount = 10000

Expand Down
Loading

0 comments on commit 6316b75

Please sign in to comment.