diff --git a/client/factory.go b/client/factory.go index 7eb2b839..189cfc1d 100644 --- a/client/factory.go +++ b/client/factory.go @@ -12,6 +12,9 @@ import ( "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "gopkg.in/yaml.v2" + + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/azure" "github.com/thanos-io/objstore/providers/bos" "github.com/thanos-io/objstore/providers/cos" @@ -20,9 +23,6 @@ import ( "github.com/thanos-io/objstore/providers/oss" "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/objstore/providers/swift" - yaml "gopkg.in/yaml.v2" - - "github.com/thanos-io/objstore" ) type ObjProvider string diff --git a/clientutil/parse.go b/clientutil/parse.go new file mode 100644 index 00000000..759c42d2 --- /dev/null +++ b/clientutil/parse.go @@ -0,0 +1,65 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package clientutil + +import ( + "net/http" + "strconv" + "time" + + "github.com/pkg/errors" +) + +// ParseContentLength returns the content length (in bytes) parsed from the Content-Length +// HTTP header in input. +func ParseContentLength(m http.Header) (int64, error) { + const name = "Content-Length" + + v, ok := m[name] + if !ok { + return 0, errors.Errorf("%s header not found", name) + } + + if len(v) == 0 { + return 0, errors.Errorf("%s header has no values", name) + } + + ret, err := strconv.ParseInt(v[0], 10, 64) + if err != nil { + return 0, errors.Wrapf(err, "convert %s", name) + } + + return ret, nil +} + +// ParseLastModified returns the timestamp parsed from the Last-Modified +// HTTP header in input. +// Passing an second parameter, named f, to specify the time format. +// If f is empty then RFC3339 will be used as default format. +func ParseLastModified(m http.Header, f string) (time.Time, error) { + const ( + name = "Last-Modified" + defaultFormat = time.RFC3339 + ) + + v, ok := m[name] + if !ok { + return time.Time{}, errors.Errorf("%s header not found", name) + } + + if len(v) == 0 { + return time.Time{}, errors.Errorf("%s header has no values", name) + } + + if f == "" { + f = defaultFormat + } + + mod, err := time.Parse(f, v[0]) + if err != nil { + return time.Time{}, errors.Wrapf(err, "parse %s", name) + } + + return mod, nil +} diff --git a/clientutil/parse_test.go b/clientutil/parse_test.go new file mode 100644 index 00000000..149d5fb0 --- /dev/null +++ b/clientutil/parse_test.go @@ -0,0 +1,110 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package clientutil + +import ( + "net/http" + "testing" + "time" + + alioss "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/efficientgo/tools/core/pkg/testutil" +) + +func TestParseLastModified(t *testing.T) { + location, _ := time.LoadLocation("GMT") + tests := map[string]struct { + headerValue string + expectedVal time.Time + expectedErr string + format string + }{ + "no header": { + expectedErr: "Last-Modified header not found", + }, + "empty format string to default RFC3339 format": { + headerValue: "2015-11-06T10:07:11.000Z", + expectedVal: time.Date(2015, time.November, 6, 10, 7, 11, 0, time.UTC), + format: "", + }, + "valid RFC3339 header value": { + headerValue: "2015-11-06T10:07:11.000Z", + expectedVal: time.Date(2015, time.November, 6, 10, 7, 11, 0, time.UTC), + format: time.RFC3339, + }, + "invalid RFC3339 header value": { + headerValue: "invalid", + expectedErr: `parse Last-Modified: parsing time "invalid" as "2006-01-02T15:04:05Z07:00": cannot parse "invalid" as "2006"`, + format: time.RFC3339, + }, + "valid RFC1123 header value": { + headerValue: "Fri, 24 Feb 2012 06:07:48 GMT", + expectedVal: time.Date(2012, time.February, 24, 6, 7, 48, 0, location), + format: time.RFC1123, + }, + "invalid RFC1123 header value": { + headerValue: "invalid", + expectedErr: `parse Last-Modified: parsing time "invalid" as "Mon, 02 Jan 2006 15:04:05 MST": cannot parse "invalid" as "Mon"`, + format: time.RFC1123, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + meta := http.Header{} + if testData.headerValue != "" { + meta.Add(alioss.HTTPHeaderLastModified, testData.headerValue) + } + + actual, err := ParseLastModified(meta, testData.format) + + if testData.expectedErr != "" { + testutil.NotOk(t, err) + testutil.Equals(t, testData.expectedErr, err.Error()) + } else { + testutil.Ok(t, err) + testutil.Assert(t, testData.expectedVal.Equal(actual)) + } + }) + } +} + +func TestParseContentLength(t *testing.T) { + tests := map[string]struct { + headerValue string + expectedVal int64 + expectedErr string + }{ + "no header": { + expectedErr: "Content-Length header not found", + }, + "invalid header value": { + headerValue: "invalid", + expectedErr: `convert Content-Length: strconv.ParseInt: parsing "invalid": invalid syntax`, + }, + "valid header value": { + headerValue: "12345", + expectedVal: 12345, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + meta := http.Header{} + if testData.headerValue != "" { + meta.Add(alioss.HTTPHeaderContentLength, testData.headerValue) + } + + actual, err := ParseContentLength(meta) + + if testData.expectedErr != "" { + testutil.NotOk(t, err) + testutil.Equals(t, testData.expectedErr, err.Error()) + } else { + testutil.Ok(t, err) + testutil.Equals(t, testData.expectedVal, actual) + } + }) + } +} diff --git a/objtesting/foreach.go b/objtesting/foreach.go index be8b79dc..e3cb5731 100644 --- a/objtesting/foreach.go +++ b/objtesting/foreach.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/efficientgo/tools/core/pkg/testutil" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/client" "github.com/thanos-io/objstore/providers/azure" diff --git a/providers/azure/azure.go b/providers/azure/azure.go index d58aa92f..b5337bcb 100644 --- a/providers/azure/azure.go +++ b/providers/azure/azure.go @@ -4,10 +4,8 @@ package azure import ( - "bytes" "context" "io" - "io/ioutil" "os" "strings" "testing" @@ -129,6 +127,11 @@ func (conf *Config) validate() error { return nil } +// HTTPConfig exists here only because Cortex depends on it, and we depend on Cortex. +// Deprecated. +// TODO(bwplotka): Remove it, once we remove Cortex cycle dep, or Cortex stops using this. +type HTTPConfig = exthttp.HTTPConfig + // parseConfig unmarshals a buffer into a Config with default values. func parseConfig(conf []byte) (Config, error) { config := DefaultConfig @@ -293,39 +296,15 @@ func (b *Bucket) getBlobReader(ctx context.Context, name string, offset, length if err != nil { return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name) } - var props *blob.BlobGetPropertiesResponse - props, err = blobURL.GetProperties(ctx, blob.BlobAccessConditions{}, blob.ClientProvidedKeyOptions{}) - if err != nil { - return nil, errors.Wrapf(err, "cannot get properties for container: %s", name) - } - - var size int64 - // If a length is specified and it won't go past the end of the file, - // then set it as the size. - if length > 0 && length <= props.ContentLength()-offset { - size = length - level.Debug(b.logger).Log("msg", "set size to length", "size", size, "length", length, "offset", offset, "name", name) - } else { - size = props.ContentLength() - offset - level.Debug(b.logger).Log("msg", "set size to go to EOF", "contentlength", props.ContentLength(), "size", size, "length", length, "offset", offset, "name", name) - } - - destBuffer := make([]byte, size) - if err := blob.DownloadBlobToBuffer(context.Background(), blobURL.BlobURL, offset, size, - destBuffer, blob.DownloadFromBlobOptions{ - BlockSize: blob.BlobDefaultDownloadBlockSize, - Parallelism: uint16(3), - Progress: nil, - RetryReaderOptionsPerBlock: blob.RetryReaderOptions{ - MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests, - }, - }, - ); err != nil { - return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobURL.BlobURL) + dl, err := blobURL.Download(ctx, offset, length, blob.BlobAccessConditions{}, false, blob.ClientProvidedKeyOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "cannot download Azure blob, address: %s", name) } - return ioutil.NopCloser(bytes.NewReader(destBuffer)), nil + return dl.Body(blob.RetryReaderOptions{ + MaxRetryRequests: b.config.ReaderConfig.MaxRetryRequests, + }), nil } // Get returns a reader for the given object name. diff --git a/providers/azure/azure_test.go b/providers/azure/azure_test.go index cf913c1c..f02efb56 100644 --- a/providers/azure/azure_test.go +++ b/providers/azure/azure_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/efficientgo/tools/core/pkg/testutil" + "github.com/thanos-io/objstore/exthttp" ) diff --git a/providers/azure/helpers.go b/providers/azure/helpers.go index e22861db..0cdd6a4a 100644 --- a/providers/azure/helpers.go +++ b/providers/azure/helpers.go @@ -17,6 +17,7 @@ import ( "github.com/Azure/go-autorest/autorest/azure/auth" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/thanos-io/objstore/exthttp" ) diff --git a/providers/cos/cos.go b/providers/cos/cos.go index f02d4537..0c40ab3b 100644 --- a/providers/cos/cos.go +++ b/providers/cos/cos.go @@ -21,9 +21,11 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/tencentyun/cos-go-sdk-v5" + "gopkg.in/yaml.v2" + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/clientutil" "github.com/thanos-io/objstore/exthttp" - "gopkg.in/yaml.v2" ) // DirDelim is the delimiter used to model a directory structure in an object store bucket. @@ -155,14 +157,14 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt return objstore.ObjectAttributes{}, err } - size, err := exthttp.ParseContentLength(resp.Header) + size, err := clientutil.ParseContentLength(resp.Header) if err != nil { return objstore.ObjectAttributes{}, err } // tencent cos return Last-Modified header in RFC1123 format. // see api doc for details: https://intl.cloud.tencent.com/document/product/436/7729 - mod, err := exthttp.ParseLastModified(resp.Header, time.RFC1123) + mod, err := clientutil.ParseLastModified(resp.Header, time.RFC1123) if err != nil { return objstore.ObjectAttributes{}, err } diff --git a/providers/cos/cos_test.go b/providers/cos/cos_test.go index ae219347..807cc58e 100644 --- a/providers/cos/cos_test.go +++ b/providers/cos/cos_test.go @@ -9,6 +9,7 @@ import ( "github.com/efficientgo/tools/core/pkg/testutil" "github.com/prometheus/common/model" + "github.com/thanos-io/objstore/exthttp" ) diff --git a/providers/filesystem/filesystem.go b/providers/filesystem/filesystem.go index 13384308..8189168e 100644 --- a/providers/filesystem/filesystem.go +++ b/providers/filesystem/filesystem.go @@ -13,8 +13,9 @@ import ( "github.com/efficientgo/tools/core/pkg/errcapture" "github.com/pkg/errors" - "github.com/thanos-io/objstore" "gopkg.in/yaml.v2" + + "github.com/thanos-io/objstore" ) // Config stores the configuration for storing and accessing blobs in filesystem. diff --git a/providers/gcs/gcs_test.go b/providers/gcs/gcs_test.go index 01372f09..75eaa143 100644 --- a/providers/gcs/gcs_test.go +++ b/providers/gcs/gcs_test.go @@ -27,7 +27,7 @@ func TestBucket_Get_ShouldReturnErrorIfServerTruncateResponse(t *testing.T) { })) defer srv.Close() - testutil.Ok(t, os.Setenv("STORAGE_EMULATOR_HOST", srv.Listener.Addr().String())) + os.Setenv("STORAGE_EMULATOR_HOST", srv.Listener.Addr().String()) cfg := Config{ Bucket: "test-bucket", diff --git a/providers/oss/oss.go b/providers/oss/oss.go index 2e423eac..25d22406 100644 --- a/providers/oss/oss.go +++ b/providers/oss/oss.go @@ -20,9 +20,10 @@ import ( alioss "github.com/aliyun/aliyun-oss-go-sdk/oss" "github.com/go-kit/log" "github.com/pkg/errors" - "github.com/thanos-io/objstore/exthttp" "gopkg.in/yaml.v2" + "github.com/thanos-io/objstore/clientutil" + "github.com/thanos-io/objstore" ) @@ -139,14 +140,14 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt return objstore.ObjectAttributes{}, err } - size, err := exthttp.ParseContentLength(m) + size, err := clientutil.ParseContentLength(m) if err != nil { return objstore.ObjectAttributes{}, err } // aliyun oss return Last-Modified header in RFC1123 format. // see api doc for details: https://www.alibabacloud.com/help/doc-detail/31985.htm - mod, err := exthttp.ParseLastModified(m, time.RFC1123) + mod, err := clientutil.ParseLastModified(m, time.RFC1123) if err != nil { return objstore.ObjectAttributes{}, err } diff --git a/providers/s3/s3.go b/providers/s3/s3.go index e3dfd581..c1ed5cf6 100644 --- a/providers/s3/s3.go +++ b/providers/s3/s3.go @@ -26,9 +26,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/common/version" + "gopkg.in/yaml.v2" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/exthttp" - "gopkg.in/yaml.v2" ) type ctxKey int @@ -112,6 +113,11 @@ var DefaultConfig = Config{ BucketLookupType: AutoLookup, } +// HTTPConfig exists here only because Cortex depends on it, and we depend on Cortex. +// Deprecated. +// TODO(bwplotka): Remove it, once we remove Cortex cycle dep, or Cortex stops using this. +type HTTPConfig = exthttp.HTTPConfig + // Config stores the configuration for s3 bucket. type Config struct { Bucket string `yaml:"bucket"` @@ -480,6 +486,10 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { PartSize: partSize, ServerSideEncryption: sse, UserMetadata: b.putUserMetadata, + // 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we + // ensure we pin this number to four. + // TODO(bwplotka): Consider adjusting this number to GOMAXPROCS or to expose this in config if it becomes bottleneck. + NumThreads: 4, }, ); err != nil { return errors.Wrap(err, "upload s3 object") diff --git a/providers/s3/s3_e2e_test.go b/providers/s3/s3_e2e_test.go index d133780b..d58f0c1a 100644 --- a/providers/s3/s3_e2e_test.go +++ b/providers/s3/s3_e2e_test.go @@ -12,16 +12,19 @@ import ( "github.com/efficientgo/e2e" "github.com/efficientgo/tools/core/pkg/testutil" "github.com/go-kit/log" + "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/objstore/test/e2e/e2ethanos" ) -// Regression benchmark for https://github.com/thanos-io/thanos/issues/3917. +// Regression benchmark for https://github.com/thanos-io/thanos/issues/3917 and https://github.com/thanos-io/thanos/issues/3967. +// $ export ver=v1 && go test ./pkg/objstore/s3/... -run '^$' -bench '^BenchmarkUpload' -benchtime 5s -count 5 \ +// -memprofile=${ver}.mem.pprof -cpuprofile=${ver}.cpu.pprof | tee ${ver}.txt . func BenchmarkUpload(b *testing.B) { b.ReportAllocs() ctx := context.Background() - e, err := e2e.NewDockerEnvironment("e2e_bench_mino_client") + e, err := e2e.NewDockerEnvironment("e2e_bench_mino_client", e2e.WithLogger(log.NewNopLogger())) testutil.Ok(b, err) b.Cleanup(e2ethanos.CleanScenario(b, e)) @@ -29,8 +32,11 @@ func BenchmarkUpload(b *testing.B) { m := e2ethanos.NewMinio(e, "benchmark", bucket) testutil.Ok(b, e2e.StartAndWaitReady(m)) - bkt, err := s3.NewBucketWithConfig(log.NewNopLogger(), - e2ethanos.NewS3Config(bucket, m.Endpoint("https"), m.Dir()), "test-feed") + bkt, err := s3.NewBucketWithConfig( + log.NewNopLogger(), + e2ethanos.NewS3Config(bucket, m.Endpoint("https"), m.Dir()), + "test-feed", + ) testutil.Ok(b, err) buf := bytes.Buffer{} diff --git a/providers/s3/s3_test.go b/providers/s3/s3_test.go index 47cb5d6b..0ce136bd 100644 --- a/providers/s3/s3_test.go +++ b/providers/s3/s3_test.go @@ -17,6 +17,7 @@ import ( "github.com/efficientgo/tools/core/pkg/testutil" "github.com/go-kit/log" "github.com/minio/minio-go/v7/pkg/encrypt" + "github.com/thanos-io/objstore/exthttp" )