Skip to content

Commit

Permalink
objstore: add ObjectSize() implementation (#1792)
Browse files Browse the repository at this point in the history
* objstore: add initial ObjectSize() implementation

Adds a new method ObjectSize(context.Context, string) method to our
object storage interface which returns the size of the specified object
in uint64 without actually downloading all of it.

This is a pre-requisite to one check that we want to add.

Add a check to our E2E tests to see if it works.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* objstore: cos/oss: fix linter errors

Signed-off-by: Giedrius Statkevičius <[email protected]>

* objstore: return direct err in ObjectSize()

The exact error is needed by the callers so that they could check
`IsObjNotFoundErr` correctly.

Add a test for this to the E2E tests.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* objstore: azure: return err directly as well

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS authored Nov 30, 2019
1 parent 6cad4ca commit 229f978
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/objstore/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,20 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
return b.getBlobReader(ctx, name, off, length)
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return 0, errors.Wrapf(err, "cannot get Azure blob URL, blob: %s", name)
}
var props *blob.BlobGetPropertiesResponse
props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{})
if err != nil {
return 0, err
}
return uint64(props.ContentLength()), nil
}

// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
level.Debug(b.logger).Log("msg", "check if blob exists", "blob", name)
Expand Down
20 changes: 20 additions & 0 deletions pkg/objstore/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net/http"
"os"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -88,6 +89,25 @@ func (b *Bucket) Name() string {
return b.name
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
resp, err := b.client.Object.Head(ctx, name, nil)
if err != nil {
return 0, err
}
if v, ok := resp.Header["Content-Length"]; ok {
if len(v) == 0 {
return 0, errors.New("content-length header has no values")
}
ret, err := strconv.ParseUint(v[0], 10, 64)
if err != nil {
return 0, errors.Wrap(err, "convert content-length")
}
return ret, nil
}
return 0, errors.New("content-length header not found")
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/objstore/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ func (r *rangeReaderCloser) Close() error {
return r.f.Close()
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) {
file := filepath.Join(b.rootDir, name)
st, err := os.Stat(file)
if err != nil {
return 0, errors.Wrapf(err, "stat %s", file)
}
return uint64(st.Size()), nil
}

// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) {
if name == "" {
Expand Down
9 changes: 9 additions & 0 deletions pkg/objstore/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
return b.bkt.Object(name).NewRangeReader(ctx, off, length)
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
obj, err := b.bkt.Object(name).Attrs(ctx)
if err != nil {
return 0, err
}
return uint64(obj.Size), nil
}

// Handle returns the underlying GCS bucket handle.
// Used for testing purposes (we return handle, so it is not instrumented).
func (b *Bucket) Handle() *storage.BucketHandle {
Expand Down
11 changes: 11 additions & 0 deletions pkg/objstore/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
return ok, nil
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) {
b.mtx.RLock()
file, ok := b.objects[name]
b.mtx.RUnlock()
if !ok {
return 0, errNotFound
}
return uint64(len(file)), nil
}

// Upload writes the file specified in src to into the memory.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
b.mtx.Lock()
Expand Down
18 changes: 18 additions & 0 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type BucketReader interface {

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
IsObjNotFoundErr(err error) bool

// ObjectSize returns the size of the specified object.
ObjectSize(ctx context.Context, name string) (uint64, error)
}

// UploadDir uploads all files in srcdir to the bucket with into a top-level directory
Expand Down Expand Up @@ -238,6 +241,21 @@ func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string)
return err
}

// ObjectSize returns the size of the specified object.
func (b *metricBucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
const op = "objectsize"
b.ops.WithLabelValues(op).Inc()
start := time.Now()

rc, err := b.bkt.ObjectSize(ctx, name)
if err != nil {
b.opsFailures.WithLabelValues(op).Inc()
return 0, err
}
b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
return rc, nil
}

func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
const op = "get"
b.ops.WithLabelValues(op).Inc()
Expand Down
9 changes: 9 additions & 0 deletions pkg/objstore/objtesting/acceptance_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func TestObjStore_AcceptanceTest_e2e(t *testing.T) {
testutil.Ok(t, err)
testutil.Assert(t, !ok, "expected not exits")

_, err = bkt.ObjectSize(ctx, "id1/obj_1.some")
testutil.NotOk(t, err)
testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error but got %s", err)

// Upload first object.
testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@")))

Expand All @@ -42,6 +46,11 @@ func TestObjStore_AcceptanceTest_e2e(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, "@test-data@", string(content))

// Check if we can get the correct size.
sz, err := bkt.ObjectSize(ctx, "id1/obj_1.some")
testutil.Ok(t, err)
testutil.Assert(t, sz == 11, "expected size to be equal to 11")

rc2, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, 3)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc2.Close()) }()
Expand Down
20 changes: 20 additions & 0 deletions pkg/objstore/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,26 @@ func (b *Bucket) Delete(ctx context.Context, name string) error {
return nil
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
// https://github.com/aliyun/aliyun-oss-go-sdk/blob/cee409f5b4d75d7ad077cacb7e6f4590a7f2e172/oss/bucket.go#L668
m, err := b.bucket.GetObjectMeta(name)
if err != nil {
return 0, err
}
if v, ok := m["Content-Length"]; ok {
if len(v) == 0 {
return 0, errors.New("content-length header has no values")
}
ret, err := strconv.ParseUint(v[0], 10, 64)
if err != nil {
return 0, errors.Wrap(err, "convert content-length")
}
return ret, nil
}
return 0, errors.New("content-length header not found")
}

// NewBucket returns a new Bucket using the provided oss config values.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
var config Config
Expand Down
9 changes: 9 additions & 0 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
return nil
}

// ObjectSize returns the size of the specified object.
func (b *Bucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
objInfo, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{})
if err != nil {
return 0, err
}
return uint64(objInfo.Size), nil
}

// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.client.RemoveObject(b.name, name)
Expand Down
10 changes: 10 additions & 0 deletions pkg/objstore/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ func (c *Container) GetRange(ctx context.Context, name string, off, length int64
return response.Body, response.Err
}

// ObjectSize returns the size of the specified object.
func (c *Container) ObjectSize(ctx context.Context, name string) (uint64, error) {
response := objects.Get(c.client, c.name, name, nil)
headers, err := response.Extract()
if err != nil {
return 0, err
}
return uint64(headers.ContentLength), nil
}

// Exists checks if the given object exists.
func (c *Container) Exists(ctx context.Context, name string) (bool, error) {
err := objects.Get(c.client, c.name, name, nil).Err
Expand Down

0 comments on commit 229f978

Please sign in to comment.