Skip to content

Commit

Permalink
support clusterfile add taint
Browse files Browse the repository at this point in the history
  • Loading branch information
Stevent-fei committed Dec 3, 2022
1 parent 48baeed commit d35a665
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 1 deletion.
31 changes: 31 additions & 0 deletions pkg/cluster-runtime/installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package clusterruntime

import (
"context"
"fmt"
"net"
"path/filepath"

"github.com/sealerio/sealer/common"
Expand Down Expand Up @@ -188,6 +190,15 @@ func (i *Installer) Install() error {
return err
}

runtimeDriver, err := kubeRuntimeInstaller.GetCurrentRuntimeDriver()
if err != nil {
return err
}

if err = i.setNodeTaints(all, runtimeDriver); err != nil {
return err
}

appInstaller := NewAppInstaller(i.infraDriver, i.Distributor, extension, i.RegistryConfig)

if err = appInstaller.LaunchClusterImage(master0, cmds); err != nil {
Expand Down Expand Up @@ -226,3 +237,23 @@ func (i *Installer) GetCurrentDriver() (registry.Driver, runtime.Driver, error)

return registryDriver, runtimeDriver, nil
}

func (i *Installer) setNodeTaints(hosts []net.IP, driver runtime.Driver) error {
t := infradriver.NewCreateTaints()
for _, ip := range hosts {
taints := i.infraDriver.GetHostTaint(ip)
for _, taint := range taints {
if err := t.FormatData(ip, taint); err != nil {
return err
}
node, err := t.UpdateNodeTaint(ip, driver)
if err != nil {
return err
}
if err := driver.Update(context.TODO(), &node); err != nil {
return err
}
}
}
return nil
}
4 changes: 4 additions & 0 deletions pkg/cluster-runtime/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (i *Installer) ScaleUp(newMasters, newWorkers []net.IP) (registry.Driver, r
return nil, nil, err
}

if err = i.setNodeTaints(all, runtimeDriver); err != nil {
return nil, nil, err
}

return registryDriver, runtimeDriver, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/infradriver/infradriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
// InfraDriver treat the entire cluster as an operating system kernel,
// interface function here is the target system call.
type InfraDriver interface {
// GetHostTaint return host taint
GetHostTaint(host net.IP) []string

GetHostIPList() []net.IP

GetHostIPListByRole(role string) []net.IP
Expand Down
7 changes: 7 additions & 0 deletions pkg/infradriver/ssh_infradriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
type SSHInfraDriver struct {
sshConfigs map[string]ssh.Interface
hosts []net.IP
hostTaint map[string][]string
roleHostsMap map[string][]net.IP
hostEnvMap map[string]map[string]interface{}
clusterEnv map[string]interface{}
Expand Down Expand Up @@ -105,6 +106,7 @@ func NewInfraDriver(cluster *v2.Cluster) (InfraDriver, error) {
roleHostsMap: map[string][]net.IP{},
// todo need to separate env into app render data and sys render data
hostEnvMap: map[string]map[string]interface{}{},
hostTaint: map[string][]string{},
clusterHostAliases: cluster.Spec.HostAliases,
}

Expand Down Expand Up @@ -157,12 +159,17 @@ func NewInfraDriver(cluster *v2.Cluster) (InfraDriver, error) {
for _, host := range cluster.Spec.Hosts {
for _, ip := range host.IPS {
ret.hostEnvMap[ip.String()] = mergeList(ConvertEnv(host.Env), ret.clusterEnv)
ret.hostTaint[ip.String()] = host.Taints
}
}

return ret, err
}

func (d *SSHInfraDriver) GetHostTaint(host net.IP) []string {
return d.hostTaint[host.String()]
}

func (d *SSHInfraDriver) GetHostIPList() []net.IP {
return d.hosts
}
Expand Down
190 changes: 190 additions & 0 deletions pkg/infradriver/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright © 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infradriver

import (
"context"
"fmt"
"net"
"strings"

"github.com/sealerio/sealer/pkg/runtime"
"github.com/sirupsen/logrus"
k8sv1 "k8s.io/api/core/v1"
)

type Taint struct {
IPList []string
TaintList
}

const (
DelSymbol = "-"
EqualSymbol = "="
ColonSymbol = ":"
)

type taintList struct {
DelTaintList []k8sv1.Taint
AddTaintList []k8sv1.Taint
}
type TaintList map[string]*taintList

var TaintEffectValues = []k8sv1.TaintEffect{k8sv1.TaintEffectNoSchedule, k8sv1.TaintEffectNoExecute, k8sv1.TaintEffectPreferNoSchedule}

// FormatData key1=value1:NoSchedule;key1=value1:NoSchedule-;
//key1:NoSchedule;key1:NoSchedule-;
//key1=:NoSchedule-;key1=value1:NoSchedule
func (t *Taint) FormatData(ip net.IP, data string) error {
var key, value, effect string
items := strings.Split(data, "\n")
if t.TaintList == nil {
t.TaintList = make(map[string]*taintList)
}
for _, v := range items {
v = strings.TrimSpace(v)
if strings.HasPrefix(v, "#") || v == "" {
continue
}
if strings.Contains(v, EqualSymbol) && !strings.Contains(v, EqualSymbol+ColonSymbol) {
temps := strings.Split(v, EqualSymbol)
if len(temps) != 2 {
return fmt.Errorf("faild to split taint argument: %s", v)
}
key = temps[0]
taintArgs := strings.Split(temps[1], ColonSymbol)
if len(taintArgs) != 2 {
return fmt.Errorf("error: invalid taint data: %s", v)
}
value, effect = taintArgs[0], taintArgs[1]
effect = strings.TrimSuffix(effect, DelSymbol)
}

if !strings.Contains(v, EqualSymbol) {
temps := strings.Split(v, ColonSymbol)
if len(temps) != 2 {
return fmt.Errorf("faild to split taint argument: %s", v)
}
key, value, effect = temps[0], "", temps[1]
}

if strings.Contains(v, EqualSymbol+ColonSymbol) {
temps := strings.Split(v, EqualSymbol+ColonSymbol)
if len(temps) != 2 {
return fmt.Errorf("faild to split taint argument: %s", v)
}
key, value, effect = temps[0], "", temps[1]
}

//kubectl taint nodes xxx key- : remove all key related taints
if t.TaintList[ip.String()] == nil {
t.TaintList[ip.String()] = &taintList{}
}
if strings.HasSuffix(v, DelSymbol) && !strings.Contains(v, ColonSymbol) && !strings.Contains(v, EqualSymbol) {
t.TaintList[ip.String()].DelTaintList = append(t.TaintList[ip.String()].DelTaintList, k8sv1.Taint{Key: strings.TrimSuffix(v, DelSymbol), Value: "", Effect: ""})
}

effect = strings.TrimSuffix(effect, DelSymbol)
if NotInEffect(k8sv1.TaintEffect(effect), TaintEffectValues) {
return fmt.Errorf("taint effect %s need in %v", v, TaintEffectValues)
}

taint := k8sv1.Taint{
Key: key,
Value: value,
Effect: k8sv1.TaintEffect(effect),
}

if _, ok := t.TaintList[ip.String()]; !ok {
t.TaintList[ip.String()] = &taintList{}
}
if strings.HasSuffix(v, DelSymbol) {
t.TaintList[ip.String()].DelTaintList = append(t.TaintList[ip.String()].DelTaintList, taint)
continue
}
t.TaintList[ip.String()].AddTaintList = append(t.TaintList[ip.String()].AddTaintList, taint)
}
return nil
}

//Remove existing taint
func (t *Taint) removePresenceTaint(taint k8sv1.Taint, ip string) {
for k, v := range t.TaintList[ip].AddTaintList {
if v.Key == taint.Key && v.Value == taint.Value && v.Effect == taint.Effect {
logrus.Infof("taint %s already exist", t.TaintList[ip].AddTaintList[k].String())
t.TaintList[ip].AddTaintList = append(t.TaintList[ip].AddTaintList[:k], t.TaintList[ip].AddTaintList[k+1:]...)
break
}
}
}

func (t *Taint) isDelTaint(taint k8sv1.Taint, ip string) bool {
for _, v := range t.TaintList[ip].DelTaintList {
if v.Key == taint.Key && (v.Effect == taint.Effect || v.Effect == "") {
return true
}
}
return false
}

func NotInEffect(effect k8sv1.TaintEffect, effects []k8sv1.TaintEffect) bool {
for _, e := range effects {
if e == effect {
return false
}
}
return true
}

// UpdateTaints return nil mean's needn't update taint
func (t *Taint) UpdateTaints(taints []k8sv1.Taint, ip string) []k8sv1.Taint {
needDel := false
var updateTaints []k8sv1.Taint
for k, v := range taints {
t.removePresenceTaint(v, ip)
if t.isDelTaint(v, ip) {
needDel = true
continue
}
updateTaints = append(updateTaints, taints[k])
}
if len(t.TaintList[ip].AddTaintList) == 0 && !needDel {
return nil
}
return append(updateTaints, t.TaintList[ip].AddTaintList...)
}

func (t *Taint) UpdateNodeTaint(ip net.IP, driver runtime.Driver) (k8sv1.Node, error) {
var node k8sv1.Node
nodeList := k8sv1.NodeList{}
if err := driver.List(context.TODO(), &nodeList); err != nil {
return k8sv1.Node{}, fmt.Errorf("failed to list cluster nodes: %v", err)
}

for _, node = range nodeList.Items {
updateTaints := t.UpdateTaints(node.Spec.Taints, ip.String())
if updateTaints != nil {
node.Spec.Taints = updateTaints
}
if err := driver.Update(context.TODO(), &node); err != nil {
return k8sv1.Node{}, err
}
}
return node, nil
}

func NewCreateTaints() *Taint {
return &Taint{TaintList: make(map[string]*taintList)}
}
3 changes: 2 additions & 1 deletion types/api/v2/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Host struct {
//overwrite SSH config
SSH v1.SSH `json:"ssh,omitempty"`
//overwrite env
Env []string `json:"env,omitempty"`
Env []string `json:"env,omitempty"`
Taints []string `json:"taints,omitempty"`
}

// HostAlias holds the mapping between IP and hostnames that will be injected as an entry in the
Expand Down

0 comments on commit d35a665

Please sign in to comment.