diff --git a/pkg/kv/kvserver/client_replica_gc_test.go b/pkg/kv/kvserver/client_replica_gc_test.go index 6526ebde4eb3..2bbca240daf5 100644 --- a/pkg/kv/kvserver/client_replica_gc_test.go +++ b/pkg/kv/kvserver/client_replica_gc_test.go @@ -12,7 +12,6 @@ package kvserver_test import ( "context" - "io/ioutil" "os" "path/filepath" "strconv" @@ -24,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/errors" @@ -99,6 +99,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) { if err != nil { t.Fatal(err) } + eng := mtc.engines[1] // Put some bogus sideloaded data on the replica which we're about to // remove. Then, at the end of the test, check that that sideloaded @@ -111,10 +112,10 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) { if dir == "" { t.Fatal("no sideloaded directory") } - if err := os.MkdirAll(dir, 0755); err != nil { + if err := eng.MkdirAll(dir); err != nil { t.Fatal(err) } - if err := ioutil.WriteFile(filepath.Join(dir, "i1000000.t100000"), []byte("foo"), 0644); err != nil { + if err := writeFile(eng, filepath.Join(dir, "i1000000.t100000"), "foo"); err != nil { t.Fatal(err) } @@ -125,9 +126,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) { repl1.RaftLock() dir := repl1.SideloadedRaftMuLocked().Dir() repl1.RaftUnlock() - _, err := os.Stat(dir) - - if os.IsNotExist(err) { + if _, err := eng.Stat(dir); os.IsNotExist(err) { return nil } return errors.Errorf("replica still has sideloaded files despite GC: %v", err) @@ -147,6 +146,19 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) { }) } +func writeFile(fs fs.FS, path, contents string) error { + f, err := fs.Create(path) + if err != nil { + return err + } + _, err = f.Write([]byte(contents)) + if err != nil { + _ = f.Close() + return err + } + return f.Close() +} + // TestReplicaGCQueueDropReplicaOnScan verifies that the range GC queue // removes a range from a store that no longer should have a replica. func TestReplicaGCQueueDropReplicaGCOnScan(t *testing.T) { diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index 1250bd6e3721..37f7e87ae035 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -364,8 +364,12 @@ func TestCheckConsistencyInconsistent(t *testing.T) { assert.Contains(t, resp.Result[0].Detail, `stats`) // A death rattle should have been written on s2 (store index 1). - b, err := ioutil.ReadFile(base.PreventedStartupFile(mtc.stores[1].Engine().GetAuxiliaryDir())) + eng := mtc.stores[1].Engine() + f, err := eng.Open(base.PreventedStartupFile(eng.GetAuxiliaryDir())) require.NoError(t, err) + b, err := ioutil.ReadAll(f) + require.NoError(t, err) + require.NoError(t, f.Close()) require.NotEmpty(t, b) } diff --git a/pkg/kv/kvserver/replica_corruption.go b/pkg/kv/kvserver/replica_corruption.go index 47044897e237..a234e2cc4351 100644 --- a/pkg/kv/kvserver/replica_corruption.go +++ b/pkg/kv/kvserver/replica_corruption.go @@ -13,11 +13,10 @@ package kvserver import ( "context" "fmt" - "io/ioutil" - "os" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -57,7 +56,7 @@ func (r *Replica) setCorruptRaftMuLocked( r.mu.destroyStatus.Set(cErr, destroyReasonRemoved) auxDir := r.store.engine.GetAuxiliaryDir() - _ = os.MkdirAll(auxDir, 0755) + _ = r.store.engine.MkdirAll(auxDir) path := base.PreventedStartupFile(auxDir) preventStartupMsg := fmt.Sprintf(`ATTENTION: @@ -70,10 +69,23 @@ A file preventing this node from restarting was placed at: %s `, r, path) - if err := ioutil.WriteFile(path, []byte(preventStartupMsg), 0644); err != nil { + if err := writeFile(r.store.engine, path, preventStartupMsg); err != nil { log.Warningf(ctx, "%v", err) } log.FatalfDepth(ctx, 1, "replica is corrupted: %s", cErr) return roachpb.NewError(cErr) } + +func writeFile(fs fs.FS, path, contents string) error { + f, err := fs.Create(path) + if err != nil { + return err + } + _, err = f.Write([]byte(contents)) + if err != nil { + _ = f.Close() + return err + } + return f.Close() +} diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 51082d56082d..6adf3ddae9fe 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -13,8 +13,6 @@ package kvserver import ( "context" "fmt" - "io/ioutil" - "os" "path/filepath" "strings" "time" @@ -259,7 +257,7 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co // in a goroutine that's about to end, simply sleep for a few seconds // and then terminate. auxDir := r.store.engine.GetAuxiliaryDir() - _ = os.MkdirAll(auxDir, 0755) + _ = r.store.engine.MkdirAll(auxDir) path := base.PreventedStartupFile(auxDir) preventStartupMsg := fmt.Sprintf(`ATTENTION: @@ -276,7 +274,7 @@ A file preventing this node from restarting was placed at: %s `, r, auxDir, path) - if err := ioutil.WriteFile(path, []byte(preventStartupMsg), 0644); err != nil { + if err := writeFile(r.store.engine, path, preventStartupMsg); err != nil { log.Warningf(ctx, "%v", err) } @@ -566,15 +564,15 @@ func addSSTablePreApply( // TODO(tschottdorf): remove this once sideloaded storage guarantees its // existence. - if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { + if err := eng.MkdirAll(filepath.Dir(path)); err != nil { panic(err) } - if _, err := os.Stat(path); err == nil { + if _, err := eng.Stat(path); err == nil { // The file we want to ingest exists. This can happen since the // ingestion may apply twice (we ingest before we mark the Raft // command as committed). Just unlink the file (RocksDB created a // hard link); after that we're free to write it again. - if err := os.Remove(path); err != nil { + if err := eng.Remove(path); err != nil { log.Fatalf(ctx, "while removing existing file during ingestion of %s: %+v", path, err) } } diff --git a/pkg/kv/kvserver/replica_sideload_disk.go b/pkg/kv/kvserver/replica_sideload_disk.go index dc2d37fc8034..608a37abc2e4 100644 --- a/pkg/kv/kvserver/replica_sideload_disk.go +++ b/pkg/kv/kvserver/replica_sideload_disk.go @@ -61,8 +61,8 @@ func sideloadedPath(baseDir string, rangeID roachpb.RangeID) string { ) } -func exists(path string) (bool, error) { - _, err := os.Stat(path) +func exists(eng storage.Engine, path string) (bool, error) { + _, err := eng.Stat(path) if err == nil { return true, nil } @@ -96,15 +96,15 @@ func newDiskSideloadStorage( // ns on my laptop, but only around 2.2k ns on the gceworker. Still, // even on the laptop, 50k replicas would only add 1.2s which is also // acceptable given that it'll happen only once. - exists, err := exists(path) + exists, err := exists(eng, path) if err != nil { return nil, errors.Wrap(err, "checking pre-migration sideloaded directory") } if exists { - if err := os.MkdirAll(filepath.Dir(newPath), 0755); err != nil { + if err := eng.MkdirAll(filepath.Dir(newPath)); err != nil { return nil, errors.Wrap(err, "creating migrated sideloaded directory") } - if err := os.Rename(path, newPath); err != nil { + if err := eng.Rename(path, newPath); err != nil { return nil, errors.Wrap(err, "while migrating sideloaded directory") } } @@ -120,7 +120,7 @@ func newDiskSideloadStorage( } func (ss *diskSideloadStorage) createDir() error { - err := os.MkdirAll(ss.dir, 0755) + err := ss.eng.MkdirAll(ss.dir) ss.dirCreated = ss.dirCreated || err == nil return err } @@ -177,12 +177,7 @@ func (ss *diskSideloadStorage) Purge(ctx context.Context, index, term uint64) (i } func (ss *diskSideloadStorage) fileSize(filename string) (int64, error) { - // TODO(tschottdorf): this should all be done through the env. As written, - // the sizes returned here will be wrong if encryption is on. We want the - // size of the unencrypted payload. - // - // See #31913. - info, err := os.Stat(filename) + info, err := ss.eng.Stat(filename) if err != nil { if os.IsNotExist(err) { return 0, errSideloadedFileNotFound @@ -208,9 +203,7 @@ func (ss *diskSideloadStorage) purgeFile(ctx context.Context, filename string) ( // Clear implements SideloadStorage. func (ss *diskSideloadStorage) Clear(_ context.Context) error { - // TODO(jackson): Update this and the rest of `os.` filesystem calls in - // this impl to use ss.eng. - err := os.RemoveAll(ss.dir) + err := ss.eng.RemoveAll(ss.dir) ss.dirCreated = ss.dirCreated && err != nil return err } @@ -243,8 +236,8 @@ func (ss *diskSideloadStorage) TruncateTo( if deletedAll { // The directory may not exist, or it may exist and have been empty. // Not worth trying to figure out which one, just try to delete. - err := os.Remove(ss.dir) - if !os.IsNotExist(err) { + err := ss.eng.RemoveDir(ss.dir) + if err != nil && !os.IsNotExist(err) { return bytesFreed, 0, errors.Wrapf(err, "while purging %q", ss.dir) } } diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index 214ff6942083..d7bde532561b 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -280,7 +280,7 @@ func testSideloadingSideloadedStorage( // First add a file that shouldn't be in the sideloaded storage to ensure // sane behavior when directory can't be removed after full truncate. nonRemovableFile := filepath.Join(ss.(*diskSideloadStorage).dir, "cantremove.xx") - f, err := os.Create(nonRemovableFile) + f, err := eng.Create(nonRemovableFile) if err != nil { t.Fatalf("could not create non i*.t* file in sideloaded storage: %+v", err) } @@ -295,7 +295,7 @@ func testSideloadingSideloadedStorage( t.Fatalf("error truncating sideloaded storage: %+v", err) } // Now remove extra file and let truncation proceed to remove directory. - err = os.Remove(nonRemovableFile) + err = eng.Remove(nonRemovableFile) if err != nil { t.Fatalf("could not remove %s: %+v", nonRemovableFile, err) } @@ -305,7 +305,7 @@ func testSideloadingSideloadedStorage( t.Fatal(err) } // Ensure directory is removed, now that all files should be gone. - _, err = os.Stat(ss.(*diskSideloadStorage).dir) + _, err = eng.Stat(ss.(*diskSideloadStorage).dir) if err == nil { t.Fatalf("expected %q to be removed after truncating full range", ss.(*diskSideloadStorage).dir) } @@ -329,7 +329,7 @@ func testSideloadingSideloadedStorage( t.Fatal(err) } // Ensure directory is removed when all records are removed. - _, err = os.Stat(ss.(*diskSideloadStorage).dir) + _, err = eng.Stat(ss.(*diskSideloadStorage).dir) if err == nil { t.Fatalf("expected %q to be removed after truncating full range", ss.(*diskSideloadStorage).dir) } diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage.go b/pkg/kv/kvserver/replica_sst_snapshot_storage.go index 356e235e72e9..e636d12c2b15 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage.go @@ -13,7 +13,6 @@ package kvserver import ( "context" "fmt" - "os" "path/filepath" "strconv" @@ -57,7 +56,7 @@ func (s *SSTSnapshotStorage) NewScratchSpace( // Clear removes all created directories and SSTs. func (s *SSTSnapshotStorage) Clear() error { - return os.RemoveAll(s.dir) + return s.engine.RemoveAll(s.dir) } // SSTSnapshotStorageScratch keeps track of the SST files incrementally created @@ -75,12 +74,7 @@ func (s *SSTSnapshotStorageScratch) filename(id int) string { } func (s *SSTSnapshotStorageScratch) createDir() error { - // TODO(peter): The directory creation needs to be plumbed through the Engine - // interface. Right now, this is creating a directory on disk even when the - // Engine has an in-memory filesystem. The only reason everything still works - // is because RocksDB MemEnvs allow the creation of files when the parent - // directory doesn't exist. - err := os.MkdirAll(s.snapDir, 0755) + err := s.storage.engine.MkdirAll(s.snapDir) s.dirCreated = s.dirCreated || err == nil return err } @@ -132,7 +126,7 @@ func (s *SSTSnapshotStorageScratch) SSTs() []string { // Clear removes the directory and SSTs created for a particular snapshot. func (s *SSTSnapshotStorageScratch) Clear() error { - return os.RemoveAll(s.snapDir) + return s.storage.engine.RemoveAll(s.snapDir) } // SSTSnapshotStorageFile is an SST file managed by a diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go index cbe3de01a06c..3c52a4506389 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go @@ -40,7 +40,7 @@ func TestSSTSnapshotStorage(t *testing.T) { scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID) // Check that the storage lazily creates the directories on first write. - _, err := os.Stat(scratch.snapDir) + _, err := eng.Stat(scratch.snapDir) if !os.IsNotExist(err) { t.Fatalf("expected %s to not exist", scratch.snapDir) } @@ -56,7 +56,7 @@ func TestSSTSnapshotStorage(t *testing.T) { // Check that the storage lazily creates the files on write. for _, fileName := range scratch.SSTs() { - _, err := os.Stat(fileName) + _, err := eng.Stat(fileName) if !os.IsNotExist(err) { t.Fatalf("expected %s to not exist", fileName) } @@ -67,10 +67,12 @@ func TestSSTSnapshotStorage(t *testing.T) { // After writing to files, check that they have been flushed to disk. for _, fileName := range scratch.SSTs() { - require.FileExists(t, fileName) - data, err := ioutil.ReadFile(fileName) + f, err := eng.Open(fileName) + require.NoError(t, err) + data, err := ioutil.ReadAll(f) require.NoError(t, err) require.Equal(t, data, []byte("foo")) + require.NoError(t, f.Close()) } // Check that closing is idempotent. @@ -90,12 +92,12 @@ func TestSSTSnapshotStorage(t *testing.T) { // Check that Clear removes the directory. require.NoError(t, scratch.Clear()) - _, err = os.Stat(scratch.snapDir) + _, err = eng.Stat(scratch.snapDir) if !os.IsNotExist(err) { t.Fatalf("expected %s to not exist", scratch.snapDir) } require.NoError(t, sstSnapshotStorage.Clear()) - _, err = os.Stat(sstSnapshotStorage.dir) + _, err = eng.Stat(sstSnapshotStorage.dir) if !os.IsNotExist(err) { t.Fatalf("expected %s to not exist", sstSnapshotStorage.dir) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 7a7bf37d4b6e..6d4ae95a2c23 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -16,7 +16,6 @@ import ( "fmt" "math" "math/rand" - "os" "reflect" "regexp" "sort" @@ -6327,7 +6326,7 @@ func TestReplicaCorruption(t *testing.T) { } // Should have laid down marker file to prevent startup. - _, err := os.Stat(base.PreventedStartupFile(tc.engine.GetAuxiliaryDir())) + _, err := tc.engine.Stat(base.PreventedStartupFile(tc.engine.GetAuxiliaryDir())) require.NoError(t, err) // Should have triggered fatal error. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 6f69ce1f4be5..8b70c5e02229 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -15,7 +15,6 @@ import ( "context" "fmt" "math" - "os" "path/filepath" "runtime" "sort" @@ -2460,7 +2459,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { // is returned. func (s *Store) checkpoint(ctx context.Context, tag string) (string, error) { checkpointBase := filepath.Join(s.engine.GetAuxiliaryDir(), "checkpoints") - _ = os.MkdirAll(checkpointBase, 0700) + _ = s.engine.MkdirAll(checkpointBase) checkpointDir := filepath.Join(checkpointBase, tag) if err := s.engine.CreateCheckpoint(checkpointDir); err != nil { diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index 04749053c886..62f300fc20a2 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -1464,6 +1464,9 @@ func TestFS(t *testing.T) { // Create a/ and assert that it's empty. require.NoError(t, fs.MkdirAll(path("a"))) expectLS(path("a"), []string{}) + if _, err := fs.Stat(path("a/b/c")); !os.IsNotExist(err) { + t.Fatal(`fs.Stat("a/b/c") should not exist`) + } // Create a/b/ and a/b/c/ in a single MkdirAll call. // Then ensure that a duplicate call returns a nil error. @@ -1472,6 +1475,10 @@ func TestFS(t *testing.T) { expectLS(path("a"), []string{"b"}) expectLS(path("a/b"), []string{"c"}) expectLS(path("a/b/c"), []string{}) + finfo, err := fs.Stat(path("a/b/c")) + require.NoError(t, err) + require.Equal(t, "c", finfo.Name()) + require.True(t, finfo.IsDir()) // Create a file at a/b/c/foo. f, err := fs.Create(path("a/b/c/foo")) @@ -1484,6 +1491,9 @@ func TestFS(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) expectLS(path("a/b/c"), []string{"bar", "foo"}) + finfo, err = fs.Stat(path("a/b/c/bar")) + require.NoError(t, err) + require.Equal(t, "bar", finfo.Name()) // RemoveAll a file. require.NoError(t, fs.RemoveAll(path("a/b/c/bar"))) diff --git a/pkg/storage/fs/fs.go b/pkg/storage/fs/fs.go index 8c7712886079..724b7d23f7ae 100644 --- a/pkg/storage/fs/fs.go +++ b/pkg/storage/fs/fs.go @@ -10,7 +10,10 @@ package fs -import "io" +import ( + "io" + "os" +) // File and FS are a partial attempt at offering the Pebble vfs.FS interface. Given the constraints // of the RocksDB Env interface we've chosen to only include what is easy to implement. Additionally, @@ -65,4 +68,7 @@ type FS interface { // List returns a listing of the given directory. The names returned are // relative to the directory. List(name string) ([]string, error) + + // Stat returns a FileInfo describing the named file. + Stat(name string) (os.FileInfo, error) } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 458355b9dc78..82635aa0c919 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -472,24 +472,9 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (*Pebble, error) { } } - var auxDir string - if cfg.Dir == "" { - // TODO(peter): This is horribly hacky but matches what RocksDB does. For - // in-memory instances, we create an on-disk auxiliary directory. This is - // necessary because various tests expect the auxiliary directory to - // actually exist on disk even though they don't actually write files to - // the directory. See SSTSnapshotStorage for one example of this bad - // behavior. - var err error - auxDir, err = ioutil.TempDir(os.TempDir(), "cockroach-auxiliary") - if err != nil { - return nil, err - } - } else { - auxDir = cfg.Opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir) - if err := cfg.Opts.FS.MkdirAll(auxDir, 0755); err != nil { - return nil, err - } + auxDir := cfg.Opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir) + if err := cfg.Opts.FS.MkdirAll(auxDir, 0755); err != nil { + return nil, err } fileRegistry, statsHandler, err := ResolveEncryptedEnvOptions(&cfg) @@ -580,18 +565,6 @@ func (p *Pebble) Close() { return } p.closed = true - - if p.path == "" { - // Remove the temporary directory when the engine is in-memory. This - // matches the RocksDB behavior. - // - // TODO(peter): The aux-dir shouldn't be on-disk for in-memory - // engines. This is just a wart that needs to be removed. - if err := os.RemoveAll(p.auxDir); err != nil { - p.logger.Infof("%v", err) - } - } - _ = p.db.Close() } @@ -986,7 +959,13 @@ func (p *Pebble) Remove(filename string) error { // RemoveAll implements the Engine interface. func (p *Pebble) RemoveAll(dir string) error { - return p.fs.RemoveAll(dir) + err := p.fs.RemoveAll(dir) + // TODO(jackson): remove this once the vfs.MemFS RemoveAll impl is fixed + // (before this PR lands). + if os.IsNotExist(err) { + return nil + } + return err } // Link implements the FS interface. @@ -1058,6 +1037,11 @@ func (p *Pebble) List(name string) ([]string, error) { return dirents, err } +// Stat implements the FS interface. +func (p *Pebble) Stat(name string) (os.FileInfo, error) { + return p.fs.Stat(name) +} + // CreateCheckpoint implements the Engine interface. func (p *Pebble) CreateCheckpoint(dir string) error { return p.db.Checkpoint(dir) diff --git a/pkg/storage/rocksdb.go b/pkg/storage/rocksdb.go index 472058927f7c..21c7d2c4c686 100644 --- a/pkg/storage/rocksdb.go +++ b/pkg/storage/rocksdb.go @@ -14,7 +14,7 @@ import ( "bytes" "context" "fmt" - "io/ioutil" + "io" "math" "os" "path/filepath" @@ -527,14 +527,10 @@ func NewRocksDB(cfg RocksDBConfig, cache RocksDBCache) (*RocksDB, error) { } r := &RocksDB{ - cfg: cfg, - cache: cache.ref(), + cfg: cfg, + cache: cache.ref(), + auxDir: filepath.Join(cfg.Dir, base.AuxiliaryDir), } - - if err := r.setAuxiliaryDir(filepath.Join(cfg.Dir, base.AuxiliaryDir)); err != nil { - return nil, err - } - if err := r.open(); err != nil { return nil, err } @@ -565,25 +561,12 @@ func newMemRocksDB(attrs roachpb.Attributes, cache RocksDBCache, maxSize int64) }, }, // dir: empty dir == "mem" RocksDB instance. - cache: cache.ref(), - } - - // TODO(peter): This is bizarre. We're creating on on-disk temporary - // directory for an in-memory filesystem. The reason this is done is because - // various users of the auxiliary directory use the os.* routines (which is - // invalid!). This needs to be cleaned up. - auxDir, err := ioutil.TempDir(os.TempDir(), "cockroach-auxiliary") - if err != nil { - return nil, err - } - if err := r.setAuxiliaryDir(auxDir); err != nil { - return nil, err + cache: cache.ref(), + auxDir: "cockroach-auxiliary", } - if err := r.open(); err != nil { return nil, err } - return r, nil } @@ -643,7 +626,11 @@ func (r *RocksDB) open() error { if r.cfg.MaxOpenFiles != 0 { maxOpenFiles = r.cfg.MaxOpenFiles } - + if r.cfg.Dir != "" { + if err := os.MkdirAll(r.cfg.Dir, os.ModePerm); err != nil { + return err + } + } status := C.DBOpen(&r.rdb, goToCSlice([]byte(r.cfg.Dir)), C.DBOptions{ cache: r.cache.cache, @@ -666,6 +653,13 @@ func (r *RocksDB) open() error { } } + // Create the auxiliary directory if necessary. + if !r.cfg.ReadOnly { + if err := r.MkdirAll(r.auxDir); err != nil { + return err + } + } + r.commit.cond.L = &r.commit.Mutex r.syncer.cond.L = &r.syncer.Mutex r.iters.m = make(map[*rocksDBIterator][]byte) @@ -673,6 +667,7 @@ func (r *RocksDB) open() error { // NB: The sync goroutine acts as a check that the RocksDB instance was // properly closed as the goroutine will leak otherwise. go r.syncLoop() + return nil } @@ -738,10 +733,6 @@ func (r *RocksDB) Close() { if log.V(1) { log.Infof(context.TODO(), "closing in-memory rocksdb instance") } - // Remove the temporary directory when the engine is in-memory. - if err := os.RemoveAll(r.auxDir); err != nil { - log.Warningf(context.TODO(), "%v", err) - } } else { log.Infof(context.TODO(), "closing rocksdb instance at %q", r.cfg.Dir) } @@ -3206,16 +3197,6 @@ func (r *RocksDB) GetAuxiliaryDir() string { return r.auxDir } -func (r *RocksDB) setAuxiliaryDir(d string) error { - if !r.cfg.ReadOnly { - if err := os.MkdirAll(d, 0755); err != nil { - return err - } - } - r.auxDir = d - return nil -} - // PreIngestDelay implements the Engine interface. func (r *RocksDB) PreIngestDelay(ctx context.Context) { preIngestDelay(ctx, r, r.cfg.Settings) @@ -3374,6 +3355,9 @@ func MVCCScanDecodeKeyValues(repr [][]byte, fn func(key MVCCKey, rawBytes []byte } func notFoundErrOrDefault(err error) error { + if err == nil { + return nil + } errStr := err.Error() if strings.Contains(errStr, "No such") || strings.Contains(errStr, "not found") || @@ -3456,6 +3440,11 @@ func (f *rocksdbReadableFile) Read(p []byte) (n int, err error) { func (f *rocksdbReadableFile) ReadAt(p []byte, off int64) (int, error) { var n C.int err := statusToError(C.DBEnvReadAtFile(f.rdb, f.file, goToCSlice(p), C.int64_t(off), &n)) + // The io.ReaderAt interface requires implementations to return a non-nil + // error if fewer than len(p) bytes are read. + if int(n) < len(p) { + err = io.EOF + } return int(n), err } @@ -3557,7 +3546,7 @@ func (r *RocksDB) MkdirAll(path string) error { // RemoveDir implements the FS interface. func (r *RocksDB) RemoveDir(name string) error { - return statusToError(C.DBEnvDeleteDir(r.rdb, goToCSlice([]byte(name)))) + return notFoundErrOrDefault(statusToError(C.DBEnvDeleteDir(r.rdb, goToCSlice([]byte(name))))) } // List implements the FS interface. @@ -3594,6 +3583,39 @@ func (r *RocksDB) List(name string) ([]string, error) { return result, err } +// Stat implements the FS interface. +func (r *RocksDB) Stat(name string) (os.FileInfo, error) { + // The RocksDB Env doesn't expose a Stat equivalent. If we're using an + // on-disk filesystem, circumvent the Env and return the os.Stat results. + // The file sizes of encrypted files might be off. + if r.cfg.Dir != "" { + return os.Stat(name) + } + + // Otherwise, construct what we can. We don't know whether the path + // names a directory or a file, so try both. + if _, listErr := r.List(name); listErr == nil { + return rocksDBFileInfo{name: name, isDir: true}, nil + } + f, err := r.Open(name) + if err != nil { + return nil, err + } + return rocksDBFileInfo{name: name, isDir: false}, f.Close() +} + +type rocksDBFileInfo struct { + name string + isDir bool +} + +func (fi rocksDBFileInfo) Name() string { return filepath.Base(fi.name) } +func (fi rocksDBFileInfo) Size() int64 { return 64 } +func (fi rocksDBFileInfo) Mode() os.FileMode { return os.ModePerm } +func (fi rocksDBFileInfo) ModTime() time.Time { return time.Time{} } +func (fi rocksDBFileInfo) IsDir() bool { return fi.isDir } +func (fi rocksDBFileInfo) Sys() interface{} { return nil } + // ThreadStacks returns the stacks for all threads. The stacks are raw // addresses, and do not contain symbols. Use addr2line (or atos on Darwin) to // symbolize. diff --git a/pkg/storage/tee.go b/pkg/storage/tee.go index 3f02f962331d..1e83eed05927 100644 --- a/pkg/storage/tee.go +++ b/pkg/storage/tee.go @@ -580,6 +580,21 @@ func (t *TeeEngine) List(name string) ([]string, error) { return list1, nil } +// Stat implements the FS interface. +func (t *TeeEngine) Stat(name string) (os.FileInfo, error) { + info1, err := t.eng1.Stat(name) + name2, ok := t.remapPath(name) + if !ok { + return info1, err + } + _, err2 := t.eng2.Stat(name2) + if err = fatalOnErrorMismatch(t.ctx, err, err2); err != nil { + return nil, err + } + // TODO(jackson): compare the FileInfos? + return info1, err +} + // CreateCheckpoint implements the Engine interface. func (t *TeeEngine) CreateCheckpoint(dir string) error { path1 := filepath.Join(dir, "eng1")