From 758b30609f5c8146caadc73c511c7b32787302dd Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 26 Oct 2018 15:43:57 +0200 Subject: [PATCH] storage: fix Raft log size accounting We were accounting for sideloaded payloads (SSTs) when adding them to the log, but were omitting them during truncations. As a result, the tracked raft log size would permanently skyrocket which in turn would lead to extremely aggressive truncations and resulted in pathological amounts of Raft snapshots. I'm still concerned about this logic as we're now relying on numbers obtained from the file system to match exactly a prior in-mem computation, and there may be other bugs that cause a divergence. But this fixes the blatant obvious one, so it's a step in the right direction. The added tests highlight a likely omission in the sideloaded storage code which doesn't access the file system through the RocksDB env as it seems like it should, filed as #31913. At this point it's unclear whether it fixes the below issues, but at the very least it seems to address a major problem they encountered: Touches #31732. Touches #31740. Touches #30261. Touches #31768. Touches #31745. Release note (bug fix): avoid a performance degradation related to overly aggressive Raft log truncations that could occur during RESTORE or IMPORT operations. --- pkg/storage/raft_log_queue.go | 7 +- pkg/storage/raft_log_queue_test.go | 44 ++++----- pkg/storage/replica_proposal.go | 4 +- pkg/storage/replica_sideload.go | 4 +- pkg/storage/replica_sideload_disk.go | 18 ++-- pkg/storage/replica_sideload_inmem.go | 8 +- pkg/storage/replica_sideload_test.go | 126 ++++++++++++++++++++------ 7 files changed, 147 insertions(+), 64 deletions(-) diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index 5e1f785e4fae..e6d8df8a7e91 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -19,6 +19,8 @@ import ( "sort" "time" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/pkg/errors" "go.etcd.io/etcd/raft" @@ -249,11 +251,12 @@ func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ config.Syste if shouldTruncate(truncatableIndexes, raftLogSize) { r.mu.Lock() raftLogSize := r.mu.raftLogSize + lastIndex := r.mu.lastIndex r.mu.Unlock() if log.V(1) { - log.Infof(ctx, "truncating raft log %d-%d: size=%d", - oldestIndex-truncatableIndexes, oldestIndex, raftLogSize) + log.Infof(ctx, "truncating raft log entries [%d-%d], resulting in log [%d,%d], reclaiming ~%s", + oldestIndex-truncatableIndexes, oldestIndex-1, oldestIndex, lastIndex, humanizeutil.IBytes(raftLogSize)) } b := &client.Batch{} b.AddRawRequest(&roachpb.TruncateLogRequest{ diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index 38b502fbcbd5..5c7750e094a1 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -143,6 +143,28 @@ func TestComputeTruncatableIndex(t *testing.T) { } } +func verifyLogSizeInSync(t *testing.T, r *Replica) { + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + raftLogSize := r.mu.raftLogSize + r.mu.Unlock() + start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1)) + end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64)) + + var ms enginepb.MVCCStats + iter := r.store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key}) + defer iter.Close() + ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */) + if err != nil { + t.Fatal(err) + } + actualRaftLogSize := ms.SysBytes + if actualRaftLogSize != raftLogSize { + t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize) + } +} + // TestGetTruncatableIndexes verifies that old raft log entries are correctly // removed. func TestGetTruncatableIndexes(t *testing.T) { @@ -228,27 +250,7 @@ func TestGetTruncatableIndexes(t *testing.T) { t.Errorf("expected oldestIndex to increase, instead it changed from %d -> %d", bOldest, cOldest) } - func() { - r.raftMu.Lock() - defer r.raftMu.Unlock() - r.mu.Lock() - raftLogSize := r.mu.raftLogSize - r.mu.Unlock() - start := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, 1)) - end := engine.MakeMVCCMetadataKey(keys.RaftLogKey(r.RangeID, math.MaxUint64)) - - var ms enginepb.MVCCStats - iter := store.engine.NewIterator(engine.IterOptions{UpperBound: end.Key}) - defer iter.Close() - ms, err := iter.ComputeStats(start, end, 0 /* nowNanos */) - if err != nil { - t.Fatal(err) - } - actualRaftLogSize := ms.SysBytes - if actualRaftLogSize != raftLogSize { - t.Fatalf("replica claims raft log size %d, but computed %d", raftLogSize, actualRaftLogSize) - } - }() + verifyLogSizeInSync(t, r) // Again, enable the raft log scanner and and force a truncation. This time // we expect no truncation to occur. diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 1b5b08449ca8..d3e7147aad5f 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -565,10 +565,12 @@ func (r *Replica) handleReplicatedEvalResult( // could rot. { log.Eventf(ctx, "truncating sideloaded storage up to (and including) index %d", newTruncState.Index) - if err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil { + if size, err := r.raftMu.sideloaded.TruncateTo(ctx, newTruncState.Index+1); err != nil { // We don't *have* to remove these entries for correctness. Log a // loud error, but keep humming along. log.Errorf(ctx, "while removing sideloaded files during log truncation: %s", err) + } else { + rResult.RaftLogDelta -= size } } } diff --git a/pkg/storage/replica_sideload.go b/pkg/storage/replica_sideload.go index 8b421cfb90e7..5b0b6aa2edbc 100644 --- a/pkg/storage/replica_sideload.go +++ b/pkg/storage/replica_sideload.go @@ -47,8 +47,8 @@ type sideloadStorage interface { // Clear files that may have been written by this sideloadStorage. Clear(context.Context) error // TruncateTo removes all files belonging to an index strictly smaller than - // the given one. - TruncateTo(_ context.Context, index uint64) error + // the given one. Returns the number of bytes freed. + TruncateTo(_ context.Context, index uint64) (int64, error) // Returns an absolute path to the file that Get() would return the contents // of. Does not check whether the file actually exists. Filename(_ context.Context, index, term uint64) (string, error) diff --git a/pkg/storage/replica_sideload_disk.go b/pkg/storage/replica_sideload_disk.go index d86005f06144..31793bb6b90e 100644 --- a/pkg/storage/replica_sideload_disk.go +++ b/pkg/storage/replica_sideload_disk.go @@ -129,12 +129,13 @@ func (ss *diskSideloadStorage) Clear(_ context.Context) error { return err } -func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) error { +func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) (int64, error) { matches, err := filepath.Glob(filepath.Join(ss.dir, "i*.t*")) if err != nil { - return err + return 0, err } var deleted int + var size int64 for _, match := range matches { base := filepath.Base(match) if len(base) < 1 || base[0] != 'i' { @@ -144,22 +145,27 @@ func (ss *diskSideloadStorage) TruncateTo(ctx context.Context, index uint64) err upToDot := strings.SplitN(base, ".", 2) i, err := strconv.ParseUint(upToDot[0], 10, 64) if err != nil { - return errors.Wrapf(err, "while parsing %q during TruncateTo", match) + return size, errors.Wrapf(err, "while parsing %q during TruncateTo", match) } if i >= index { continue } + var fi os.FileInfo + if fi, err = os.Stat(match); err != nil { + return size, errors.Wrapf(err, "while purging %q", match) + } if err := ss.purgeFile(ctx, match); err != nil { - return errors.Wrapf(err, "while purging %q", match) + return size, errors.Wrapf(err, "while purging %q", match) } deleted++ + size += fi.Size() } if deleted == len(matches) { err = os.Remove(ss.dir) if !os.IsNotExist(err) { - return errors.Wrapf(err, "while purging %q", ss.dir) + return size, errors.Wrapf(err, "while purging %q", ss.dir) } } - return nil + return size, nil } diff --git a/pkg/storage/replica_sideload_inmem.go b/pkg/storage/replica_sideload_inmem.go index e6627786adeb..2a194862835a 100644 --- a/pkg/storage/replica_sideload_inmem.go +++ b/pkg/storage/replica_sideload_inmem.go @@ -99,12 +99,14 @@ func (ss *inMemSideloadStorage) Clear(_ context.Context) error { return nil } -func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) error { +func (ss *inMemSideloadStorage) TruncateTo(_ context.Context, index uint64) (int64, error) { // Not efficient, but this storage is for testing purposes only anyway. - for k := range ss.m { + var size int64 + for k, v := range ss.m { if k.index < index { + size += int64(len(v)) delete(ss.m, k) } } - return nil + return size, nil } diff --git a/pkg/storage/replica_sideload_test.go b/pkg/storage/replica_sideload_test.go index 518fb6cec404..269f3f328fe6 100644 --- a/pkg/storage/replica_sideload_test.go +++ b/pkg/storage/replica_sideload_test.go @@ -189,7 +189,8 @@ func testSideloadingSideloadedStorage( { err: nil, fun: func() error { - return ss.TruncateTo(ctx, 123) + _, err := ss.TruncateTo(ctx, 123) + return err }, }, { @@ -260,7 +261,7 @@ func testSideloadingSideloadedStorage( for n := range payloads { // Truncate indexes <= payloads[n] (payloads is sorted in increasing order). - if err := ss.TruncateTo(ctx, payloads[n]); err != nil { + if _, err := ss.TruncateTo(ctx, payloads[n]); err != nil { t.Fatalf("%d: %s", n, err) } // Index payloads[n] and above are still there (truncation is exclusive) @@ -280,16 +281,20 @@ func testSideloadingSideloadedStorage( } } - if !isInMem { + func() { + if isInMem { + return + } // First add a file that shouldn't be in the sideloaded storage to ensure // sane behavior when directory can't be removed after full truncate. nonRemovableFile := filepath.Join(ss.(*diskSideloadStorage).dir, "cantremove.xx") - _, err := os.Create(nonRemovableFile) + f, err := os.Create(nonRemovableFile) if err != nil { t.Fatalf("could not create non i*.t* file in sideloaded storage: %v", err) } + defer f.Close() - err = ss.TruncateTo(ctx, math.MaxUint64) + _, err = ss.TruncateTo(ctx, math.MaxUint64) if err == nil { t.Fatalf("sideloaded directory should not have been removable due to extra file %s", nonRemovableFile) } @@ -304,7 +309,7 @@ func testSideloadingSideloadedStorage( } // Test that directory is removed when filepath.Glob returns 0 matches. - if err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { + if _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { t.Fatal(err) } // Ensure directory is removed, now that all files should be gone. @@ -328,7 +333,7 @@ func testSideloadingSideloadedStorage( } } assertCreated(true) - if err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { + if _, err := ss.TruncateTo(ctx, math.MaxUint64); err != nil { t.Fatal(err) } // Ensure directory is removed when all records are removed. @@ -341,7 +346,7 @@ func testSideloadingSideloadedStorage( t.Fatalf("expected %q to be removed: %v", ss.(*diskSideloadStorage).dir, err) } } - } + }() if err := ss.Clear(ctx); err != nil { t.Fatal(err) @@ -350,7 +355,7 @@ func testSideloadingSideloadedStorage( assertCreated(false) // Sanity check that we can call TruncateTo without the directory existing. - if err := ss.TruncateTo(ctx, 1); err != nil { + if _, err := ss.TruncateTo(ctx, 1); err != nil { t.Fatal(err) } @@ -620,10 +625,40 @@ func makeInMemSideloaded(repl *Replica) { // TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command. func TestRaftSSTableSideloadingProposal(t *testing.T) { defer leaktest.AfterTest(t)() + + testutils.RunTrueAndFalse(t, "engineInMem", func(t *testing.T, engineInMem bool) { + testutils.RunTrueAndFalse(t, "mockSideloaded", func(t *testing.T, mockSideloaded bool) { + if engineInMem && !mockSideloaded { + t.Skip("https://github.com/cockroachdb/cockroach/issues/31913") + } + testRaftSSTableSideloadingProposal(t, engineInMem, mockSideloaded) + }) + }) +} + +// TestRaftSSTableSideloadingProposal runs a straightforward application of an `AddSSTable` command. +func testRaftSSTableSideloadingProposal(t *testing.T, engineInMem, mockSideloaded bool) { + defer leaktest.AfterTest(t)() defer SetMockAddSSTable()() - tc := testContext{} + dir, cleanup := testutils.TempDir(t) + defer cleanup() stopper := stop.NewStopper() + tc := testContext{} + if !engineInMem { + cfg := engine.RocksDBConfig{ + Dir: dir, + Settings: cluster.MakeTestingClusterSettings(), + } + var err error + cache := engine.NewRocksDBCache(1 << 20) + defer cache.Release() + tc.engine, err = engine.NewRocksDB(cfg, cache) + if err != nil { + t.Fatal(err) + } + stopper.AddCloser(tc.engine) + } defer stopper.Stop(context.TODO()) tc.Start(t, stopper) @@ -631,11 +666,14 @@ func TestRaftSSTableSideloadingProposal(t *testing.T) { defer cancel() const ( - key = "foo" - val = "bar" + key = "foo" + entrySize = 128 ) + val := strings.Repeat("x", entrySize) - makeInMemSideloaded(tc.repl) + if mockSideloaded { + makeInMemSideloaded(tc.repl) + } ts := hlc.Timestamp{Logical: 1} @@ -664,27 +702,57 @@ func TestRaftSSTableSideloadingProposal(t *testing.T) { } } - tc.repl.raftMu.Lock() - defer tc.repl.raftMu.Unlock() - if ss := tc.repl.raftMu.sideloaded.(*inMemSideloadStorage); len(ss.m) < 1 { - t.Fatal("sideloaded storage is empty") - } + func() { + tc.repl.raftMu.Lock() + defer tc.repl.raftMu.Unlock() + if ss, ok := tc.repl.raftMu.sideloaded.(*inMemSideloadStorage); ok && len(ss.m) < 1 { + t.Fatal("sideloaded storage is empty") + } - if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()), "sideloadable proposal detected", "ingested SSTable"); err != nil { - t.Fatal(err) - } + if err := testutils.MatchInOrder(tracing.FormatRecordedSpans(collect()), "sideloadable proposal detected", "ingested SSTable"); err != nil { + t.Fatal(err) + } - if n := tc.store.metrics.AddSSTableProposals.Count(); n == 0 { - t.Fatalf("expected metric to show at least one AddSSTable proposal, but got %d", n) - } + if n := tc.store.metrics.AddSSTableProposals.Count(); n == 0 { + t.Fatalf("expected metric to show at least one AddSSTable proposal, but got %d", n) + } + + if n := tc.store.metrics.AddSSTableApplications.Count(); n == 0 { + t.Fatalf("expected metric to show at least one AddSSTable application, but got %d", n) + } + // We usually don't see copies because we hardlink and ingest the original SST. However, this + // depends on luck and the file system, so don't try to assert it. We should, however, see + // no more than one. + expMaxCopies := int64(1) + if engineInMem { + // We don't count in-memory env SST writes as copies. + expMaxCopies = 0 + } + if n := tc.store.metrics.AddSSTableApplicationCopies.Count(); n > expMaxCopies { + t.Fatalf("expected metric to show <= %d AddSSTable copies, but got %d", expMaxCopies, n) + } + }() + + // Force a log truncation followed by verification of the tracked raft log size. This exercises a + // former bug in which the raft log size took the sideloaded payload into account when adding + // to the log, but not when truncating. - if n := tc.store.metrics.AddSSTableApplications.Count(); n == 0 { - t.Fatalf("expected metric to show at least one AddSSTable application, but got %d", n) + // Write enough keys to the range to make sure that a truncation will happen. + for i := 0; i < RaftLogQueueStaleThreshold+1; i++ { + key := roachpb.Key(fmt.Sprintf("key%02d", i)) + args := putArgs(key, []byte(fmt.Sprintf("value%02d", i))) + if _, err := client.SendWrapped(context.Background(), tc.store.TestSender(), &args); err != nil { + t.Fatal(err) + } } - // We don't count in-memory env SST writes as copies. - if n := tc.store.metrics.AddSSTableApplicationCopies.Count(); n != 0 { - t.Fatalf("expected metric to show 0 AddSSTable copy, but got %d", n) + + if _, err := tc.store.raftLogQueue.Add(tc.repl, 99.99 /* priority */); err != nil { + t.Fatal(err) } + tc.store.ForceRaftLogScanAndProcess() + // SST is definitely truncated now, so recomputing the Raft log keys should match up with + // the tracked size. + verifyLogSizeInSync(t, tc.repl) } type mockSender struct {