Skip to content

Commit

Permalink
Merge branch 'main' into rynowak/rad-install-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rynowak authored Jun 16, 2023
2 parents 8c6176c + bf8dc0b commit 9f82cd7
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 99 deletions.
26 changes: 0 additions & 26 deletions pkg/corerp/config/timeout.go

This file was deleted.

4 changes: 0 additions & 4 deletions pkg/corerp/frontend/handler/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (
"github.com/project-radius/radius/pkg/corerp/datamodel"
converter "github.com/project-radius/radius/pkg/corerp/datamodel/converter"

corerp_config "github.com/project-radius/radius/pkg/corerp/config"

app_ctrl "github.com/project-radius/radius/pkg/corerp/frontend/controller/applications"
ctr_ctrl "github.com/project-radius/radius/pkg/corerp/frontend/controller/containers"
env_ctrl "github.com/project-radius/radius/pkg/corerp/frontend/controller/environments"
Expand Down Expand Up @@ -312,7 +310,6 @@ func AddRoutes(ctx context.Context, router *mux.Router, pathBase string, isARM b
rp_frontend.PrepareRadiusResource[*datamodel.ContainerResource],
ctr_ctrl.ValidateAndMutateRequest,
},
AsyncOperationTimeout: corerp_config.AsyncCreateOrUpdateContainerTimeout,
},
)
},
Expand All @@ -330,7 +327,6 @@ func AddRoutes(ctx context.Context, router *mux.Router, pathBase string, isARM b
rp_frontend.PrepareRadiusResource[*datamodel.ContainerResource],
ctr_ctrl.ValidateAndMutateRequest,
},
AsyncOperationTimeout: corerp_config.AsyncCreateOrUpdateContainerTimeout,
},
)
},
Expand Down
71 changes: 38 additions & 33 deletions pkg/corerp/handlers/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"strings"
"time"

corerp_config "github.com/project-radius/radius/pkg/corerp/config"
"github.com/project-radius/radius/pkg/kubernetes"
"github.com/project-radius/radius/pkg/kubeutil"
"github.com/project-radius/radius/pkg/resourcemodel"
Expand All @@ -33,6 +32,7 @@ import (

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
Expand All @@ -42,17 +42,19 @@ import (
)

const (
// MaxDeploymentTimeout is the max timeout for waiting for a deployment to be ready.
// Deployment duration should not reach to this timeout since async operation worker will time out context before MaxDeploymentTimeout.
MaxDeploymentTimeout = time.Minute * time.Duration(10)
// DefaultCacheResyncInterval is the interval for resyncing informer.
DefaultCacheResyncInterval = time.Second * time.Duration(10)
DefaultCacheResyncInterval = time.Second * time.Duration(30)
)

// NewKubernetesHandler creates Kubernetes Resource handler instance.
func NewKubernetesHandler(client client.Client, clientSet k8s.Interface) ResourceHandler {
return &kubernetesHandler{
client: client,
clientSet: clientSet,

deploymentTimeOut: corerp_config.AsyncCreateOrUpdateContainerTimeout,
client: client,
clientSet: clientSet,
deploymentTimeOut: MaxDeploymentTimeout,
cacheResyncInterval: DefaultCacheResyncInterval,
}
}
Expand Down Expand Up @@ -126,40 +128,44 @@ func (handler *kubernetesHandler) waitUntilDeploymentIsReady(ctx context.Context
logger := ucplog.FromContextOrDiscard(ctx)

doneCh := make(chan bool, 1)
statusCh := make(chan string, 1)
errCh := make(chan error, 1)

ctx, cancel := context.WithTimeout(ctx, handler.deploymentTimeOut)
// This ensures that the informer is stopped when this function is returned.
defer cancel()

err := handler.startDeploymentInformer(ctx, item, doneCh, statusCh)
err := handler.startDeploymentInformer(ctx, item, doneCh, errCh)
if err != nil {
logger.Error(err, "failed to start deployment informer")
return err
}

// Get the latest status for the deployment.
lastStatus := fmt.Sprintf("unknown status, name: %s, namespace %s", item.GetName(), item.GetNamespace())
select {
case <-ctx.Done():
// Get the final deployment status
dep, err := handler.clientSet.AppsV1().Deployments(item.GetNamespace()).Get(ctx, item.GetName(), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("deployment timed out, name: %s, namespace %s, error occured while fetching latest status: %w", item.GetName(), item.GetNamespace(), err)
}

for {
select {
case <-ctx.Done():
// TODO: Deployment doesn't describe the detail of POD failures, so we should get the errors from POD - https://github.com/project-radius/radius/issues/5686
err := fmt.Errorf("deployment has timed out with the status: %s", lastStatus)
logger.Error(err, "Kubernetes handler failed")
return err
// Now get the latest available observation of deployment current state
// note that there can be a race condition here, by the time it fetches the latest status, deployment might be succeeded
status := v1.DeploymentCondition{}
if len(dep.Status.Conditions) > 0 {
status = dep.Status.Conditions[len(dep.Status.Conditions)-1]
}
return fmt.Errorf("deployment timed out, name: %s, namespace %s, status: %s, reason: %s", item.GetName(), item.GetNamespace(), status.Message, status.Reason)

case <-doneCh:
logger.Info(fmt.Sprintf("Marking deployment %s in namespace %s as complete", item.GetName(), item.GetNamespace()))
return nil
case <-doneCh:
logger.Info(fmt.Sprintf("Marking deployment %s in namespace %s as complete", item.GetName(), item.GetNamespace()))
return nil

case status := <-statusCh:
lastStatus = status
}
case err := <-errCh:
return err
}
}

func (handler *kubernetesHandler) startDeploymentInformer(ctx context.Context, item client.Object, doneCh chan<- bool, statusCh chan<- string) error {
func (handler *kubernetesHandler) startDeploymentInformer(ctx context.Context, item client.Object, doneCh chan<- bool, errCh chan<- error) error {
informers := informers.NewSharedInformerFactoryWithOptions(handler.clientSet, handler.cacheResyncInterval, informers.WithNamespace(item.GetNamespace()))
deploymentInformer := informers.Apps().V1().Deployments().Informer()
handlers := cache.ResourceEventHandlerFuncs{
Expand All @@ -169,7 +175,7 @@ func (handler *kubernetesHandler) startDeploymentInformer(ctx context.Context, i
if obj.Name != item.GetName() {
return
}
handler.checkDeploymentStatus(ctx, obj, doneCh, statusCh)
handler.checkDeploymentStatus(ctx, obj, doneCh)
},
UpdateFunc: func(old_obj, new_obj any) {
old := old_obj.(*v1.Deployment)
Expand All @@ -181,7 +187,7 @@ func (handler *kubernetesHandler) startDeploymentInformer(ctx context.Context, i
}

if old.ResourceVersion != new.ResourceVersion {
handler.checkDeploymentStatus(ctx, new, doneCh, statusCh)
handler.checkDeploymentStatus(ctx, new, doneCh)
}
},
}
Expand All @@ -198,21 +204,20 @@ func (handler *kubernetesHandler) startDeploymentInformer(ctx context.Context, i
return nil
}

func (handler *kubernetesHandler) checkDeploymentStatus(ctx context.Context, obj *v1.Deployment, doneCh chan<- bool, statusCh chan<- string) {
func (handler *kubernetesHandler) checkDeploymentStatus(ctx context.Context, obj *v1.Deployment, doneCh chan<- bool) {
logger := ucplog.FromContextOrDiscard(ctx)
cond := obj.Status.Conditions
for _, c := range cond {
for _, c := range obj.Status.Conditions {
// check for complete deployment condition
// Reference https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#complete-deployment
if c.Type == v1.DeploymentProgressing {
if c.Type == v1.DeploymentProgressing && c.Status == corev1.ConditionTrue && strings.EqualFold(c.Reason, "NewReplicaSetAvailable") {
logger.Info(fmt.Sprintf("Deployment status for deployment: %s in namespace: %s is: %s - %s, Reason: %s", obj.Name, obj.Namespace, c.Type, c.Status, c.Reason))

// ObservedGeneration should be updated to latest generation to avoid stale replicas
if c.Status == corev1.ConditionTrue && strings.EqualFold(c.Reason, "NewReplicaSetAvailable") && obj.Status.ObservedGeneration >= obj.Generation {
if obj.Status.ObservedGeneration >= obj.Generation {
logger.Info(fmt.Sprintf("Deployment %s in namespace %s is ready. Observed generation: %d, Generation: %d", obj.Name, obj.Namespace, obj.Status.ObservedGeneration, obj.Generation))
doneCh <- true
return
}
// Update the current status of deployment when deployment is progressing. If deployment is failed, the last status will be returned to the caller.
statusCh <- fmt.Sprintf("%s (%s), name: %s, namespace: %s", c.Message, c.Reason, obj.GetName(), obj.GetNamespace())
}
}
}
Expand Down
50 changes: 14 additions & 36 deletions pkg/corerp/handlers/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,26 +306,7 @@ func TestWaitUntilDeploymentIsReady_NewResource(t *testing.T) {
}

func TestWaitUntilDeploymentIsReady_Timeout(t *testing.T) {
tests := []struct {
name string
contextTimeout time.Duration
deploymentTimeout time.Duration
expectedError string
}{
{
name: "context timeout",
contextTimeout: time.Duration(1) * time.Second,
deploymentTimeout: time.Duration(5) * time.Minute,
expectedError: "deployment has timed out with the status: Deadline is exceeded (ProgressDeadlineExceeded), name: test-deployment, namespace: test-namespace",
},
{
name: "deployment timeout",
contextTimeout: time.Duration(5) * time.Minute,
deploymentTimeout: time.Duration(1) * time.Second,
expectedError: "deployment has timed out with the status: Deadline is exceeded (ProgressDeadlineExceeded), name: test-deployment, namespace: test-namespace",
},
}

ctx := context.TODO()
// Create first deployment that will be watched
deployment := &v1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -337,27 +318,24 @@ func TestWaitUntilDeploymentIsReady_Timeout(t *testing.T) {
{
Type: v1.DeploymentProgressing,
Status: corev1.ConditionFalse,
Reason: "ProgressDeadlineExceeded",
Message: "Deadline is exceeded",
Reason: "NewReplicaSetAvailable",
Message: "Deployment has minimum availability",
},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), tc.contextTimeout)
handler := kubernetesHandler{
clientSet: fake.NewSimpleClientset(deployment),
deploymentTimeOut: tc.deploymentTimeout,
cacheResyncInterval: time.Duration(10) * time.Second,
}
err := handler.waitUntilDeploymentIsReady(ctx, deployment)
require.Error(t, err)
require.Equal(t, tc.expectedError, err.Error())
cancel()
})
deploymentClient := fake.NewSimpleClientset(deployment)

handler := kubernetesHandler{
clientSet: deploymentClient,
deploymentTimeOut: time.Duration(1) * time.Second,
cacheResyncInterval: time.Duration(10) * time.Second,
}

err := handler.waitUntilDeploymentIsReady(ctx, deployment)
require.Error(t, err)
require.Equal(t, "deployment timed out, name: test-deployment, namespace test-namespace, status: Deployment has minimum availability, reason: NewReplicaSetAvailable", err.Error())
}

func TestWaitUntilDeploymentIsReady_DifferentResourceName(t *testing.T) {
Expand Down Expand Up @@ -397,5 +375,5 @@ func TestWaitUntilDeploymentIsReady_DifferentResourceName(t *testing.T) {

// It must be timed out because the name of the deployment does not match.
require.Error(t, err)
require.Equal(t, "deployment has timed out with the status: unknown status, name: not-matched-deployment, namespace test-namespace", err.Error())
require.Equal(t, "deployment timed out, name: not-matched-deployment, namespace test-namespace, error occured while fetching latest status: deployments.apps \"not-matched-deployment\" not found", err.Error())
}

0 comments on commit 9f82cd7

Please sign in to comment.