Skip to content

Commit

Permalink
Add initialization phase to Kubernetes router
Browse files Browse the repository at this point in the history
Create Kubernetes services before deployments because Envoy's readiness depends on existing ClusterIPs
  • Loading branch information
stefanprodan committed Nov 27, 2019
1 parent b02a6da commit 8766523
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 304 deletions.
29 changes: 15 additions & 14 deletions pkg/canary/controller.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package canary

import (
"github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3"
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3"
)

type Controller interface {
IsPrimaryReady(canary *v1alpha3.Canary) (bool, error)
IsCanaryReady(canary *v1alpha3.Canary) (bool, error)
SyncStatus(canary *v1alpha3.Canary, status v1alpha3.CanaryStatus) error
SetStatusFailedChecks(canary *v1alpha3.Canary, val int) error
SetStatusWeight(canary *v1alpha3.Canary, val int) error
SetStatusIterations(canary *v1alpha3.Canary, val int) error
SetStatusPhase(canary *v1alpha3.Canary, phase v1alpha3.CanaryPhase) error
Initialize(canary *v1alpha3.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error)
Promote(canary *v1alpha3.Canary) error
HasTargetChanged(canary *v1alpha3.Canary) (bool, error)
HaveDependenciesChanged(canary *v1alpha3.Canary) (bool, error)
Scale(canary *v1alpha3.Canary, replicas int32) error
ScaleFromZero(canary *v1alpha3.Canary) error
IsPrimaryReady(canary *flaggerv1.Canary) (bool, error)
IsCanaryReady(canary *flaggerv1.Canary) (bool, error)
GetMetadata(canary *flaggerv1.Canary) (string, map[string]int32, error)
SyncStatus(canary *flaggerv1.Canary, status flaggerv1.CanaryStatus) error
SetStatusFailedChecks(canary *flaggerv1.Canary, val int) error
SetStatusWeight(canary *flaggerv1.Canary, val int) error
SetStatusIterations(canary *flaggerv1.Canary, val int) error
SetStatusPhase(canary *flaggerv1.Canary, phase flaggerv1.CanaryPhase) error
Initialize(canary *flaggerv1.Canary, skipLivenessChecks bool) error
Promote(canary *flaggerv1.Canary) error
HasTargetChanged(canary *flaggerv1.Canary) (bool, error)
HaveDependenciesChanged(canary *flaggerv1.Canary) (bool, error)
Scale(canary *flaggerv1.Canary, replicas int32) error
ScaleFromZero(canary *flaggerv1.Canary) error
}
48 changes: 34 additions & 14 deletions pkg/canary/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,34 @@ type DeploymentController struct {

// Initialize creates the primary deployment, hpa,
// scales to zero the canary deployment and returns the pod selector label and container ports
func (c *DeploymentController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) {
func (c *DeploymentController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (err error) {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)

label, ports, err = c.createPrimaryDeployment(cd)
err = c.createPrimaryDeployment(cd)
if err != nil {
return "", ports, fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err)
return fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err)
}

if cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing {
if !skipLivenessChecks && !cd.Spec.SkipAnalysis {
_, readyErr := c.IsPrimaryReady(cd)
if readyErr != nil {
return "", ports, readyErr
return readyErr
}
}

c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)
if err := c.Scale(cd, 0); err != nil {
return "", ports, err
return err
}
}

if cd.Spec.AutoscalerRef != nil && cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" {
if err := c.reconcilePrimaryHpa(cd, true); err != nil {
return "", ports, fmt.Errorf("creating HorizontalPodAutoscaler %s.%s failed: %v", primaryName, cd.Namespace, err)
return fmt.Errorf("creating HorizontalPodAutoscaler %s.%s failed: %v", primaryName, cd.Namespace, err)
}
}
return label, ports, nil
return nil
}

// Promote copies the pod spec, secrets and config maps from canary to primary
Expand Down Expand Up @@ -191,9 +191,9 @@ func (c *DeploymentController) ScaleFromZero(cd *flaggerv1.Canary) error {
return nil
}

func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[string]int32, error) {
// GetMetadata returns the pod label selector and svc ports
func (c *DeploymentController) GetMetadata(cd *flaggerv1.Canary) (string, map[string]int32, error) {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)

canaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
Expand All @@ -218,19 +218,39 @@ func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) (st
ports = p
}

return label, ports, nil
}
func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)

canaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found, retrying", targetName, cd.Namespace)
}
return err
}

label, err := c.getSelectorLabel(canaryDep)
if err != nil {
return fmt.Errorf("invalid label selector! Deployment %s.%s spec.selector.matchLabels must contain selector 'app: %s'",
targetName, cd.Namespace, targetName)
}

primaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
if errors.IsNotFound(err) {
// create primary secrets and config maps
configRefs, err := c.configTracker.GetTargetConfigs(cd)
if err != nil {
return "", nil, err
return err
}
if err := c.configTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
return "", nil, err
return err
}
annotations, err := c.makeAnnotations(canaryDep.Spec.Template.Annotations)
if err != nil {
return "", nil, err
return err
}

replicas := int32(1)
Expand Down Expand Up @@ -278,13 +298,13 @@ func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) (st

_, err = c.kubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep)
if err != nil {
return "", nil, err
return err
}

c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace)
}

return label, ports, nil
return nil
}

func (c *DeploymentController) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
Expand Down
16 changes: 8 additions & 8 deletions pkg/canary/deployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestCanaryDeployer_Sync(t *testing.T) {
mocks := SetupMocks()
_, _, err := mocks.deployer.Initialize(mocks.canary, true)
err := mocks.deployer.Initialize(mocks.canary, true)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestCanaryDeployer_Sync(t *testing.T) {

func TestCanaryDeployer_IsNewSpec(t *testing.T) {
mocks := SetupMocks()
_, _, err := mocks.deployer.Initialize(mocks.canary, true)
err := mocks.deployer.Initialize(mocks.canary, true)
if err != nil {
t.Fatal(err.Error())
}
Expand All @@ -119,7 +119,7 @@ func TestCanaryDeployer_IsNewSpec(t *testing.T) {

func TestCanaryDeployer_Promote(t *testing.T) {
mocks := SetupMocks()
_, _, err := mocks.deployer.Initialize(mocks.canary, true)
err := mocks.deployer.Initialize(mocks.canary, true)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestCanaryDeployer_Promote(t *testing.T) {

func TestCanaryDeployer_IsReady(t *testing.T) {
mocks := SetupMocks()
_, _, err := mocks.deployer.Initialize(mocks.canary, true)
err := mocks.deployer.Initialize(mocks.canary, true)
if err != nil {
t.Error("Expected primary readiness check to fail")
}
Expand All @@ -203,7 +203,7 @@ func TestCanaryDeployer_IsReady(t *testing.T) {

func TestCanaryDeployer_SetFailedChecks(t *testing.T) {
mocks := SetupMocks()
_, _, err := mocks.deployer.Initialize(mocks.canary, true)
err := mocks.deployer.Initialize(mocks.canary, true)
if err != nil {
t.Fatal(err.Error())
}
Expand All @@ -225,7 +225,7 @@ func TestCanaryDeployer_SetFailedChecks(t *testing.T) {

func TestCanaryDeployer_SetState(t *testing.T) {
mocks := SetupMocks()
_, _, err := mocks.deployer.Initialize(mocks.canary, true)
err := mocks.deployer.Initialize(mocks.canary, true)
if err != nil {
t.Fatal(err.Error())
}
Expand All @@ -247,7 +247,7 @@ func TestCanaryDeployer_SetState(t *testing.T) {

func TestCanaryDeployer_SyncStatus(t *testing.T) {
mocks := SetupMocks()
_, _, err := mocks.deployer.Initialize(mocks.canary, true)
err := mocks.deployer.Initialize(mocks.canary, true)
if err != nil {
t.Fatal(err.Error())
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestCanaryDeployer_SyncStatus(t *testing.T) {

func TestCanaryDeployer_Scale(t *testing.T) {
mocks := SetupMocks()
_, _, err := mocks.deployer.Initialize(mocks.canary, true)
err := mocks.deployer.Initialize(mocks.canary, true)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/canary/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,32 +42,35 @@ func (c *ServiceController) SetStatusPhase(cd *flaggerv1.Canary, phase flaggerv1
return setStatusPhase(c.flaggerClient, cd, phase)
}

var _ Controller = &ServiceController{}
// GetMetadata returns the pod label selector and svc ports
func (c *ServiceController) GetMetadata(cd *flaggerv1.Canary) (string, map[string]int32, error) {
return "", nil, nil
}

// Initialize creates or updates the primary and canary services to prepare for the canary release process targeted on the K8s service
func (c *ServiceController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) {
func (c *ServiceController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (err error) {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", targetName)
canaryName := fmt.Sprintf("%s-canary", targetName)

svc, err := c.kubeClient.CoreV1().Services(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
return "", nil, err
return err
}

// canary svc
err = c.reconcileCanaryService(cd, canaryName, svc)
if err != nil {
return "", nil, err
return err
}

// primary svc
err = c.reconcilePrimaryService(cd, primaryName, svc)
if err != nil {
return "", nil, err
return err
}

return "", nil, nil
return nil
}

func (c *ServiceController) reconcileCanaryService(canary *flaggerv1.Canary, name string, src *corev1.Service) error {
Expand Down
Loading

0 comments on commit 8766523

Please sign in to comment.