Skip to content

Commit

Permalink
kvserver: use engine's filesystem
Browse files Browse the repository at this point in the history
Update filesystem access to always go through the storage engine's
filesystem interface, which ensures correctness for in-mem and encrypted
filesystems

Also, add a Stat function to the storage/fs.FS interface. The RocksDB
implementation is still a hack, because the RocksDB Env doesn't expose
sufficient information for implementing. For on-disk RocksDB engines,
this implementation circumvents the Env, performing a direct os.Stat
of the filesystem. For in-memory RocksDB engines, it provides a mocked
os.FileInfo implementation.

Fixes cockroachdb#42034.
Related to cockroachdb#31913.

Release note: None
  • Loading branch information
jbowens committed May 29, 2020
1 parent 3725997 commit d007eac
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 129 deletions.
24 changes: 18 additions & 6 deletions pkg/kv/kvserver/client_replica_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package kvserver_test

import (
"context"
"io/ioutil"
"os"
"path/filepath"
"strconv"
Expand All @@ -24,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -99,6 +99,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
if err != nil {
t.Fatal(err)
}
eng := mtc.engines[1]

// Put some bogus sideloaded data on the replica which we're about to
// remove. Then, at the end of the test, check that that sideloaded
Expand All @@ -111,10 +112,10 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
if dir == "" {
t.Fatal("no sideloaded directory")
}
if err := os.MkdirAll(dir, 0755); err != nil {
if err := eng.MkdirAll(dir); err != nil {
t.Fatal(err)
}
if err := ioutil.WriteFile(filepath.Join(dir, "i1000000.t100000"), []byte("foo"), 0644); err != nil {
if err := writeFile(eng, filepath.Join(dir, "i1000000.t100000"), "foo"); err != nil {
t.Fatal(err)
}

Expand All @@ -125,9 +126,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
repl1.RaftLock()
dir := repl1.SideloadedRaftMuLocked().Dir()
repl1.RaftUnlock()
_, err := os.Stat(dir)

if os.IsNotExist(err) {
if _, err := eng.Stat(dir); os.IsNotExist(err) {
return nil
}
return errors.Errorf("replica still has sideloaded files despite GC: %v", err)
Expand All @@ -147,6 +146,19 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
})
}

func writeFile(fs fs.FS, path, contents string) error {
f, err := fs.Create(path)
if err != nil {
return err
}
_, err = f.Write([]byte(contents))
if err != nil {
_ = f.Close()
return err
}
return f.Close()
}

// TestReplicaGCQueueDropReplicaOnScan verifies that the range GC queue
// removes a range from a store that no longer should have a replica.
func TestReplicaGCQueueDropReplicaGCOnScan(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,12 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
assert.Contains(t, resp.Result[0].Detail, `stats`)

// A death rattle should have been written on s2 (store index 1).
b, err := ioutil.ReadFile(base.PreventedStartupFile(mtc.stores[1].Engine().GetAuxiliaryDir()))
eng := mtc.stores[1].Engine()
f, err := eng.Open(base.PreventedStartupFile(eng.GetAuxiliaryDir()))
require.NoError(t, err)
b, err := ioutil.ReadAll(f)
require.NoError(t, err)
require.NoError(t, f.Close())
require.NotEmpty(t, b)
}

Expand Down
20 changes: 16 additions & 4 deletions pkg/kv/kvserver/replica_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ package kvserver
import (
"context"
"fmt"
"io/ioutil"
"os"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand Down Expand Up @@ -57,7 +56,7 @@ func (r *Replica) setCorruptRaftMuLocked(
r.mu.destroyStatus.Set(cErr, destroyReasonRemoved)

auxDir := r.store.engine.GetAuxiliaryDir()
_ = os.MkdirAll(auxDir, 0755)
_ = r.store.engine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)

preventStartupMsg := fmt.Sprintf(`ATTENTION:
Expand All @@ -70,10 +69,23 @@ A file preventing this node from restarting was placed at:
%s
`, r, path)

if err := ioutil.WriteFile(path, []byte(preventStartupMsg), 0644); err != nil {
if err := writeFile(r.store.engine, path, preventStartupMsg); err != nil {
log.Warningf(ctx, "%v", err)
}

log.FatalfDepth(ctx, 1, "replica is corrupted: %s", cErr)
return roachpb.NewError(cErr)
}

func writeFile(fs fs.FS, path, contents string) error {
f, err := fs.Create(path)
if err != nil {
return err
}
_, err = f.Write([]byte(contents))
if err != nil {
_ = f.Close()
return err
}
return f.Close()
}
12 changes: 5 additions & 7 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ package kvserver
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -259,7 +257,7 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co
// in a goroutine that's about to end, simply sleep for a few seconds
// and then terminate.
auxDir := r.store.engine.GetAuxiliaryDir()
_ = os.MkdirAll(auxDir, 0755)
_ = r.store.engine.MkdirAll(auxDir)
path := base.PreventedStartupFile(auxDir)

preventStartupMsg := fmt.Sprintf(`ATTENTION:
Expand All @@ -276,7 +274,7 @@ A file preventing this node from restarting was placed at:
%s
`, r, auxDir, path)

if err := ioutil.WriteFile(path, []byte(preventStartupMsg), 0644); err != nil {
if err := writeFile(r.store.engine, path, preventStartupMsg); err != nil {
log.Warningf(ctx, "%v", err)
}

Expand Down Expand Up @@ -566,15 +564,15 @@ func addSSTablePreApply(

// TODO(tschottdorf): remove this once sideloaded storage guarantees its
// existence.
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
if err := eng.MkdirAll(filepath.Dir(path)); err != nil {
panic(err)
}
if _, err := os.Stat(path); err == nil {
if _, err := eng.Stat(path); err == nil {
// The file we want to ingest exists. This can happen since the
// ingestion may apply twice (we ingest before we mark the Raft
// command as committed). Just unlink the file (RocksDB created a
// hard link); after that we're free to write it again.
if err := os.Remove(path); err != nil {
if err := eng.Remove(path); err != nil {
log.Fatalf(ctx, "while removing existing file during ingestion of %s: %+v", path, err)
}
}
Expand Down
27 changes: 10 additions & 17 deletions pkg/kv/kvserver/replica_sideload_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string {
)
}

func exists(path string) (bool, error) {
_, err := os.Stat(path)
func exists(eng storage.Engine, path string) (bool, error) {
_, err := eng.Stat(path)
if err == nil {
return true, nil
}
Expand Down Expand Up @@ -96,15 +96,15 @@ func newDiskSideloadStorage(
// ns on my laptop, but only around 2.2k ns on the gceworker. Still,
// even on the laptop, 50k replicas would only add 1.2s which is also
// acceptable given that it'll happen only once.
exists, err := exists(path)
exists, err := exists(eng, path)
if err != nil {
return nil, errors.Wrap(err, "checking pre-migration sideloaded directory")
}
if exists {
if err := os.MkdirAll(filepath.Dir(newPath), 0755); err != nil {
if err := eng.MkdirAll(filepath.Dir(newPath)); err != nil {
return nil, errors.Wrap(err, "creating migrated sideloaded directory")
}
if err := os.Rename(path, newPath); err != nil {
if err := eng.Rename(path, newPath); err != nil {
return nil, errors.Wrap(err, "while migrating sideloaded directory")
}
}
Expand All @@ -120,7 +120,7 @@ func newDiskSideloadStorage(
}

func (ss *diskSideloadStorage) createDir() error {
err := os.MkdirAll(ss.dir, 0755)
err := ss.eng.MkdirAll(ss.dir)
ss.dirCreated = ss.dirCreated || err == nil
return err
}
Expand Down Expand Up @@ -177,12 +177,7 @@ func (ss *diskSideloadStorage) Purge(ctx context.Context, index, term uint64) (i
}

func (ss *diskSideloadStorage) fileSize(filename string) (int64, error) {
// TODO(tschottdorf): this should all be done through the env. As written,
// the sizes returned here will be wrong if encryption is on. We want the
// size of the unencrypted payload.
//
// See #31913.
info, err := os.Stat(filename)
info, err := ss.eng.Stat(filename)
if err != nil {
if os.IsNotExist(err) {
return 0, errSideloadedFileNotFound
Expand All @@ -208,9 +203,7 @@ func (ss *diskSideloadStorage) purgeFile(ctx context.Context, filename string) (

// Clear implements SideloadStorage.
func (ss *diskSideloadStorage) Clear(_ context.Context) error {
// TODO(jackson): Update this and the rest of `os.` filesystem calls in
// this impl to use ss.eng.
err := os.RemoveAll(ss.dir)
err := ss.eng.RemoveAll(ss.dir)
ss.dirCreated = ss.dirCreated && err != nil
return err
}
Expand Down Expand Up @@ -243,8 +236,8 @@ func (ss *diskSideloadStorage) TruncateTo(
if deletedAll {
// The directory may not exist, or it may exist and have been empty.
// Not worth trying to figure out which one, just try to delete.
err := os.Remove(ss.dir)
if !os.IsNotExist(err) {
err := ss.eng.RemoveDir(ss.dir)
if err != nil && !os.IsNotExist(err) {
return bytesFreed, 0, errors.Wrapf(err, "while purging %q", ss.dir)
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func testSideloadingSideloadedStorage(
// First add a file that shouldn't be in the sideloaded storage to ensure
// sane behavior when directory can't be removed after full truncate.
nonRemovableFile := filepath.Join(ss.(*diskSideloadStorage).dir, "cantremove.xx")
f, err := os.Create(nonRemovableFile)
f, err := eng.Create(nonRemovableFile)
if err != nil {
t.Fatalf("could not create non i*.t* file in sideloaded storage: %+v", err)
}
Expand All @@ -295,7 +295,7 @@ func testSideloadingSideloadedStorage(
t.Fatalf("error truncating sideloaded storage: %+v", err)
}
// Now remove extra file and let truncation proceed to remove directory.
err = os.Remove(nonRemovableFile)
err = eng.Remove(nonRemovableFile)
if err != nil {
t.Fatalf("could not remove %s: %+v", nonRemovableFile, err)
}
Expand All @@ -305,7 +305,7 @@ func testSideloadingSideloadedStorage(
t.Fatal(err)
}
// Ensure directory is removed, now that all files should be gone.
_, err = os.Stat(ss.(*diskSideloadStorage).dir)
_, err = eng.Stat(ss.(*diskSideloadStorage).dir)
if err == nil {
t.Fatalf("expected %q to be removed after truncating full range", ss.(*diskSideloadStorage).dir)
}
Expand All @@ -329,7 +329,7 @@ func testSideloadingSideloadedStorage(
t.Fatal(err)
}
// Ensure directory is removed when all records are removed.
_, err = os.Stat(ss.(*diskSideloadStorage).dir)
_, err = eng.Stat(ss.(*diskSideloadStorage).dir)
if err == nil {
t.Fatalf("expected %q to be removed after truncating full range", ss.(*diskSideloadStorage).dir)
}
Expand Down
12 changes: 3 additions & 9 deletions pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvserver
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"

Expand Down Expand Up @@ -57,7 +56,7 @@ func (s *SSTSnapshotStorage) NewScratchSpace(

// Clear removes all created directories and SSTs.
func (s *SSTSnapshotStorage) Clear() error {
return os.RemoveAll(s.dir)
return s.engine.RemoveAll(s.dir)
}

// SSTSnapshotStorageScratch keeps track of the SST files incrementally created
Expand All @@ -75,12 +74,7 @@ func (s *SSTSnapshotStorageScratch) filename(id int) string {
}

func (s *SSTSnapshotStorageScratch) createDir() error {
// TODO(peter): The directory creation needs to be plumbed through the Engine
// interface. Right now, this is creating a directory on disk even when the
// Engine has an in-memory filesystem. The only reason everything still works
// is because RocksDB MemEnvs allow the creation of files when the parent
// directory doesn't exist.
err := os.MkdirAll(s.snapDir, 0755)
err := s.storage.engine.MkdirAll(s.snapDir)
s.dirCreated = s.dirCreated || err == nil
return err
}
Expand Down Expand Up @@ -132,7 +126,7 @@ func (s *SSTSnapshotStorageScratch) SSTs() []string {

// Clear removes the directory and SSTs created for a particular snapshot.
func (s *SSTSnapshotStorageScratch) Clear() error {
return os.RemoveAll(s.snapDir)
return s.storage.engine.RemoveAll(s.snapDir)
}

// SSTSnapshotStorageFile is an SST file managed by a
Expand Down
14 changes: 8 additions & 6 deletions pkg/kv/kvserver/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSSTSnapshotStorage(t *testing.T) {
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)

// Check that the storage lazily creates the directories on first write.
_, err := os.Stat(scratch.snapDir)
_, err := eng.Stat(scratch.snapDir)
if !os.IsNotExist(err) {
t.Fatalf("expected %s to not exist", scratch.snapDir)
}
Expand All @@ -56,7 +56,7 @@ func TestSSTSnapshotStorage(t *testing.T) {

// Check that the storage lazily creates the files on write.
for _, fileName := range scratch.SSTs() {
_, err := os.Stat(fileName)
_, err := eng.Stat(fileName)
if !os.IsNotExist(err) {
t.Fatalf("expected %s to not exist", fileName)
}
Expand All @@ -67,10 +67,12 @@ func TestSSTSnapshotStorage(t *testing.T) {

// After writing to files, check that they have been flushed to disk.
for _, fileName := range scratch.SSTs() {
require.FileExists(t, fileName)
data, err := ioutil.ReadFile(fileName)
f, err := eng.Open(fileName)
require.NoError(t, err)
data, err := ioutil.ReadAll(f)
require.NoError(t, err)
require.Equal(t, data, []byte("foo"))
require.NoError(t, f.Close())
}

// Check that closing is idempotent.
Expand All @@ -90,12 +92,12 @@ func TestSSTSnapshotStorage(t *testing.T) {

// Check that Clear removes the directory.
require.NoError(t, scratch.Clear())
_, err = os.Stat(scratch.snapDir)
_, err = eng.Stat(scratch.snapDir)
if !os.IsNotExist(err) {
t.Fatalf("expected %s to not exist", scratch.snapDir)
}
require.NoError(t, sstSnapshotStorage.Clear())
_, err = os.Stat(sstSnapshotStorage.dir)
_, err = eng.Stat(sstSnapshotStorage.dir)
if !os.IsNotExist(err) {
t.Fatalf("expected %s to not exist", sstSnapshotStorage.dir)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"fmt"
"math"
"math/rand"
"os"
"reflect"
"regexp"
"sort"
Expand Down Expand Up @@ -6327,7 +6326,7 @@ func TestReplicaCorruption(t *testing.T) {
}

// Should have laid down marker file to prevent startup.
_, err := os.Stat(base.PreventedStartupFile(tc.engine.GetAuxiliaryDir()))
_, err := tc.engine.Stat(base.PreventedStartupFile(tc.engine.GetAuxiliaryDir()))
require.NoError(t, err)

// Should have triggered fatal error.
Expand Down
Loading

0 comments on commit d007eac

Please sign in to comment.