Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain daemon mode information for each daemon instance #466

Merged
merged 3 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,27 @@ func init() {
type DaemonMode string

const (
// One nydusd, one rafs instance
// Spawn a dedicated nydusd for each RAFS instance.
DaemonModeMultiple DaemonMode = DaemonMode(constant.DaemonModeMultiple)
// One nydusd serves multiple rafs instances
// Spawn a dedicated nydusd for each RAFS instance.
DaemonModeDedicated DaemonMode = DaemonMode(constant.DaemonModeDedicated)
// Share a global nydusd to serve all RAFS instances.
DaemonModeShared DaemonMode = DaemonMode(constant.DaemonModeShared)
// No nydusd daemon is needed to be started. Snapshotter does not start any nydusd
// and only interacts with containerd with mount slice to pass necessary configuration
// to container runtime
// Do not spawn nydusd for RAFS instances.
//
// For tarfs and rund, there's no need to create nydusd to serve RAFS instances,
// the snapshotter just returns mount slices with additional information for runC/runD
// to manage those snapshots.
DaemonModeNone DaemonMode = DaemonMode(constant.DaemonModeNone)
DaemonModeInvalid DaemonMode = DaemonMode(constant.DaemonModeInvalid)
)

func parseDaemonMode(m string) (DaemonMode, error) {
switch m {
case string(DaemonModeMultiple):
return DaemonModeMultiple, nil
return DaemonModeDedicated, nil
case string(DaemonModeDedicated):
return DaemonModeDedicated, nil
case string(DaemonModeShared):
return DaemonModeShared, nil
case string(DaemonModeNone):
Expand Down
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestLoadSnapshotterTOMLConfig(t *testing.T) {
Version: 1,
Root: "/var/lib/containerd-nydus",
Address: "/run/containerd-nydus/containerd-nydus-grpc.sock",
DaemonMode: "multiple",
DaemonMode: "dedicated",
Experimental: Experimental{
EnableStargz: false,
EnableReferrerDetect: false,
Expand Down
6 changes: 6 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type GlobalConfig struct {
DaemonMode DaemonMode
SocketRoot string
ConfigRoot string
RootMountpoint string
DaemonThreadsNum int
CacheGCPeriod time.Duration
MirrorsConfig MirrorsConfig
Expand All @@ -46,6 +47,10 @@ func GetSnapshotsRootDir() string {
return globalConfig.SnapshotsDir
}

func GetRootMountpoint() string {
return globalConfig.RootMountpoint
}

func GetSocketRoot() string {
return globalConfig.SocketRoot
}
Expand Down Expand Up @@ -111,6 +116,7 @@ func ProcessConfigurations(c *SnapshotterConfig) error {
globalConfig.SnapshotsDir = filepath.Join(c.Root, "snapshots")
globalConfig.ConfigRoot = filepath.Join(c.Root, "config")
globalConfig.SocketRoot = filepath.Join(c.Root, "socket")
globalConfig.RootMountpoint = filepath.Join(c.Root, "mnt")

globalConfig.MirrorsConfig = c.RemoteConfig.MirrorsConfig

Expand Down
9 changes: 5 additions & 4 deletions internal/constant/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
package constant

const (
DaemonModeMultiple string = "multiple"
DaemonModeShared string = "shared"
DaemonModeNone string = "none"
DaemonModeInvalid string = ""
DaemonModeMultiple string = "multiple"
DaemonModeDedicated string = "dedicated"
DaemonModeShared string = "shared"
DaemonModeNone string = "none"
DaemonModeInvalid string = ""
)

const (
Expand Down
2 changes: 1 addition & 1 deletion internal/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func buildFlags(args *Args) []cli.Flag {
},
&cli.StringFlag{
Name: "daemon-mode",
Usage: "nydusd daemon working mode, possible values: \"multiple\", \"shared\" or \"none\"",
Usage: "nydusd daemon working mode, possible values: \"dedicated\", \"multiple\", \"shared\" or \"none\". \"multiple\" is an alias of \"dedicated\" and will be deprecated in v1.0",
Destination: &args.DaemonMode,
DefaultText: constant.DaemonModeMultiple,
},
Expand Down
3 changes: 2 additions & 1 deletion misc/snapshotter/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version = 1
root = "/var/lib/containerd-nydus"
# The snapshotter's GRPC server socket, containerd will connect to plugin on this socket
address = "/run/containerd-nydus/containerd-nydus-grpc.sock"
daemon_mode = "multiple"
daemon_mode = "dedicated"
# Whether snapshotter should try to clean up resources when it is closed
cleanup_on_close = false

Expand Down Expand Up @@ -78,6 +78,7 @@ sync_remove = false
[cache_manager]
disable = false
gc_period = "24h"
# Directory to host cached files
cache_dir = ""

[image]
Expand Down
8 changes: 8 additions & 0 deletions pkg/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path"
"path/filepath"

"github.com/containerd/nydus-snapshotter/config"
"github.com/containerd/nydus-snapshotter/internal/constant"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -93,3 +94,10 @@ func WithFsDriver(fsDriver string) NewDaemonOpt {
return nil
}
}

func WithDaemonMode(daemonMode config.DaemonMode) NewDaemonOpt {
return func(d *Daemon) error {
d.States.DaemonMode = daemonMode
return nil
}
}
14 changes: 12 additions & 2 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ const (
type NewDaemonOpt func(d *Daemon) error

type States struct {
// Generated by daemons manager as a unique to identify a nydusd
// A unique ID generated by daemon manager to identify the nydusd instance.
ID string
ProcessID int
APISocket string
LogDir string
LogLevel string
LogToStdout bool
DaemonMode config.DaemonMode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle the compatibility if the newer nydus-snapshotter is loading old records in DB without DamonMode ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to migrate the database, that's on the todo list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it works, but may get broken in future easily. So the safe way is to convert records in databases. Rafs will face the same situation in coming PRs, so let's delay support of data migration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Um. I tend to tolerate the DB compatibility within this PR rather than make it a future work.
For the compatibility, I suppose it wouldn't be very complicated. Nydus-sanpshotter already has an experience on how to migrate different record formats in function tryTranslateRecords(). We can simply assign a DaemonMode by comparing daemon Mountpoint and HostRootMount and rewrite the daemon's record of state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a patch to migrate the database.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

FsDriver string
// Host kernel mountpoint, only applies to fuse fs driver. The fscache fs driver
// doesn't need a host kernel mountpoint.
Expand Down Expand Up @@ -210,12 +211,20 @@ func (d *Daemon) WaitUntilState(expected types.DaemonState) error {

return nil
},
retry.Attempts(20), // totally wait for 2 seconds, should be enough
retry.LastErrorOnly(true),
retry.Attempts(20), // totally wait for 2 seconds, should be enough
retry.Delay(100*time.Millisecond),
)
}

func (d *Daemon) IsSharedDaemon() bool {
if d.States.DaemonMode != "" {
return d.States.DaemonMode == config.DaemonModeShared
}

return d.HostMountpoint() == config.GetRootMountpoint()
}

func (d *Daemon) SharedMount(rafs *Rafs) error {
client, err := d.GetClient()
if err != nil {
Expand Down Expand Up @@ -574,6 +583,7 @@ func (d *Daemon) ClearVestige() {
func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) {
d := &Daemon{}
d.States.ID = newID()
d.States.DaemonMode = config.DaemonModeDedicated
d.Instances = rafsSet{instances: make(map[string]*Rafs)}

for _, o := range opt {
Expand Down
38 changes: 20 additions & 18 deletions pkg/daemon/rafs.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,16 @@ func (r *Rafs) FscacheWorkDir() string {
}

func (d *Daemon) UmountAllInstances() error {
d.Instances.Lock()
defer d.Instances.Unlock()
if d.IsSharedDaemon() {
d.Instances.Lock()
defer d.Instances.Unlock()

instances := d.Instances.ListUnlocked()
instances := d.Instances.ListUnlocked()

for _, r := range instances {
if err := d.SharedUmount(r); err != nil {
return errors.Wrapf(err, "umount fs instance %s", r.SnapshotID)
for _, r := range instances {
if err := d.SharedUmount(r); err != nil {
return errors.Wrapf(err, "umount fs instance %s", r.SnapshotID)
}
}
}

Expand All @@ -212,7 +214,7 @@ func (d *Daemon) CloneInstances(src *Daemon) {
}

func (d *Daemon) UmountInstance(r *Rafs) error {
if r.Mountpoint != d.States.Mountpoint {
if d.IsSharedDaemon() {
if err := d.SharedUmount(r); err != nil {
return errors.Wrapf(err, "umount fs instance %s", r.SnapshotID)
}
Expand All @@ -223,20 +225,20 @@ func (d *Daemon) UmountInstance(r *Rafs) error {

// Daemon must be started and reach RUNNING state before call this method
func (d *Daemon) RecoveredMountInstances() error {
d.Instances.Lock()
defer d.Instances.Unlock()
if d.IsSharedDaemon() {
d.Instances.Lock()
defer d.Instances.Unlock()

instances := make([]*Rafs, 0, 16)
for _, r := range d.Instances.ListUnlocked() {
instances = append(instances, r)
}
instances := make([]*Rafs, 0, 16)
for _, r := range d.Instances.ListUnlocked() {
instances = append(instances, r)
}

sort.Slice(instances, func(i, j int) bool {
return instances[i].Seq < instances[j].Seq
})
sort.Slice(instances, func(i, j int) bool {
return instances[i].Seq < instances[j].Seq
})

for _, i := range instances {
if d.HostMountpoint() != i.GetMountpoint() {
for _, i := range instances {
log.L.Infof("Recovered mount instance %s", i.SnapshotID)
if err := d.SharedMount(i); err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (fs *Filesystem) getSharedDaemon() *daemon.Daemon {

func (fs *Filesystem) decideDaemonMountpoint(rafs *daemon.Rafs) (string, error) {
var m string
if fs.Manager.IsSharedDaemon() {
if config.GetDaemonMode() == config.DaemonModeShared {
if config.GetFsDriver() == config.FsDriverFscache {
return "", nil
}
Expand Down Expand Up @@ -491,7 +491,9 @@ func (fs *Filesystem) createDaemon(mountpoint string, ref int32) (d *daemon.Daem
daemon.WithLogLevel(config.GetLogLevel()),
daemon.WithLogToStdout(config.GetLogToStdout()),
daemon.WithNydusdThreadNum(config.GetDaemonThreadsNumber()),
daemon.WithFsDriver(config.GetFsDriver())}
daemon.WithFsDriver(config.GetFsDriver()),
daemon.WithDaemonMode(config.GetDaemonMode()),
}

if mountpoint != "" {
opts = append(opts, daemon.WithMountpoint(mountpoint))
Expand Down
16 changes: 6 additions & 10 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,19 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
cmdOpts = append(cmdOpts,
command.WithMode("singleton"),
command.WithFscacheDriver(m.cacheDir))

if nydusdThreadNum != 0 {
cmdOpts = append(cmdOpts, command.WithFscacheThreads(nydusdThreadNum))
}

} else {
cmdOpts = append(cmdOpts, command.WithMode("fuse"))

cmdOpts = append(cmdOpts, command.WithMode("fuse"), command.WithMountpoint(d.HostMountpoint()))
if nydusdThreadNum != 0 {
cmdOpts = append(cmdOpts, command.WithThreadNum(nydusdThreadNum))
}

switch {
case !m.IsSharedDaemon():
case d.IsSharedDaemon():
break
case !d.IsSharedDaemon():
rafs := d.Instances.Head()
if rafs == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
Expand All @@ -142,12 +141,9 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
cmdOpts = append(cmdOpts,
command.WithConfig(d.ConfigFile("")),
command.WithBootstrap(bootstrap),
command.WithMountpoint(d.HostMountpoint()))

case m.IsSharedDaemon():
cmdOpts = append(cmdOpts, command.WithMountpoint(d.HostMountpoint()))
)
default:
return nil, errors.Errorf("invalid daemon mode %s ", m.daemonMode)
return nil, errors.Errorf("invalid daemon mode %s ", d.States.DaemonMode)
}
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ func (s *DaemonStates) Size() int {
type Manager struct {
store Store
NydusdBinaryPath string
daemonMode config.DaemonMode
// Where nydusd stores cache files for fscache driver
cacheDir string
// Daemon states are inserted when creating snapshots and nydusd and
Expand Down Expand Up @@ -148,7 +147,6 @@ type Manager struct {
type Opt struct {
NydusdBinaryPath string
Database *store.Database
DaemonMode config.DaemonMode
CacheDir string
RecoverPolicy config.DaemonRecoverPolicy
// Nydus-snapshotter work directory
Expand Down Expand Up @@ -275,7 +273,6 @@ func NewManager(opt Opt) (*Manager, error) {
mgr := &Manager{
store: s,
NydusdBinaryPath: opt.NydusdBinaryPath,
daemonMode: opt.DaemonMode,
cacheDir: opt.CacheDir,
daemonStates: newDaemonStates(),
monitor: monitor,
Expand Down Expand Up @@ -400,7 +397,7 @@ func (m *Manager) ListDaemons() []*daemon.Daemon {

func (m *Manager) CleanUpDaemonResources(d *daemon.Daemon) {
resource := []string{d.States.ConfigDir, d.States.LogDir}
if !m.IsSharedDaemon() {
if !d.IsSharedDaemon() {
socketDir := path.Dir(d.GetAPISock())
resource = append(resource, socketDir)
}
Expand Down
13 changes: 0 additions & 13 deletions pkg/manager/mode.go

This file was deleted.

14 changes: 14 additions & 0 deletions pkg/store/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (

var (
v1RootBucket = []byte("v1")
versionKey = []byte("version")
daemonsBucket = []byte("daemons") // Contains daemon info <daemon_id>=<daemon>

// Usually representing rafs instances which are attached to daemons or not.
Expand Down Expand Up @@ -137,6 +138,7 @@ func getObject(bucket *bolt.Bucket, key string, obj interface{}) error {

func (db *Database) initDatabase() error {
var notV1 = false
var version string
err := db.db.Update(func(tx *bolt.Tx) error {

bk := tx.Bucket(v1RootBucket)
Expand All @@ -158,6 +160,12 @@ func (db *Database) initDatabase() error {
return errors.Wrapf(err, "bucket %s", instancesBucket)
}

if val := bk.Get(versionKey); val == nil {
version = "v1.0"
} else {
version = string(val)
}

return nil
})
if err != nil {
Expand All @@ -170,6 +178,12 @@ func (db *Database) initDatabase() error {
}
}

if version == "v1.0" {
if err := db.tryUpgradeRecords(version); err != nil && !errors.Is(err, errdefs.ErrNotFound) {
return errors.Wrapf(err, "convert old database")
}
}

return nil
}

Expand Down
Loading