Skip to content

Commit

Permalink
mc: Vendorize minio-go this brings in streaming support.
Browse files Browse the repository at this point in the history
This PR brings following changes

- Brings streaming support for mc uploads.
- Brings support for copying objects upto 5TiB
  as a consequence of ComposeObject implementation
  in minio-go.
  • Loading branch information
harshavardhana committed Jul 5, 2017
1 parent 05f588f commit e97b3aa
Show file tree
Hide file tree
Showing 25 changed files with 1,270 additions and 1,209 deletions.
72 changes: 16 additions & 56 deletions cmd/client-fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (f *fsClient) put(reader io.Reader, size int64, metadata map[string][]strin
}

// Get stat to get the current size.
partSt, e := partFile.Stat()
partSt, e := os.Stat(objectPartPath)
if e != nil {
err := f.toClientError(e, objectPartPath)
return 0, err.Trace(objectPartPath)
Expand All @@ -246,73 +246,33 @@ func (f *fsClient) put(reader io.Reader, size int64, metadata map[string][]strin
// Current file offset.
var currentOffset = partSt.Size()

// Use ReadAt() capability when reader implements it, but also avoid it in two cases:
// *) reader represents a standard input/output stream since they return illegal seek error when ReadAt() is invoked
// *) we know in advance that reader will provide zero length data
if readerAt, ok := reader.(io.ReaderAt); ok && !isStdIO(reader) && size > 0 {
// Notify the progress bar if any till current size.
if progress != nil {
if _, e = io.CopyN(ioutil.Discard, progress, currentOffset); e != nil {
if !isStdIO(reader) && size > 0 {
reader = hookreader.NewHook(reader, progress)
if seeker, ok := reader.(io.Seeker); ok {
if _, e = seeker.Seek(currentOffset, 0); e != nil {
return 0, probe.NewError(e)
}
}

// Allocate buffer of 10MiB once.
readAtBuffer := make([]byte, 10*1024*1024)

// Loop through all offsets on incoming io.ReaderAt and write
// to the destination.
for currentOffset < size {
readAtSize, re := readerAt.ReadAt(readAtBuffer, currentOffset)
if re != nil && re != io.EOF {
// For any errors other than io.EOF, we return error
// and breakout.
err := f.toClientError(re, objectPartPath)
return 0, err.Trace(objectPartPath)
}
writtenSize, we := partFile.Write(readAtBuffer[:readAtSize])
if we != nil {
err := f.toClientError(we, objectPartPath)
return 0, err.Trace(objectPartPath)
}
// read size and subsequent write differ, a possible
// corruption return here.
if readAtSize != writtenSize {
// Unexpected write (less data was written than expected).
return 0, probe.NewError(UnexpectedShortWrite{
InputSize: readAtSize,
WriteSize: writtenSize,
})
}
// Notify the progress bar if any for written size.
if progress != nil {
if _, e = io.CopyN(ioutil.Discard, progress, int64(writtenSize)); e != nil {
return totalWritten, probe.NewError(e)
}
}
currentOffset += int64(writtenSize)
// Once we see io.EOF we break out of the loop.
if re == io.EOF {
break
// Discard bytes until currentOffset.
if _, e = io.CopyN(ioutil.Discard, progress, currentOffset); e != nil {
return 0, probe.NewError(e)
}
}
// Save currently copied total into totalWritten.
totalWritten = currentOffset
} else {
reader = hookreader.NewHook(reader, progress)
// Discard bytes until currentOffset.
if _, e = io.CopyN(ioutil.Discard, reader, currentOffset); e != nil {
return 0, probe.NewError(e)
}
var n int64
n, e = io.Copy(partFile, reader)
if e != nil {
return 0, probe.NewError(e)
}
// Save currently copied total into totalWritten.
totalWritten = n + currentOffset
}

n, e := io.Copy(partFile, reader)
if e != nil {
return 0, probe.NewError(e)
}

// Save currently copied total into totalWritten.
totalWritten = n + currentOffset

// Close the input reader as well, if possible.
closer, ok := reader.(io.Closer)
if ok {
Expand Down
24 changes: 16 additions & 8 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,14 +514,22 @@ func (c *s3Client) Get() (io.Reader, map[string][]string, *probe.Error) {

// Copy - copy object
func (c *s3Client) Copy(source string, size int64, progress io.Reader) *probe.Error {
bucket, object := c.url2BucketAndObject()
if bucket == "" {
dstBucket, dstObject := c.url2BucketAndObject()
if dstBucket == "" {
return probe.NewError(BucketNameEmpty{})
}
// Empty copy conditions
copyConds := minio.NewCopyConditions()
e := c.api.CopyObject(bucket, object, source, copyConds)

tokens := splitStr(source, string(c.targetURL.Separator), 3)

// Source object
src := minio.NewSourceInfo(tokens[1], tokens[2], nil)

// Destination object
dst, e := minio.NewDestinationInfo(dstBucket, dstObject, nil, nil)
if e != nil {
return probe.NewError(e)
}
if e = c.api.CopyObject(dst, src); e != nil {
errResponse := minio.ToErrorResponse(e)
if errResponse.Code == "AccessDenied" {
return probe.NewError(PathInsufficientPermission{
Expand All @@ -530,12 +538,12 @@ func (c *s3Client) Copy(source string, size int64, progress io.Reader) *probe.Er
}
if errResponse.Code == "NoSuchBucket" {
return probe.NewError(BucketDoesNotExist{
Bucket: bucket,
Bucket: dstBucket,
})
}
if errResponse.Code == "InvalidBucketName" {
return probe.NewError(BucketInvalid{
Bucket: bucket,
Bucket: dstBucket,
})
}
if errResponse.Code == "NoSuchKey" || errResponse.Code == "InvalidArgument" {
Expand Down Expand Up @@ -563,7 +571,7 @@ func (c *s3Client) Put(reader io.Reader, size int64, metadata map[string][]strin
if bucket == "" {
return 0, probe.NewError(BucketNameEmpty{})
}
n, e := c.api.PutObjectWithMetadata(bucket, object, reader, metadata, progress)
n, e := c.api.PutObjectWithSize(bucket, object, reader, size, metadata, progress)
if e != nil {
errResponse := minio.ToErrorResponse(e)
if errResponse.Code == "UnexpectedEOF" || e == io.EOF {
Expand Down
4 changes: 0 additions & 4 deletions cmd/client-s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ func (h objectHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
if !bytes.Equal(h.data, buffer.Bytes()) {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("ETag", "9af2f8218b150c351ad802c6f3d66abe")
w.WriteHeader(http.StatusOK)
case r.Method == "HEAD":
Expand Down
4 changes: 2 additions & 2 deletions cmd/common-methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func uploadSourceToTargetURL(urls URLs, progress io.Reader) URLs {
targetURL := urls.TargetContent.URL
length := urls.SourceContent.Size

// Optimize for server side copy if object is <= 5GiB and the host is same.
if length <= globalMaximumPutSize && sourceAlias == targetAlias {
// Optimize for server side copy if the host is same.
if sourceAlias == targetAlias {
sourcePath := filepath.ToSlash(sourceURL.Path)
err := copySourceToTargetURL(targetAlias, targetURL.String(), sourcePath, length, progress)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions cmd/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ const (

// Global error exit status.
globalErrorExitStatus = 1

// Maximum size for a single PUT operation.
globalMaximumPutSize = 5 * 1024 * 1024 * 1024 // 5GiB.

)

var (
Expand Down
20 changes: 20 additions & 0 deletions pkg/hookreader/hookreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,31 @@ package hookreader

import "io"

// hookReader hooks additional reader in the source stream. It is
// useful for making progress bars. Second reader is appropriately
// notified about the exact number of bytes read from the primary
// source on each Read operation.
type hookReader struct {
source io.Reader
hook io.Reader
}

// Seek implements io.Seeker. Seeks source first, and if necessary
// seeks hook if Seek method is appropriately found.
func (hr *hookReader) Seek(offset int64, whence int) (n int64, err error) {
// Verify for source has embedded Seeker, use it.
sourceSeeker, ok := hr.source.(io.Seeker)
if ok {
return sourceSeeker.Seek(offset, whence)
}
// Verify if hook has embedded Seeker, use it.
hookSeeker, ok := hr.hook.(io.Seeker)
if ok {
return hookSeeker.Seek(offset, whence)
}
return n, nil
}

// Read implements io.Reader. Always reads from the source, the return
// value 'n' number of bytes are reported through the hook. Returns
// error for all non io.EOF conditions.
Expand Down
Loading

0 comments on commit e97b3aa

Please sign in to comment.