Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mount: restart mount process gracefully #4392

Merged
merged 27 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 78 additions & 137 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,21 @@ package cmd

import (
"bufio"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"path"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"syscall"
"time"

"github.com/juicedata/juicefs/pkg/object"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -81,29 +77,10 @@ $ juicefs mount redis://localhost /mnt/jfs -d --read-only

# Disable metadata backup
$ juicefs mount redis://localhost /mnt/jfs --backup-meta 0`,
Flags: expandFlags(mount_flags(), clientFlags(1.0), shareInfoFlags()),
Flags: expandFlags(mountFlags(), clientFlags(1.0), shareInfoFlags()),
}
}

func installHandler(mp string) {
// Go will catch all the signals
signal.Ignore(syscall.SIGPIPE)
signalChan := make(chan os.Signal, 10)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
go func() {
for {
sig := <-signalChan
logger.Infof("Received signal %s, exiting...", sig.String())
go func() { _ = doUmount(mp, true) }()
go func() {
time.Sleep(time.Second * 3)
logger.Warnf("Umount not finished after 3 seconds, force exit")
os.Exit(1)
}()
}
}()
}

func exposeMetrics(c *cli.Context, registerer prometheus.Registerer, registry *prometheus.Registry) string {
var ip, port string
//default set
Expand Down Expand Up @@ -207,7 +184,7 @@ func updateFormat(c *cli.Context) func(*meta.Format) {
}
}

func daemonRun(c *cli.Context, addr string, vfsConf *vfs.Config, m meta.Meta) {
func daemonRun(c *cli.Context, addr string, vfsConf *vfs.Config) {
if runtime.GOOS != "windows" {
if cd := c.String("cache-dir"); cd != "memory" {
ds := utils.SplitDir(cd)
Expand Down Expand Up @@ -238,7 +215,7 @@ func daemonRun(c *cli.Context, addr string, vfsConf *vfs.Config, m meta.Meta) {
_ = expandPathForEmbedded(addr)
// The default log to syslog is only in daemon mode.
utils.InitLoggers(!c.Bool("no-syslog"))
err := makeDaemon(c, vfsConf.Format.Name, vfsConf.Meta.MountPoint, m)
err := makeDaemon(c, vfsConf)
if err != nil {
logger.Fatalf("Failed to make daemon: %s", err)
}
Expand Down Expand Up @@ -276,6 +253,8 @@ func getVfsConf(c *cli.Context, metaConf *meta.Config, format *meta.Format, chun
BackupMeta: duration(c.String("backup-meta")),
Port: &vfs.Port{DebugAgent: debugAgent, PyroscopeAddr: c.String("pyroscope")},
PrefixInternal: c.Bool("prefix-internal"),
Pid: os.Getpid(),
PPid: os.Getppid(),
}
skip_check := os.Getenv("SKIP_BACKUP_META_CHECK") == "true"
if !skip_check && cfg.BackupMeta > 0 && cfg.BackupMeta < time.Minute*5 {
Expand All @@ -293,31 +272,6 @@ func registerMetaMsg(m meta.Meta, store chunk.ChunkStore, chunkConf *chunk.Confi
})
}

func configEqual(a, b *vfs.Config) bool {
if a == nil || b == nil {
return a == b
}

ac, bc := *a, *b
ac.Meta, ac.Chunk, ac.Port, ac.Format.SecretKey, ac.AttrTimeout, ac.DirEntryTimeout, ac.EntryTimeout = nil, nil, nil, "", 0, 0, 0
bc.Meta, bc.Chunk, bc.Port, bc.Format.SecretKey, bc.AttrTimeout, bc.DirEntryTimeout, bc.EntryTimeout = nil, nil, nil, "", 0, 0, 0
eq := ac == bc

if a.Meta == nil || b.Meta == nil {
eq = eq && a.Meta == b.Meta
} else {
eq = eq && *a.Meta == *b.Meta
}

if a.Chunk == nil || b.Chunk == nil {
eq = eq && a.Chunk == b.Chunk
} else {
eq = eq && *a.Chunk == *b.Chunk
}

return eq
}

func readConfig(mp string) ([]byte, error) {
contents, err := os.ReadFile(filepath.Join(mp, ".jfs.config"))
if os.IsNotExist(err) {
Expand All @@ -326,56 +280,6 @@ func readConfig(mp string) ([]byte, error) {
return contents, err
}

func prepareMp(newCfg *vfs.Config, mp string) (ignore bool) {
fi, err := os.Stat(mp)
if err != nil {
if strings.Contains(mp, ":") {
// Windows path, users should inspect mount point by themselves
return
}
if err := os.MkdirAll(mp, 0777); err != nil {
if os.IsExist(err) {
// a broken mount point, umount it and continue to mount
_ = doUmount(mp, true)
return
}
logger.Fatalf("create %s: %s", mp, err)
}
return
}
if fi.Size() == 0 {
// a broken mount point, umount it and continue to mount
_ = doUmount(mp, true)
return
}

ino, _ := utils.GetFileInode(mp)
if ino != uint64(meta.RootInode) {
// not a mount point, just mount it
return
}

contents, err := readConfig(mp)
if err != nil {
// failed to read juicefs config, continue to mount
return
}

originConfig := vfs.Config{}
if err = json.Unmarshal(contents, &originConfig); err != nil {
// not a valid juicefs config, continue to mount
return
}

if !configEqual(newCfg, &originConfig) {
// not the same juicefs, continue to mount
return
}

logger.Warnf("%s is already mounted by the same juicefs, ignored", mp)
return true
}

func getMetaConf(c *cli.Context, mp string, readOnly bool) *meta.Config {
conf := meta.DefaultConf()
conf.Retries = c.Int("io-retries")
Expand All @@ -389,6 +293,7 @@ func getMetaConf(c *cli.Context, mp string, readOnly bool) *meta.Config {
conf.MountPoint = mp
conf.Subdir = c.String("subdir")
conf.SkipDirMtime = duration(c.String("skip-dir-mtime"))
conf.Sid, _ = strconv.ParseUint(os.Getenv("_JFS_META_SID"), 10, 64)

atimeMode := c.String("atime-mode")
if atimeMode != meta.RelAtime && atimeMode != meta.StrictAtime && atimeMode != meta.NoAtime {
Expand Down Expand Up @@ -626,17 +531,83 @@ func mount(c *cli.Context) error {
addr := c.Args().Get(0)
mp := c.Args().Get(1)

// __DAEMON_STAGE env is set by the godaemon.MakeDaemon function
supervisor := os.Getenv("JFS_SUPERVISOR")
inFirstProcess := supervisor == "test" || supervisor == "" && os.Getenv("__DAEMON_STAGE") == ""
if inFirstProcess {
var err error
err = utils.WithTimeout(func() error {
mp, err = filepath.Abs(mp)
return err
}, time.Second*3)
if err != nil {
logger.Fatalf("abs %s: %s", mp, err)
}
if mp == "/" {
logger.Fatalf("should not mount on the root directory")
}
prepareMp(mp)
if c.Bool("update-fstab") && !calledViaMount(os.Args) && !insideContainer() {
if os.Getuid() != 0 {
logger.Warnf("--update-fstab should be used with root")
} else {
var e1, e2 error
if e1 = tryToInstallMountExec(); e1 != nil {
logger.Warnf("failed to create /sbin/mount.juicefs: %s", e1)
}
if e2 = updateFstab(c); e2 != nil {
logger.Warnf("failed to update fstab: %s", e2)
}
if e1 == nil && e2 == nil {
logger.Infof("Successfully updated fstab, now you can mount with `mount %s`", mp)
}
}
}
}

metaConf := getMetaConf(c, mp, c.Bool("read-only") || utils.StringContains(strings.Split(c.String("o"), ","), "ro"))
metaConf.CaseInsensi = strings.HasSuffix(mp, ":") && runtime.GOOS == "windows"
metaCli := meta.NewClient(addr, metaConf)
format, err := metaCli.Load(true)
if err != nil {
return err
}

chunkConf := getChunkConf(c, format)
vfsConf := getVfsConf(c, metaConf, format, chunkConf)
setFuseOption(c, format, vfsConf)

if os.Getenv("JFS_SUPERVISOR") == "" {
// close the database connection that is not in the final stage
if err = metaCli.Shutdown(); err != nil {
logger.Errorf("[pid=%d] meta shutdown: %s", os.Getpid(), err)
}
var foreground bool
if runtime.GOOS == "windows" || !c.Bool("background") || os.Getenv("JFS_FOREGROUND") != "" {
foreground = true
} else if c.Bool("background") || os.Getenv("__DAEMON_STAGE") != "" {
foreground = false
} else {
foreground = os.Getppid() == 1 && !insideContainer()
}
if foreground {
go checkMountpoint(format.Name, mp, c.String("log"), false)
} else {
daemonRun(c, addr, vfsConf)
}
os.Setenv("JFS_SUPERVISOR", strconv.Itoa(os.Getppid()))
return launchMount(mp, vfsConf)
}
logger.Infof("JuiceFS version %s", version.Version())

if commPath := os.Getenv("_FUSE_FD_COMM"); commPath != "" {
vfsConf.CommPath = commPath
vfsConf.StatePath = fmt.Sprintf("/tmp/state%d.json", os.Getppid())
}

if st := metaCli.Chroot(meta.Background, metaConf.Subdir); st != 0 {
return st
}

// Wrap the default registry, all prometheus.MustRegister() calls should be afterwards
registerer, registry := wrapRegister(c, mp, format.Name)

Expand All @@ -646,42 +617,9 @@ func mount(c *cli.Context) error {
}
logger.Infof("Data use %s", blob)

if c.Bool("update-fstab") && !calledViaMount(os.Args) && !insideContainer() {
if os.Getuid() != 0 {
logger.Warnf("--update-fstab should be used with root")
} else {
var e1, e2 error
if e1 = tryToInstallMountExec(); e1 != nil {
logger.Warnf("failed to create /sbin/mount.juicefs: %s", e1)
}
if e2 = updateFstab(c); e2 != nil {
logger.Warnf("failed to update fstab: %s", e2)
}
if e1 == nil && e2 == nil {
logger.Infof("Successfully updated fstab, now you can mount with `mount %s`", mp)
}
}
}

chunkConf := getChunkConf(c, format)
store := chunk.NewCachedStore(blob, *chunkConf, registerer)
registerMetaMsg(metaCli, store, chunkConf)

vfsConf := getVfsConf(c, metaConf, format, chunkConf)
ignore := prepareMp(vfsConf, mp)
if !c.Bool("force") && ignore {
return nil
}

if c.Bool("background") && os.Getenv("JFS_FOREGROUND") == "" {
daemonRun(c, addr, vfsConf, metaCli)
} else {
if c.IsSet("log") {
logger.Warnf("--log flag is ignored in foreground mode, the log output will be Stderr")
}
go checkMountpoint(vfsConf.Format.Name, mp, c.String("log"), false)
}

removePassword(addr)
err = metaCli.NewSession(true)
if err != nil {
Expand All @@ -692,11 +630,14 @@ func mount(c *cli.Context) error {
updateFormat(c)(fmt)
store.UpdateLimit(fmt.UploadLimit, fmt.DownloadLimit)
})
installHandler(mp)
v := vfs.NewVFS(vfsConf, metaCli, store, registerer, registry)
installHandler(mp, v)
v.UpdateFormat = updateFormat(c)
initBackgroundTasks(c, vfsConf, metaConf, metaCli, blob, registerer, registry)
mount_main(v, c)
mountMain(v, c)
if err := v.FlushAll(""); err != nil {
logger.Errorf("flush all delayed data: %s", err)
}
err = metaCli.CloseSession()
logger.Infof("The juicefs mount process exit successfully, mountpoint: %s", metaConf.MountPoint)
return err
Expand Down
37 changes: 3 additions & 34 deletions cmd/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package cmd
import (
"context"
"fmt"
"github.com/juicedata/juicefs/pkg/version"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"net/url"
Expand All @@ -32,11 +34,9 @@ import (
"github.com/agiledragon/gomonkey/v2"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/juicedata/juicefs/pkg/version"
"github.com/juicedata/juicefs/pkg/vfs"
"github.com/redis/go-redis/v9"
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -124,6 +124,7 @@ func mountTemp(t *testing.T, bucket *string, extraFormatOpts []string, extraMoun
// must do reset, otherwise will panic
ResetHttp()

os.Setenv("JFS_SUPERVISOR", "test")
mountArgs := []string{"", "mount", "--enable-xattr", testMeta, testMountPoint, "--attr-cache", "0", "--entry-cache", "0", "--dir-entry-cache", "0", "--no-usage-report"}
if extraMountOpts != nil {
mountArgs = append(mountArgs, extraMountOpts...)
Expand Down Expand Up @@ -204,38 +205,6 @@ func TestUmount(t *testing.T) {
}
}

func Test_configEqual(t *testing.T) {
cases := []struct {
a, b *vfs.Config
equal bool
}{
{
a: nil, b: nil, equal: true,
},
{
a: &vfs.Config{}, b: nil, equal: false,
},
{
a: nil, b: &vfs.Config{}, equal: false,
},
{
a: &vfs.Config{}, b: &vfs.Config{}, equal: true,
},
{
a: &vfs.Config{Format: meta.Format{SecretKey: "1"}}, b: &vfs.Config{Format: meta.Format{SecretKey: "2"}}, equal: true,
},
{
a: &vfs.Config{Port: &vfs.Port{}}, b: &vfs.Config{}, equal: true,
},
}

for _, c := range cases {
if configEqual(c.a, c.b) != c.equal {
t.Errorf("configEqual(%v, %v) should be %v", c.a, c.b, c.equal)
}
}
}

func tryMountTemp(t *testing.T, bucket *string, extraFormatOpts []string, extraMountOpts []string) error {
_ = resetTestMeta()
testDir := t.TempDir()
Expand Down
Loading
Loading