From 16f1fe907bcf956d0bc3f5c681d58cfd88cc6662 Mon Sep 17 00:00:00 2001 From: Kevin Cao Date: Mon, 16 Dec 2024 11:16:32 -0500 Subject: [PATCH] backupccl: create backup compaction iterator For the purposes of backup compaction, a custom iterator is required that behaves similarly to the ReadAsOfIterator, but also surfaces live tombstones point keys. Epic: none Release note: None --- pkg/storage/BUILD.bazel | 2 + pkg/storage/backup_compaction_iterator.go | 157 +++++++++++++ .../backup_compaction_iterator_test.go | 211 ++++++++++++++++++ 3 files changed, 370 insertions(+) create mode 100644 pkg/storage/backup_compaction_iterator.go create mode 100644 pkg/storage/backup_compaction_iterator_test.go diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index 0efbe5e176e7..85439425c821 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "array_32bit.go", "array_64bit.go", + "backup_compaction_iterator.go", "ballast.go", "batch.go", "col_mvcc.go", @@ -118,6 +119,7 @@ go_test( name = "storage_test", size = "medium", srcs = [ + "backup_compaction_iterator_test.go", "ballast_test.go", "batch_test.go", "bench_cloud_io_test.go", diff --git a/pkg/storage/backup_compaction_iterator.go b/pkg/storage/backup_compaction_iterator.go new file mode 100644 index 000000000000..1df10571466c --- /dev/null +++ b/pkg/storage/backup_compaction_iterator.go @@ -0,0 +1,157 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package storage + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// BackupCompactionIterator wraps a SimpleMVCCIterator and only surfaces the +// latest valid key of a given MVCC key, including point tombstones, at or below the +// asOfTimestamp, if set. +// +// The iterator assumes that it will not encounter any write intents and that the +// wrapped SimpleMVCCIterator *only* surfaces point keys. +type BackupCompactionIterator struct { + iter SimpleMVCCIterator + + // asOf is the latest timestamp of a key surfaced by the iterator. + asOf hlc.Timestamp + + // valid tracks if the current key is valid. + valid bool + + // err tracks if iterating to the current key returned an error. + err error +} + +var _ SimpleMVCCIterator = &BackupCompactionIterator{} + +// NewBackupCompactionIterator creates a new BackupCompactionIterator. The asOf timestamp cannot be empty. +func NewBackupCompactionIterator( + iter SimpleMVCCIterator, asOf hlc.Timestamp, +) (*BackupCompactionIterator, error) { + if asOf.IsEmpty() { + return nil, errors.New("asOf timestamp cannot be empty") + } + return &BackupCompactionIterator{ + iter: iter, + asOf: asOf, + }, nil +} + +func (f *BackupCompactionIterator) Close() { + f.iter.Close() +} + +// Next is identical to NextKey, as BackupCompactionIterator only surfaces live keys. +func (f *BackupCompactionIterator) Next() { + f.NextKey() +} + +func (f *BackupCompactionIterator) NextKey() { + f.iter.NextKey() + f.advance() +} + +func (f *BackupCompactionIterator) SeekGE(originalKey MVCCKey) { + // See ReadAsOfIterator comment for explanation of this. + synthetic := MVCCKey{Key: originalKey.Key, Timestamp: f.asOf} + f.iter.SeekGE(synthetic) + if f.advance(); f.valid && f.UnsafeKey().Less(originalKey) { + f.NextKey() + } +} + +func (f *BackupCompactionIterator) updateValid() bool { + f.valid, f.err = f.iter.Valid() + return f.valid +} + +// advance moves past keys with timestamps later than f.asOf. +func (f *BackupCompactionIterator) advance() { + for { + if ok := f.updateValid(); !ok { + return + } + if key := f.iter.UnsafeKey(); f.asOf.Less(key.Timestamp) { + f.iter.Next() + continue + } + return + } +} + +func (f *BackupCompactionIterator) UnsafeKey() MVCCKey { + return f.iter.UnsafeKey() +} + +func (f *BackupCompactionIterator) UnsafeValue() ([]byte, error) { + return f.iter.UnsafeValue() +} + +func (f *BackupCompactionIterator) Valid() (bool, error) { + if util.RaceEnabled && f.valid { + if err := f.assertInvariants(); err != nil { + return false, err + } + } + return f.valid, f.err +} + +func (f *BackupCompactionIterator) MVCCValueLenAndIsTombstone() (int, bool, error) { + return f.iter.MVCCValueLenAndIsTombstone() +} + +func (f *BackupCompactionIterator) ValueLen() int { + return f.iter.ValueLen() +} + +func (f *BackupCompactionIterator) HasPointAndRange() (bool, bool) { + hasPoint, hasRange := f.iter.HasPointAndRange() + if hasRange { + panic("unexpected range tombstone") + } + return hasPoint, hasRange +} + +func (f *BackupCompactionIterator) RangeBounds() roachpb.Span { + return roachpb.Span{} +} + +func (f *BackupCompactionIterator) RangeKeys() MVCCRangeKeyStack { + return MVCCRangeKeyStack{} +} + +func (f *BackupCompactionIterator) RangeKeyChanged() bool { + return false +} + +// assertInvariants checks that the iterator is in a valid state, but first assumes that the underlying iterator +// has already been validated and is in a valid state. +func (f *BackupCompactionIterator) assertInvariants() error { + if err := assertSimpleMVCCIteratorInvariants(f); err != nil { + return err + } + + if ok, err := f.iter.Valid(); !ok || err != nil { + errMsg := err.Error() + return errors.AssertionFailedf("invalid underlying iter with err=%s", errMsg) + } + + key := f.UnsafeKey() + if key.Timestamp.IsEmpty() { + return errors.AssertionFailedf("emitted key %s has no timestamp", key) + } + if f.asOf.Less(key.Timestamp) { + return errors.AssertionFailedf("emitted key %s above asOf timestamp %s", key, f.asOf) + } + + return nil +} diff --git a/pkg/storage/backup_compaction_iterator_test.go b/pkg/storage/backup_compaction_iterator_test.go new file mode 100644 index 000000000000..6ab6af3ccbef --- /dev/null +++ b/pkg/storage/backup_compaction_iterator_test.go @@ -0,0 +1,211 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package storage + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestBackupCompactionIterator(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + pebble, err := Open(context.Background(), InMemory(), + cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer pebble.Close() + + // The test turns each `input` into a batch for the readAsOfIterator, fully + // iterates the iterator, and puts the surfaced keys into a string in the same + // format as `input`. The test then compares the output to 'expectedNextKey'. + // The 'asOf' field represents the wall time of the hlc.Timestamp for the + // readAsOfIterator. + tests := []asOfTest{ + // Ensure vanilla iteration works, surfacing latest values with no AOST. + {input: "a1b1", expectedNextKey: "a1b1", asOf: ""}, + {input: "a2a1", expectedNextKey: "a2", asOf: ""}, + {input: "a1b2b1", expectedNextKey: "a1b2", asOf: ""}, + {input: "a2Xa1", expectedNextKey: "a2X", asOf: ""}, + {input: "a1b2Xb1", expectedNextKey: "a1b2X", asOf: ""}, + + // Ensure vanilla iterations works with provided AOST + {input: "a1b1", expectedNextKey: "a1b1", asOf: "1"}, + {input: "a1b1", expectedNextKey: "a1b1", asOf: "2"}, + {input: "a1b2b1", expectedNextKey: "a1b2", asOf: "2"}, + {input: "a1b1X", expectedNextKey: "a1b1X", asOf: "2"}, + {input: "a1b2Xb1", expectedNextKey: "a1b2X", asOf: "2"}, + + // Skipping keys with AOST. + {input: "a2a1", expectedNextKey: "a1", asOf: "1"}, + {input: "a1b2b1", expectedNextKey: "a1b1", asOf: "1"}, + + // Double skip within keys. + {input: "b3b2b1", expectedNextKey: "b1", asOf: "1"}, + + // Double skip across keys. + {input: "b2c2c1", expectedNextKey: "c1", asOf: "1"}, + + // Skipping tombstones with AOST. + {input: "a1b2Xb1", expectedNextKey: "a1b1", asOf: "1"}, + {input: "a2Xa1b2Xb1c2Xc1", expectedNextKey: "a1b1c1", asOf: "1"}, + + // Skipping under tombstone to land on another tombstone + {input: "a2Xa1b2b1X", expectedNextKey: "a1b1X", asOf: "1"}, + + // Ensure next key captures at most one mvcc key per key after an asOf skip. + {input: "b3c2c1", expectedNextKey: "c2", asOf: "2"}, + + // Ensure clean iteration over double tombstone. + {input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb2Xc1", asOf: ""}, + {input: "a1Xb2Xb1c1", expectedNextKey: "a1Xb1c1", asOf: "1"}, + + // Ensure key before delete tombstone gets read if under AOST. + {input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: ""}, + {input: "b2b1Xc1", expectedNextKey: "b2c1", asOf: "2"}, + } + + for i, test := range tests { + name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) + t.Run(name, func(t *testing.T) { + batch := pebble.NewBatch() + defer batch.Close() + populateBatch(t, batch, test.input) + iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) + require.NoError(t, err) + defer iter.Close() + + subtests := []iterSubtest{ + {"Next", test.expectedNextKey, SimpleMVCCIterator.Next}, + {"NextKey", test.expectedNextKey, SimpleMVCCIterator.NextKey}, + } + for _, subtest := range subtests { + t.Run(subtest.name, func(t *testing.T) { + asOf := hlc.Timestamp{} + if test.asOf != "" { + asOf.WallTime = int64(test.asOf[0]) + } else { + asOf = hlc.MaxTimestamp + } + it, err := NewBackupCompactionIterator(iter, asOf) + require.NoError(t, err) + iterateSimpleMVCCIterator(t, it, subtest) + }) + } + }) + } +} + +func TestBackupCompactionIteratorSeek(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + pebble, err := Open(context.Background(), InMemory(), + cluster.MakeTestingClusterSettings(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) + defer pebble.Close() + + tests := []struct { + input string + seekKey string + expected string + asOf string + }{ + // Ensure vanilla seek works. + {"a1b1", "a1", "a1", ""}, + + // Ensure seek always returns the latest key AOST of an MVCC key. + {"a2a1b1", "a1", "b1", ""}, + {"a2a1b1", "a1", "b1", "2"}, + {"a2a1b1", "a1", "a1", "1"}, + + // Seeking above all keys will return the latest key AOST of an MVCC key. + {"a2a1b3", "a8", "a2", ""}, + {"a2a1b3", "a8", "a1", "1"}, + {"a2a1b3X", "b8", "b3X", ""}, + {"a2a1b3Xb2", "b8", "b2", "2"}, + + // Ensure out of bounds seek fails gracefully. + {"a1", "b1", "notOK", ""}, + + // Ensure the asOf timestamp moves the iterator during a seek. + {"a2a1", "a2", "a1", "1"}, + {"a2b1", "a2", "b1", "1"}, + + // Ensure seek will return on a tombstone if it is the latest key. + {"a3Xa1b1", "a3", "a3X", ""}, + {"a3Xa1c2Xc1", "b1", "c2X", ""}, + + // Ensure seek will only return the latest key AOST + {"a3Xa2a1b1", "a2", "b1", ""}, + {"a3Xa2a1b1", "a2", "b1", "3"}, + {"a3a2Xa1b1", "a1", "b1", ""}, + {"a3a2Xa1b2Xb1c1", "a1", "b2X", ""}, + + // Ensure we can seek to a key right before a tombstone. + {"a2Xa1b2b1Xc1", "a1", "b2", ""}, + + // Ensure seek on a key above the AOST returns the correct key AOST. + {"a3a2Xa1b3Xb2b1", "b3", "b2", "2"}, + {"a3a2Xa1b3Xb2b1", "b3", "b1", "1"}, + + // Ensure seeking on a key on AOST returns that key + {"a3a2Xa1b3Xb2b1", "a2", "a2X", "2"}, + {"a3a2Xa1b3Xb2b1", "b2", "b2", "2"}, + } + for i, test := range tests { + name := fmt.Sprintf("Test %d: %s, AOST %s", i, test.input, test.asOf) + t.Run(name, func(t *testing.T) { + batch := pebble.NewBatch() + defer batch.Close() + populateBatch(t, batch, test.input) + iter, err := batch.NewMVCCIterator(context.Background(), MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax}) + require.NoError(t, err) + defer iter.Close() + + asOf := hlc.Timestamp{} + if test.asOf != "" { + asOf.WallTime = int64(test.asOf[0]) + } else { + asOf = hlc.MaxTimestamp + } + it, err := NewBackupCompactionIterator(iter, asOf) + require.NoError(t, err) + var output bytes.Buffer + + seekKey := MVCCKey{ + Key: []byte{test.seekKey[0]}, + Timestamp: hlc.Timestamp{WallTime: int64(test.seekKey[1])}, + } + it.SeekGE(seekKey) + ok, err := it.Valid() + require.NoError(t, err) + if !ok { + if test.expected == "notOK" { + return + } + require.NoError(t, err, "seek not ok") + } + output.Write(it.UnsafeKey().Key) + output.WriteByte(byte(it.UnsafeKey().Timestamp.WallTime)) + v, err := DecodeMVCCValueAndErr(it.UnsafeValue()) + require.NoError(t, err) + if v.IsTombstone() { + output.WriteRune('X') + } + require.Equal(t, test.expected, output.String()) + }) + } +}