Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide CloudEvents around the management of ScaledObjects resources #5953

Merged
merged 4 commits into from
Jul 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Add --ca-dir flag to KEDA operator to specify directories with CA certificates for scalers to authenticate TLS connections (defaults to /custom/ca) ([#5860](https://github.com/kedacore/keda/issues/5860))
- **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037)|[#5797](https://github.com/kedacore/keda/issues/5797))
- **General**: Introduce new Splunk Scaler ([#5904](https://github.com/kedacore/keda/issues/5904))
- **General**: Provide CloudEvents around the management of ScaledObjects resources ([#3522](https://github.com/kedacore/keda/issues/3522))
- **General**: Remove deprecated Kustomize commonLabels ([#5888](https://github.com/kedacore/keda/pull/5888))
- **General**: Support for Kubernetes v1.30 ([#5828](https://github.com/kedacore/keda/issues/5828))

3 changes: 3 additions & 0 deletions apis/eventing/v1alpha1/cloudevent_types.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,9 @@ const (

// ScaledObjectFailedType is for event when creating ScaledObject failed
ScaledObjectFailedType CloudEventType = "keda.scaledobject.failed.v1"

// ScaledObjectFailedType is for event when removed ScaledObject
ScaledObjectRemovedType CloudEventType = "keda.scaledobject.removed.v1"
)

var AllEventTypes = []CloudEventType{ScaledObjectFailedType, ScaledObjectReadyType}
1 change: 0 additions & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
@@ -223,7 +223,6 @@ func main() {
if err = (&kedacontrollers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: eventRecorder,
ScaleClient: scaleClient,
ScaleHandler: scaledHandler,
EventEmitter: eventEmitter,
20 changes: 9 additions & 11 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
@@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -70,7 +69,6 @@ import (
type ScaledObjectReconciler struct {
Client client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
ScaleClient scale.ScalesGetter
ScaleHandler scaling.ScaleHandler
EventEmitter eventemitter.EventHandler
@@ -119,8 +117,8 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
if r.Scheme == nil {
return fmt.Errorf("ScaledObjectReconciler.Scheme is not initialized")
}
if r.Recorder == nil {
return fmt.Errorf("ScaledObjectReconciler.Recorder is not initialized")
if r.EventEmitter == nil {
return fmt.Errorf("ScaledObjectReconciler.EventEmitter is not initialized")
}
// Start controller
return ctrl.NewControllerManagedBy(mgr).
@@ -184,7 +182,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
if !scaledObject.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
}
}
@@ -196,18 +194,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledObject check failed")
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg)
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg)
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg)
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg)
}

if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
}

@@ -359,7 +357,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
if err != nil {
msg := "Failed to parse Group, Version, Kind, Resource"
logger.Error(err, msg, "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectUpdateFailed, msg)
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return gvkr, err
}
gvkString := gvkr.GVKString()
@@ -396,12 +394,12 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, message.ScaleTargetNotFoundMsg, "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNotFoundMsg)
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNotFoundMsg)
return gvkr, err
}
// resource exist but doesn't expose /scale subresource
logger.Error(errScale, message.ScaleTargetNoSubresourceMsg, "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg)
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg)
return gvkr, errScale
}
isScalableCache.Store(gr.String(), true)
4 changes: 3 additions & 1 deletion controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
@@ -24,8 +24,10 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventreason"
)

@@ -86,7 +88,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge
}

logger.Info("Successfully finalized ScaledObject")
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectDeleted, "ScaledObject was deleted")
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectRemovedType, eventreason.ScaledObjectDeleted, message.ScaledObjectRemoved)
return nil
}

1 change: 0 additions & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
@@ -94,7 +94,6 @@ var _ = BeforeSuite(func() {
err = (&ScaledObjectReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleClient: scaleClient,
EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default", nil),
2 changes: 2 additions & 0 deletions pkg/common/message/message.go
Original file line number Diff line number Diff line change
@@ -28,4 +28,6 @@ const (
ScaleTargetNotFoundMsg = "Target resource doesn't exist"

ScaleTargetNoSubresourceMsg = "Target resource doesn't expose /scale subresource"

ScaledObjectRemoved = "ScaledObject was deleted"
)
6 changes: 3 additions & 3 deletions pkg/eventemitter/eventemitter.go
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ type EventEmitter struct {
type EventHandler interface {
DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error
HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error
Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string)
Emit(object runtime.Object, namesapce string, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string)
}

// EventDataHandler defines the behavior for different event handlers
@@ -325,7 +325,7 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource
}

// Emit is emitting event to both local kubernetes and custom CloudEventSource handler. After emit event to local kubernetes, event will inqueue and waitng for handler's consuming.
func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) {
func (e *EventEmitter) Emit(object runtime.Object, namesapce string, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) {
e.recorder.Event(object, eventType, reason, message)

e.eventHandlersCacheLock.RLock()
@@ -337,7 +337,7 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam
objectName, _ := meta.NewAccessor().Name(object)
objectType, _ := meta.NewAccessor().Kind(object)
eventData := eventdata.EventData{
Namespace: namesapce.Namespace,
Namespace: namesapce,
CloudEventType: cloudeventType,
ObjectName: strings.ToLower(objectName),
ObjectType: strings.ToLower(objectType),
107 changes: 106 additions & 1 deletion tests/internals/cloudevent_source/cloudevent_source_test.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ var _ = godotenv.Load("../../.env")
var (
namespace = fmt.Sprintf("%s-ns", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
deploymentName = fmt.Sprintf("%s-d", testName)
clientName = fmt.Sprintf("%s-client", testName)
cloudeventSourceName = fmt.Sprintf("%s-ce", testName)
cloudeventSourceErrName = fmt.Sprintf("%s-ce-err", testName)
@@ -43,6 +44,7 @@ var (
type templateData struct {
TestNamespace string
ScaledObject string
DeploymentName string
ClientName string
CloudEventSourceName string
CloudeventSourceErrName string
@@ -210,6 +212,56 @@ spec:
excludedEventTypes:
- keda.scaledobject.failed.v1
`

deploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.DeploymentName}}
namespace: {{.TestNamespace}}
labels:
deploy: {{.DeploymentName}}
spec:
replicas: 1
selector:
matchLabels:
pod: {{.DeploymentName}}
template:
metadata:
labels:
pod: {{.DeploymentName}}
spec:
containers:
- name: nginx
image: 'nginxinc/nginx-unprivileged'
`

scaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObject}}
namespace: {{.TestNamespace}}
spec:
scaleTargetRef:
name: {{.DeploymentName}}
pollingInterval: 5
cooldownPeriod: 5
minReplicaCount: 1
maxReplicaCount: 10
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleDown:
stabilizationWindowSeconds: 15
triggers:
- type: cron
metadata:
timezone: Etc/UTC
start: 3 * * * *
end: 5 * * * *
desiredReplicas: '4'
`
)

func TestScaledObjectGeneral(t *testing.T) {
@@ -223,6 +275,7 @@ func TestScaledObjectGeneral(t *testing.T) {
assert.True(t, WaitForAllPodRunningInNamespace(t, kc, namespace, 5, 20), "all pods should be running")

testErrEventSourceEmitValue(t, kc, data)
testEventSourceEmitValue(t, kc, data)
testErrEventSourceExcludeValue(t, kc, data)
testErrEventSourceIncludeValue(t, kc, data)
testErrEventSourceCreation(t, kc, data)
@@ -258,8 +311,16 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem
foundEvents = append(foundEvents, cloudEvent)
data := map[string]string{}
err := cloudEvent.DataAs(&data)
t.Log("--- test emitting eventsource about scaledobject err---", "message", data["message"])

assert.NoError(t, err)
assert.Equal(t, data["message"], "ScaledObject doesn't have correct scaleTargetRef specification")
assert.Condition(t, func() bool {
if data["message"] == "ScaledObject doesn't have correct scaleTargetRef specification" || data["message"] == "Target resource doesn't exist" {
return true
}
return false
}, "get filtered event")

assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.failed.v1")
assert.Equal(t, cloudEvent.Source(), expectedSource)
assert.Equal(t, cloudEvent.DataContentType(), "application/json")
@@ -272,6 +333,49 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem
assert.NotEmpty(t, foundEvents)
}

func testEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data templateData) {
t.Log("--- test emitting eventsource about scaledobject removed---")
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "deploymentTemplate", deploymentTemplate)

// wait 15 seconds to ensure event propagation
time.Sleep(5 * time.Second)
KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
time.Sleep(10 * time.Second)

out, outErr, err := ExecCommandOnSpecificPod(t, clientName, namespace, fmt.Sprintf("curl -X GET %s/getCloudEvent/%s", cloudEventHTTPServiceURL, "ScaledObjectDeleted"))
assert.NotEmpty(t, out)
assert.Empty(t, outErr)
assert.NoError(t, err, "dont expect error requesting ")

cloudEvents := []cloudevents.Event{}
err = json.Unmarshal([]byte(out), &cloudEvents)

assert.NoError(t, err, "dont expect error unmarshaling the cloudEvents")
assert.Greater(t, len(cloudEvents), 0, "cloudEvents should have at least 1 item")

foundEvents := []cloudevents.Event{}

for _, cloudEvent := range cloudEvents {
if cloudEvent.Subject() == expectedSubject {
foundEvents = append(foundEvents, cloudEvent)
data := map[string]string{}
err := cloudEvent.DataAs(&data)

assert.NoError(t, err)
assert.Equal(t, data["message"], "ScaledObject was deleted")
assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.removed.v1")
assert.Equal(t, cloudEvent.Source(), expectedSource)
assert.Equal(t, cloudEvent.DataContentType(), "application/json")

if lastCloudEventTime.Before(cloudEvent.Time()) {
lastCloudEventTime = cloudEvent.Time()
}
}
}
assert.NotEmpty(t, foundEvents)
}

// tests error events not emitted by
func testErrEventSourceExcludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData) {
t.Log("--- test emitting eventsource about scaledobject err with exclude filter---")
@@ -362,6 +466,7 @@ func getTemplateData() (templateData, []Template) {
return templateData{
TestNamespace: namespace,
ScaledObject: scaledObjectName,
DeploymentName: deploymentName,
ClientName: clientName,
CloudEventSourceName: cloudeventSourceName,
CloudeventSourceErrName: cloudeventSourceErrName,