Skip to content

Commit

Permalink
Add API to compose objects through server-side copying
Browse files Browse the repository at this point in the history
The new ComposeObject API provides a way to create objects by
concatenating existing objects. It takes a list of source objects
along with optional start-end range specifications, and concatenates
them into a new object.

The API supports:

* Create an object from upto 10000 existing objects.
* Create objects upto 5TiB in size, from source objects of any size.
* Support copy-conditions on each source object separately.
* Support SSE-C (i.e. Server-Side-Encryption with Customer provided
  key) for both encryption of destination object, and decryption of
  source objects.
* Support for setting/replacing custom metadata in the destination
  object.

This API has been used to refactor the CopyObject API - that API now
supports source objects of any size, SSE-C for source and destination,
and settings custom metadata.
  • Loading branch information
donatello committed Jun 30, 2017
1 parent 982f4fb commit e773fe7
Show file tree
Hide file tree
Showing 9 changed files with 1,067 additions and 115 deletions.
509 changes: 509 additions & 0 deletions api-compose-object.go

Large diffs are not rendered by default.

88 changes: 88 additions & 0 deletions api-compose-object_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio

import (
"reflect"
"testing"
)

const (
gb1 = 1024 * 1024 * 1024
gb5 = 5 * gb1
gb5p1 = gb5 + 1
gb10p1 = 2*gb5 + 1
gb10p2 = 2*gb5 + 2
)

func TestPartsRequired(t *testing.T) {
testCases := []struct {
size, ref int64
}{
{0, 0},
{1, 1},
{gb5, 1},
{2 * gb5, 2},
{gb10p1, 3},
{gb10p2, 3},
}

for i, testCase := range testCases {
res := partsRequired(testCase.size)
if res != testCase.ref {
t.Errorf("Test %d - output did not match with reference results", i+1)
}
}
}

func TestCalculateEvenSplits(t *testing.T) {

testCases := []struct {
// input size and source object
size int64
src SourceInfo

// output part-indexes
starts, ends []int64
}{
{0, SourceInfo{start: -1}, nil, nil},
{1, SourceInfo{start: -1}, []int64{0}, []int64{0}},
{1, SourceInfo{start: 0}, []int64{0}, []int64{0}},

{gb1, SourceInfo{start: -1}, []int64{0}, []int64{gb1 - 1}},
{gb5, SourceInfo{start: -1}, []int64{0}, []int64{gb5 - 1}},

// 2 part splits
{gb5p1, SourceInfo{start: -1}, []int64{0, gb5/2 + 1}, []int64{gb5 / 2, gb5}},
{gb5p1, SourceInfo{start: -1}, []int64{0, gb5/2 + 1}, []int64{gb5 / 2, gb5}},

// 3 part splits
{gb10p1, SourceInfo{start: -1},
[]int64{0, gb10p1/3 + 1, 2*gb10p1/3 + 1},
[]int64{gb10p1 / 3, 2 * gb10p1 / 3, gb10p1 - 1}},

{gb10p2, SourceInfo{start: -1},
[]int64{0, gb10p2 / 3, 2 * gb10p2 / 3},
[]int64{gb10p2/3 - 1, 2*gb10p2/3 - 1, gb10p2 - 1}},
}

for i, testCase := range testCases {
resStart, resEnd := calculateEvenSplits(testCase.size, testCase.src)
if !reflect.DeepEqual(testCase.starts, resStart) || !reflect.DeepEqual(testCase.ends, resEnd) {
t.Errorf("Test %d - output did not match with reference results", i+1)
}
}
}
56 changes: 3 additions & 53 deletions api-put-object-copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,7 @@

package minio

import (
"net/http"

"github.com/minio/minio-go/pkg/s3utils"
)

// CopyObject - copy a source object into a new object with the provided name in the provided bucket
func (c Client) CopyObject(bucketName string, objectName string, objectSource string, cpCond CopyConditions) error {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return err
}
if err := s3utils.CheckValidObjectName(objectName); err != nil {
return err
}
if objectSource == "" {
return ErrInvalidArgument("Object source cannot be empty.")
}

// customHeaders apply headers.
customHeaders := make(http.Header)
for _, cond := range cpCond.conditions {
customHeaders.Set(cond.key, cond.value)
}

// Set copy source.
customHeaders.Set("x-amz-copy-source", s3utils.EncodePath(objectSource))

// Execute PUT on objectName.
resp, err := c.executeMethod("PUT", requestMetadata{
bucketName: bucketName,
objectName: objectName,
customHeader: customHeaders,
})
defer closeResponse(resp)
if err != nil {
return err
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return httpRespToErrorResponse(resp, bucketName, objectName)
}
}

// Decode copy response on success.
cpObjRes := copyObjectResult{}
err = xmlDecoder(resp.Body, &cpObjRes)
if err != nil {
return err
}

// Return nil on success.
return nil
// CopyObject - copy a source object into a new object
func (c Client) CopyObject(dst DestinationInfo, src SourceInfo) error {
return c.ComposeObject(dst, []SourceInfo{src})
}
152 changes: 145 additions & 7 deletions api_functional_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"net/url"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -757,18 +758,19 @@ func TestCopyObjectV2(t *testing.T) {
len(buf), n)
}

// Set copy conditions.
copyConds := CopyConditions{}
err = copyConds.SetModified(time.Date(2014, time.April, 0, 0, 0, 0, 0, time.UTC))
dst, err := NewDestinationInfo(bucketName+"-copy", objectName+"-copy", nil, nil)
if err != nil {
t.Fatal("Error:", err)
t.Fatal(err)
}

// Copy source.
copySource := bucketName + "/" + objectName
src := NewSourceInfo(bucketName, objectName, nil)
err = src.SetModifiedSinceCond(time.Date(2014, time.April, 0, 0, 0, 0, 0, time.UTC))
if err != nil {
t.Fatal("Error:", err)
}

// Perform the Copy
err = c.CopyObject(bucketName+"-copy", objectName+"-copy", copySource, copyConds)
err = c.CopyObject(dst, src)
if err != nil {
t.Fatal("Error:", err, bucketName+"-copy", objectName+"-copy")
}
Expand Down Expand Up @@ -1106,3 +1108,139 @@ func TestFunctionalV2(t *testing.T) {
t.Fatal("Error: ", err)
}
}

func testComposeObjectErrorCases(c *Client, t *testing.T) {
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")

// Make a new bucket in 'us-east-1' (source bucket).
err := c.MakeBucket(bucketName, "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}

// Test that more than 10K source objects cannot be
// concatenated.
srcArr := [10001]SourceInfo{}
srcSlice := srcArr[:]
dst, err := NewDestinationInfo(bucketName, "object", nil, nil)
if err != nil {
t.Fatal(err)
}

if err := c.ComposeObject(dst, srcSlice); err == nil {
t.Fatal("Error was expected.")
} else if err.Error() != "There must be as least one and upto 10000 source objects." {
t.Fatal("Got unexpected error: ", err)
}

// Create a source with invalid offset spec and check that
// error is returned:
// 1. Create the source object.
const badSrcSize = 5 * 1024 * 1024
buf := bytes.Repeat([]byte("1"), badSrcSize)
_, err = c.PutObject(bucketName, "badObject", bytes.NewReader(buf), "")
if err != nil {
t.Fatal("Error:", err)
}
// 2. Set invalid range spec on the object (going beyond
// object size)
badSrc := NewSourceInfo(bucketName, "badObject", nil)
err = badSrc.SetRange(1, badSrcSize)
if err != nil {
t.Fatal("Error:", err)
}
// 3. ComposeObject call should fail.
if err := c.ComposeObject(dst, []SourceInfo{badSrc}); err == nil {
t.Fatal("Error was expected.")
} else if !strings.Contains(err.Error(), "has invalid segment-to-copy") {
t.Fatal("Got unexpected error: ", err)
}
}

// Test expected error cases
func TestComposeObjectErrorCasesV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}

// Instantiate new minio client object
c, err := NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableSecurity)),
)
if err != nil {
t.Fatal("Error:", err)
}

testComposeObjectErrorCases(c, t)
}

func testComposeMultipleSources(c *Client, t *testing.T) {
// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")
// Make a new bucket in 'us-east-1' (source bucket).
err := c.MakeBucket(bucketName, "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}

// Upload a small source object
const srcSize = 1024 * 1024 * 5
buf := bytes.Repeat([]byte("1"), srcSize)
_, err = c.PutObject(bucketName, "srcObject", bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err)
}

// We will append 10 copies of the object.
srcs := []SourceInfo{}
for i := 0; i < 10; i++ {
srcs = append(srcs, NewSourceInfo(bucketName, "srcObject", nil))
}
// make the last part very small
err = srcs[9].SetRange(0, 0)
if err != nil {
t.Fatal("unexpected error:", err)
}

dst, err := NewDestinationInfo(bucketName, "dstObject", nil, nil)
if err != nil {
t.Fatal(err)
}
err = c.ComposeObject(dst, srcs)
if err != nil {
t.Fatal("Error:", err)
}

objProps, err := c.StatObject(bucketName, "dstObject")
if err != nil {
t.Fatal("Error:", err)
}

if objProps.Size != 9*srcSize+1 {
t.Fatal("Size mismatched! Expected:", 10000*srcSize, "but got:", objProps.Size)
}
}

// Test concatenating multiple objects objects
func TestCompose10KSourcesV2(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}

// Instantiate new minio client object
c, err := NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableSecurity)),
)
if err != nil {
t.Fatal("Error:", err)
}

testComposeMultipleSources(c, t)
}
Loading

0 comments on commit e773fe7

Please sign in to comment.