Skip to content

Commit

Permalink
daemon: record daemon mode on each instance
Browse files Browse the repository at this point in the history
Currently the daemon mode is global and is recorded in manager, which
limits extensibility. So record daemon mode in every daemon instance,
so we can extend it later.

Signed-off-by: Jiang Liu <[email protected]>
  • Loading branch information
jiangliu committed May 8, 2023
1 parent e200d58 commit 28132bd
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 51 deletions.
6 changes: 6 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type GlobalConfig struct {
DaemonMode DaemonMode
SocketRoot string
ConfigRoot string
RootMountpoint string
DaemonThreadsNum int
CacheGCPeriod time.Duration
MirrorsConfig MirrorsConfig
Expand All @@ -46,6 +47,10 @@ func GetSnapshotsRootDir() string {
return globalConfig.SnapshotsDir
}

func GetRootMountpoint() string {
return globalConfig.RootMountpoint
}

func GetSocketRoot() string {
return globalConfig.SocketRoot
}
Expand Down Expand Up @@ -111,6 +116,7 @@ func ProcessConfigurations(c *SnapshotterConfig) error {
globalConfig.SnapshotsDir = filepath.Join(c.Root, "snapshots")
globalConfig.ConfigRoot = filepath.Join(c.Root, "config")
globalConfig.SocketRoot = filepath.Join(c.Root, "socket")
globalConfig.RootMountpoint = filepath.Join(c.Root, "mnt")

globalConfig.MirrorsConfig = c.RemoteConfig.MirrorsConfig

Expand Down
1 change: 1 addition & 0 deletions misc/snapshotter/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ sync_remove = false
[cache_manager]
disable = false
gc_period = "24h"
# Directory to host cached files
cache_dir = ""

[image]
Expand Down
8 changes: 8 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path"
"path/filepath"

"github.com/containerd/nydus-snapshotter/config"
"github.com/containerd/nydus-snapshotter/internal/constant"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -93,3 +94,10 @@ func WithFsDriver(fsDriver string) NewDaemonOpt {
return nil
}
}

func WithDaemonMode(daemonMode config.DaemonMode) NewDaemonOpt {
return func(d *Daemon) error {
d.States.DaemonMode = daemonMode
return nil
}
}
14 changes: 12 additions & 2 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ const (
type NewDaemonOpt func(d *Daemon) error

type States struct {
// Generated by daemons manager as a unique to identify a nydusd
// A unique ID generated by daemon manager to identify the nydusd instance.
ID string
ProcessID int
APISocket string
LogDir string
LogLevel string
LogToStdout bool
DaemonMode config.DaemonMode
FsDriver string
// Host kernel mountpoint, only applies to fuse fs driver. The fscache fs driver
// doesn't need a host kernel mountpoint.
Expand Down Expand Up @@ -210,12 +211,20 @@ func (d *Daemon) WaitUntilState(expected types.DaemonState) error {

return nil
},
retry.Attempts(20), // totally wait for 2 seconds, should be enough
retry.LastErrorOnly(true),
retry.Attempts(20), // totally wait for 2 seconds, should be enough
retry.Delay(100*time.Millisecond),
)
}

func (d *Daemon) IsSharedDaemon() bool {
if d.States.DaemonMode != "" {
return d.States.DaemonMode == config.DaemonModeShared
}

return d.HostMountpoint() == config.GetRootMountpoint()
}

func (d *Daemon) SharedMount(rafs *Rafs) error {
client, err := d.GetClient()
if err != nil {
Expand Down Expand Up @@ -574,6 +583,7 @@ func (d *Daemon) ClearVestige() {
func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) {
d := &Daemon{}
d.States.ID = newID()
d.States.DaemonMode = config.DaemonModeDedicated
d.Instances = rafsSet{instances: make(map[string]*Rafs)}

for _, o := range opt {
Expand Down
38 changes: 20 additions & 18 deletions pkg/daemon/rafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,16 @@ func (r *Rafs) FscacheWorkDir() string {
}

func (d *Daemon) UmountAllInstances() error {
d.Instances.Lock()
defer d.Instances.Unlock()
if d.IsSharedDaemon() {
d.Instances.Lock()
defer d.Instances.Unlock()

instances := d.Instances.ListUnlocked()
instances := d.Instances.ListUnlocked()

for _, r := range instances {
if err := d.SharedUmount(r); err != nil {
return errors.Wrapf(err, "umount fs instance %s", r.SnapshotID)
for _, r := range instances {
if err := d.SharedUmount(r); err != nil {
return errors.Wrapf(err, "umount fs instance %s", r.SnapshotID)
}
}
}

Expand All @@ -212,7 +214,7 @@ func (d *Daemon) CloneInstances(src *Daemon) {
}

func (d *Daemon) UmountInstance(r *Rafs) error {
if r.Mountpoint != d.States.Mountpoint {
if d.IsSharedDaemon() {
if err := d.SharedUmount(r); err != nil {
return errors.Wrapf(err, "umount fs instance %s", r.SnapshotID)
}
Expand All @@ -223,20 +225,20 @@ func (d *Daemon) UmountInstance(r *Rafs) error {

// Daemon must be started and reach RUNNING state before call this method
func (d *Daemon) RecoveredMountInstances() error {
d.Instances.Lock()
defer d.Instances.Unlock()
if d.IsSharedDaemon() {
d.Instances.Lock()
defer d.Instances.Unlock()

instances := make([]*Rafs, 0, 16)
for _, r := range d.Instances.ListUnlocked() {
instances = append(instances, r)
}
instances := make([]*Rafs, 0, 16)
for _, r := range d.Instances.ListUnlocked() {
instances = append(instances, r)
}

sort.Slice(instances, func(i, j int) bool {
return instances[i].Seq < instances[j].Seq
})
sort.Slice(instances, func(i, j int) bool {
return instances[i].Seq < instances[j].Seq
})

for _, i := range instances {
if d.HostMountpoint() != i.GetMountpoint() {
for _, i := range instances {
log.L.Infof("Recovered mount instance %s", i.SnapshotID)
if err := d.SharedMount(i); err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (fs *Filesystem) getSharedDaemon() *daemon.Daemon {

func (fs *Filesystem) decideDaemonMountpoint(rafs *daemon.Rafs) (string, error) {
var m string
if fs.Manager.IsSharedDaemon() {
if config.GetDaemonMode() == config.DaemonModeShared {
if config.GetFsDriver() == config.FsDriverFscache {
return "", nil
}
Expand Down Expand Up @@ -491,7 +491,9 @@ func (fs *Filesystem) createDaemon(mountpoint string, ref int32) (d *daemon.Daem
daemon.WithLogLevel(config.GetLogLevel()),
daemon.WithLogToStdout(config.GetLogToStdout()),
daemon.WithNydusdThreadNum(config.GetDaemonThreadsNumber()),
daemon.WithFsDriver(config.GetFsDriver())}
daemon.WithFsDriver(config.GetFsDriver()),
daemon.WithDaemonMode(config.GetDaemonMode()),
}

if mountpoint != "" {
opts = append(opts, daemon.WithMountpoint(mountpoint))
Expand Down
16 changes: 6 additions & 10 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,19 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
cmdOpts = append(cmdOpts,
command.WithMode("singleton"),
command.WithFscacheDriver(m.cacheDir))

if nydusdThreadNum != 0 {
cmdOpts = append(cmdOpts, command.WithFscacheThreads(nydusdThreadNum))
}

} else {
cmdOpts = append(cmdOpts, command.WithMode("fuse"))

cmdOpts = append(cmdOpts, command.WithMode("fuse"), command.WithMountpoint(d.HostMountpoint()))
if nydusdThreadNum != 0 {
cmdOpts = append(cmdOpts, command.WithThreadNum(nydusdThreadNum))
}

switch {
case !m.IsSharedDaemon():
case d.IsSharedDaemon():
break
case !d.IsSharedDaemon():
rafs := d.Instances.Head()
if rafs == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
Expand All @@ -142,12 +141,9 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
cmdOpts = append(cmdOpts,
command.WithConfig(d.ConfigFile("")),
command.WithBootstrap(bootstrap),
command.WithMountpoint(d.HostMountpoint()))

case m.IsSharedDaemon():
cmdOpts = append(cmdOpts, command.WithMountpoint(d.HostMountpoint()))
)
default:
return nil, errors.Errorf("invalid daemon mode %s ", m.daemonMode)
return nil, errors.Errorf("invalid daemon mode %s ", d.States.DaemonMode)
}
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (s *DaemonStates) Size() int {
type Manager struct {
store Store
NydusdBinaryPath string
daemonMode config.DaemonMode
// Where nydusd stores cache files for fscache driver
cacheDir string
// Daemon states are inserted when creating snapshots and nydusd and
Expand Down Expand Up @@ -148,7 +147,6 @@ type Manager struct {
type Opt struct {
NydusdBinaryPath string
Database *store.Database
DaemonMode config.DaemonMode
CacheDir string
RecoverPolicy config.DaemonRecoverPolicy
// Nydus-snapshotter work directory
Expand Down Expand Up @@ -275,7 +273,6 @@ func NewManager(opt Opt) (*Manager, error) {
mgr := &Manager{
store: s,
NydusdBinaryPath: opt.NydusdBinaryPath,
daemonMode: opt.DaemonMode,
cacheDir: opt.CacheDir,
daemonStates: newDaemonStates(),
monitor: monitor,
Expand Down Expand Up @@ -400,7 +397,7 @@ func (m *Manager) ListDaemons() []*daemon.Daemon {

func (m *Manager) CleanUpDaemonResources(d *daemon.Daemon) {
resource := []string{d.States.ConfigDir, d.States.LogDir}
if !m.IsSharedDaemon() {
if !d.IsSharedDaemon() {
socketDir := path.Dir(d.GetAPISock())
resource = append(resource, socketDir)
}
Expand Down
13 changes: 0 additions & 13 deletions pkg/manager/mode.go

This file was deleted.

3 changes: 1 addition & 2 deletions snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
manager, err := manager.NewManager(manager.Opt{
NydusdBinaryPath: cfg.DaemonConfig.NydusdPath,
Database: db,
DaemonMode: config.GetDaemonMode(),
CacheDir: cfg.CacheManagerConfig.CacheDir,
RootDir: cfg.Root,
RecoverPolicy: rp,
Expand Down Expand Up @@ -754,7 +753,7 @@ func (o *snapshotter) remoteMounts(ctx context.Context, s storage.Snapshot, id s
daemon := o.fs.Manager.GetByDaemonID(instance.DaemonID)

var c daemonconfig.DaemonConfig
if o.fs.Manager.IsSharedDaemon() {
if daemon.IsSharedDaemon() {
c, err = daemonconfig.NewDaemonConfig(daemon.States.FsDriver, daemon.ConfigFile(instance.SnapshotID))
if err != nil {
return nil, errors.Wrapf(err, "Failed to load instance configuration %s",
Expand Down

0 comments on commit 28132bd

Please sign in to comment.