Skip to content

Commit

Permalink
support cluster scale up via sealer apply cli (#1818)
Browse files Browse the repository at this point in the history
* support scale up for sealer apply

* support run app for sealer apply
  • Loading branch information
kakaZhou719 authored Nov 1, 2022
1 parent c854d89 commit 72fafa6
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 122 deletions.
283 changes: 221 additions & 62 deletions cmd/sealer/cmd/cluster/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,30 @@ package cluster
import (
"fmt"
"io/ioutil"
"net"
"path/filepath"

"github.com/pkg/errors"
"github.com/sealerio/sealer/common"
"github.com/sealerio/sealer/pkg/client/k8s"
clusterruntime "github.com/sealerio/sealer/pkg/cluster-runtime"
"github.com/sealerio/sealer/pkg/clusterfile"
imagecommon "github.com/sealerio/sealer/pkg/define/options"
v12 "github.com/sealerio/sealer/pkg/define/image/v1"
"github.com/sealerio/sealer/pkg/define/options"
"github.com/sealerio/sealer/pkg/imagedistributor"
"github.com/sealerio/sealer/pkg/imageengine"
"github.com/sealerio/sealer/pkg/infradriver"
v2 "github.com/sealerio/sealer/types/api/v2"
"github.com/sealerio/sealer/utils/strings"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
)

var applyClusterFile string

const MasterRoleLabel = "node-role.kubernetes.io/master"

func NewApplyCmd() *cobra.Command {
applyCmd := &cobra.Command{
Use: "apply",
Expand All @@ -44,12 +56,13 @@ will apply the diff change of current Clusterfile and the original one.`,
clusterFileData []byte
err error
)
logrus.Warn("sealer apply command will be deprecated in the future, please use sealer run instead.")

if clusterFile == "" {
if applyClusterFile == "" {
return fmt.Errorf("you must input Clusterfile")
}

clusterFileData, err = ioutil.ReadFile(filepath.Clean(clusterFile))
clusterFileData, err = ioutil.ReadFile(filepath.Clean(applyClusterFile))
if err != nil {
return err
}
Expand All @@ -58,94 +71,240 @@ will apply the diff change of current Clusterfile and the original one.`,
if err != nil {
return err
}

//save desired clusterfile
if err = cf.SaveAll(); err != nil {
return err
}

cluster := cf.GetCluster()
infraDriver, err := infradriver.NewInfraDriver(&cluster)
if err != nil {
return err
}

var (
clusterLaunchCmds = infraDriver.GetClusterLaunchCmds()
clusterHosts = infraDriver.GetHostIPList()
clusterImageName = cluster.Spec.Image
)

clusterHostsPlatform, err := infraDriver.GetHostsPlatform(clusterHosts)
desiredCluster := cf.GetCluster()
infraDriver, err := infradriver.NewInfraDriver(&desiredCluster)
if err != nil {
return err
}

imageEngine, err := imageengine.NewImageEngine(imagecommon.EngineGlobalConfigurations{})
// use image extension to determine apply type:
// scale up cluster, install applications, maybe support upgrade later
imageName := desiredCluster.Spec.Image
imageEngine, err := imageengine.NewImageEngine(options.EngineGlobalConfigurations{})
if err != nil {
return err
}

imageMounter, err := imagedistributor.NewImageMounter(imageEngine, clusterHostsPlatform)
if err != nil {
return err
client := getClusterClient()
if client == nil {
// no k8s client means to init a new cluster.
return createNewCluster(imageName, infraDriver, imageEngine, cf)
}

imageMountInfo, err := imageMounter.Mount(clusterImageName)
extension, err := imageEngine.GetSealerImageExtension(&options.GetImageAnnoOptions{ImageNameOrID: imageName})
if err != nil {
return err
return fmt.Errorf("failed to get cluster image extension: %s", err)
}

defer func() {
err = imageMounter.Umount(imageMountInfo)
if err != nil {
logrus.Errorf("failed to umount cluster image")
}
}()

distributor, err := imagedistributor.NewScpDistributor(imageMountInfo, infraDriver, cf.GetConfigs())
if err != nil {
return err
if extension.Type == v12.AppInstaller {
return installApplication(imageName, []string{}, extension, infraDriver, imageEngine)
}

plugins, err := loadPluginsFromImage(imageMountInfo)
currentCluster, err := GetCurrentCluster(client)
if err != nil {
return err
return errors.Wrap(err, "failed to get current cluster")
}

if cf.GetPlugins() != nil {
plugins = append(plugins, cf.GetPlugins()...)
mj, md := strings.Diff(currentCluster.GetMasterIPList(), desiredCluster.GetMasterIPList())
nj, nd := strings.Diff(currentCluster.GetNodeIPList(), desiredCluster.GetNodeIPList())
if len(mj) == 0 && len(md) == 0 && len(nj) == 0 && len(nd) == 0 {
return nil
}

runtimeConfig := &clusterruntime.RuntimeConfig{
Distributor: distributor,
ImageEngine: imageEngine,
Plugins: plugins,
ClusterLaunchCmds: clusterLaunchCmds,
ClusterImageImage: clusterImageName,
}

if cf.GetKubeadmConfig() != nil {
runtimeConfig.KubeadmConfig = *cf.GetKubeadmConfig()
}

installer, err := clusterruntime.NewInstaller(infraDriver, *runtimeConfig)
if err != nil {
return err
if len(md) > 0 || len(nd) > 0 {
return fmt.Errorf("scale down not supported: %v,%v", md, nd)
}

err = installer.Install()
if err != nil {
return err
}

//save clusterfile
if err = cf.SaveAll(); err != nil {
return err
}
return nil
return scaleUpCluster(imageName, mj, nj, infraDriver, imageEngine, cf)
},
}
applyCmd.Flags().StringVarP(&clusterFile, "Clusterfile", "f", "", "Clusterfile path to apply a Kubernetes cluster")
applyCmd.Flags().BoolVar(&ForceDelete, "force", false, "force to delete the specified cluster if set true")
applyCmd.Flags().StringVarP(&applyClusterFile, "Clusterfile", "f", "", "Clusterfile path to apply a Kubernetes cluster")
return applyCmd
}

func createNewCluster(clusterImageName string, infraDriver infradriver.InfraDriver, imageEngine imageengine.Interface, cf clusterfile.Interface) error {
var (
clusterHosts = infraDriver.GetHostIPList()
clusterLaunchCmds = infraDriver.GetClusterLaunchCmds()
)

clusterHostsPlatform, err := infraDriver.GetHostsPlatform(clusterHosts)
if err != nil {
return err
}

imageMounter, err := imagedistributor.NewImageMounter(imageEngine, clusterHostsPlatform)
if err != nil {
return err
}

imageMountInfo, err := imageMounter.Mount(clusterImageName)
if err != nil {
return err
}

defer func() {
err = imageMounter.Umount(imageMountInfo)
if err != nil {
logrus.Errorf("failed to umount cluster image")
}
}()

distributor, err := imagedistributor.NewScpDistributor(imageMountInfo, infraDriver, cf.GetConfigs())
if err != nil {
return err
}

plugins, err := loadPluginsFromImage(imageMountInfo)
if err != nil {
return err
}

if cf.GetPlugins() != nil {
plugins = append(plugins, cf.GetPlugins()...)
}

runtimeConfig := &clusterruntime.RuntimeConfig{
Distributor: distributor,
ImageEngine: imageEngine,
Plugins: plugins,
ClusterLaunchCmds: clusterLaunchCmds,
ClusterImageImage: clusterImageName,
}

if cf.GetKubeadmConfig() != nil {
runtimeConfig.KubeadmConfig = *cf.GetKubeadmConfig()
}

installer, err := clusterruntime.NewInstaller(infraDriver, *runtimeConfig)
if err != nil {
return err
}

err = installer.Install()
if err != nil {
return err
}

//save clusterfile
if err = cf.SaveAll(); err != nil {
return err
}
return nil
}

func scaleUpCluster(clusterImageName string, scaleUpMasterIPList, scaleUpNodeIPList []net.IP, infraDriver infradriver.InfraDriver, imageEngine imageengine.Interface, cf clusterfile.Interface) error {
var (
newHosts = append(scaleUpMasterIPList, scaleUpNodeIPList...)
)

clusterHostsPlatform, err := infraDriver.GetHostsPlatform(newHosts)
if err != nil {
return err
}

imageMounter, err := imagedistributor.NewImageMounter(imageEngine, clusterHostsPlatform)
if err != nil {
return err
}

imageMountInfo, err := imageMounter.Mount(clusterImageName)
if err != nil {
return err
}
defer func() {
err = imageMounter.Umount(imageMountInfo)
if err != nil {
logrus.Errorf("failed to umount cluster image")
}
}()

distributor, err := imagedistributor.NewScpDistributor(imageMountInfo, infraDriver, cf.GetConfigs())
if err != nil {
return err
}

plugins, err := loadPluginsFromImage(imageMountInfo)
if err != nil {
return err
}

if cf.GetPlugins() != nil {
plugins = append(plugins, cf.GetPlugins()...)
}

runtimeConfig := &clusterruntime.RuntimeConfig{
Distributor: distributor,
Plugins: plugins,
}

if cf.GetKubeadmConfig() != nil {
runtimeConfig.KubeadmConfig = *cf.GetKubeadmConfig()
}

installer, err := clusterruntime.NewInstaller(infraDriver, *runtimeConfig)
if err != nil {
return err
}
_, _, err = installer.ScaleUp(scaleUpMasterIPList, scaleUpNodeIPList)
if err != nil {
return err
}

if err = cf.SaveAll(); err != nil {
return err
}

return nil
}

func GetCurrentCluster(client *k8s.Client) (*v2.Cluster, error) {
nodes, err := client.ListNodes()
if err != nil {
return nil, err
}

cluster := &v2.Cluster{}
var masterIPList []net.IP
var nodeIPList []net.IP

for _, node := range nodes.Items {
addr := getNodeAddress(node)
if addr == nil {
continue
}
if _, ok := node.Labels[MasterRoleLabel]; ok {
masterIPList = append(masterIPList, addr)
continue
}
nodeIPList = append(nodeIPList, addr)
}
cluster.Spec.Hosts = []v2.Host{{IPS: masterIPList, Roles: []string{common.MASTER}}, {IPS: nodeIPList, Roles: []string{common.NODE}}}

return cluster, nil
}

func getNodeAddress(node corev1.Node) net.IP {
if len(node.Status.Addresses) < 1 {
return nil
}
return net.ParseIP(node.Status.Addresses[0].Address)
}

func getClusterClient() *k8s.Client {
client, err := k8s.NewK8sClient()
if client != nil {
return client
}
if err != nil {
logrus.Warnf("try to new k8s client via default kubeconfig failed: %v", err)
}
return nil
}
3 changes: 2 additions & 1 deletion cmd/sealer/cmd/cluster/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewJoinCmd() *cobra.Command {
cf clusterfile.Interface
err error
)
logrus.Warn("sealer join command will be deprecated in the future, please use sealer scale-up instead.")

if err = utils.ValidateScaleIPStr(joinFlags.Masters, joinFlags.Nodes); err != nil {
return fmt.Errorf("failed to validate input run args: %v", err)
Expand Down Expand Up @@ -94,7 +95,7 @@ func NewJoinCmd() *cobra.Command {
if err == nil {
return
}
//if there exits an error,rollback the ClusterFile to the default file
//if there exists an error,rollback the ClusterFile to the default file
cf.RollBackClusterFile()
}()

Expand Down
Loading

0 comments on commit 72fafa6

Please sign in to comment.