Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: extend snapshot copy to filesystems that cannot link #22703

Merged
merged 3 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions cmd/influxd/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/influxdata/influxdb/cmd/influxd/backup_util"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/services/snapshotter"
"github.com/influxdata/influxdb/tcp"
gzip "github.com/klauspost/pgzip"
Expand Down Expand Up @@ -390,7 +391,7 @@ func (cmd *Command) backupResponsePaths(response *snapshotter.Response) error {

// backupMetastore will backup the whole metastore on the host to the backup path
// if useDB is non-empty, it will backup metadata only for the named database.
func (cmd *Command) backupMetastore() error {
func (cmd *Command) backupMetastore() (retErr error) {
metastoreArchivePath, err := cmd.nextPath(filepath.Join(cmd.path, backup_util.Metafile))
if err != nil {
return err
Expand All @@ -402,12 +403,12 @@ func (cmd *Command) backupMetastore() error {
Type: snapshotter.RequestMetastoreBackup,
}

err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) error {
err = cmd.downloadAndVerify(req, metastoreArchivePath, func(file string) (rErr error) {
f, err := os.Open(file)
if err != nil {
return err
}
defer f.Close()
defer errors2.Capture(&rErr, f.Close)()

var magicByte [8]byte
n, err := io.ReadFull(f, magicByte[:])
Expand Down Expand Up @@ -438,7 +439,7 @@ func (cmd *Command) backupMetastore() error {

if cmd.portable {
metaBytes, err := backup_util.GetMetaBytes(metastoreArchivePath)
defer os.Remove(metastoreArchivePath)
defer errors2.Capture(&retErr, func() error { return os.Remove(metastoreArchivePath) })()
if err != nil {
return err
}
Expand Down Expand Up @@ -510,13 +511,13 @@ func (cmd *Command) downloadAndVerify(req *snapshotter.Request, path string, val
}

// download downloads a snapshot of either the metastore or a shard from a host to a given path.
func (cmd *Command) download(req *snapshotter.Request, path string) error {
func (cmd *Command) download(req *snapshotter.Request, path string) (retErr error) {
// Create local file to write to.
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("open temp file: %s", err)
}
defer f.Close()
defer errors2.Capture(&retErr, f.Close)()

min := 2 * time.Second
for i := 0; i < 10; i++ {
Expand All @@ -526,7 +527,7 @@ func (cmd *Command) download(req *snapshotter.Request, path string) error {
if err != nil {
return err
}
defer conn.Close()
defer errors2.Capture(&retErr, conn.Close)()

_, err = conn.Write([]byte{byte(req.Type)})
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/influxd/backup_util/backup_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync/atomic"

internal "github.com/influxdata/influxdb/cmd/influxd/backup_util/internal"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/services/snapshotter"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -55,11 +56,12 @@ func (ep *PortablePacker) UnmarshalBinary(data []byte) error {
return nil
}

func GetMetaBytes(fname string) ([]byte, error) {
func GetMetaBytes(fname string) (_ []byte, retErr error) {
f, err := os.Open(fname)
if err != nil {
return []byte{}, err
}
defer errors2.Capture(&retErr, f.Close)()

var buf bytes.Buffer
if _, err := io.Copy(&buf, f); err != nil {
Expand Down
17 changes: 0 additions & 17 deletions tsdb/engine/tsm1/copy_or_link_unix.go

This file was deleted.

46 changes: 0 additions & 46 deletions tsdb/engine/tsm1/copy_or_link_windows.go

This file was deleted.

86 changes: 84 additions & 2 deletions tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"os"
Expand All @@ -15,6 +16,7 @@ import (
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/influxdata/influxdb/models"
Expand Down Expand Up @@ -193,6 +195,8 @@ type FileStore struct {
parseFileName ParseFileNameFunc

obs tsdb.FileStoreObserver

copyFiles bool
}

// FileStat holds information about a TSM file on disk.
Expand Down Expand Up @@ -244,6 +248,7 @@ func NewFileStore(dir string) *FileStore {
},
obs: noFileStoreObserver{},
parseFileName: DefaultParseFileName,
copyFiles: runtime.GOOS == "windows",
}
fs.purger.fileStore = fs
return fs
Expand Down Expand Up @@ -1072,19 +1077,96 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
func (f *FileStore) MakeSnapshotLinks(destPath string, files []TSMFile) (returnErr error) {
for _, tsmf := range files {
newpath := filepath.Join(destPath, filepath.Base(tsmf.Path()))
if err := copyOrLink(tsmf.Path(), newpath); err != nil {
err := f.copyOrLink(tsmf.Path(), newpath)
if err != nil {
return err
}
if tf := tsmf.TombstoneStats(); tf.TombstoneExists {
newpath := filepath.Join(destPath, filepath.Base(tf.Path))
if err := copyOrLink(tf.Path, newpath); err != nil {
err := f.copyOrLink(tf.Path, newpath)
if err != nil {
return err
}
}
}
return nil
}

func (f *FileStore) copyOrLink(oldpath string, newpath string) error {
if f.copyFiles {
f.logger.Info("copying backup snapshots", zap.String("OldPath", oldpath), zap.String("NewPath", newpath))
if err := f.copyNotLink(oldpath, newpath); err != nil {
return err
}
} else {
f.logger.Info("linking backup snapshots", zap.String("OldPath", oldpath), zap.String("NewPath", newpath))
if err := f.linkNotCopy(oldpath, newpath); err != nil {
return err
}
}
return nil
}

// copyNotLink - use file copies instead of hard links for 2 scenarios:
// Windows does not permit deleting a file with open file handles
// Azure does not support hard links in its default file system
func (f *FileStore) copyNotLink(oldPath, newPath string) (returnErr error) {
Copy link
Contributor

@lesam lesam Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function comment should follow godoc format (I don't think the extra whitespace line is correct?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the sort of thing I expect go fmt to catch, but clearly not the case....

rfd, err := os.Open(oldPath)
if err != nil {
return fmt.Errorf("error opening file for backup %s: %q", oldPath, err)
} else {
defer func() {
if e := rfd.Close(); returnErr == nil && e != nil {
returnErr = fmt.Errorf("error closing source file for backup %s: %w", oldPath, e)
}
}()
}
fi, err := rfd.Stat()
if err != nil {
return fmt.Errorf("error collecting statistics from file for backup %s: %w", oldPath, err)
}
wfd, err := os.OpenFile(newPath, os.O_RDWR|os.O_CREATE, fi.Mode())
if err != nil {
return fmt.Errorf("error creating temporary file for backup %s: %w", newPath, err)
} else {
defer func() {
if e := wfd.Close(); returnErr == nil && e != nil {
returnErr = fmt.Errorf("error closing temporary file for backup %s: %w", newPath, e)
}
}()
}
if _, err := io.Copy(wfd, rfd); err != nil {
return fmt.Errorf("unable to copy file for backup from %s to %s: %w", oldPath, newPath, err)
}
if err := os.Chtimes(newPath, fi.ModTime(), fi.ModTime()); err != nil {
return fmt.Errorf("unable to set modification time on temporary backup file %s: %w", newPath, err)
}
return nil
}

// linkNotCopy - use hard links for backup snapshots
func (f *FileStore) linkNotCopy(oldPath, newPath string) error {
if err := os.Link(oldPath, newPath); err != nil {
if errors.Is(err, syscall.ENOTSUP) {
if fi, e := os.Stat(oldPath); e == nil && !fi.IsDir() {
f.logger.Info("file system does not support hard links, switching to copies for backup", zap.String("OldPath", oldPath), zap.String("NewPath", newPath))
// Force future snapshots to copy
f.copyFiles = true
return f.copyNotLink(oldPath, newPath)
} else if e != nil {
// Stat failed
return fmt.Errorf("error creating hard link for backup, cannot determine if %s is a file or directory: %w", oldPath, e)
} else {
return fmt.Errorf("error creating hard link for backup - %s is a directory, not a file: %q", oldPath, err)
}
} else {
return fmt.Errorf("error creating hard link for backup from %s to %s: %w", oldPath, newPath, err)
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the else? Just return nil at the end of the function would be clearer I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I am an old C coder with damaged neurons.

return nil
}
}

// CreateSnapshot creates hardlinks for all tsm and tombstone files
// in the path provided.
func (f *FileStore) CreateSnapshot() (string, error) {
Expand Down