Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mc: Vendorize minio-go this brings in streaming support. #2203

Merged
merged 1 commit into from
Jul 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 16 additions & 56 deletions cmd/client-fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (f *fsClient) put(reader io.Reader, size int64, metadata map[string][]strin
}

// Get stat to get the current size.
partSt, e := partFile.Stat()
partSt, e := os.Stat(objectPartPath)
if e != nil {
err := f.toClientError(e, objectPartPath)
return 0, err.Trace(objectPartPath)
Expand All @@ -246,73 +246,33 @@ func (f *fsClient) put(reader io.Reader, size int64, metadata map[string][]strin
// Current file offset.
var currentOffset = partSt.Size()

// Use ReadAt() capability when reader implements it, but also avoid it in two cases:
// *) reader represents a standard input/output stream since they return illegal seek error when ReadAt() is invoked
// *) we know in advance that reader will provide zero length data
if readerAt, ok := reader.(io.ReaderAt); ok && !isStdIO(reader) && size > 0 {
// Notify the progress bar if any till current size.
if progress != nil {
if _, e = io.CopyN(ioutil.Discard, progress, currentOffset); e != nil {
if !isStdIO(reader) && size > 0 {
reader = hookreader.NewHook(reader, progress)
if seeker, ok := reader.(io.Seeker); ok {
if _, e = seeker.Seek(currentOffset, 0); e != nil {
return 0, probe.NewError(e)
}
}

// Allocate buffer of 10MiB once.
readAtBuffer := make([]byte, 10*1024*1024)

// Loop through all offsets on incoming io.ReaderAt and write
// to the destination.
for currentOffset < size {
readAtSize, re := readerAt.ReadAt(readAtBuffer, currentOffset)
if re != nil && re != io.EOF {
// For any errors other than io.EOF, we return error
// and breakout.
err := f.toClientError(re, objectPartPath)
return 0, err.Trace(objectPartPath)
}
writtenSize, we := partFile.Write(readAtBuffer[:readAtSize])
if we != nil {
err := f.toClientError(we, objectPartPath)
return 0, err.Trace(objectPartPath)
}
// read size and subsequent write differ, a possible
// corruption return here.
if readAtSize != writtenSize {
// Unexpected write (less data was written than expected).
return 0, probe.NewError(UnexpectedShortWrite{
InputSize: readAtSize,
WriteSize: writtenSize,
})
}
// Notify the progress bar if any for written size.
if progress != nil {
if _, e = io.CopyN(ioutil.Discard, progress, int64(writtenSize)); e != nil {
return totalWritten, probe.NewError(e)
}
}
currentOffset += int64(writtenSize)
// Once we see io.EOF we break out of the loop.
if re == io.EOF {
break
// Discard bytes until currentOffset.
if _, e = io.CopyN(ioutil.Discard, progress, currentOffset); e != nil {
return 0, probe.NewError(e)
}
}
// Save currently copied total into totalWritten.
totalWritten = currentOffset
} else {
reader = hookreader.NewHook(reader, progress)
// Discard bytes until currentOffset.
if _, e = io.CopyN(ioutil.Discard, reader, currentOffset); e != nil {
return 0, probe.NewError(e)
}
var n int64
n, e = io.Copy(partFile, reader)
if e != nil {
return 0, probe.NewError(e)
}
// Save currently copied total into totalWritten.
totalWritten = n + currentOffset
}

n, e := io.Copy(partFile, reader)
if e != nil {
return 0, probe.NewError(e)
}

// Save currently copied total into totalWritten.
totalWritten = n + currentOffset

// Close the input reader as well, if possible.
closer, ok := reader.(io.Closer)
if ok {
Expand Down
24 changes: 16 additions & 8 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,14 +501,22 @@ func (c *s3Client) Get() (io.Reader, *probe.Error) {

// Copy - copy object
func (c *s3Client) Copy(source string, size int64, progress io.Reader) *probe.Error {
bucket, object := c.url2BucketAndObject()
if bucket == "" {
dstBucket, dstObject := c.url2BucketAndObject()
if dstBucket == "" {
return probe.NewError(BucketNameEmpty{})
}
// Empty copy conditions
copyConds := minio.NewCopyConditions()
e := c.api.CopyObject(bucket, object, source, copyConds)

tokens := splitStr(source, string(c.targetURL.Separator), 3)

// Source object
src := minio.NewSourceInfo(tokens[1], tokens[2], nil)

// Destination object
dst, e := minio.NewDestinationInfo(dstBucket, dstObject, nil, nil)
if e != nil {
return probe.NewError(e)
}
if e = c.api.CopyObject(dst, src); e != nil {
errResponse := minio.ToErrorResponse(e)
if errResponse.Code == "AccessDenied" {
return probe.NewError(PathInsufficientPermission{
Expand All @@ -517,12 +525,12 @@ func (c *s3Client) Copy(source string, size int64, progress io.Reader) *probe.Er
}
if errResponse.Code == "NoSuchBucket" {
return probe.NewError(BucketDoesNotExist{
Bucket: bucket,
Bucket: dstBucket,
})
}
if errResponse.Code == "InvalidBucketName" {
return probe.NewError(BucketInvalid{
Bucket: bucket,
Bucket: dstBucket,
})
}
if errResponse.Code == "NoSuchKey" || errResponse.Code == "InvalidArgument" {
Expand Down Expand Up @@ -550,7 +558,7 @@ func (c *s3Client) Put(reader io.Reader, size int64, metadata map[string][]strin
if bucket == "" {
return 0, probe.NewError(BucketNameEmpty{})
}
n, e := c.api.PutObjectWithMetadata(bucket, object, reader, metadata, progress)
n, e := c.api.PutObjectWithSize(bucket, object, reader, size, metadata, progress)
if e != nil {
errResponse := minio.ToErrorResponse(e)
if errResponse.Code == "UnexpectedEOF" || e == io.EOF {
Expand Down
4 changes: 0 additions & 4 deletions cmd/client-s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ func (h objectHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
if !bytes.Equal(h.data, buffer.Bytes()) {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("ETag", "9af2f8218b150c351ad802c6f3d66abe")
w.WriteHeader(http.StatusOK)
case r.Method == "HEAD":
Expand Down
4 changes: 2 additions & 2 deletions cmd/common-methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func uploadSourceToTargetURL(urls URLs, progress io.Reader) URLs {
targetURL := urls.TargetContent.URL
length := urls.SourceContent.Size

// Optimize for server side copy if object is <= 5GiB and the host is same.
if length <= globalMaximumPutSize && sourceAlias == targetAlias {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing length <= globalMaximumPutSize because minio-go already does that ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now minio-go supports copyObject upto 5TiB // cc @donatello

// Optimize for server side copy if the host is same.
if sourceAlias == targetAlias {
sourcePath := filepath.ToSlash(sourceURL.Path)
err := copySourceToTargetURL(targetAlias, targetURL.String(), sourcePath, length, progress)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions cmd/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ const (

// Global error exit status.
globalErrorExitStatus = 1

// Maximum size for a single PUT operation.
globalMaximumPutSize = 5 * 1024 * 1024 * 1024 // 5GiB.

)

var (
Expand Down
20 changes: 20 additions & 0 deletions pkg/hookreader/hookreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,31 @@ package hookreader

import "io"

// hookReader hooks additional reader in the source stream. It is
// useful for making progress bars. Second reader is appropriately
// notified about the exact number of bytes read from the primary
// source on each Read operation.
type hookReader struct {
source io.Reader
hook io.Reader
}

// Seek implements io.Seeker. Seeks source first, and if necessary
// seeks hook if Seek method is appropriately found.
func (hr *hookReader) Seek(offset int64, whence int) (n int64, err error) {
// Verify for source has embedded Seeker, use it.
sourceSeeker, ok := hr.source.(io.Seeker)
if ok {
return sourceSeeker.Seek(offset, whence)
}
// Verify if hook has embedded Seeker, use it.
hookSeeker, ok := hr.hook.(io.Seeker)
if ok {
return hookSeeker.Seek(offset, whence)
}
return n, nil
}

// Read implements io.Reader. Always reads from the source, the return
// value 'n' number of bytes are reported through the hook. Returns
// error for all non io.EOF conditions.
Expand Down
Loading