From 876652327922faa83cb1bd7dc9214aaec9e4691c Mon Sep 17 00:00:00 2001 From: stefanprodan Date: Wed, 27 Nov 2019 22:15:04 +0200 Subject: [PATCH] Add initialization phase to Kubernetes router Create Kubernetes services before deployments because Envoy's readiness depends on existing ClusterIPs --- pkg/canary/controller.go | 29 +- pkg/canary/deployment_controller.go | 48 ++- pkg/canary/deployment_controller_test.go | 16 +- pkg/canary/service_controller.go | 15 +- pkg/controller/scheduler.go | 426 ++++++++++++----------- pkg/router/kubernetes.go | 30 +- pkg/router/kubernetes_deployment.go | 45 +-- pkg/router/kubernetes_deployment_test.go | 29 +- pkg/router/kubernetes_noop.go | 4 + 9 files changed, 338 insertions(+), 304 deletions(-) diff --git a/pkg/canary/controller.go b/pkg/canary/controller.go index f6e286c59..afdd7d19c 100644 --- a/pkg/canary/controller.go +++ b/pkg/canary/controller.go @@ -1,21 +1,22 @@ package canary import ( - "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" ) type Controller interface { - IsPrimaryReady(canary *v1alpha3.Canary) (bool, error) - IsCanaryReady(canary *v1alpha3.Canary) (bool, error) - SyncStatus(canary *v1alpha3.Canary, status v1alpha3.CanaryStatus) error - SetStatusFailedChecks(canary *v1alpha3.Canary, val int) error - SetStatusWeight(canary *v1alpha3.Canary, val int) error - SetStatusIterations(canary *v1alpha3.Canary, val int) error - SetStatusPhase(canary *v1alpha3.Canary, phase v1alpha3.CanaryPhase) error - Initialize(canary *v1alpha3.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) - Promote(canary *v1alpha3.Canary) error - HasTargetChanged(canary *v1alpha3.Canary) (bool, error) - HaveDependenciesChanged(canary *v1alpha3.Canary) (bool, error) - Scale(canary *v1alpha3.Canary, replicas int32) error - ScaleFromZero(canary *v1alpha3.Canary) error + IsPrimaryReady(canary *flaggerv1.Canary) (bool, error) + IsCanaryReady(canary *flaggerv1.Canary) (bool, error) + GetMetadata(canary *flaggerv1.Canary) (string, map[string]int32, error) + SyncStatus(canary *flaggerv1.Canary, status flaggerv1.CanaryStatus) error + SetStatusFailedChecks(canary *flaggerv1.Canary, val int) error + SetStatusWeight(canary *flaggerv1.Canary, val int) error + SetStatusIterations(canary *flaggerv1.Canary, val int) error + SetStatusPhase(canary *flaggerv1.Canary, phase flaggerv1.CanaryPhase) error + Initialize(canary *flaggerv1.Canary, skipLivenessChecks bool) error + Promote(canary *flaggerv1.Canary) error + HasTargetChanged(canary *flaggerv1.Canary) (bool, error) + HaveDependenciesChanged(canary *flaggerv1.Canary) (bool, error) + Scale(canary *flaggerv1.Canary, replicas int32) error + ScaleFromZero(canary *flaggerv1.Canary) error } diff --git a/pkg/canary/deployment_controller.go b/pkg/canary/deployment_controller.go index 580d4899f..f730c2145 100644 --- a/pkg/canary/deployment_controller.go +++ b/pkg/canary/deployment_controller.go @@ -31,34 +31,34 @@ type DeploymentController struct { // Initialize creates the primary deployment, hpa, // scales to zero the canary deployment and returns the pod selector label and container ports -func (c *DeploymentController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) { +func (c *DeploymentController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (err error) { primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) - label, ports, err = c.createPrimaryDeployment(cd) + err = c.createPrimaryDeployment(cd) if err != nil { - return "", ports, fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err) + return fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err) } if cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing { if !skipLivenessChecks && !cd.Spec.SkipAnalysis { _, readyErr := c.IsPrimaryReady(cd) if readyErr != nil { - return "", ports, readyErr + return readyErr } } c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) if err := c.Scale(cd, 0); err != nil { - return "", ports, err + return err } } if cd.Spec.AutoscalerRef != nil && cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" { if err := c.reconcilePrimaryHpa(cd, true); err != nil { - return "", ports, fmt.Errorf("creating HorizontalPodAutoscaler %s.%s failed: %v", primaryName, cd.Namespace, err) + return fmt.Errorf("creating HorizontalPodAutoscaler %s.%s failed: %v", primaryName, cd.Namespace, err) } } - return label, ports, nil + return nil } // Promote copies the pod spec, secrets and config maps from canary to primary @@ -191,9 +191,9 @@ func (c *DeploymentController) ScaleFromZero(cd *flaggerv1.Canary) error { return nil } -func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[string]int32, error) { +// GetMetadata returns the pod label selector and svc ports +func (c *DeploymentController) GetMetadata(cd *flaggerv1.Canary) (string, map[string]int32, error) { targetName := cd.Spec.TargetRef.Name - primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) canaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) if err != nil { @@ -218,19 +218,39 @@ func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) (st ports = p } + return label, ports, nil +} +func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) error { + targetName := cd.Spec.TargetRef.Name + primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) + + canaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return fmt.Errorf("deployment %s.%s not found, retrying", targetName, cd.Namespace) + } + return err + } + + label, err := c.getSelectorLabel(canaryDep) + if err != nil { + return fmt.Errorf("invalid label selector! Deployment %s.%s spec.selector.matchLabels must contain selector 'app: %s'", + targetName, cd.Namespace, targetName) + } + primaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{}) if errors.IsNotFound(err) { // create primary secrets and config maps configRefs, err := c.configTracker.GetTargetConfigs(cd) if err != nil { - return "", nil, err + return err } if err := c.configTracker.CreatePrimaryConfigs(cd, configRefs); err != nil { - return "", nil, err + return err } annotations, err := c.makeAnnotations(canaryDep.Spec.Template.Annotations) if err != nil { - return "", nil, err + return err } replicas := int32(1) @@ -278,13 +298,13 @@ func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) (st _, err = c.kubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep) if err != nil { - return "", nil, err + return err } c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace) } - return label, ports, nil + return nil } func (c *DeploymentController) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error { diff --git a/pkg/canary/deployment_controller_test.go b/pkg/canary/deployment_controller_test.go index 66914f31f..d143a5b03 100644 --- a/pkg/canary/deployment_controller_test.go +++ b/pkg/canary/deployment_controller_test.go @@ -10,7 +10,7 @@ import ( func TestCanaryDeployer_Sync(t *testing.T) { mocks := SetupMocks() - _, _, err := mocks.deployer.Initialize(mocks.canary, true) + err := mocks.deployer.Initialize(mocks.canary, true) if err != nil { t.Fatal(err.Error()) } @@ -96,7 +96,7 @@ func TestCanaryDeployer_Sync(t *testing.T) { func TestCanaryDeployer_IsNewSpec(t *testing.T) { mocks := SetupMocks() - _, _, err := mocks.deployer.Initialize(mocks.canary, true) + err := mocks.deployer.Initialize(mocks.canary, true) if err != nil { t.Fatal(err.Error()) } @@ -119,7 +119,7 @@ func TestCanaryDeployer_IsNewSpec(t *testing.T) { func TestCanaryDeployer_Promote(t *testing.T) { mocks := SetupMocks() - _, _, err := mocks.deployer.Initialize(mocks.canary, true) + err := mocks.deployer.Initialize(mocks.canary, true) if err != nil { t.Fatal(err.Error()) } @@ -185,7 +185,7 @@ func TestCanaryDeployer_Promote(t *testing.T) { func TestCanaryDeployer_IsReady(t *testing.T) { mocks := SetupMocks() - _, _, err := mocks.deployer.Initialize(mocks.canary, true) + err := mocks.deployer.Initialize(mocks.canary, true) if err != nil { t.Error("Expected primary readiness check to fail") } @@ -203,7 +203,7 @@ func TestCanaryDeployer_IsReady(t *testing.T) { func TestCanaryDeployer_SetFailedChecks(t *testing.T) { mocks := SetupMocks() - _, _, err := mocks.deployer.Initialize(mocks.canary, true) + err := mocks.deployer.Initialize(mocks.canary, true) if err != nil { t.Fatal(err.Error()) } @@ -225,7 +225,7 @@ func TestCanaryDeployer_SetFailedChecks(t *testing.T) { func TestCanaryDeployer_SetState(t *testing.T) { mocks := SetupMocks() - _, _, err := mocks.deployer.Initialize(mocks.canary, true) + err := mocks.deployer.Initialize(mocks.canary, true) if err != nil { t.Fatal(err.Error()) } @@ -247,7 +247,7 @@ func TestCanaryDeployer_SetState(t *testing.T) { func TestCanaryDeployer_SyncStatus(t *testing.T) { mocks := SetupMocks() - _, _, err := mocks.deployer.Initialize(mocks.canary, true) + err := mocks.deployer.Initialize(mocks.canary, true) if err != nil { t.Fatal(err.Error()) } @@ -286,7 +286,7 @@ func TestCanaryDeployer_SyncStatus(t *testing.T) { func TestCanaryDeployer_Scale(t *testing.T) { mocks := SetupMocks() - _, _, err := mocks.deployer.Initialize(mocks.canary, true) + err := mocks.deployer.Initialize(mocks.canary, true) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/canary/service_controller.go b/pkg/canary/service_controller.go index bd8a505b8..c6f4b015c 100644 --- a/pkg/canary/service_controller.go +++ b/pkg/canary/service_controller.go @@ -42,32 +42,35 @@ func (c *ServiceController) SetStatusPhase(cd *flaggerv1.Canary, phase flaggerv1 return setStatusPhase(c.flaggerClient, cd, phase) } -var _ Controller = &ServiceController{} +// GetMetadata returns the pod label selector and svc ports +func (c *ServiceController) GetMetadata(cd *flaggerv1.Canary) (string, map[string]int32, error) { + return "", nil, nil +} // Initialize creates or updates the primary and canary services to prepare for the canary release process targeted on the K8s service -func (c *ServiceController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) { +func (c *ServiceController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (err error) { targetName := cd.Spec.TargetRef.Name primaryName := fmt.Sprintf("%s-primary", targetName) canaryName := fmt.Sprintf("%s-canary", targetName) svc, err := c.kubeClient.CoreV1().Services(cd.Namespace).Get(targetName, metav1.GetOptions{}) if err != nil { - return "", nil, err + return err } // canary svc err = c.reconcileCanaryService(cd, canaryName, svc) if err != nil { - return "", nil, err + return err } // primary svc err = c.reconcilePrimaryService(cd, primaryName, svc) if err != nil { - return "", nil, err + return err } - return "", nil, nil + return nil } func (c *ServiceController) reconcileCanaryService(canary *flaggerv1.Canary, name string, src *corev1.Service) error { diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 25cdabb6c..dcaccf0d7 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -90,8 +90,6 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh return } - primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) - // override the global provider if one is specified in the canary spec provider := c.meshProvider if cd.Spec.Provider != "" { @@ -100,35 +98,42 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh // init controller based on target kind canaryController := c.canaryFactory.Controller(cd.Spec.TargetRef.Kind) + labelSelector, ports, err := canaryController.GetMetadata(cd) + if err != nil { + c.recordEventWarningf(cd, "%v", err) + return + } - // create primary deployment and hpa if needed - // skip primary check for Istio since the deployment will become ready after the ClusterIP are created - skipPrimaryCheck := false - if skipLivenessChecks || strings.Contains(provider, "istio") || strings.Contains(provider, "appmesh") { - skipPrimaryCheck = true + // init Kubernetes router + router := c.routerFactory.KubernetesRouter(cd.Spec.TargetRef.Kind, labelSelector, map[string]string{}, ports) + if err := router.Initialize(cd); err != nil { + c.recordEventWarningf(cd, "%v", err) + return } - labelSelector, ports, err := canaryController.Initialize(cd, skipPrimaryCheck) + + // create primary deployment and hpa + err = canaryController.Initialize(cd, skipLivenessChecks) if err != nil { c.recordEventWarningf(cd, "%v", err) return } - // init routers + // init mesh router meshRouter := c.routerFactory.MeshRouter(provider) - // create or update ClusterIP services - if err := c.routerFactory.KubernetesRouter(cd.Spec.TargetRef.Kind, labelSelector, map[string]string{}, ports).Reconcile(cd); err != nil { + // create or update svc + if err := router.Reconcile(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } - // create or update virtual service + // create or update mesh routes if err := meshRouter.Reconcile(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } - // check for deployment spec or configs changes + // check for changes shouldAdvance, err := c.shouldAdvance(cd, canaryController) if err != nil { c.recordEventWarningf(cd, "%v", err) @@ -200,10 +205,6 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh return } - defer func() { - c.recorder.SetDuration(cd, time.Since(begin)) - }() - // check canary deployment status var retriable = true if !skipLivenessChecks { @@ -306,6 +307,11 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh return } + // record analysis duration + defer func() { + c.recorder.SetDuration(cd, time.Since(begin)) + }() + // check if the canary success rate is above the threshold // skip check if no traffic is routed or mirrored to canary if canaryWeight == 0 && cd.Status.Iterations == 0 && @@ -321,7 +327,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh return } } else { - if ok := c.analyseCanary(cd); !ok { + if ok := c.runAnalysis(cd); !ok { if err := canaryController.SetStatusFailedChecks(cd, cd.Status.FailedChecks+1); err != nil { c.recordEventWarningf(cd, "%v", err) return @@ -345,247 +351,259 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh // strategy: A/B testing if len(cd.Spec.CanaryAnalysis.Match) > 0 && cd.Spec.CanaryAnalysis.Iterations > 0 { - // route traffic to canary and increment iterations - if cd.Spec.CanaryAnalysis.Iterations > cd.Status.Iterations { - if err := meshRouter.SetRoutes(cd, 0, 100, false); err != nil { - c.recordEventWarningf(cd, "%v", err) - return + c.runAB(cd, canaryController, meshRouter, provider) + return + } + + // strategy: Blue/Green + if cd.Spec.CanaryAnalysis.Iterations > 0 { + c.runBlueGreen(cd, canaryController, meshRouter, provider, mirrored) + return + } + + // strategy: Canary progressive traffic increase + if cd.Spec.CanaryAnalysis.StepWeight > 0 { + c.runCanary(cd, canaryController, meshRouter, provider, mirrored, canaryWeight, primaryWeight, maxWeight) + } + +} + +func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool, canaryWeight int, primaryWeight int, maxWeight int) { + primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name) + + // increase traffic weight + if canaryWeight < maxWeight { + // If in "mirror" mode, do one step of mirroring before shifting traffic to canary. + // When mirroring, all requests go to primary and canary, but only responses from + // primary go back to the user. + if canary.Spec.CanaryAnalysis.Mirror && canaryWeight == 0 { + if mirrored == false { + mirrored = true + primaryWeight = 100 + canaryWeight = 0 + } else { + mirrored = false + primaryWeight = 100 - canary.Spec.CanaryAnalysis.StepWeight + canaryWeight = canary.Spec.CanaryAnalysis.StepWeight } - c.recorder.SetWeight(cd, 0, 100) + c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)). + Infof("Running mirror step %d/%d/%t", primaryWeight, canaryWeight, mirrored) + } else { - if err := canaryController.SetStatusIterations(cd, cd.Status.Iterations+1); err != nil { - c.recordEventWarningf(cd, "%v", err) - return + primaryWeight -= canary.Spec.CanaryAnalysis.StepWeight + if primaryWeight < 0 { + primaryWeight = 0 + } + canaryWeight += canary.Spec.CanaryAnalysis.StepWeight + if canaryWeight > 100 { + canaryWeight = 100 } - c.recordEventInfof(cd, "Advance %s.%s canary iteration %v/%v", - cd.Name, cd.Namespace, cd.Status.Iterations+1, cd.Spec.CanaryAnalysis.Iterations) - return } - // check promotion gate - if promote := c.runConfirmPromotionHooks(cd); !promote { + if err := meshRouter.SetRoutes(canary, primaryWeight, canaryWeight, mirrored); err != nil { + c.recordEventWarningf(canary, "%v", err) return } - // promote canary - max iterations reached - if cd.Spec.CanaryAnalysis.Iterations == cd.Status.Iterations { - c.recordEventInfof(cd, "Copying %s.%s template spec to %s.%s", - cd.Spec.TargetRef.Name, cd.Namespace, primaryName, cd.Namespace) - if err := canaryController.Promote(cd); err != nil { - c.recordEventWarningf(cd, "%v", err) - return - } - - // update status phase - if err := canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhasePromoting); err != nil { - c.recordEventWarningf(cd, "%v", err) - return - } + if err := canaryController.SetStatusWeight(canary, canaryWeight); err != nil { + c.recordEventWarningf(canary, "%v", err) return } + c.recorder.SetWeight(canary, primaryWeight, canaryWeight) + c.recordEventInfof(canary, "Advance %s.%s canary weight %v", canary.Name, canary.Namespace, canaryWeight) return } - // strategy: Blue/Green - if cd.Spec.CanaryAnalysis.Iterations > 0 { - // increment iterations - if cd.Spec.CanaryAnalysis.Iterations > cd.Status.Iterations { - // If in "mirror" mode, mirror requests during the entire B/G canary test - if provider != "kubernetes" && - cd.Spec.CanaryAnalysis.Mirror == true && mirrored == false { - if err := meshRouter.SetRoutes(cd, 100, 0, true); err != nil { - c.recordEventWarningf(cd, "%v", err) - } - c.logger.With("canary", fmt.Sprintf("%s.%s", name, namespace)). - Infof("Start traffic mirroring") - } - if err := canaryController.SetStatusIterations(cd, cd.Status.Iterations+1); err != nil { - c.recordEventWarningf(cd, "%v", err) - return - } - c.recordEventInfof(cd, "Advance %s.%s canary iteration %v/%v", - cd.Name, cd.Namespace, cd.Status.Iterations+1, cd.Spec.CanaryAnalysis.Iterations) + // promote canary - max weight reached + if canaryWeight >= maxWeight { + // check promotion gate + if promote := c.runConfirmPromotionHooks(canary); !promote { return } - // check promotion gate - if promote := c.runConfirmPromotionHooks(cd); !promote { + // update primary spec + c.recordEventInfof(canary, "Copying %s.%s template spec to %s.%s", + canary.Spec.TargetRef.Name, canary.Namespace, primaryName, canary.Namespace) + if err := canaryController.Promote(canary); err != nil { + c.recordEventWarningf(canary, "%v", err) return } - // route all traffic to canary - max iterations reached - if cd.Spec.CanaryAnalysis.Iterations == cd.Status.Iterations { - if provider != "kubernetes" { - if cd.Spec.CanaryAnalysis.Mirror { - c.recordEventInfof(cd, "Stop traffic mirroring and route all traffic to canary") - } else { - c.recordEventInfof(cd, "Routing all traffic to canary") - } - if err := meshRouter.SetRoutes(cd, 0, 100, false); err != nil { - c.recordEventWarningf(cd, "%v", err) - return - } - c.recorder.SetWeight(cd, 0, 100) - } - - // increment iterations - if err := canaryController.SetStatusIterations(cd, cd.Status.Iterations+1); err != nil { - c.recordEventWarningf(cd, "%v", err) - return - } + // update status phase + if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhasePromoting); err != nil { + c.recordEventWarningf(canary, "%v", err) return } + } +} - // promote canary - max iterations reached - if cd.Spec.CanaryAnalysis.Iterations < cd.Status.Iterations { - c.recordEventInfof(cd, "Copying %s.%s template spec to %s.%s", - cd.Spec.TargetRef.Name, cd.Namespace, primaryName, cd.Namespace) - if err := canaryController.Promote(cd); err != nil { - c.recordEventWarningf(cd, "%v", err) - return - } +func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string) { + primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name) - // update status phase - if err := canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhasePromoting); err != nil { - c.recordEventWarningf(cd, "%v", err) - return - } + // route traffic to canary and increment iterations + if canary.Spec.CanaryAnalysis.Iterations > canary.Status.Iterations { + if err := meshRouter.SetRoutes(canary, 0, 100, false); err != nil { + c.recordEventWarningf(canary, "%v", err) return } + c.recorder.SetWeight(canary, 0, 100) + if err := canaryController.SetStatusIterations(canary, canary.Status.Iterations+1); err != nil { + c.recordEventWarningf(canary, "%v", err) + return + } + c.recordEventInfof(canary, "Advance %s.%s canary iteration %v/%v", + canary.Name, canary.Namespace, canary.Status.Iterations+1, canary.Spec.CanaryAnalysis.Iterations) return } - // strategy: Canary progressive traffic increase - if cd.Spec.CanaryAnalysis.StepWeight > 0 { - // increase traffic weight - if canaryWeight < maxWeight { - // If in "mirror" mode, do one step of mirroring before shifting traffic to canary. - // When mirroring, all requests go to primary and canary, but only responses from - // primary go back to the user. - if cd.Spec.CanaryAnalysis.Mirror && canaryWeight == 0 { - if mirrored == false { - mirrored = true - primaryWeight = 100 - canaryWeight = 0 - } else { - mirrored = false - primaryWeight = 100 - cd.Spec.CanaryAnalysis.StepWeight - canaryWeight = cd.Spec.CanaryAnalysis.StepWeight - } - c.logger.With("canary", fmt.Sprintf("%s.%s", name, namespace)). - Infof("Running mirror step %d/%d/%t", primaryWeight, canaryWeight, mirrored) - } else { + // check promotion gate + if promote := c.runConfirmPromotionHooks(canary); !promote { + return + } - primaryWeight -= cd.Spec.CanaryAnalysis.StepWeight - if primaryWeight < 0 { - primaryWeight = 0 - } - canaryWeight += cd.Spec.CanaryAnalysis.StepWeight - if canaryWeight > 100 { - canaryWeight = 100 - } - } + // promote canary - max iterations reached + if canary.Spec.CanaryAnalysis.Iterations == canary.Status.Iterations { + c.recordEventInfof(canary, "Copying %s.%s template spec to %s.%s", + canary.Spec.TargetRef.Name, canary.Namespace, primaryName, canary.Namespace) + if err := canaryController.Promote(canary); err != nil { + c.recordEventWarningf(canary, "%v", err) + return + } - if err := meshRouter.SetRoutes(cd, primaryWeight, canaryWeight, mirrored); err != nil { - c.recordEventWarningf(cd, "%v", err) - return - } + // update status phase + if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhasePromoting); err != nil { + c.recordEventWarningf(canary, "%v", err) + return + } + } +} - if err := canaryController.SetStatusWeight(cd, canaryWeight); err != nil { - c.recordEventWarningf(cd, "%v", err) - return - } +func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool) { + primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name) - c.recorder.SetWeight(cd, primaryWeight, canaryWeight) - c.recordEventInfof(cd, "Advance %s.%s canary weight %v", cd.Name, cd.Namespace, canaryWeight) + // increment iterations + if canary.Spec.CanaryAnalysis.Iterations > canary.Status.Iterations { + // If in "mirror" mode, mirror requests during the entire B/G canary test + if provider != "kubernetes" && + canary.Spec.CanaryAnalysis.Mirror == true && mirrored == false { + if err := meshRouter.SetRoutes(canary, 100, 0, true); err != nil { + c.recordEventWarningf(canary, "%v", err) + } + c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)). + Infof("Start traffic mirroring") + } + if err := canaryController.SetStatusIterations(canary, canary.Status.Iterations+1); err != nil { + c.recordEventWarningf(canary, "%v", err) return } + c.recordEventInfof(canary, "Advance %s.%s canary iteration %v/%v", + canary.Name, canary.Namespace, canary.Status.Iterations+1, canary.Spec.CanaryAnalysis.Iterations) + return + } - // promote canary - max weight reached - if canaryWeight >= maxWeight { - // check promotion gate - if promote := c.runConfirmPromotionHooks(cd); !promote { - return - } + // check promotion gate + if promote := c.runConfirmPromotionHooks(canary); !promote { + return + } - // update primary spec - c.recordEventInfof(cd, "Copying %s.%s template spec to %s.%s", - cd.Spec.TargetRef.Name, cd.Namespace, primaryName, cd.Namespace) - if err := canaryController.Promote(cd); err != nil { - c.recordEventWarningf(cd, "%v", err) - return + // route all traffic to canary - max iterations reached + if canary.Spec.CanaryAnalysis.Iterations == canary.Status.Iterations { + if provider != "kubernetes" { + if canary.Spec.CanaryAnalysis.Mirror { + c.recordEventInfof(canary, "Stop traffic mirroring and route all traffic to canary") + } else { + c.recordEventInfof(canary, "Routing all traffic to canary") } - - // update status phase - if err := canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhasePromoting); err != nil { - c.recordEventWarningf(cd, "%v", err) + if err := meshRouter.SetRoutes(canary, 0, 100, false); err != nil { + c.recordEventWarningf(canary, "%v", err) return } + c.recorder.SetWeight(canary, 0, 100) + } + // increment iterations + if err := canaryController.SetStatusIterations(canary, canary.Status.Iterations+1); err != nil { + c.recordEventWarningf(canary, "%v", err) + return + } + return + } + + // promote canary - max iterations reached + if canary.Spec.CanaryAnalysis.Iterations < canary.Status.Iterations { + c.recordEventInfof(canary, "Copying %s.%s template spec to %s.%s", + canary.Spec.TargetRef.Name, canary.Namespace, primaryName, canary.Namespace) + if err := canaryController.Promote(canary); err != nil { + c.recordEventWarningf(canary, "%v", err) return } + // update status phase + if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhasePromoting); err != nil { + c.recordEventWarningf(canary, "%v", err) + return + } } } -func (c *Controller) shouldSkipAnalysis(cd *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, primaryWeight int, canaryWeight int) bool { - if !cd.Spec.SkipAnalysis { +func (c *Controller) shouldSkipAnalysis(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, primaryWeight int, canaryWeight int) bool { + if !canary.Spec.SkipAnalysis { return false } // route all traffic to primary primaryWeight = 100 canaryWeight = 0 - if err := meshRouter.SetRoutes(cd, primaryWeight, canaryWeight, false); err != nil { - c.recordEventWarningf(cd, "%v", err) + if err := meshRouter.SetRoutes(canary, primaryWeight, canaryWeight, false); err != nil { + c.recordEventWarningf(canary, "%v", err) return false } - c.recorder.SetWeight(cd, primaryWeight, canaryWeight) + c.recorder.SetWeight(canary, primaryWeight, canaryWeight) // copy spec and configs from canary to primary - c.recordEventInfof(cd, "Copying %s.%s template spec to %s-primary.%s", - cd.Spec.TargetRef.Name, cd.Namespace, cd.Spec.TargetRef.Name, cd.Namespace) - if err := canaryController.Promote(cd); err != nil { - c.recordEventWarningf(cd, "%v", err) + c.recordEventInfof(canary, "Copying %s.%s template spec to %s-primary.%s", + canary.Spec.TargetRef.Name, canary.Namespace, canary.Spec.TargetRef.Name, canary.Namespace) + if err := canaryController.Promote(canary); err != nil { + c.recordEventWarningf(canary, "%v", err) return false } // shutdown canary - if err := canaryController.Scale(cd, 0); err != nil { - c.recordEventWarningf(cd, "%v", err) + if err := canaryController.Scale(canary, 0); err != nil { + c.recordEventWarningf(canary, "%v", err) return false } // update status phase - if err := canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseSucceeded); err != nil { - c.recordEventWarningf(cd, "%v", err) + if err := canaryController.SetStatusPhase(canary, flaggerv1.CanaryPhaseSucceeded); err != nil { + c.recordEventWarningf(canary, "%v", err) return false } // notify - c.recorder.SetStatus(cd, flaggerv1.CanaryPhaseSucceeded) - c.recordEventInfof(cd, "Promotion completed! Canary analysis was skipped for %s.%s", - cd.Spec.TargetRef.Name, cd.Namespace) - c.sendNotification(cd, "Canary analysis was skipped, promotion finished.", + c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseSucceeded) + c.recordEventInfof(canary, "Promotion completed! Canary analysis was skipped for %s.%s", + canary.Spec.TargetRef.Name, canary.Namespace) + c.sendNotification(canary, "Canary analysis was skipped, promotion finished.", false, false) return true } -func (c *Controller) shouldAdvance(cd *flaggerv1.Canary, canaryController canary.Controller) (bool, error) { - if cd.Status.LastAppliedSpec == "" || - cd.Status.Phase == flaggerv1.CanaryPhaseInitializing || - cd.Status.Phase == flaggerv1.CanaryPhaseProgressing || - cd.Status.Phase == flaggerv1.CanaryPhaseWaiting || - cd.Status.Phase == flaggerv1.CanaryPhasePromoting || - cd.Status.Phase == flaggerv1.CanaryPhaseFinalising { +func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController canary.Controller) (bool, error) { + if canary.Status.LastAppliedSpec == "" || + canary.Status.Phase == flaggerv1.CanaryPhaseInitializing || + canary.Status.Phase == flaggerv1.CanaryPhaseProgressing || + canary.Status.Phase == flaggerv1.CanaryPhaseWaiting || + canary.Status.Phase == flaggerv1.CanaryPhasePromoting || + canary.Status.Phase == flaggerv1.CanaryPhaseFinalising { return true, nil } - newTarget, err := canaryController.HasTargetChanged(cd) + newTarget, err := canaryController.HasTargetChanged(canary) if err != nil { return false, err } @@ -593,7 +611,7 @@ func (c *Controller) shouldAdvance(cd *flaggerv1.Canary, canaryController canary return newTarget, nil } - newCfg, err := canaryController.HaveDependenciesChanged(cd) + newCfg, err := canaryController.HaveDependenciesChanged(canary) if err != nil { return false, err } @@ -602,50 +620,50 @@ func (c *Controller) shouldAdvance(cd *flaggerv1.Canary, canaryController canary } -func (c *Controller) checkCanaryStatus(cd *flaggerv1.Canary, canaryController canary.Controller, shouldAdvance bool) bool { - c.recorder.SetStatus(cd, cd.Status.Phase) - if cd.Status.Phase == flaggerv1.CanaryPhaseProgressing || - cd.Status.Phase == flaggerv1.CanaryPhasePromoting || - cd.Status.Phase == flaggerv1.CanaryPhaseFinalising { +func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, shouldAdvance bool) bool { + c.recorder.SetStatus(canary, canary.Status.Phase) + if canary.Status.Phase == flaggerv1.CanaryPhaseProgressing || + canary.Status.Phase == flaggerv1.CanaryPhasePromoting || + canary.Status.Phase == flaggerv1.CanaryPhaseFinalising { return true } - if cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing { - if err := canaryController.SyncStatus(cd, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseInitialized}); err != nil { - c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Errorf("%v", err) + if canary.Status.Phase == "" || canary.Status.Phase == flaggerv1.CanaryPhaseInitializing { + if err := canaryController.SyncStatus(canary, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseInitialized}); err != nil { + c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return false } - c.recorder.SetStatus(cd, flaggerv1.CanaryPhaseInitialized) - c.recordEventInfof(cd, "Initialization done! %s.%s", cd.Name, cd.Namespace) - c.sendNotification(cd, "New deployment detected, initialization completed.", + c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseInitialized) + c.recordEventInfof(canary, "Initialization done! %s.%s", canary.Name, canary.Namespace) + c.sendNotification(canary, "New deployment detected, initialization completed.", true, false) return false } if shouldAdvance { - c.recordEventInfof(cd, "New revision detected! Scaling up %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) - c.sendNotification(cd, "New revision detected, starting canary analysis.", + c.recordEventInfof(canary, "New revision detected! Scaling up %s.%s", canary.Spec.TargetRef.Name, canary.Namespace) + c.sendNotification(canary, "New revision detected, starting canary analysis.", true, false) - if err := canaryController.ScaleFromZero(cd); err != nil { - c.recordEventErrorf(cd, "%v", err) + if err := canaryController.ScaleFromZero(canary); err != nil { + c.recordEventErrorf(canary, "%v", err) return false } - if err := canaryController.SyncStatus(cd, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseProgressing}); err != nil { - c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Errorf("%v", err) + if err := canaryController.SyncStatus(canary, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseProgressing}); err != nil { + c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)).Errorf("%v", err) return false } - c.recorder.SetStatus(cd, flaggerv1.CanaryPhaseProgressing) + c.recorder.SetStatus(canary, flaggerv1.CanaryPhaseProgressing) return false } return false } -func (c *Controller) hasCanaryRevisionChanged(cd *flaggerv1.Canary, canaryController canary.Controller) bool { - if cd.Status.Phase == flaggerv1.CanaryPhaseProgressing { - if diff, _ := canaryController.HasTargetChanged(cd); diff { +func (c *Controller) hasCanaryRevisionChanged(canary *flaggerv1.Canary, canaryController canary.Controller) bool { + if canary.Status.Phase == flaggerv1.CanaryPhaseProgressing { + if diff, _ := canaryController.HasTargetChanged(canary); diff { return true } - if diff, _ := canaryController.HaveDependenciesChanged(cd); diff { + if diff, _ := canaryController.HaveDependenciesChanged(canary); diff { return true } } @@ -729,7 +747,7 @@ func (c *Controller) runPostRolloutHooks(canary *flaggerv1.Canary, phase flagger return true } -func (c *Controller) analyseCanary(r *flaggerv1.Canary) bool { +func (c *Controller) runAnalysis(r *flaggerv1.Canary) bool { // run external checks for _, webhook := range r.Spec.CanaryAnalysis.Webhooks { if webhook.Type == "" || webhook.Type == flaggerv1.RolloutHook { diff --git a/pkg/router/kubernetes.go b/pkg/router/kubernetes.go index b4b7a6b6d..a293ed3dc 100644 --- a/pkg/router/kubernetes.go +++ b/pkg/router/kubernetes.go @@ -2,34 +2,12 @@ package router import ( flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" ) -// KubernetesDeploymentRouter is managing ClusterIP services +// KubernetesRouter manages Kubernetes services type KubernetesRouter interface { - // Reconcile creates or updates K8s services to prepare for the canary release + // Initialize creates or updates the primary and canary services + Initialize(canary *flaggerv1.Canary) error + // Reconcile creates or updates the main service Reconcile(canary *flaggerv1.Canary) error } - -func buildService(canary *flaggerv1.Canary, name string, src *corev1.Service) *corev1.Service { - svc := src.DeepCopy() - svc.ObjectMeta.Name = name - svc.ObjectMeta.Namespace = canary.Namespace - svc.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ - *metav1.NewControllerRef(canary, schema.GroupVersionKind{ - Group: flaggerv1.SchemeGroupVersion.Group, - Version: flaggerv1.SchemeGroupVersion.Version, - Kind: flaggerv1.CanaryKind, - }), - } - _, exists := svc.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] - if exists { - // Leaving this results in updates from flagger to this svc never succeed due to resourceVersion mismatch: - // Operation cannot be fulfilled on services "mysvc-canary": the object has been modified; please apply your changes to the latest version and try again - delete(svc.ObjectMeta.Annotations, "kubectl.kubernetes.io/last-applied-configuration") - } - - return svc -} diff --git a/pkg/router/kubernetes_deployment.go b/pkg/router/kubernetes_deployment.go index ec50b1ebb..e028f4d61 100644 --- a/pkg/router/kubernetes_deployment.go +++ b/pkg/router/kubernetes_deployment.go @@ -27,26 +27,34 @@ type KubernetesDeploymentRouter struct { ports map[string]int32 } -// Reconcile creates or updates the primary and canary services -func (c *KubernetesDeploymentRouter) Reconcile(canary *flaggerv1.Canary) error { +// Initialize creates the primary and canary services +func (c *KubernetesDeploymentRouter) Initialize(canary *flaggerv1.Canary) error { targetName := canary.Spec.TargetRef.Name primaryName := fmt.Sprintf("%s-primary", targetName) canaryName := fmt.Sprintf("%s-canary", targetName) - // main svc - err := c.reconcileService(canary, targetName, primaryName) + // canary svc + err := c.reconcileService(canary, canaryName, targetName) if err != nil { return err } - // canary svc - err = c.reconcileService(canary, canaryName, targetName) + // primary svc + err = c.reconcileService(canary, primaryName, primaryName) if err != nil { return err } - // primary svc - err = c.reconcileService(canary, primaryName, primaryName) + return nil +} + +// Reconcile creates or updates the main service +func (c *KubernetesDeploymentRouter) Reconcile(canary *flaggerv1.Canary) error { + targetName := canary.Spec.TargetRef.Name + primaryName := fmt.Sprintf("%s-primary", targetName) + + // main svc + err := c.reconcileService(canary, targetName, primaryName) if err != nil { return err } @@ -159,24 +167,3 @@ func (c *KubernetesDeploymentRouter) reconcileService(canary *flaggerv1.Canary, return nil } - -func (c *KubernetesDeploymentRouter) createService(canary *flaggerv1.Canary, name string, src *corev1.Service) error { - svc := buildService(canary, name, src) - - if svc.Spec.Type == "ClusterIP" { - // Reset and let K8s assign the IP. Otherwise we get an error due to the IP is already assigned - svc.Spec.ClusterIP = "" - } - - // Let K8s set this. Otherwise K8s API complains with "resourceVersion should not be set on objects to be created" - svc.ObjectMeta.ResourceVersion = "" - - _, err := c.kubeClient.CoreV1().Services(canary.Namespace).Create(svc) - if err != nil { - return err - } - - c.logger.With("canary", fmt.Sprintf("%s.%s", canary.Name, canary.Namespace)). - Infof("Service %s.%s created", svc.GetName(), canary.Namespace) - return nil -} diff --git a/pkg/router/kubernetes_deployment_test.go b/pkg/router/kubernetes_deployment_test.go index a27fe9184..d02410d51 100644 --- a/pkg/router/kubernetes_deployment_test.go +++ b/pkg/router/kubernetes_deployment_test.go @@ -14,7 +14,12 @@ func TestServiceRouter_Create(t *testing.T) { logger: mocks.logger, } - err := router.Reconcile(mocks.canary) + err := router.Initialize(mocks.canary) + if err != nil { + t.Fatal(err.Error()) + } + + err = router.Reconcile(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -54,7 +59,12 @@ func TestServiceRouter_Update(t *testing.T) { logger: mocks.logger, } - err := router.Reconcile(mocks.canary) + err := router.Initialize(mocks.canary) + if err != nil { + t.Fatal(err.Error()) + } + + err = router.Reconcile(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -73,6 +83,10 @@ func TestServiceRouter_Update(t *testing.T) { } // apply changes + err = router.Initialize(c) + if err != nil { + t.Fatal(err.Error()) + } err = router.Reconcile(c) if err != nil { t.Fatal(err.Error()) @@ -96,7 +110,12 @@ func TestServiceRouter_Undo(t *testing.T) { logger: mocks.logger, } - err := router.Reconcile(mocks.canary) + err := router.Initialize(mocks.canary) + if err != nil { + t.Fatal(err.Error()) + } + + err = router.Reconcile(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -116,6 +135,10 @@ func TestServiceRouter_Undo(t *testing.T) { } // undo changes + err = router.Initialize(mocks.canary) + if err != nil { + t.Fatal(err.Error()) + } err = router.Reconcile(mocks.canary) if err != nil { t.Fatal(err.Error()) diff --git a/pkg/router/kubernetes_noop.go b/pkg/router/kubernetes_noop.go index fa97362a4..4da4a35bc 100644 --- a/pkg/router/kubernetes_noop.go +++ b/pkg/router/kubernetes_noop.go @@ -9,6 +9,10 @@ import ( type KubernetesNoopRouter struct { } +func (c *KubernetesNoopRouter) Initialize(canary *flaggerv1.Canary) error { + return nil +} + func (c *KubernetesNoopRouter) Reconcile(canary *flaggerv1.Canary) error { return nil }