Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocks upload: Don't do cleanUp when meta.json fails to upload. #3626

Merged
merged 3 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re
- [#2732](https://github.com/thanos-io/thanos/pull/2732) Swift: Switched to a new library [ncw/swift](https://github.com/ncw/swift) providing large objects support.
By default, segments will be uploaded to the same container directory `segments/` if the file is bigger than `1GB`.
To change the defaults see [the docs](./docs/storage.md#openstack-swift).

- [#3626](https://github.com/thanos-io/thanos/pull/3626) Failed upload of `meta.json` file doesn't cause block cleanup anymore. This has a potential to generate corrupted blocks under specific conditions. Partial block is left in bucket for later cleanup.


## [v0.17.2](https://github.com/thanos-io/thanos/releases/tag/v0.17.2) - 2020.12.07
Expand Down
6 changes: 5 additions & 1 deletion pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st

// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file to be pending uploads.
if err := bkt.Upload(ctx, path.Join(id.String(), MetaFilename), &metaEncoded); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload meta file"))
// Don't call cleanUp here. Despite getting error, meta.json may have been uploaded in certain cases,
// and even though cleanUp will not see it yet, meta.json may appear in the bucket later.
// (Eg. S3 is known to behave this way when it returns 503 "SlowDown" error).
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
// If meta.json is not uploaded, this will produce partial blocks, but such blocks will be cleaned later.
return errors.Wrap(err, "upload meta file")
}

return nil
Expand Down
67 changes: 67 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
Expand All @@ -17,6 +18,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -367,3 +369,68 @@ func TestMarkForNoCompact(t *testing.T) {
})
}
}

func TestUploadCleanup(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)

ctx := context.Background()

tmpDir, err := ioutil.TempDir("", "test-block-upload")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := objstore.NewInMemBucket()
b1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)

{
errBkt := errBucket{Bucket: bkt, failSuffix: "/index"}

uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String()))
testutil.Assert(t, errors.Is(uploadErr, errUploadFailed))

// If upload of index fails, block is deleted.
testutil.Equals(t, 1, len(bkt.Objects())) // Only debug meta file is present.
testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) > 0)
}

{
errBkt := errBucket{Bucket: bkt, failSuffix: "/meta.json"}

uploadErr := Upload(ctx, log.NewNopLogger(), errBkt, path.Join(tmpDir, b1.String()))
testutil.Assert(t, errors.Is(uploadErr, errUploadFailed))

// If upload of meta.json fails, nothing is cleaned up.
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) > 0)
}
}

var errUploadFailed = errors.New("upload failed")

type errBucket struct {
objstore.Bucket

failSuffix string
}

func (eb errBucket) Upload(ctx context.Context, name string, r io.Reader) error {
err := eb.Bucket.Upload(ctx, name, r)
if err != nil {
return err
}

if strings.HasSuffix(name, eb.failSuffix) {
return errUploadFailed
}
return nil
}