Skip to content

Commit

Permalink
Add PutObjectWithMetadata() API
Browse files Browse the repository at this point in the history
To be able to send a bunch of metadata for all types of PUT uploads
  • Loading branch information
vadmeste committed Dec 13, 2016
1 parent f69b61d commit 8ecf49c
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 59 deletions.
15 changes: 5 additions & 10 deletions api-put-object-common.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func hashCopyN(hashAlgorithms map[string]hash.Hash, hashSums map[string][]byte,

// getUploadID - fetch upload id if already present for an object name
// or initiate a new request to fetch a new upload id.
func (c Client) newUploadID(bucketName, objectName, contentType string) (uploadID string, err error) {
func (c Client) newUploadID(bucketName, objectName string, metaData map[string][]string) (uploadID string, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return "", err
Expand All @@ -175,13 +175,8 @@ func (c Client) newUploadID(bucketName, objectName, contentType string) (uploadI
return "", err
}

// Set content Type to default if empty string.
if contentType == "" {
contentType = "application/octet-stream"
}

// Initiate multipart upload for an object.
initMultipartUploadResult, err := c.initiateMultipartUpload(bucketName, objectName, contentType)
initMultipartUploadResult, err := c.initiateMultipartUpload(bucketName, objectName, metaData)
if err != nil {
return "", err
}
Expand All @@ -190,7 +185,7 @@ func (c Client) newUploadID(bucketName, objectName, contentType string) (uploadI

// getMpartUploadSession returns the upload id and the uploaded parts to continue a previous upload session
// or initiate a new multipart session if no current one found
func (c Client) getMpartUploadSession(bucketName, objectName, contentType string) (string, map[int]objectPart, error) {
func (c Client) getMpartUploadSession(bucketName, objectName string, metaData map[string][]string) (string, map[int]objectPart, error) {
// A map of all uploaded parts.
var partsInfo map[int]objectPart
var err error
Expand All @@ -202,7 +197,7 @@ func (c Client) getMpartUploadSession(bucketName, objectName, contentType string

if uploadID == "" {
// Initiates a new multipart request
uploadID, err = c.newUploadID(bucketName, objectName, contentType)
uploadID, err = c.newUploadID(bucketName, objectName, metaData)
if err != nil {
return "", nil, err
}
Expand All @@ -213,7 +208,7 @@ func (c Client) getMpartUploadSession(bucketName, objectName, contentType string
// When the server returns NoSuchUpload even if its previouls acknowleged the existance of the upload id,
// initiate a new multipart upload
if respErr, ok := err.(ErrorResponse); ok && respErr.Code == "NoSuchUpload" {
uploadID, err = c.newUploadID(bucketName, objectName, contentType)
uploadID, err = c.newUploadID(bucketName, objectName, metaData)
if err != nil {
return "", nil, err
}
Expand Down
18 changes: 11 additions & 7 deletions api-put-object-file.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
return 0, ErrEntityTooLarge(fileSize, maxMultipartPutObjectSize, bucketName, objectName)
}

objMetadata := make(map[string][]string)

// Set contentType based on filepath extension if not given or default
// value of "binary/octet-stream" if the extension has no associated type.
if contentType == "" {
Expand All @@ -70,6 +72,8 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
}
}

objMetadata["Content-Type"] = []string{contentType}

// NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs.
// Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers.
if isGoogleEndpoint(c.endpointURL) {
Expand All @@ -82,7 +86,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
}
}
// Do not compute MD5 for Google Cloud Storage. Uploads up to 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil)
return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
}

// NOTE: S3 doesn't allow anonymous multipart requests.
Expand All @@ -97,15 +101,15 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
}
// Do not compute MD5 for anonymous requests to Amazon
// S3. Uploads up to 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil)
return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
}

// Small object upload is initiated for uploads for input data size smaller than 5MiB.
if fileSize < minPartSize && fileSize >= 0 {
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil)
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
}
// Upload all large objects as multipart.
n, err = c.putObjectMultipartFromFile(bucketName, objectName, fileReader, fileSize, contentType, nil)
n, err = c.putObjectMultipartFromFile(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
Expand All @@ -116,7 +120,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
return 0, ErrEntityTooLarge(fileSize, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil)
return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, objMetadata, nil)
}
return n, err
}
Expand All @@ -131,7 +135,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string)
// against MD5SUM of each individual parts. This function also
// effectively utilizes file system capabilities of reading from
// specific sections and not having to create temporary files.
func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, contentType string, progress io.Reader) (int64, error) {
func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, metaData map[string][]string, progress io.Reader) (int64, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
Expand All @@ -141,7 +145,7 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe
}

// Get the upload id of a previously partially uploaded object or initiate a new multipart upload
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, contentType)
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData)
if err != nil {
return 0, err
}
Expand Down
29 changes: 17 additions & 12 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ import (
// If we exhaust all the known types, code proceeds to use stream as
// is where each part is re-downloaded, checksummed and verified
// before upload.
func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) {
func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
if size > 0 && size > minPartSize {
// Verify if reader is *os.File, then use file system functionalities.
if isFile(reader) {
return c.putObjectMultipartFromFile(bucketName, objectName, reader.(*os.File), size, contentType, progress)
return c.putObjectMultipartFromFile(bucketName, objectName, reader.(*os.File), size, metaData, progress)
}
// Verify if reader is *minio.Object or io.ReaderAt.
// NOTE: Verification of object is kept for a specific purpose
Expand All @@ -58,17 +58,17 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
// and such a functionality is used in the subsequent code
// path.
if isObject(reader) || isReadAt(reader) {
return c.putObjectMultipartFromReadAt(bucketName, objectName, reader.(io.ReaderAt), size, contentType, progress)
return c.putObjectMultipartFromReadAt(bucketName, objectName, reader.(io.ReaderAt), size, metaData, progress)
}
}
// For any other data size and reader type we do generic multipart
// approach by staging data in temporary files and uploading them.
return c.putObjectMultipartStream(bucketName, objectName, reader, size, contentType, progress)
return c.putObjectMultipartStream(bucketName, objectName, reader, size, metaData, progress)
}

// putObjectStream uploads files bigger than 5MiB, and also supports
// special case where size is unknown i.e '-1'.
func (c Client) putObjectMultipartStream(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) {
func (c Client) putObjectMultipartStream(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
Expand All @@ -84,7 +84,7 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
var complMultipartUpload completeMultipartUpload

// Get the upload id of a previously partially uploaded object or initiate a new multipart upload
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, contentType)
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func (c Client) putObjectMultipartStream(bucketName, objectName string, reader i
}

// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
func (c Client) initiateMultipartUpload(bucketName, objectName, contentType string) (initiateMultipartUploadResult, error) {
func (c Client) initiateMultipartUpload(bucketName, objectName string, metaData map[string][]string) (initiateMultipartUploadResult, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return initiateMultipartUploadResult{}, err
Expand All @@ -212,13 +212,18 @@ func (c Client) initiateMultipartUpload(bucketName, objectName, contentType stri
urlValues := make(url.Values)
urlValues.Set("uploads", "")

if contentType == "" {
contentType = "application/octet-stream"
}

// Set ContentType header.
customHeader := make(http.Header)
customHeader.Set("Content-Type", contentType)
for k, v := range metaData {
if len(v) > 0 {
customHeader.Set(k, v[0])
}
}

// Set a default content-type header if the latter is not provided
if v, ok := metaData["Content-Type"]; !ok || len(v) == 0 {
customHeader.Set("Content-Type", "application/octet-stream")
}

reqMetadata := requestMetadata{
bucketName: bucketName,
Expand Down
19 changes: 13 additions & 6 deletions api-put-object-progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@ import (
"strings"
)

// PutObjectWithProgress - With progress.
// PutObjectWithProgress - with progress.
func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.Reader, contentType string, progress io.Reader) (n int64, err error) {
metaData := make(map[string][]string)
metaData["Content-Type"] = []string{contentType}
return c.PutObjectWithMetadata(bucketName, objectName, reader, metaData, progress)
}

// PutObjectWithMetadata - with metadata.
func (c Client) PutObjectWithMetadata(bucketName, objectName string, reader io.Reader, metaData map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
Expand Down Expand Up @@ -63,7 +70,7 @@ func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.R
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Do not compute MD5 for Google Cloud Storage. Uploads up to 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, contentType, progress)
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metaData, progress)
}

// NOTE: S3 doesn't allow anonymous multipart requests.
Expand All @@ -81,15 +88,15 @@ func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.R
}
// Do not compute MD5 for anonymous requests to Amazon
// S3. Uploads up to 5GiB in size.
return c.putObjectNoChecksum(bucketName, objectName, reader, size, contentType, progress)
return c.putObjectNoChecksum(bucketName, objectName, reader, size, metaData, progress)
}

// putSmall object.
if size < minPartSize && size >= 0 {
return c.putObjectSingle(bucketName, objectName, reader, size, contentType, progress)
return c.putObjectSingle(bucketName, objectName, reader, size, metaData, progress)
}
// For all sizes greater than 5MiB do multipart.
n, err = c.putObjectMultipart(bucketName, objectName, reader, size, contentType, progress)
n, err = c.putObjectMultipart(bucketName, objectName, reader, size, metaData, progress)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
Expand All @@ -100,7 +107,7 @@ func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.R
return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}
// Fall back to uploading as single PutObject operation.
return c.putObjectSingle(bucketName, objectName, reader, size, contentType, progress)
return c.putObjectSingle(bucketName, objectName, reader, size, metaData, progress)
}
return n, err
}
Expand Down
4 changes: 2 additions & 2 deletions api-put-object-readat.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func shouldUploadPartReadAt(objPart objectPart, uploadReq uploadPartReq) bool {
// temporary files for staging all the data, these temporary files are
// cleaned automatically when the caller i.e http client closes the
// stream after uploading all the contents successfully.
func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, reader io.ReaderAt, size int64, contentType string, progress io.Reader) (n int64, err error) {
func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, reader io.ReaderAt, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
Expand All @@ -73,7 +73,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read
}

// Get the upload id of a previously partially uploaded object or initiate a new multipart upload
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, contentType)
uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData)
if err != nil {
return 0, err
}
Expand Down
37 changes: 22 additions & 15 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (c Client) PutObject(bucketName, objectName string, reader io.Reader, conte

// putObjectNoChecksum special function used Google Cloud Storage. This special function
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) {
func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
Expand All @@ -168,7 +168,7 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea

// This function does not calculate sha256 and md5sum for payload.
// Execute put object.
st, err := c.putObjectDo(bucketName, objectName, readSeeker, nil, nil, size, contentType)
st, err := c.putObjectDo(bucketName, objectName, readSeeker, nil, nil, size, metaData)
if err != nil {
return 0, err
}
Expand All @@ -180,7 +180,7 @@ func (c Client) putObjectNoChecksum(bucketName, objectName string, reader io.Rea

// putObjectSingle is a special function for uploading single put object request.
// This special function is used as a fallback when multipart upload fails.
func (c Client) putObjectSingle(bucketName, objectName string, reader io.Reader, size int64, contentType string, progress io.Reader) (n int64, err error) {
func (c Client) putObjectSingle(bucketName, objectName string, reader io.Reader, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return 0, err
Expand Down Expand Up @@ -236,7 +236,7 @@ func (c Client) putObjectSingle(bucketName, objectName string, reader io.Reader,
}
}
// Execute put object.
st, err := c.putObjectDo(bucketName, objectName, reader, hashSums["md5"], hashSums["sha256"], size, contentType)
st, err := c.putObjectDo(bucketName, objectName, reader, hashSums["md5"], hashSums["sha256"], size, metaData)
if err != nil {
return 0, err
}
Expand All @@ -254,7 +254,7 @@ func (c Client) putObjectSingle(bucketName, objectName string, reader io.Reader,

// putObjectDo - executes the put object http operation.
// NOTE: You must have WRITE permissions on a bucket to add an object to it.
func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5Sum []byte, sha256Sum []byte, size int64, contentType string) (ObjectInfo, error) {
func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5Sum []byte, sha256Sum []byte, size int64, metaData map[string][]string) (ObjectInfo, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return ObjectInfo{}, err
Expand All @@ -271,13 +271,20 @@ func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5
return ObjectInfo{}, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
}

if strings.TrimSpace(contentType) == "" {
contentType = "application/octet-stream"
}

// Set headers.
customHeader := make(http.Header)
customHeader.Set("Content-Type", contentType)

// Set metadata to headers
for k, v := range metaData {
if len(v) > 0 {
customHeader.Set(k, v[0])
}
}

// If Content-Type is not provided, set the default application/octet-stream one
if v, ok := metaData["Content-Type"]; !ok || len(v) == 0 {
customHeader.Set("Content-Type", "application/octet-stream")
}

// Populate request metadata.
reqMetadata := requestMetadata{
Expand All @@ -302,13 +309,13 @@ func (c Client) putObjectDo(bucketName, objectName string, reader io.Reader, md5
}
}

var metadata ObjectInfo
var objInfo ObjectInfo
// Trim off the odd double quotes from ETag in the beginning and end.
metadata.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
metadata.ETag = strings.TrimSuffix(metadata.ETag, "\"")
objInfo.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
objInfo.ETag = strings.TrimSuffix(objInfo.ETag, "\"")
// A success here means data was written to server successfully.
metadata.Size = size
objInfo.Size = size

// Return here.
return metadata, nil
return objInfo, nil
}
Loading

0 comments on commit 8ecf49c

Please sign in to comment.