Skip to content

Commit

Permalink
[issue-562] Serverless workflow pod gets restarted repeatedly after K…
Browse files Browse the repository at this point in the history
…native K_SINK injection
  • Loading branch information
jianrongzhang89 committed Nov 7, 2024
1 parent ae7dde3 commit 83c55ab
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 3 deletions.
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ rules:
- list
- update
- watch
- apiGroups:
- serving.knative.dev
resources:
- revisions
verbs:
- delete
- list
- watch
- apiGroups:
- sonataflow.org
resources:
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/profiles/common/knative_eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package common
import (
"context"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"k8s.io/klog/v2"

operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down
86 changes: 84 additions & 2 deletions internal/controller/profiles/preview/states_preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,31 @@ package preview
import (
"context"
"fmt"
"sort"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/apache/incubator-kie-kogito-serverless-operator/api"
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/builder"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/knative"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/platform"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
kubeutil "github.com/apache/incubator-kie-kogito-serverless-operator/utils/kubernetes"
"github.com/apache/incubator-kie-kogito-serverless-operator/workflowproj"
)

const (
kSink = "K_SINK"
workflowContainer = "workflow"
)

type newBuilderState struct {
Expand Down Expand Up @@ -221,8 +231,8 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato
}

func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, workflow *operatorapi.SonataFlow) error {
//By default, we don't want to perform anything after the reconciliation, and so we will simply return no error
return nil
// Clean up the outdated Knative revisions, if any
return h.cleanupOutdatedRevisions(ctx, workflow)
}

// isWorkflowChanged marks the workflow status as unknown to require a new build reconciliation
Expand All @@ -233,3 +243,75 @@ func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.S
}
return false
}

func (h *deployWithBuildWorkflowState) cleanupOutdatedRevisions(ctx context.Context, workflow *operatorapi.SonataFlow) error {
if !workflow.IsKnativeDeployment() {
return nil
}
avail, err := knative.GetKnativeAvailability(h.Cfg)
if err != nil {
return err
}
if !avail.Serving || !avail.Eventing {
return nil
}
injected, err := knative.CheckKSinkInjected(workflow.Name, workflow.Namespace)
if err != nil {
return err
}
if !injected {
return fmt.Errorf("waiting for Sinkbinding K_SINK injection to complete")
}
opts := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(
map[string]string{
workflowproj.LabelWorkflow: workflow.Name,
workflowproj.LabelWorkflowNamespace: workflow.Namespace,
},
),
Namespace: workflow.Namespace,
}
revisionList := &servingv1.RevisionList{}
if err := h.C.List(ctx, revisionList, opts); err != nil {
return err
}
// Sort the revisions based on creation timestamp
sortRevisions(revisionList.Items)
// Clean up previous revisions that do not have K_SINK injected
for i := 0; i < len(revisionList.Items)-1; i++ {
revision := &revisionList.Items[i]
if !containsKSink(revision) {
klog.V(log.I).InfoS("Revision %s does not have K_SINK injected and can be cleaned up.", revision.Name)
if err := h.C.Delete(ctx, revision, &client.DeleteOptions{}); err != nil {
return err
}
}
}
return nil
}

func containsKSink(revision *servingv1.Revision) bool {
for _, container := range revision.Spec.PodSpec.Containers {
if container.Name == workflowContainer {
for _, env := range container.Env {
if env.Name == kSink {
return true
}
}
break
}
}
return false
}

type CreationTimestamp []servingv1.Revision

func (a CreationTimestamp) Len() int { return len(a) }
func (a CreationTimestamp) Less(i, j int) bool {
return a[i].CreationTimestamp.Before(&a[j].CreationTimestamp)
}
func (a CreationTimestamp) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

func sortRevisions(revisions []servingv1.Revision) {
sort.Sort(CreationTimestamp(revisions))
}
1 change: 1 addition & 0 deletions internal/controller/sonataflow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type SonataFlowReconciler struct {
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=sonataflow.org,resources=sonataflows/finalizers,verbs=update
//+kubebuilder:rbac:groups="monitoring.coreos.com",resources=servicemonitors,verbs=get;list;watch;create;update;delete
//+kubebuilder:rbac:groups="serving.knative.dev",resources=revisions,verbs=list;watch;delete

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand Down
8 changes: 8 additions & 0 deletions operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27746,6 +27746,14 @@ rules:
- list
- update
- watch
- apiGroups:
- serving.knative.dev
resources:
- revisions
verbs:
- delete
- list
- watch
- apiGroups:
- sonataflow.org
resources:
Expand Down

0 comments on commit 83c55ab

Please sign in to comment.