Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: unify storage/fs.FS and pebble/vfs.FS #98776

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ccl/cliccl/ear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"crypto/rand"
"fmt"
"path/filepath"
"sort"
"strings"
"testing"

Expand Down Expand Up @@ -56,6 +57,7 @@ func TestDecrypt(t *testing.T) {
// Find a manifest file to check.
files, err := p.List(dir)
require.NoError(t, err)
sort.Strings(files)
var manifestPath string
for _, basename := range files {
if strings.HasPrefix(basename, "MANIFEST-") {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//objstorage",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_google_btree//:btree",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/client_replica_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver_test

import (
"context"
"os"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
if dir == "" {
t.Fatal("no sideloaded directory")
}
if err := eng.MkdirAll(dir); err != nil {
if err := eng.MkdirAll(dir, os.ModePerm); err != nil {
t.Fatal(err)
}
if err := fs.WriteFile(eng, filepath.Join(dir, "i1000000.t100000"), []byte("foo")); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvserverbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/storage/fs",
"//pkg/util/errorutil",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
Expand All @@ -30,6 +29,7 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@org_golang_x_time//rate",
],
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvserverbase/syncing_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/vfs"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -82,7 +82,7 @@ func WriteFileSyncing(
ctx context.Context,
filename string,
data []byte,
fs fs.FS,
fs vfs.FS,
perm os.FileMode,
settings *cluster.Settings,
limiter *rate.Limiter,
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/logstore/sideload_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package logstore
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -75,7 +76,7 @@ func NewDiskSideloadStorage(
}

func (ss *DiskSideloadStorage) createDir() error {
err := ss.eng.MkdirAll(ss.dir)
err := ss.eng.MkdirAll(ss.dir, os.ModePerm)
ss.dirCreated = ss.dirCreated || err == nil
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"crypto/sha512"
"encoding/binary"
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -741,7 +742,7 @@ func (r *Replica) computeChecksumPostApply(
// certain of completing the check. Since we're already in a goroutine
// that's about to end, just sleep for a few seconds and then terminate.
auxDir := r.store.TODOEngine().GetAuxiliaryDir()
_ = r.store.TODOEngine().MkdirAll(auxDir)
_ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm)
path := base.PreventedStartupFile(auxDir)

const attentionFmt = `ATTENTION:
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_corruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver
import (
"context"
"fmt"
"os"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -50,7 +51,7 @@ func (r *Replica) setCorruptRaftMuLocked(
r.mu.destroyStatus.Set(cErr, destroyReasonRemoved)

auxDir := r.store.TODOEngine().GetAuxiliaryDir()
_ = r.store.TODOEngine().MkdirAll(auxDir)
_ = r.store.TODOEngine().MkdirAll(auxDir, os.ModePerm)
path := base.PreventedStartupFile(auxDir)

preventStartupMsg := fmt.Sprintf(`ATTENTION:
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package kvserver

import (
"context"
"os"
"path/filepath"
"time"
"unsafe"
Expand Down Expand Up @@ -561,7 +562,7 @@ func addSSTablePreApply(

// TODO(tschottdorf): remove this once sideloaded storage guarantees its
// existence.
if err := eng.MkdirAll(filepath.Dir(ingestPath)); err != nil {
if err := eng.MkdirAll(filepath.Dir(ingestPath), os.ModePerm); err != nil {
panic(err)
}
if _, err := eng.Stat(ingestPath); err == nil {
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvserver
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"

Expand All @@ -24,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/vfs"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -113,7 +115,7 @@ func (s *SSTSnapshotStorageScratch) filename(id int) string {
}

func (s *SSTSnapshotStorageScratch) createDir() error {
err := s.storage.engine.MkdirAll(s.snapDir)
err := s.storage.engine.MkdirAll(s.snapDir, os.ModePerm)
s.dirCreated = s.dirCreated || err == nil
return err
}
Expand Down Expand Up @@ -182,7 +184,7 @@ func (s *SSTSnapshotStorageScratch) Close() error {
type SSTSnapshotStorageFile struct {
scratch *SSTSnapshotStorageScratch
created bool
file fs.File
file vfs.File
filename string
ctx context.Context
bytesPerSync int64
Expand All @@ -207,7 +209,7 @@ func (f *SSTSnapshotStorageFile) ensureFile() error {
}
var err error
if f.bytesPerSync > 0 {
f.file, err = f.scratch.storage.engine.CreateWithSync(f.filename, int(f.bytesPerSync))
f.file, err = fs.CreateWithSync(f.scratch.storage.engine, f.filename, int(f.bytesPerSync))
} else {
f.file, err = f.scratch.storage.engine.Create(f.filename)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"runtime"
"sort"
Expand Down Expand Up @@ -3083,7 +3084,7 @@ func (s *Store) checkpointSpans(desc *roachpb.RangeDescriptor) []roachpb.Span {
// the provided key spans. If spans is empty, it includes the entire store.
func (s *Store) checkpoint(tag string, spans []roachpb.Span) (string, error) {
checkpointBase := s.checkpointsDir()
_ = s.TODOEngine().MkdirAll(checkpointBase)
_ = s.TODOEngine().MkdirAll(checkpointBase, os.ModePerm)
// Create the checkpoint in a "pending" directory first. If we fail midway, it
// should be clear that the directory contains an incomplete checkpoint.
pendingDir := filepath.Join(checkpointBase, tag+"_pending")
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colcontainer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/util/mon",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_golang_snappy//:snappy",
"@com_github_marusama_semaphore//:semaphore",
],
Expand All @@ -46,14 +47,14 @@ go_test(
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/types",
"//pkg/storage/fs",
"//pkg/testutils/colcontainerutils",
"//pkg/testutils/skip",
"//pkg/util/humanizeutil",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_marusama_semaphore//:semaphore",
"@com_github_stretchr_testify//require",
],
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/colcontainer/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"io"
"os"
"path/filepath"
"strconv"

Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/vfs"
"github.com/golang/snappy"
)

Expand Down Expand Up @@ -181,15 +183,15 @@ type diskQueue struct {
// written before a compress and flush.
writeBufferLimit int
writeFileIdx int
writeFile fs.File
writeFile vfs.File
deserializerState struct {
*colserde.FileDeserializer
curBatch int
}
// readFileIdx is an index into the current file in files the deserializer is
// reading from.
readFileIdx int
readFile fs.File
readFile vfs.File
scratchDecompressedReadBytes []byte

diskAcc *mon.BoundAccount
Expand Down Expand Up @@ -298,7 +300,7 @@ func GetPatherFunc(f func(ctx context.Context) string) GetPather {
// DiskQueueCfg is a struct holding the configuration options for a DiskQueue.
type DiskQueueCfg struct {
// FS is the filesystem interface to use.
FS fs.FS
FS vfs.FS
// GetPather returns where the temporary directory that will contain this
// DiskQueue's files has been created. The directory name will be a UUID.
// Note that the directory is created lazily on the first call to GetPath.
Expand Down Expand Up @@ -411,7 +413,7 @@ func newDiskQueue(
if d.cfg.CacheMode != DiskQueueCacheModeIntertwinedCalls {
d.writeBufferLimit = d.cfg.BufferSizeBytes / 2
}
if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName)); err != nil {
if err := cfg.FS.MkdirAll(filepath.Join(cfg.GetPather.GetPath(ctx), d.dirName), os.ModePerm); err != nil {
return nil, err
}
// rotateFile will create a new file to write to.
Expand Down Expand Up @@ -492,7 +494,7 @@ func (d *diskQueue) Close(ctx context.Context) error {
// to write to.
func (d *diskQueue) rotateFile(ctx context.Context) error {
fName := filepath.Join(d.cfg.GetPather.GetPath(ctx), d.dirName, strconv.Itoa(d.seqNo))
f, err := d.cfg.FS.CreateWithSync(fName, bytesPerSync)
f, err := fs.CreateWithSync(d.cfg.FS, fName, bytesPerSync)
if err != nil {
return err
}
Expand Down Expand Up @@ -527,7 +529,7 @@ func (d *diskQueue) rotateFile(ctx context.Context) error {
return nil
}

func (d *diskQueue) resetWriters(f fs.File) error {
func (d *diskQueue) resetWriters(f vfs.File) error {
d.writer.reset(f)
return d.serializer.Reset(d.writer)
}
Expand Down
19 changes: 5 additions & 14 deletions pkg/sql/colcontainer/partitionedqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/pebble/vfs"
"github.com/marusama/semaphore"
"github.com/stretchr/testify/require"
)

type fdCountingFSFile struct {
fs.File
vfs.File
onCloseCb func()
}

Expand All @@ -41,7 +41,7 @@ func (f *fdCountingFSFile) Close() error {
}

type fdCountingFS struct {
fs.FS
vfs.FS
writeFDs int
readFDs int
}
Expand All @@ -58,7 +58,7 @@ func (f *fdCountingFS) assertOpenFDs(
require.Equal(t, expectedReadFDs, f.readFDs)
}

func (f *fdCountingFS) Create(name string) (fs.File, error) {
func (f *fdCountingFS) Create(name string) (vfs.File, error) {
file, err := f.FS.Create(name)
if err != nil {
return nil, err
Expand All @@ -67,16 +67,7 @@ func (f *fdCountingFS) Create(name string) (fs.File, error) {
return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil
}

func (f *fdCountingFS) CreateWithSync(name string, bytesPerSync int) (fs.File, error) {
file, err := f.FS.CreateWithSync(name, bytesPerSync)
if err != nil {
return nil, err
}
f.writeFDs++
return &fdCountingFSFile{File: file, onCloseCb: func() { f.writeFDs-- }}, nil
}

func (f *fdCountingFS) Open(name string) (fs.File, error) {
func (f *fdCountingFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) {
file, err := f.FS.Open(name)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colflow

import (
"context"
"os"
"path/filepath"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -326,7 +327,7 @@ func (f *vectorizedFlow) GetPath(ctx context.Context) string {
tempDirName := f.GetID().String()
f.tempStorage.path = filepath.Join(f.Cfg.TempStoragePath, tempDirName)
log.VEventf(ctx, 1, "flow %s spilled to disk, stack trace: %s", f.ID, util.GetSmallTrace(2))
if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path); err != nil {
if err := f.Cfg.TempFS.MkdirAll(f.tempStorage.path, os.ModePerm); err != nil {
colexecerror.InternalError(errors.Wrap(err, "unable to create temporary storage directory"))
}
// We have just created the temporary directory which will be used for all
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colflow/vectorized_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colflow

import (
"context"
"os"
"path/filepath"
"sync"
"testing"
Expand Down Expand Up @@ -364,12 +365,12 @@ func TestVectorizedFlowTempDirectory(t *testing.T) {
errCh := make(chan error)
go func() {
createTempDir(ctx)
errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async"))
errCh <- ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "async"), os.ModePerm)
}()
createTempDir(ctx)
// Both goroutines should be able to create their subdirectories within the
// flow's temporary directory.
require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine")))
require.NoError(t, ngn.MkdirAll(filepath.Join(vf.GetPath(ctx), "main_goroutine"), os.ModePerm))
require.NoError(t, <-errCh)
vf.Cleanup(ctx)
checkDirs(t, 0)
Expand Down
Loading