Skip to content

Commit

Permalink
Sync with upstream Thanos
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Jul 15, 2022
1 parent 2f10012 commit c047963
Showing 15 changed files with 227 additions and 48 deletions.
6 changes: 3 additions & 3 deletions client/factory.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,9 @@ import (
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/azure"
"github.com/thanos-io/objstore/providers/bos"
"github.com/thanos-io/objstore/providers/cos"
@@ -20,9 +23,6 @@ import (
"github.com/thanos-io/objstore/providers/oss"
"github.com/thanos-io/objstore/providers/s3"
"github.com/thanos-io/objstore/providers/swift"
yaml "gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
)

type ObjProvider string
65 changes: 65 additions & 0 deletions clientutil/parse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package clientutil

import (
"net/http"
"strconv"
"time"

"github.com/pkg/errors"
)

// ParseContentLength returns the content length (in bytes) parsed from the Content-Length
// HTTP header in input.
func ParseContentLength(m http.Header) (int64, error) {
const name = "Content-Length"

v, ok := m[name]
if !ok {
return 0, errors.Errorf("%s header not found", name)
}

if len(v) == 0 {
return 0, errors.Errorf("%s header has no values", name)
}

ret, err := strconv.ParseInt(v[0], 10, 64)
if err != nil {
return 0, errors.Wrapf(err, "convert %s", name)
}

return ret, nil
}

// ParseLastModified returns the timestamp parsed from the Last-Modified
// HTTP header in input.
// Passing an second parameter, named f, to specify the time format.
// If f is empty then RFC3339 will be used as default format.
func ParseLastModified(m http.Header, f string) (time.Time, error) {
const (
name = "Last-Modified"
defaultFormat = time.RFC3339
)

v, ok := m[name]
if !ok {
return time.Time{}, errors.Errorf("%s header not found", name)
}

if len(v) == 0 {
return time.Time{}, errors.Errorf("%s header has no values", name)
}

if f == "" {
f = defaultFormat
}

mod, err := time.Parse(f, v[0])
if err != nil {
return time.Time{}, errors.Wrapf(err, "parse %s", name)
}

return mod, nil
}
110 changes: 110 additions & 0 deletions clientutil/parse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package clientutil

import (
"net/http"
"testing"
"time"

alioss "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/efficientgo/tools/core/pkg/testutil"
)

func TestParseLastModified(t *testing.T) {
location, _ := time.LoadLocation("GMT")
tests := map[string]struct {
headerValue string
expectedVal time.Time
expectedErr string
format string
}{
"no header": {
expectedErr: "Last-Modified header not found",
},
"empty format string to default RFC3339 format": {
headerValue: "2015-11-06T10:07:11.000Z",
expectedVal: time.Date(2015, time.November, 6, 10, 7, 11, 0, time.UTC),
format: "",
},
"valid RFC3339 header value": {
headerValue: "2015-11-06T10:07:11.000Z",
expectedVal: time.Date(2015, time.November, 6, 10, 7, 11, 0, time.UTC),
format: time.RFC3339,
},
"invalid RFC3339 header value": {
headerValue: "invalid",
expectedErr: `parse Last-Modified: parsing time "invalid" as "2006-01-02T15:04:05Z07:00": cannot parse "invalid" as "2006"`,
format: time.RFC3339,
},
"valid RFC1123 header value": {
headerValue: "Fri, 24 Feb 2012 06:07:48 GMT",
expectedVal: time.Date(2012, time.February, 24, 6, 7, 48, 0, location),
format: time.RFC1123,
},
"invalid RFC1123 header value": {
headerValue: "invalid",
expectedErr: `parse Last-Modified: parsing time "invalid" as "Mon, 02 Jan 2006 15:04:05 MST": cannot parse "invalid" as "Mon"`,
format: time.RFC1123,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
meta := http.Header{}
if testData.headerValue != "" {
meta.Add(alioss.HTTPHeaderLastModified, testData.headerValue)
}

actual, err := ParseLastModified(meta, testData.format)

if testData.expectedErr != "" {
testutil.NotOk(t, err)
testutil.Equals(t, testData.expectedErr, err.Error())
} else {
testutil.Ok(t, err)
testutil.Assert(t, testData.expectedVal.Equal(actual))
}
})
}
}

func TestParseContentLength(t *testing.T) {
tests := map[string]struct {
headerValue string
expectedVal int64
expectedErr string
}{
"no header": {
expectedErr: "Content-Length header not found",
},
"invalid header value": {
headerValue: "invalid",
expectedErr: `convert Content-Length: strconv.ParseInt: parsing "invalid": invalid syntax`,
},
"valid header value": {
headerValue: "12345",
expectedVal: 12345,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
meta := http.Header{}
if testData.headerValue != "" {
meta.Add(alioss.HTTPHeaderContentLength, testData.headerValue)
}

actual, err := ParseContentLength(meta)

if testData.expectedErr != "" {
testutil.NotOk(t, err)
testutil.Equals(t, testData.expectedErr, err.Error())
} else {
testutil.Ok(t, err)
testutil.Equals(t, testData.expectedVal, actual)
}
})
}
}
1 change: 1 addition & 0 deletions objtesting/foreach.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"testing"

"github.com/efficientgo/tools/core/pkg/testutil"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
"github.com/thanos-io/objstore/providers/azure"
43 changes: 11 additions & 32 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
@@ -4,10 +4,8 @@
package azure

import (
"bytes"
"context"
"io"
"io/ioutil"
"os"
"strings"
"testing"
@@ -129,6 +127,11 @@ func (conf *Config) validate() error {
return nil
}

// HTTPConfig exists here only because Cortex depends on it, and we depend on Cortex.
// Deprecated.
// TODO(bwplotka): Remove it, once we remove Cortex cycle dep, or Cortex stops using this.
type HTTPConfig = exthttp.HTTPConfig

// parseConfig unmarshals a buffer into a Config with default values.
func parseConfig(conf []byte) (Config, error) {
config := DefaultConfig
@@ -293,39 +296,15 @@ func (b *Bucket) getBlobReader(ctx context.Context, name string, offset, length
if err != nil {
return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
}
var props *blob.BlobGetPropertiesResponse
props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{})
if err != nil {
return nil, errors.Wrapf(err, "cannot get properties for container: %s", name)
}

var size int64
// If a length is specified and it won't go past the end of the file,
// then set it as the size.
if length > 0 && length <= props.ContentLength()-offset {
size = length
level.Debug(b.logger).Log("msg", "set size to length", "size", size, "length", length, "offset", offset, "name", name)
} else {
size = props.ContentLength() - offset
level.Debug(b.logger).Log("msg", "set size to go to EOF", "contentlength", props.ContentLength(), "size", size, "length", length, "offset", offset, "name", name)
}

destBuffer := make([]byte, size)

if err := blob.DownloadBlobToBuffer(context.Background(), blobURL.BlobURL, offset, size,
destBuffer, blob.DownloadFromBlobOptions{
BlockSize: blob.BlobDefaultDownloadBlockSize,
Parallelism: uint16(3),
Progress: nil,
RetryReaderOptionsPerBlock: blob.RetryReaderOptions{
MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests,
},
},
); err != nil {
return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobURL.BlobURL)
dl, err := blobURL.Download(ctx, offset, length, blob.BlobAccessConditions{}, false, blob.ClientProvidedKeyOptions{})
if err != nil {
return nil, errors.Wrapf(err, "cannot download Azure blob, address: %s", name)
}

return ioutil.NopCloser(bytes.NewReader(destBuffer)), nil
return dl.Body(blob.RetryReaderOptions{
MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests,
}), nil
}

// Get returns a reader for the given object name.
1 change: 1 addition & 0 deletions providers/azure/azure_test.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
"time"

"github.com/efficientgo/tools/core/pkg/testutil"

"github.com/thanos-io/objstore/exthttp"
)

1 change: 1 addition & 0 deletions providers/azure/helpers.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import (
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/thanos-io/objstore/exthttp"
)

8 changes: 5 additions & 3 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
@@ -21,9 +21,11 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/tencentyun/cos-go-sdk-v5"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/clientutil"
"github.com/thanos-io/objstore/exthttp"
"gopkg.in/yaml.v2"
)

// DirDelim is the delimiter used to model a directory structure in an object store bucket.
@@ -155,14 +157,14 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt
return objstore.ObjectAttributes{}, err
}

size, err := exthttp.ParseContentLength(resp.Header)
size, err := clientutil.ParseContentLength(resp.Header)
if err != nil {
return objstore.ObjectAttributes{}, err
}

// tencent cos return Last-Modified header in RFC1123 format.
// see api doc for details: https://intl.cloud.tencent.com/document/product/436/7729
mod, err := exthttp.ParseLastModified(resp.Header, time.RFC1123)
mod, err := clientutil.ParseLastModified(resp.Header, time.RFC1123)
if err != nil {
return objstore.ObjectAttributes{}, err
}
1 change: 1 addition & 0 deletions providers/cos/cos_test.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (

"github.com/efficientgo/tools/core/pkg/testutil"
"github.com/prometheus/common/model"

"github.com/thanos-io/objstore/exthttp"
)

3 changes: 2 additions & 1 deletion providers/filesystem/filesystem.go
Original file line number Diff line number Diff line change
@@ -13,8 +13,9 @@ import (

"github.com/efficientgo/tools/core/pkg/errcapture"
"github.com/pkg/errors"
"github.com/thanos-io/objstore"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
)

// Config stores the configuration for storing and accessing blobs in filesystem.
2 changes: 1 addition & 1 deletion providers/gcs/gcs_test.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ func TestBucket_Get_ShouldReturnErrorIfServerTruncateResponse(t *testing.T) {
}))
defer srv.Close()

testutil.Ok(t, os.Setenv("STORAGE_EMULATOR_HOST", srv.Listener.Addr().String()))
os.Setenv("STORAGE_EMULATOR_HOST", srv.Listener.Addr().String())

cfg := Config{
Bucket: "test-bucket",
7 changes: 4 additions & 3 deletions providers/oss/oss.go
Original file line number Diff line number Diff line change
@@ -20,9 +20,10 @@ import (
alioss "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/objstore/exthttp"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore/clientutil"

"github.com/thanos-io/objstore"
)

@@ -139,14 +140,14 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt
return objstore.ObjectAttributes{}, err
}

size, err := exthttp.ParseContentLength(m)
size, err := clientutil.ParseContentLength(m)
if err != nil {
return objstore.ObjectAttributes{}, err
}

// aliyun oss return Last-Modified header in RFC1123 format.
// see api doc for details: https://www.alibabacloud.com/help/doc-detail/31985.htm
mod, err := exthttp.ParseLastModified(m, time.RFC1123)
mod, err := clientutil.ParseLastModified(m, time.RFC1123)
if err != nil {
return objstore.ObjectAttributes{}, err
}
Loading

0 comments on commit c047963

Please sign in to comment.