Skip to content

Commit

Permalink
api, core: Add new Core API {Get,Stat}Object with pre-conditions.
Browse files Browse the repository at this point in the history
Implements a new API to provide a way to set headers
for GetObject(), StatObject() request such as to

 - read partial data starting at offsets.
 - read only if etag matches.
 - read only if modtime matches.
 - read only if etag doesn't match.
 - read only if modtime doesn't match.

Fixes #669
  • Loading branch information
harshavardhana authored and minio-trusted committed Apr 28, 2017
1 parent e988fc9 commit 3aa4ee3
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 18 deletions.
7 changes: 6 additions & 1 deletion api-get-object-file.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,13 @@ func (c Client) FGetObject(bucketName, objectName, filePath string) error {
return err
}

// Initialize get object request headers to set the
// appropriate range offsets to read from.
reqHeaders := NewGetReqHeaders()
reqHeaders.SetRange(st.Size(), 0)

// Seek to current position for incoming reader.
objectReader, objectStat, err := c.getObject(bucketName, objectName, st.Size(), 0)
objectReader, objectStat, err := c.getObject(bucketName, objectName, reqHeaders)
if err != nil {
return err
}
Expand Down
35 changes: 19 additions & 16 deletions api-get-object.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016 Minio, Inc.
* Minio Go Library for Amazon S3 Compatible Cloud Storage (C) 2015, 2016, 2017 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -97,16 +97,19 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
if req.isFirstReq {
// First request is a Read/ReadAt.
if req.isReadOp {
reqHeaders := NewGetReqHeaders()
// Differentiate between wanting the whole object and just a range.
if req.isReadAt {
// If this is a ReadAt request only get the specified range.
// Range is set with respect to the offset and length of the buffer requested.
// Do not set objectInfo from the first readAt request because it will not get
// the whole object.
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
reqHeaders.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
httpReader, _, err = c.getObject(bucketName, objectName, reqHeaders)
} else {
reqHeaders.SetRange(req.Offset, 0)
// First request is a Read request.
httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, 0)
httpReader, objectInfo, err = c.getObject(bucketName, objectName, reqHeaders)
}
if err != nil {
resCh <- getResponse{
Expand Down Expand Up @@ -166,16 +169,20 @@ func (c Client) GetObject(bucketName, objectName string) (*Object, error) {
// new ones when they haven't been already.
// All readAt requests are new requests.
if req.DidOffsetChange || !req.beenRead {
reqHeaders := NewGetReqHeaders()
if httpReader != nil {
// Close previously opened http reader.
httpReader.Close()
}
// If this request is a readAt only get the specified range.
if req.isReadAt {
// Range is set with respect to the offset and length of the buffer requested.
httpReader, _, err = c.getObject(bucketName, objectName, req.Offset, int64(len(req.Buffer)))
reqHeaders.SetRange(req.Offset, req.Offset+int64(len(req.Buffer))-1)
httpReader, _, err = c.getObject(bucketName, objectName, reqHeaders)
} else {
httpReader, objectInfo, err = c.getObject(bucketName, objectName, req.Offset, 0)
// Range is set with respect to the offset.
reqHeaders.SetRange(req.Offset, 0)
httpReader, objectInfo, err = c.getObject(bucketName, objectName, reqHeaders)
}
if err != nil {
resCh <- getResponse{
Expand Down Expand Up @@ -230,8 +237,8 @@ type getResponse struct {
objectInfo ObjectInfo // Used for the first request.
}

// Object represents an open object. It implements Read, ReadAt,
// Seeker, Close for a HTTP stream.
// Object represents an open object. It implements
// Reader, ReaderAt, Seeker, Closer for a HTTP stream.
type Object struct {
// Mutex.
mutex *sync.Mutex
Expand Down Expand Up @@ -594,7 +601,7 @@ func newObject(reqCh chan<- getRequest, resCh <-chan getResponse, doneCh chan<-
//
// For more information about the HTTP Range header.
// go to http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35.
func (c Client) getObject(bucketName, objectName string, offset, length int64) (io.ReadCloser, ObjectInfo, error) {
func (c Client) getObject(bucketName, objectName string, reqHeaders RequestHeaders) (io.ReadCloser, ObjectInfo, error) {
// Validate input arguments.
if err := isValidBucketName(bucketName); err != nil {
return nil, ObjectInfo{}, err
Expand All @@ -603,15 +610,10 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) (
return nil, ObjectInfo{}, err
}

// Set all the necessary reqHeaders.
customHeader := make(http.Header)
// Set ranges if length and offset are valid.
// See https://tools.ietf.org/html/rfc7233#section-3.1 for reference.
if length > 0 && offset >= 0 {
customHeader.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
} else if offset > 0 && length == 0 {
customHeader.Set("Range", fmt.Sprintf("bytes=%d-", offset))
} else if length < 0 && offset == 0 {
customHeader.Set("Range", fmt.Sprintf("bytes=%d", length))
for key, value := range reqHeaders.Header {
customHeader[key] = value
}

// Execute GET on objectName.
Expand Down Expand Up @@ -646,6 +648,7 @@ func (c Client) getObject(bucketName, objectName string, offset, length int64) (
Region: resp.Header.Get("x-amz-bucket-region"),
}
}

// Get content-type.
contentType := strings.TrimSpace(resp.Header.Get("Content-Type"))
if contentType == "" {
Expand Down
21 changes: 21 additions & 0 deletions api-stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,31 @@ func (c Client) StatObject(bucketName, objectName string) (ObjectInfo, error) {
if err := isValidObjectName(objectName); err != nil {
return ObjectInfo{}, err
}
reqHeaders := NewHeadReqHeaders()
return c.statObject(bucketName, objectName, reqHeaders)
}

// Lower level API for statObject supporting pre-conditions and range headers.
func (c Client) statObject(bucketName, objectName string, reqHeaders RequestHeaders) (ObjectInfo, error) {
// Input validation.
if err := isValidBucketName(bucketName); err != nil {
return ObjectInfo{}, err
}
if err := isValidObjectName(objectName); err != nil {
return ObjectInfo{}, err
}

customHeader := make(http.Header)
for k, v := range reqHeaders.Header {
customHeader[k] = v
}

// Execute HEAD on objectName.
resp, err := c.executeMethod("HEAD", requestMetadata{
bucketName: bucketName,
objectName: objectName,
contentSHA256Bytes: emptySHA256,
customHeader: customHeader,
})
defer closeResponse(resp)
if err != nil {
Expand Down Expand Up @@ -124,6 +143,7 @@ func (c Client) StatObject(bucketName, objectName string) (ObjectInfo, error) {
}
}
}

// Parse Last-Modified has http time format.
date, err := time.Parse(http.TimeFormat, resp.Header.Get("Last-Modified"))
if err != nil {
Expand All @@ -137,6 +157,7 @@ func (c Client) StatObject(bucketName, objectName string) (ObjectInfo, error) {
Region: resp.Header.Get("x-amz-bucket-region"),
}
}

// Fetch content type if any present.
contentType := strings.TrimSpace(resp.Header.Get("Content-Type"))
if contentType == "" {
Expand Down
13 changes: 13 additions & 0 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,16 @@ func (c Core) GetBucketPolicy(bucket string) (policy.BucketAccessPolicy, error)
func (c Core) PutBucketPolicy(bucket string, bucketPolicy policy.BucketAccessPolicy) error {
return c.putBucketPolicy(bucket, bucketPolicy)
}

// GetObject is a lower level API implemented to support reading
// partial objects and also downloading objects with special conditions
// matching etag, modtime etc.
func (c Core) GetObject(bucketName, objectName string, reqHeaders RequestHeaders) (io.ReadCloser, ObjectInfo, error) {
return c.getObject(bucketName, objectName, reqHeaders)
}

// StatObject is a lower level API implemented to support special
// conditions matching etag, modtime on a request.
func (c Core) StatObject(bucketName, objectName string, reqHeaders RequestHeaders) (ObjectInfo, error) {
return c.statObject(bucketName, objectName, reqHeaders)
}
161 changes: 160 additions & 1 deletion core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,165 @@ import (
"time"
)

// Tests for Core GetObject() function.
func TestGetObjectCore(t *testing.T) {
if testing.Short() {
t.Skip("skipping functional tests for the short runs")
}

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio core client object.
c, err := NewCore(
os.Getenv("S3_ADDRESS"),
os.Getenv("ACCESS_KEY"),
os.Getenv("SECRET_KEY"),
mustParseBool(os.Getenv("S3_SECURE")),
)
if err != nil {
t.Fatal("Error:", err)
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), "minio-go-test")

// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
if err != nil {
t.Fatal("Error:", err, bucketName)
}

// Generate data more than 32K
buf := bytes.Repeat([]byte("3"), rand.Intn(1<<20)+32*1024)

// Save the data
objectName := randString(60, rand.NewSource(time.Now().UnixNano()), "")
n, err := c.Client.PutObject(bucketName, objectName, bytes.NewReader(buf), "binary/octet-stream")
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}

if n != int64(len(buf)) {
t.Fatalf("Error: number of bytes does not match, want %v, got %v\n", len(buf), n)
}

reqHeaders := NewGetReqHeaders()

offset := int64(2048)

// read directly
buf1 := make([]byte, 512)
buf2 := make([]byte, 512)
buf3 := make([]byte, n)

reqHeaders.SetRange(offset, offset+int64(len(buf1))-1)
reader, objectInfo, err := c.GetObject(bucketName, objectName, reqHeaders)
if err != nil {
t.Fatal(err)
}
m, err := io.ReadFull(reader, buf1)
if err != nil {
reader.Close()
t.Fatal(err)
}
reader.Close()

if objectInfo.Size != int64(m) {
t.Fatalf("Error: GetObject read shorter bytes before reaching EOF, want %v, got %v\n", objectInfo.Size, m)
}
if !bytes.Equal(buf1, buf[offset:offset+512]) {
t.Fatal("Error: Incorrect read between two GetObject from same offset.")
}
offset += 512

reqHeaders.SetRange(offset, offset+int64(len(buf2))-1)
reader, objectInfo, err = c.GetObject(bucketName, objectName, reqHeaders)
if err != nil {
t.Fatal(err)
}

m, err = io.ReadFull(reader, buf2)
if err != nil {
reader.Close()
t.Fatal(err)
}
reader.Close()

if objectInfo.Size != int64(m) {
t.Fatalf("Error: GetObject read shorter bytes before reaching EOF, want %v, got %v\n", objectInfo.Size, m)
}
if !bytes.Equal(buf2, buf[offset:offset+512]) {
t.Fatal("Error: Incorrect read between two GetObject from same offset.")
}

reqHeaders.SetRange(0, int64(len(buf3)))
reader, objectInfo, err = c.GetObject(bucketName, objectName, reqHeaders)
if err != nil {
t.Fatal(err)
}

m, err = io.ReadFull(reader, buf3)
if err != nil {
reader.Close()
t.Fatal(err)
}
reader.Close()

if objectInfo.Size != int64(m) {
t.Fatalf("Error: GetObject read shorter bytes before reaching EOF, want %v, got %v\n", objectInfo.Size, m)
}
if !bytes.Equal(buf3, buf) {
t.Fatal("Error: Incorrect data read in GetObject, than what was previously upoaded.")
}

reqHeaders = NewGetReqHeaders()
reqHeaders.SetMatchETag("etag")
_, _, err = c.GetObject(bucketName, objectName, reqHeaders)
if err == nil {
t.Fatal("Unexpected GetObject should fail with mismatching etags")
}
if errResp := ToErrorResponse(err); errResp.Code != "PreconditionFailed" {
t.Fatalf("Expected \"PreconditionFailed\" as code, got %s instead", errResp.Code)
}

reqHeaders = NewGetReqHeaders()
reqHeaders.SetMatchETagExcept("etag")
reader, objectInfo, err = c.GetObject(bucketName, objectName, reqHeaders)
if err != nil {
t.Fatal(err)
}

m, err = io.ReadFull(reader, buf3)
if err != nil {
reader.Close()
t.Fatal(err)
}
reader.Close()

if objectInfo.Size != int64(m) {
t.Fatalf("Error: GetObject read shorter bytes before reaching EOF, want %v, got %v\n", objectInfo.Size, m)
}
if !bytes.Equal(buf3, buf) {
t.Fatal("Error: Incorrect data read in GetObject, than what was previously upoaded.")
}

err = c.RemoveObject(bucketName, objectName)
if err != nil {
t.Fatal("Error: ", err)
}
err = c.RemoveBucket(bucketName)
if err != nil {
t.Fatal("Error:", err)
}
}

// Tests get bucket policy core API.
func TestGetBucketPolicy(t *testing.T) {
if testing.Short() {
Expand Down Expand Up @@ -164,7 +323,7 @@ func TestCorePutObject(t *testing.T) {
}

// Read the data back
r, err := c.GetObject(bucketName, objectName)
r, err := c.Client.GetObject(bucketName, objectName)
if err != nil {
t.Fatal("Error:", err, bucketName, objectName)
}
Expand Down
Loading

0 comments on commit 3aa4ee3

Please sign in to comment.