diff --git a/CHANGELOG.md b/CHANGELOG.md index d438abf95d..5e0a971dab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#2732](https://github.com/thanos-io/thanos/pull/2732) Swift: Switched to a new library [ncw/swift](https://github.com/ncw/swift) providing large objects support. By default, segments will be uploaded to the same container directory `segments/` if the file is bigger than `1GB`. To change the defaults see [the docs](./docs/storage.md#openstack-swift). - +- [#3626](https://github.com/thanos-io/thanos/pull/3626) Failed upload of `meta.json` file doesn't cause block cleanup anymore. This has a potential to generate corrupted blocks under specific conditions. Partial block is left in bucket for later cleanup. ## [v0.17.2](https://github.com/thanos-io/thanos/releases/tag/v0.17.2) - 2020.12.07 diff --git a/pkg/block/block.go b/pkg/block/block.go index 33993bd446..f95e176f51 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -117,7 +117,11 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st // Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file to be pending uploads. if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), &metaEncoded); err != nil { - return cleanUp(logger, bkt, id, errors.Wrap(err, "upload meta file")) + // Don't call cleanUp here. Despite getting error, meta.json may have been uploaded in certain cases, + // and even though cleanUp will not see it yet, meta.json may appear in the bucket later. + // (Eg. S3 is known to behave this way when it returns 503 "SlowDown" error). + // If meta.json is not uploaded, this will produce partial blocks, but such blocks will be cleaned later. + return errors.Wrap(err, "upload meta file") } return nil diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index ee473f0b09..076099e9cb 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "os" "path" @@ -17,6 +18,7 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" promtest "github.com/prometheus/client_golang/prometheus/testutil" @@ -367,3 +369,68 @@ func TestMarkForNoCompact(t *testing.T) { }) } } + +func TestUploadCleanup(t *testing.T) { + defer testutil.TolerantVerifyLeak(t) + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-block-upload") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt := objstore.NewInMemBucket() + b1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + testutil.Ok(t, err) + + { + errBkt := errBucket{Bucket: bkt, failSuffix: "/index"} + + uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String())) + testutil.Assert(t, errors.Is(uploadErr, errUploadFailed)) + + // If upload of index fails, block is deleted. + testutil.Equals(t, 1, len(bkt.Objects())) // Only debug meta file is present. + testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) > 0) + } + + { + errBkt := errBucket{Bucket: bkt, failSuffix: "/meta.json"} + + uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String())) + testutil.Assert(t, errors.Is(uploadErr, errUploadFailed)) + + // If upload of meta.json fails, nothing is cleaned up. + testutil.Equals(t, 4, len(bkt.Objects())) + testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]) > 0) + testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]) > 0) + testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]) > 0) + testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) > 0) + } +} + +var errUploadFailed = errors.New("upload failed") + +type errBucket struct { + objstore.Bucket + + failSuffix string +} + +func (eb errBucket) Upload(ctx context.Context, name string, r io.Reader) error { + err := eb.Bucket.Upload(ctx, name, r) + if err != nil { + return err + } + + if strings.HasSuffix(name, eb.failSuffix) { + return errUploadFailed + } + return nil +}