Skip to content

Commit

Permalink
storage: fix Raft log size accounting
Browse files Browse the repository at this point in the history
We were accounting for sideloaded payloads (SSTs) when adding them to
the log, but were omitting them during truncations. As a result, the
tracked raft log size would permanently skyrocket which in turn would
lead to extremely aggressive truncations and resulted in pathological
amounts of Raft snapshots.

I'm still concerned about this logic as we're now relying on numbers
obtained from the file system to match exactly a prior in-mem
computation, and there may be other bugs that cause a divergence. But
this fixes the blatant obvious one, so it's a step in the right
direction.

The added tests highlight a likely omission in the sideloaded storage
code which doesn't access the file system through the RocksDB env as it
seems like it should, filed as cockroachdb#31913.

At this point it's unclear whether it fixes the below issues, but at the
very least it seems to address a major problem they encountered:

Touches cockroachdb#31732.
Touches cockroachdb#31740.
Touches cockroachdb#30261.
Touches cockroachdb#31768.
Touches cockroachdb#31745.

Release note (bug fix): avoid a performance degradation related to
overly aggressive Raft log truncations that could occur during RESTORE
or IMPORT operations.
  • Loading branch information
tbg committed Nov 16, 2018
1 parent 4523f98 commit 758b306
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 64 deletions.
7 changes: 5 additions & 2 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"

"github.com/pkg/errors"
"go.etcd.io/etcd/raft"

Expand Down Expand Up @@ -249,11 +251,12 @@ func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ config.Syste
if shouldTruncate(truncatableIndexes, raftLogSize) {
r.mu.Lock()
raftLogSize := r.mu.raftLogSize
lastIndex := r.mu.lastIndex
r.mu.Unlock()

if log.V(1) {
log.Infof(ctx, "truncating raft log %d-%d: size=%d",
oldestIndex-truncatableIndexes, oldestIndex, raftLogSize)
log.Infof(ctx, "truncating raft log entries [%d-%d], resulting in log [%d,%d], reclaiming ~%s",
oldestIndex-truncatableIndexes, oldestIndex-1, oldestIndex, lastIndex, humanizeutil.IBytes(raftLogSize))
}
b := &client.Batch{}
b.AddRawRequest(&roachpb.TruncateLogRequest{
Expand Down
44 changes: 23 additions & 21 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,28 @@ func TestComputeTruncatableIndex(t *testing.T) {
}
}

func verifyLogSizeInSync(t *testing.T, r *Replica) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
raftLogSize := r.mu.raftLogSize
r.mu.Unlock()
start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1))
end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64))

var ms enginepb.MVCCStats
iter := r.store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key})
defer iter.Close()
ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */)
if err != nil {
t.Fatal(err)
}
actualRaftLogSize := ms.SysBytes
if actualRaftLogSize != raftLogSize {
t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize)
}
}

// TestGetTruncatableIndexes verifies that old raft log entries are correctly
// removed.
func TestGetTruncatableIndexes(t *testing.T) {
Expand Down Expand Up @@ -228,27 +250,7 @@ func TestGetTruncatableIndexes(t *testing.T) {
t.Errorf("expected oldestIndex to increase, instead it changed from %d -> %d", bOldest, cOldest)
}

func() {
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
raftLogSize := r.mu.raftLogSize
r.mu.Unlock()
start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1))
end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64))

var ms enginepb.MVCCStats
iter := store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key})
defer iter.Close()
ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */)
if err != nil {
t.Fatal(err)
}
actualRaftLogSize := ms.SysBytes
if actualRaftLogSize != raftLogSize {
t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize)
}
}()
verifyLogSizeInSync(t, r)

// Again, enable the raft log scanner and and force a truncation. This time
// we expect no truncation to occur.
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,12 @@ func (r *Replica) handleReplicatedEvalResult(
// could rot.
{
log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", newTruncState.Index)
if err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil {
if size, err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil {
// We don't *have* to remove these entries for correctness. Log a
// loud error, but keep humming along.
log.Errorf(ctx, "while removing sideloaded files during log truncation: %s", err)
} else {
rResult.RaftLogDelta -= size
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type sideloadStorage interface {
// Clear files that may have been written by this sideloadStorage.
Clear(context.Context) error
// TruncateTo removes all files belonging to an index strictly smaller than
// the given one.
TruncateTo(_ context.Context, index uint64) error
// the given one. Returns the number of bytes freed.
TruncateTo(_ context.Context, index uint64) (int64, error)
// Returns an absolute path to the file that Get() would return the contents
// of. Does not check whether the file actually exists.
Filename(_ context.Context, index, term uint64) (string, error)
Expand Down
18 changes: 12 additions & 6 deletions pkg/storage/replica_sideload_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,13 @@ func (ss *diskSideloadStorage) Clear(_ context.Context) error {
return err
}

func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) error {
func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) (int64, error) {
matches, err := filepath.Glob(filepath.Join(ss.dir, "i*.t*"))
if err != nil {
return err
return 0, err
}
var deleted int
var size int64
for _, match := range matches {
base := filepath.Base(match)
if len(base) < 1 || base[0] != 'i' {
Expand All @@ -144,22 +145,27 @@ func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) err
upToDot := strings.SplitN(base, ".", 2)
i, err := strconv.ParseUint(upToDot[0], 10, 64)
if err != nil {
return errors.Wrapf(err, "while parsing %q during TruncateTo", match)
return size, errors.Wrapf(err, "while parsing %q during TruncateTo", match)
}
if i >= index {
continue
}
var fi os.FileInfo
if fi, err = os.Stat(match); err != nil {
return size, errors.Wrapf(err, "while purging %q", match)
}
if err := ss.purgeFile(ctx, match); err != nil {
return errors.Wrapf(err, "while purging %q", match)
return size, errors.Wrapf(err, "while purging %q", match)
}
deleted++
size += fi.Size()
}

if deleted == len(matches) {
err = os.Remove(ss.dir)
if !os.IsNotExist(err) {
return errors.Wrapf(err, "while purging %q", ss.dir)
return size, errors.Wrapf(err, "while purging %q", ss.dir)
}
}
return nil
return size, nil
}
8 changes: 5 additions & 3 deletions pkg/storage/replica_sideload_inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@ func (ss *inMemSideloadStorage) Clear(_ context.Context) error {
return nil
}

func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) error {
func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) (int64, error) {
// Not efficient, but this storage is for testing purposes only anyway.
for k := range ss.m {
var size int64
for k, v := range ss.m {
if k.index < index {
size += int64(len(v))
delete(ss.m, k)
}
}
return nil
return size, nil
}
126 changes: 97 additions & 29 deletions pkg/storage/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ func testSideloadingSideloadedStorage(
{
err: nil,
fun: func() error {
return ss.TruncateTo(ctx, 123)
_, err := ss.TruncateTo(ctx, 123)
return err
},
},
{
Expand Down Expand Up @@ -260,7 +261,7 @@ func testSideloadingSideloadedStorage(

for n := range payloads {
// Truncate indexes <= payloads[n] (payloads is sorted in increasing order).
if err := ss.TruncateTo(ctx, payloads[n]); err != nil {
if _, err := ss.TruncateTo(ctx, payloads[n]); err != nil {
t.Fatalf("%d: %s", n, err)
}
// Index payloads[n] and above are still there (truncation is exclusive)
Expand All @@ -280,16 +281,20 @@ func testSideloadingSideloadedStorage(
}
}

if !isInMem {
func() {
if isInMem {
return
}
// First add a file that shouldn't be in the sideloaded storage to ensure
// sane behavior when directory can't be removed after full truncate.
nonRemovableFile := filepath.Join(ss.(*diskSideloadStorage).dir, "cantremove.xx")
_, err := os.Create(nonRemovableFile)
f, err := os.Create(nonRemovableFile)
if err != nil {
t.Fatalf("could not create non i*.t* file in sideloaded storage: %v", err)
}
defer f.Close()

err = ss.TruncateTo(ctx, math.MaxUint64)
_, err = ss.TruncateTo(ctx, math.MaxUint64)
if err == nil {
t.Fatalf("sideloaded directory should not have been removable due to extra file %s", nonRemovableFile)
}
Expand All @@ -304,7 +309,7 @@ func testSideloadingSideloadedStorage(
}

// Test that directory is removed when filepath.Glob returns 0 matches.
if err := ss.TruncateTo(ctx, math.MaxUint64); err != nil {
if _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil {
t.Fatal(err)
}
// Ensure directory is removed, now that all files should be gone.
Expand All @@ -328,7 +333,7 @@ func testSideloadingSideloadedStorage(
}
}
assertCreated(true)
if err := ss.TruncateTo(ctx, math.MaxUint64); err != nil {
if _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil {
t.Fatal(err)
}
// Ensure directory is removed when all records are removed.
Expand All @@ -341,7 +346,7 @@ func testSideloadingSideloadedStorage(
t.Fatalf("expected %q to be removed: %v", ss.(*diskSideloadStorage).dir, err)
}
}
}
}()

if err := ss.Clear(ctx); err != nil {
t.Fatal(err)
Expand All @@ -350,7 +355,7 @@ func testSideloadingSideloadedStorage(
assertCreated(false)

// Sanity check that we can call TruncateTo without the directory existing.
if err := ss.TruncateTo(ctx, 1); err != nil {
if _, err := ss.TruncateTo(ctx, 1); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -620,22 +625,55 @@ func makeInMemSideloaded(repl *Replica) {
// TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command.
func TestRaftSSTableSideloadingProposal(t *testing.T) {
defer leaktest.AfterTest(t)()

testutils.RunTrueAndFalse(t, "engineInMem", func(t *testing.T, engineInMem bool) {
testutils.RunTrueAndFalse(t, "mockSideloaded", func(t *testing.T, mockSideloaded bool) {
if engineInMem && !mockSideloaded {
t.Skip("https://github.com/cockroachdb/cockroach/issues/31913")
}
testRaftSSTableSideloadingProposal(t, engineInMem, mockSideloaded)
})
})
}

// TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command.
func testRaftSSTableSideloadingProposal(t *testing.T, engineInMem, mockSideloaded bool) {
defer leaktest.AfterTest(t)()
defer SetMockAddSSTable()()

tc := testContext{}
dir, cleanup := testutils.TempDir(t)
defer cleanup()
stopper := stop.NewStopper()
tc := testContext{}
if !engineInMem {
cfg := engine.RocksDBConfig{
Dir: dir,
Settings: cluster.MakeTestingClusterSettings(),
}
var err error
cache := engine.NewRocksDBCache(1 << 20)
defer cache.Release()
tc.engine, err = engine.NewRocksDB(cfg, cache)
if err != nil {
t.Fatal(err)
}
stopper.AddCloser(tc.engine)
}
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), "test-recording")
defer cancel()

const (
key = "foo"
val = "bar"
key = "foo"
entrySize = 128
)
val := strings.Repeat("x", entrySize)

makeInMemSideloaded(tc.repl)
if mockSideloaded {
makeInMemSideloaded(tc.repl)
}

ts := hlc.Timestamp{Logical: 1}

Expand Down Expand Up @@ -664,27 +702,57 @@ func TestRaftSSTableSideloadingProposal(t *testing.T) {
}
}

tc.repl.raftMu.Lock()
defer tc.repl.raftMu.Unlock()
if ss := tc.repl.raftMu.sideloaded.(*inMemSideloadStorage); len(ss.m) < 1 {
t.Fatal("sideloaded storage is empty")
}
func() {
tc.repl.raftMu.Lock()
defer tc.repl.raftMu.Unlock()
if ss, ok := tc.repl.raftMu.sideloaded.(*inMemSideloadStorage); ok && len(ss.m) < 1 {
t.Fatal("sideloaded storage is empty")
}

if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()), "sideloadable proposal detected", "ingested SSTable"); err != nil {
t.Fatal(err)
}
if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()), "sideloadable proposal detected", "ingested SSTable"); err != nil {
t.Fatal(err)
}

if n := tc.store.metrics.AddSSTableProposals.Count(); n == 0 {
t.Fatalf("expected metric to show at least one AddSSTable proposal, but got %d", n)
}
if n := tc.store.metrics.AddSSTableProposals.Count(); n == 0 {
t.Fatalf("expected metric to show at least one AddSSTable proposal, but got %d", n)
}

if n := tc.store.metrics.AddSSTableApplications.Count(); n == 0 {
t.Fatalf("expected metric to show at least one AddSSTable application, but got %d", n)
}
// We usually don't see copies because we hardlink and ingest the original SST. However, this
// depends on luck and the file system, so don't try to assert it. We should, however, see
// no more than one.
expMaxCopies := int64(1)
if engineInMem {
// We don't count in-memory env SST writes as copies.
expMaxCopies = 0
}
if n := tc.store.metrics.AddSSTableApplicationCopies.Count(); n > expMaxCopies {
t.Fatalf("expected metric to show <= %d AddSSTable copies, but got %d", expMaxCopies, n)
}
}()

// Force a log truncation followed by verification of the tracked raft log size. This exercises a
// former bug in which the raft log size took the sideloaded payload into account when adding
// to the log, but not when truncating.

if n := tc.store.metrics.AddSSTableApplications.Count(); n == 0 {
t.Fatalf("expected metric to show at least one AddSSTable application, but got %d", n)
// Write enough keys to the range to make sure that a truncation will happen.
for i := 0; i < RaftLogQueueStaleThreshold+1; i++ {
key := roachpb.Key(fmt.Sprintf("key%02d", i))
args := putArgs(key, []byte(fmt.Sprintf("value%02d", i)))
if _, err := client.SendWrapped(context.Background(), tc.store.TestSender(), &args); err != nil {
t.Fatal(err)
}
}
// We don't count in-memory env SST writes as copies.
if n := tc.store.metrics.AddSSTableApplicationCopies.Count(); n != 0 {
t.Fatalf("expected metric to show 0 AddSSTable copy, but got %d", n)

if _, err := tc.store.raftLogQueue.Add(tc.repl, 99.99 /* priority */); err != nil {
t.Fatal(err)
}
tc.store.ForceRaftLogScanAndProcess()
// SST is definitely truncated now, so recomputing the Raft log keys should match up with
// the tracked size.
verifyLogSizeInSync(t, tc.repl)
}

type mockSender struct {
Expand Down

0 comments on commit 758b306

Please sign in to comment.