Skip to content

Commit

Permalink
Refactor cloudinstancegroupmember in a more independent cloud instanc…
Browse files Browse the repository at this point in the history
…e representation

Apply suggestions from code review

Co-authored-by: John Gardiner Myers <[email protected]>
  • Loading branch information
Ole Markus With and johngmyers committed Aug 30, 2020
1 parent 4d7632a commit 0ec7168
Show file tree
Hide file tree
Showing 21 changed files with 214 additions and 126 deletions.
4 changes: 2 additions & 2 deletions cmd/kops/delete_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ func RunDeleteInstance(ctx context.Context, f *util.Factory, out io.Writer, opti
return d.UpdateSingleInstance(ctx, cloudMember, options.Surge)
}

func deleteNodeMatch(cloudMember *cloudinstances.CloudInstanceGroupMember, options *deleteInstanceOptions) bool {
func deleteNodeMatch(cloudMember *cloudinstances.CloudInstance, options *deleteInstanceOptions) bool {
return cloudMember.ID == options.InstanceID ||
(!options.CloudOnly && cloudMember.Node != nil && cloudMember.Node.Name == options.InstanceID)
}

func findDeletionNode(groups map[string]*cloudinstances.CloudInstanceGroup, options *deleteInstanceOptions) *cloudinstances.CloudInstanceGroupMember {
func findDeletionNode(groups map[string]*cloudinstances.CloudInstanceGroup, options *deleteInstanceOptions) *cloudinstances.CloudInstance {
for _, group := range groups {
for _, r := range group.Ready {
if deleteNodeMatch(r, options) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/cloudinstances/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["cloud_instance_group.go"],
srcs = [
"cloud_instance.go",
"cloud_instance_group.go",
],
importpath = "k8s.io/kops/pkg/cloudinstances",
visibility = ["//visibility:public"],
deps = [
Expand Down
46 changes: 46 additions & 0 deletions pkg/cloudinstances/cloud_instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloudinstances

import v1 "k8s.io/api/core/v1"

// CloudInstanceStatusDetached means the instance needs update and has been detached.
const CloudInstanceStatusDetached = "Detached"

// CloudInstanceStatusNeedsUpdate means the instance has joined the cluster, is not detached, and needs to be updated.
const CloudInstanceStatusNeedsUpdate = "NeedsUpdate"

// CloudInstanceStatusReady means the instance has joined the cluster, is not detached, and is up to date.
const CloudInstanceStatusUpToDate = "UpToDate"

// CloudInstance describes an instance in a CloudInstanceGroup group.
type CloudInstance struct {
// ID is a unique identifier for the instance, meaningful to the cloud
ID string
// Node is the associated k8s instance, if it is known
Node *v1.Node
// CloudInstanceGroup is the managing CloudInstanceGroup
CloudInstanceGroup *CloudInstanceGroup
// Status indicates if the instance has joined the cluster and if it needs any updates.
Status string
// Roles are the roles the instance have.
Roles []string
// MachineType is the hardware resource class of the instance.
MachineType string
// Private IP is the private ip address of the instance.
PrivateIP string
}
50 changes: 9 additions & 41 deletions pkg/cloudinstances/cloud_instance_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type CloudInstanceGroup struct {
// HumanName is a user-friendly name for the group
HumanName string
InstanceGroup *kopsapi.InstanceGroup
Ready []*CloudInstanceGroupMember
NeedUpdate []*CloudInstanceGroupMember
Ready []*CloudInstance
NeedUpdate []*CloudInstance
MinSize int
TargetSize int
MaxSize int
Expand All @@ -40,63 +40,31 @@ type CloudInstanceGroup struct {
Raw interface{}
}

// CloudInstanceGroupMember describes an instance in a CloudInstanceGroup group.
type CloudInstanceGroupMember struct {
// ID is a unique identifier for the instance, meaningful to the cloud
ID string
// Node is the associated k8s instance, if it is known
Node *v1.Node
// CloudInstanceGroup is the managing CloudInstanceGroup
CloudInstanceGroup *CloudInstanceGroup
// Detached is whether fi.Cloud.DetachInstance has been successfully called on the instance.
Detached bool
}

// NewCloudInstanceGroupMember creates a new CloudInstanceGroupMember
func (c *CloudInstanceGroup) NewCloudInstanceGroupMember(instanceId string, newGroupName string, currentGroupName string, nodeMap map[string]*v1.Node) error {
// NewCloudInstance creates a new CloudInstance
func (c *CloudInstanceGroup) NewCloudInstance(instanceId string, status string, nodeMap map[string]*v1.Node) (*CloudInstance, error) {
if instanceId == "" {
return fmt.Errorf("instance id for cloud instance member cannot be empty")
return nil, fmt.Errorf("instance id for cloud instance member cannot be empty")
}
cm := &CloudInstanceGroupMember{
cm := &CloudInstance{
ID: instanceId,
CloudInstanceGroup: c,
}
node := nodeMap[instanceId]
if node != nil {
cm.Node = node
} else {
klog.V(8).Infof("unable to find node for instance: %s", instanceId)
}

if newGroupName == currentGroupName {
if status == CloudInstanceStatusUpToDate {
c.Ready = append(c.Ready, cm)
} else {
c.NeedUpdate = append(c.NeedUpdate, cm)
}

return nil
}
cm.Status = status

// NewDetachedCloudInstanceGroupMember creates a new CloudInstanceGroupMember for a detached instance
func (c *CloudInstanceGroup) NewDetachedCloudInstanceGroupMember(instanceId string, nodeMap map[string]*v1.Node) error {
if instanceId == "" {
return fmt.Errorf("instance id for cloud instance member cannot be empty")
}
cm := &CloudInstanceGroupMember{
ID: instanceId,
CloudInstanceGroup: c,
Detached: true,
}
node := nodeMap[instanceId]
if node != nil {
cm.Node = node
} else {
klog.V(8).Infof("unable to find node for instance: %s", instanceId)
}

c.NeedUpdate = append(c.NeedUpdate, cm)

return nil
return cm, nil
}

// Status returns a human-readable Status indicating whether an update is needed
Expand Down
24 changes: 12 additions & 12 deletions pkg/instancegroups/instancegroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c
if maxSurge > 0 && !c.CloudOnly {
for numSurge := 1; numSurge <= maxSurge; numSurge++ {
u := update[len(update)-numSurge]
if !u.Detached {
if u.Status != cloudinstances.CloudInstanceStatusDetached {
if err := c.detachInstance(u); err != nil {
return err
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c
terminateChan := make(chan error, maxConcurrency)

for uIdx, u := range update {
go func(m *cloudinstances.CloudInstanceGroupMember) {
go func(m *cloudinstances.CloudInstance) {
terminateChan <- c.drainTerminateAndWait(ctx, m, sleepAfterTerminate)
}(u)
runningDrains++
Expand Down Expand Up @@ -233,15 +233,15 @@ func (c *RollingUpdateCluster) rollingUpdateInstanceGroup(ctx context.Context, c
return nil
}

func prioritizeUpdate(update []*cloudinstances.CloudInstanceGroupMember) []*cloudinstances.CloudInstanceGroupMember {
func prioritizeUpdate(update []*cloudinstances.CloudInstance) []*cloudinstances.CloudInstance {
// The priorities are, in order:
// attached before detached
// TODO unhealthy before healthy
// NeedUpdate before Ready (preserve original order)
result := make([]*cloudinstances.CloudInstanceGroupMember, 0, len(update))
var detached []*cloudinstances.CloudInstanceGroupMember
result := make([]*cloudinstances.CloudInstance, 0, len(update))
var detached []*cloudinstances.CloudInstance
for _, u := range update {
if u.Detached {
if u.Status == cloudinstances.CloudInstanceStatusDetached {
detached = append(detached, u)
} else {
result = append(result, u)
Expand All @@ -260,7 +260,7 @@ func waitForPendingBeforeReturningError(runningDrains int, terminateChan chan er
return err
}

func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstanceGroupMember) error {
func (c *RollingUpdateCluster) taintAllNeedUpdate(ctx context.Context, group *cloudinstances.CloudInstanceGroup, update []*cloudinstances.CloudInstance) error {
var toTaint []*corev1.Node
for _, u := range update {
if u.Node != nil && !u.Node.Spec.Unschedulable {
Expand Down Expand Up @@ -321,7 +321,7 @@ func (c *RollingUpdateCluster) patchTaint(ctx context.Context, node *corev1.Node
return err
}

func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstanceGroupMember, sleepAfterTerminate time.Duration) error {
func (c *RollingUpdateCluster) drainTerminateAndWait(ctx context.Context, u *cloudinstances.CloudInstance, sleepAfterTerminate time.Duration) error {
instanceID := u.ID

nodeName := ""
Expand Down Expand Up @@ -454,7 +454,7 @@ func (c *RollingUpdateCluster) validateClusterWithTimeout(validateCount int) err
}

// detachInstance detaches a Cloud Instance
func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstanceGroupMember) error {
func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstance) error {
id := u.ID
nodeName := ""
if u.Node != nil {
Expand All @@ -477,7 +477,7 @@ func (c *RollingUpdateCluster) detachInstance(u *cloudinstances.CloudInstanceGro
}

// deleteInstance deletes an Cloud Instance.
func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstanceGroupMember) error {
func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstance) error {
id := u.ID
nodeName := ""
if u.Node != nil {
Expand All @@ -500,7 +500,7 @@ func (c *RollingUpdateCluster) deleteInstance(u *cloudinstances.CloudInstanceGro
}

// drainNode drains a K8s node.
func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstanceGroupMember) error {
func (c *RollingUpdateCluster) drainNode(u *cloudinstances.CloudInstance) error {
if c.K8sClient == nil {
return fmt.Errorf("K8sClient not set")
}
Expand Down Expand Up @@ -566,7 +566,7 @@ func (c *RollingUpdateCluster) deleteNode(ctx context.Context, node *corev1.Node
}

// UpdateSingeInstance performs a rolling update on a single instance
func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstanceGroupMember, detach bool) error {
func (c *RollingUpdateCluster) UpdateSingleInstance(ctx context.Context, cloudMember *cloudinstances.CloudInstance, detach bool) error {
if detach {
if cloudMember.CloudInstanceGroup.InstanceGroup.IsMaster() {
klog.Warning("cannot detach master instances. Assuming --surge=false")
Expand Down
3 changes: 2 additions & 1 deletion pkg/instancegroups/rollingupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type RollingUpdateCluster struct {
func (c *RollingUpdateCluster) AdjustNeedUpdate(groups map[string]*cloudinstances.CloudInstanceGroup, cluster *api.Cluster, instanceGroups *api.InstanceGroupList) error {
for _, group := range groups {
if group.Ready != nil {
var newReady []*cloudinstances.CloudInstanceGroupMember
var newReady []*cloudinstances.CloudInstance
for _, member := range group.Ready {
makeNotReady := false
if member.Node != nil && member.Node.Annotations != nil {
Expand All @@ -89,6 +89,7 @@ func (c *RollingUpdateCluster) AdjustNeedUpdate(groups map[string]*cloudinstance

if makeNotReady {
group.NeedUpdate = append(group.NeedUpdate, member)
member.Status = cloudinstances.CloudInstanceStatusNeedsUpdate
} else {
newReady = append(newReady, member)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/instancegroups/rollingupdate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func makeGroup(groups map[string]*cloudinstances.CloudInstanceGroup, k8sClient k
}
_ = fakeClient.Tracker().Add(node)
}
member := cloudinstances.CloudInstanceGroupMember{
member := cloudinstances.CloudInstance{
ID: id,
Node: node,
CloudInstanceGroup: groups[name],
Expand Down Expand Up @@ -1371,7 +1371,7 @@ func TestRollingUpdateMaxSurgeAllNeedUpdateOneAlreadyDetached(t *testing.T) {
groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 4, 4)
alreadyDetachedTest.detached[groups["node-1"].NeedUpdate[3].ID] = true
groups["node-1"].NeedUpdate[3].Detached = true
groups["node-1"].NeedUpdate[3].Status = cloudinstances.CloudInstanceStatusDetached
err := c.RollingUpdate(ctx, groups, cluster, &kopsapi.InstanceGroupList{})
assert.NoError(t, err, "rolling update")

Expand All @@ -1397,8 +1397,13 @@ func TestRollingUpdateMaxSurgeAllNeedUpdateMaxAlreadyDetached(t *testing.T) {

groups := make(map[string]*cloudinstances.CloudInstanceGroup)
makeGroup(groups, c.K8sClient, cloud, "node-1", kopsapi.InstanceGroupRoleNode, 7, 7)
groups["node-1"].NeedUpdate[1].Detached = true
groups["node-1"].NeedUpdate[3].Detached = true
groups["node-1"].NeedUpdate[0].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
groups["node-1"].NeedUpdate[1].Status = cloudinstances.CloudInstanceStatusDetached
groups["node-1"].NeedUpdate[2].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
groups["node-1"].NeedUpdate[3].Status = cloudinstances.CloudInstanceStatusDetached
groups["node-1"].NeedUpdate[4].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
groups["node-1"].NeedUpdate[5].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
groups["node-1"].NeedUpdate[6].Status = cloudinstances.CloudInstanceStatusNeedsUpdate
// TODO verify those are the last two instances terminated

err := c.RollingUpdate(ctx, groups, cluster, &kopsapi.InstanceGroupList{})
Expand Down
8 changes: 4 additions & 4 deletions pkg/resources/digitalocean/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ func (c *Cloud) DeleteGroup(g *cloudinstances.CloudInstanceGroup) error {
}

// DeleteInstance is not implemented yet, is func needs to delete a DO instance.
func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *Cloud) DeleteInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("digitalocean cloud provider DeleteInstance not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support deleting cloud instances at this time")
}

// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstanceGroupMember) error {
func (c *Cloud) DetachInstance(i *cloudinstances.CloudInstance) error {
klog.V(8).Info("digitalocean cloud provider DetachInstance not implemented yet")
return fmt.Errorf("digital ocean cloud provider does not support surging")
}
Expand Down Expand Up @@ -407,8 +407,8 @@ func buildCloudInstanceGroup(c *Cloud, ig *kops.InstanceGroup, g DOInstanceGroup

for _, member := range g.Members {

// TODO use a hash of the godo.DropletCreateRequest fields for second and third parameters.
err := cg.NewCloudInstanceGroupMember(member, g.GroupType, g.GroupType, nodeMap)
// TODO use a hash of the godo.DropletCreateRequest fields to calculate the second parameter.
_, err := cg.NewCloudInstance(member, cloudinstances.CloudInstanceStatusUpToDate, nodeMap)
if err != nil {
return nil, fmt.Errorf("error creating cloud instance group member: %v", err)
}
Expand Down
18 changes: 11 additions & 7 deletions pkg/resources/spotinst/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func DeleteInstanceGroup(cloud Cloud, group *cloudinstances.CloudInstanceGroup)
}

// DeleteInstance removes an instance from its instance group.
func DeleteInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMember) error {
func DeleteInstance(cloud Cloud, instance *cloudinstances.CloudInstance) error {
klog.V(2).Infof("Detaching instance %q from instance group: %q",
instance.ID, instance.CloudInstanceGroup.HumanName)

Expand Down Expand Up @@ -194,7 +194,7 @@ func DeleteInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMemb
}

// DetachInstance is not implemented yet. It needs to cause a cloud instance to no longer be counted against the group's size limits.
func DetachInstance(cloud Cloud, instance *cloudinstances.CloudInstanceGroupMember) error {
func DetachInstance(cloud Cloud, instance *cloudinstances.CloudInstance) error {
return fmt.Errorf("spotinst does not support surging")
}

Expand Down Expand Up @@ -299,7 +299,7 @@ func buildCloudInstanceGroupFromInstanceGroup(cloud Cloud, ig *kops.InstanceGrou
}

// Register all instances as group members.
if err := registerCloudInstanceGroupMembers(instanceGroup, nodeMap,
if err := registerCloudInstances(instanceGroup, nodeMap,
instances, group.Name(), group.UpdatedAt()); err != nil {
return nil, err
}
Expand Down Expand Up @@ -332,15 +332,15 @@ func buildCloudInstanceGroupFromLaunchSpec(cloud Cloud, ig *kops.InstanceGroup,
}

// Register all instances as group members.
if err := registerCloudInstanceGroupMembers(instanceGroup, nodeMap,
if err := registerCloudInstances(instanceGroup, nodeMap,
instances, spec.Name(), spec.UpdatedAt()); err != nil {
return nil, err
}

return instanceGroup, nil
}

func registerCloudInstanceGroupMembers(instanceGroup *cloudinstances.CloudInstanceGroup, nodeMap map[string]*v1.Node,
func registerCloudInstances(instanceGroup *cloudinstances.CloudInstanceGroup, nodeMap map[string]*v1.Node,
instances []Instance, currentInstanceGroupName string, instanceGroupUpdatedAt time.Time) error {

// The instance registration below registers all active instances with
Expand Down Expand Up @@ -378,8 +378,12 @@ func registerCloudInstanceGroupMembers(instanceGroup *cloudinstances.CloudInstan
instance.Id(), instance.CreatedAt().Format(time.RFC3339),
currentInstanceGroupName, instanceGroupUpdatedAt.Format(time.RFC3339))

if err := instanceGroup.NewCloudInstanceGroupMember(
instance.Id(), newInstanceGroupName, currentInstanceGroupName, nodeMap); err != nil {
status := cloudinstances.CloudInstanceStatusUpToDate
if newInstanceGroupName != currentInstanceGroupName {
status = cloudinstances.CloudInstanceStatusNeedsUpdate
}
if _, err := instanceGroup.NewCloudInstance(
instance.Id(), status, nodeMap); err != nil {
return fmt.Errorf("error creating cloud instance group member: %v", err)
}
}
Expand Down
Loading

0 comments on commit 0ec7168

Please sign in to comment.