Skip to content

Commit

Permalink
optimize cluster runtime
Browse files Browse the repository at this point in the history
update reviews: upload container image to registry by driver when install app
  • Loading branch information
kakzhou719 committed Oct 14, 2022
1 parent 9d4f590 commit 00f75ac
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 228 deletions.
249 changes: 31 additions & 218 deletions pkg/cluster-runtime/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,6 @@ func NewInstaller(infraDriver infradriver.InfraDriver, runtimeConfig RuntimeConf
return installer, nil
}

func getWorkerIPList(infraDriver infradriver.InfraDriver) []net.IP {
masters := make(map[string]bool)
for _, master := range infraDriver.GetHostIPListByRole(common.MASTER) {
masters[master.String()] = true
}
all := infraDriver.GetHostIPList()
workers := make([]net.IP, len(all)-len(masters))

index := 0
for _, ip := range all {
if !masters[ip.String()] {
workers[index] = ip
index++
}
}

return workers
}

func (i *Installer) Install() error {
var (
masters = i.infraDriver.GetHostIPListByRole(common.MASTER)
Expand Down Expand Up @@ -162,15 +143,10 @@ func (i *Installer) Install() error {
return fmt.Errorf("failed to install application: %s", err)
}
} else {
err = i.installKubeCluster(all)
err = i.installKubeCluster(master0, all, cmds)
if err != nil {
return fmt.Errorf("failed to install cluster: %s", err)
}

err = i.installApp(master0, cmds)
if err != nil {
return fmt.Errorf("failed to install application: %s", err)
}
}

if err = i.runClusterHook(master0, PostInstallCluster); err != nil {
Expand All @@ -184,11 +160,7 @@ func (i *Installer) Install() error {
return nil
}

func (i *Installer) installKubeCluster(all []net.IP) error {
if err := i.containerRuntimeInstaller.InstallOn(all); err != nil {
return err
}

func (i *Installer) installApp(master0 net.IP, cmds []string) error {
crInfo, err := i.containerRuntimeInstaller.GetInfo()
if err != nil {
return err
Expand All @@ -199,74 +171,25 @@ func (i *Installer) installKubeCluster(all []net.IP) error {
return err
}

if err = registryConfigurator.Reconcile(all); err != nil {
return err
}

registryDriver, err := registryConfigurator.GetDriver()
if err != nil {
return err
}

kubeRuntimeInstaller, err := kubernetes.NewKubeadmRuntime(i.KubeadmConfig, i.infraDriver, crInfo, registryDriver.GetInfo())
err = registryDriver.UploadContainerImages2Registry()
if err != nil {
return err
}

if err = kubeRuntimeInstaller.Install(); err != nil {
if err = i.launchClusterImage(master0, cmds); err != nil {
return err
}

return nil
}

func (i *Installer) installApp(master0 net.IP, cmds []string) error {
var (
clusterRootfs = i.infraDriver.GetClusterRootfsPath()
ex = shell.NewLex('\\')
)

for _, value := range cmds {
if value == "" {
continue
}
cmdline, err := ex.ProcessWordWithMap(value, map[string]string{})
if err != nil {
return fmt.Errorf("failed to render launch cmd: %v", err)
}

if err = i.infraDriver.CmdAsync(master0, fmt.Sprintf(common.CdAndExecCmd, clusterRootfs, cmdline)); err != nil {
return err
}
}

return nil
}

func (i *Installer) UnInstall() error {
master0 := i.infraDriver.GetHostIPListByRole(common.MASTER)[0]
masters := i.infraDriver.GetHostIPListByRole(common.MASTER)
workers := getWorkerIPList(i.infraDriver)
all := append(masters, workers...)

if err := confirmDeleteHosts(fmt.Sprintf("%s/%s", common.MASTER, common.NODE), all); err != nil {
return err
}

if err := i.runClusterHook(master0, PreUnInstallCluster); err != nil {
return err
}

if err := i.runHostHook(PreCleanHost, all); err != nil {
return err
}

kubeRuntimeInstaller, err := kubernetes.NewKubeadmRuntime(i.KubeadmConfig, i.infraDriver, containerruntime.Info{}, registry.Info{})
if err != nil {
return err
}

if err = kubeRuntimeInstaller.Reset(); err != nil {
func (i *Installer) installKubeCluster(master0 net.IP, all []net.IP, cmds []string) error {
if err := i.containerRuntimeInstaller.InstallOn(all); err != nil {
return err
}

Expand All @@ -280,160 +203,66 @@ func (i *Installer) UnInstall() error {
return err
}

if err = registryConfigurator.UninstallFrom(all); err != nil {
return err
}

if err = registryConfigurator.Clean(); err != nil {
if err = registryConfigurator.Reconcile(all); err != nil {
return err
}

if err = i.containerRuntimeInstaller.UnInstallFrom(all); err != nil {
registryDriver, err := registryConfigurator.GetDriver()
if err != nil {
return err
}

if err = i.runHostHook(PostCleanHost, all); err != nil {
kubeRuntimeInstaller, err := kubernetes.NewKubeadmRuntime(i.KubeadmConfig, i.infraDriver, crInfo, registryDriver.GetInfo())
if err != nil {
return err
}

if err = i.runClusterHook(master0, PostUnInstallCluster); err != nil {
if err = kubeRuntimeInstaller.Install(); err != nil {
return err
}

if err = i.Distributor.Restore(i.infraDriver.GetClusterBasePath(), all); err != nil {
if err = i.launchClusterImage(master0, cmds); err != nil {
return err
}

return nil
}

func (i *Installer) GetCurrentDriver() (registry.Driver, runtime.Driver, error) {
crInfo, err := i.containerRuntimeInstaller.GetInfo()
if err != nil {
return nil, nil, err
}

// TODO, init here or in constructor?
registryConfigurator, err := registry.NewConfigurator(i.RegistryConfig, crInfo, i.infraDriver, i.Distributor)
if err != nil {
return nil, nil, err
}

registryDriver, err := registryConfigurator.GetDriver()
if err != nil {
return nil, nil, err
}

kubeRuntimeInstaller, err := kubernetes.NewKubeadmRuntime(i.KubeadmConfig, i.infraDriver, crInfo, registryDriver.GetInfo())
if err != nil {
return nil, nil, err
}

runtimeDriver, err := kubeRuntimeInstaller.GetCurrentRuntimeDriver()
if err != nil {
return nil, nil, err
}

return registryDriver, runtimeDriver, nil
}

func (i *Installer) ScaleUp(newMasters, newWorkers []net.IP) (registry.Driver, runtime.Driver, error) {
master0 := i.infraDriver.GetHostIPListByRole(common.MASTER)[0]
all := append(newMasters, newWorkers...)

// distribute rootfs
if err := i.Distributor.DistributeRootfs(all, i.infraDriver.GetClusterRootfsPath()); err != nil {
return nil, nil, err
}

if err := i.runClusterHook(master0, PreScaleUpCluster); err != nil {
return nil, nil, err
}

if err := i.runHostHook(PreInitHost, all); err != nil {
return nil, nil, err
}

if err := i.containerRuntimeInstaller.InstallOn(all); err != nil {
return nil, nil, err
}

crInfo, err := i.containerRuntimeInstaller.GetInfo()
if err != nil {
return nil, nil, err
}

registryConfigurator, err := registry.NewConfigurator(i.RegistryConfig, crInfo, i.infraDriver, i.Distributor)
if err != nil {
return nil, nil, err
}

if err := registryConfigurator.Reconcile(all); err != nil {
return nil, nil, err
}

registryDriver, err := registryConfigurator.GetDriver()
if err != nil {
return nil, nil, err
}

kubeRuntimeInstaller, err := kubernetes.NewKubeadmRuntime(i.KubeadmConfig, i.infraDriver, crInfo, registryDriver.GetInfo())
if err != nil {
return nil, nil, err
}

if err := kubeRuntimeInstaller.ScaleUp(newMasters, newWorkers); err != nil {
return nil, nil, err
}

runtimeDriver, err := kubeRuntimeInstaller.GetCurrentRuntimeDriver()
if err != nil {
return nil, nil, err
}

if err := i.runHostHook(PostInitHost, all); err != nil {
return nil, nil, err
}

if err := i.runClusterHook(master0, PostScaleUpCluster); err != nil {
return nil, nil, err
}

return registryDriver, runtimeDriver, nil
}
func (i *Installer) launchClusterImage(master0 net.IP, cmds []string) error {
var (
clusterRootfs = i.infraDriver.GetClusterRootfsPath()
ex = shell.NewLex('\\')
)

func (i *Installer) ScaleDown(mastersToDelete, workersToDelete []net.IP) (registry.Driver, runtime.Driver, error) {
if len(workersToDelete) > 0 {
if err := confirmDeleteHosts(common.NODE, workersToDelete); err != nil {
return nil, nil, err
for _, value := range cmds {
if value == "" {
continue
}
cmdline, err := ex.ProcessWordWithMap(value, map[string]string{})
if err != nil {
return fmt.Errorf("failed to render launch cmd: %v", err)
}
}

if len(mastersToDelete) > 0 {
if err := confirmDeleteHosts(common.MASTER, mastersToDelete); err != nil {
return nil, nil, err
if err = i.infraDriver.CmdAsync(master0, fmt.Sprintf(common.CdAndExecCmd, clusterRootfs, cmdline)); err != nil {
return err
}
}

all := append(mastersToDelete, workersToDelete...)
if err := i.runHostHook(PreCleanHost, all); err != nil {
return nil, nil, err
}
return nil
}

func (i *Installer) GetCurrentDriver() (registry.Driver, runtime.Driver, error) {
crInfo, err := i.containerRuntimeInstaller.GetInfo()
if err != nil {
return nil, nil, err
}

// TODO, init here or in constructor?
registryConfigurator, err := registry.NewConfigurator(i.RegistryConfig, crInfo, i.infraDriver, i.Distributor)
if err != nil {
return nil, nil, err
}

if err = registryConfigurator.UninstallFrom(all); err != nil {
return nil, nil, err
}

registryDriver, err := registryConfigurator.GetDriver()
if err != nil {
return nil, nil, err
Expand All @@ -444,26 +273,10 @@ func (i *Installer) ScaleDown(mastersToDelete, workersToDelete []net.IP) (regist
return nil, nil, err
}

if err = kubeRuntimeInstaller.ScaleDown(mastersToDelete, workersToDelete); err != nil {
return nil, nil, err
}

runtimeDriver, err := kubeRuntimeInstaller.GetCurrentRuntimeDriver()
if err != nil {
return nil, nil, err
}

if err = i.containerRuntimeInstaller.UnInstallFrom(all); err != nil {
return nil, nil, err
}

if err = i.runHostHook(PostCleanHost, all); err != nil {
return nil, nil, err
}

if err = i.Distributor.Restore(i.infraDriver.GetClusterBasePath(), all); err != nil {
return nil, nil, err
}

return registryDriver, runtimeDriver, nil
}
Loading

0 comments on commit 00f75ac

Please sign in to comment.