Skip to content

Commit

Permalink
receive/multitsdb: do not delete not uploaded blocks
Browse files Browse the repository at this point in the history
If a block hasn't been uploaded yet then tell the TSDB layer not to
delete them. This prevents a nasty race where the TSDB layer can delete
a block before the shipper gets to it. I saw this happen with a very
small block.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Feb 28, 2024
1 parent 4c7997d commit 90b8a7b
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 4 deletions.
41 changes: 37 additions & 4 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -154,7 +155,37 @@ type tenant struct {
exemplarsTSDB *exemplars.TSDB
ship *shipper.Shipper

mtx *sync.RWMutex
mtx *sync.RWMutex
tsdb *tsdb.DB

// For tests.
blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc
}

func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
if t.tsdb == nil {
return nil
}

blocksToDeleteFn := t.blocksToDeleteFn
if blocksToDeleteFn == nil {
blocksToDeleteFn = tsdb.DefaultBlocksToDelete
}
deletable := blocksToDeleteFn(t.tsdb)(blocks)

if t.ship == nil {
return deletable
}

uploaded := t.ship.UploadedBlocks()

for deletableID := range deletable {
if _, ok := uploaded[deletableID]; !ok {
delete(deletable, deletableID)
}
}

return deletable
}

func newTenant() *tenant {
Expand Down Expand Up @@ -202,14 +233,15 @@ func (t *tenant) shipper() *shipper.Shipper {
func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
t.setComponents(storeTSDB, ship, exemplarsTSDB)
t.setComponents(storeTSDB, ship, exemplarsTSDB, tenantTSDB)
t.mtx.Unlock()
}

func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
func (t *tenant) setComponents(storeTSDB *store.TSDBStore, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, tenantTSDB *tsdb.DB) {
t.storeTSDB = storeTSDB
t.ship = ship
t.exemplarsTSDB = exemplarsTSDB
t.tsdb = tenantTSDB
}

func (t *MultiTSDB) Open() error {
Expand Down Expand Up @@ -422,7 +454,7 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst
}

tenantInstance.readyS.set(nil)
tenantInstance.setComponents(nil, nil, nil)
tenantInstance.setComponents(nil, nil, nil, nil)

return true, nil
}
Expand Down Expand Up @@ -574,6 +606,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant

level.Info(logger).Log("msg", "opening TSDB")
opts := *t.tsdbOpts
opts.BlocksToDelete = tenant.blocksToDelete

// NOTE(GiedriusS): always set to false to properly handle OOO samples - OOO samples are written into the WBL
// which gets later converted into a block. Without setting this flag to false, the block would get compacted
Expand Down
58 changes: 58 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ import (
"math"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand All @@ -28,6 +31,7 @@ import (
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -829,3 +833,57 @@ func BenchmarkMultiTSDB(b *testing.B) {
_, _ = a.Append(0, l, int64(i), float64(i))
}
}

func TestMultiTSDBDoesNotDeleteNotUploadedBlocks(t *testing.T) {
tenant := &tenant{}

t.Run("no blocks", func(t *testing.T) {
require.Equal(t, (map[ulid.ULID]struct{})(nil), tenant.blocksToDelete(nil))
})

mockBlockIDs := []ulid.ULID{
ulid.MustNew(1, nil),
ulid.MustNew(2, nil),
}

t.Run("no shipper", func(t *testing.T) {
tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc {
return func(_ []*tsdb.Block) map[ulid.ULID]struct{} {
return map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
mockBlockIDs[1]: {},
}
}
}
tenant.tsdb = &tsdb.DB{}

require.Equal(t, map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
mockBlockIDs[1]: {},
}, tenant.blocksToDelete(nil))
})

t.Run("some blocks uploaded", func(t *testing.T) {
tenant.blocksToDeleteFn = func(db *tsdb.DB) tsdb.BlocksToDeleteFunc {
return func(_ []*tsdb.Block) map[ulid.ULID]struct{} {
return map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
mockBlockIDs[1]: {},
}
}
}
tenant.tsdb = &tsdb.DB{}

td := t.TempDir()

require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), filepath.Join(td, shipper.DefaultMetaFilename), &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{mockBlockIDs[0]},
}))

tenant.ship = shipper.New(log.NewNopLogger(), nil, td, nil, nil, metadata.BucketUploadSource, nil, false, metadata.NoneFunc, "")
require.Equal(t, map[ulid.ULID]struct{}{
mockBlockIDs[0]: {},
}, tenant.blocksToDelete(nil))
})
}
15 changes: 15 additions & 0 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,21 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
return uploaded, nil
}

func (s *Shipper) UploadedBlocks() map[ulid.ULID]struct{} {
meta, err := ReadMetaFile(s.metadataFilePath)
if err != nil {
// NOTE(GiedriusS): Sync() will inform users about any problems.
return nil
}

ret := make(map[ulid.ULID]struct{}, len(meta.Uploaded))
for _, id := range meta.Uploaded {
ret[id] = struct{}{}
}

return ret
}

// sync uploads the block if not exists in remote storage.
// TODO(khyatisoneji): Double check if block does not have deletion-mark.json for some reason, otherwise log it or return error.
func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error {
Expand Down

0 comments on commit 90b8a7b

Please sign in to comment.