From c20c47e682ef14162267089867aa0c5ecd0117ad Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Thu, 11 Apr 2024 05:43:45 +0800 Subject: [PATCH] Introduce Filter CloudEvents Feature (#5424) * update on spiritzhou/filtercloudevent Signed-off-by: SpiritZhou * Update CHANGELOG.md Co-authored-by: Tom Kerkhove Signed-off-by: SpiritZhou * Update log Signed-off-by: SpiritZhou * add crd validation Signed-off-by: SpiritZhou * update Signed-off-by: SpiritZhou * Fix Signed-off-by: SpiritZhou * Update pkg/eventemitter/eventemitter.go Co-authored-by: Jorge Turrado Ferrero Signed-off-by: SpiritZhou * Update Signed-off-by: SpiritZhou * Fix Signed-off-by: SpiritZhou * Fix Signed-off-by: SpiritZhou * Update Signed-off-by: SpiritZhou * Update Signed-off-by: SpiritZhou --------- Signed-off-by: SpiritZhou Signed-off-by: Jorge Turrado Ferrero Co-authored-by: Tom Kerkhove Co-authored-by: Jorge Turrado Ferrero --- CHANGELOG.md | 2 +- .../eventing/v1alpha1/cloudevent_types.go | 12 +- .../v1alpha1/cloudeventsource_types.go | 16 +- .../v1alpha1/cloudeventsource_webhook.go | 97 +++++++ .../v1alpha1/cloudeventsource_webhook_test.go | 242 ++++++++++++++++++ .../v1alpha1/zz_generated.deepcopy.go | 28 +- cmd/webhooks/main.go | 10 +- .../eventing.keda.sh_cloudeventsources.yaml | 22 ++ config/webhooks/validation_webhooks.yaml | 24 ++ controllers/keda/scaledobject_controller.go | 9 +- pkg/eventemitter/cloudevent_http_handler.go | 2 +- .../cloudevent_http_handler_test.go | 12 +- pkg/eventemitter/eventdata/eventdata.go | 22 +- pkg/eventemitter/eventemitter.go | 55 +++- pkg/eventemitter/eventemitter_test.go | 32 ++- pkg/eventemitter/eventfilter.go | 51 ++++ pkg/mock/mock_eventemitter/mock_interface.go | 2 +- .../cloudevent_source_test.go | 170 +++++++++++- 18 files changed, 754 insertions(+), 54 deletions(-) rename pkg/eventemitter/eventtypes.go => apis/eventing/v1alpha1/cloudevent_types.go (63%) create mode 100644 apis/eventing/v1alpha1/cloudeventsource_webhook.go create mode 100644 apis/eventing/v1alpha1/cloudeventsource_webhook_test.go create mode 100644 pkg/eventemitter/eventfilter.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 944b9f6109d..d9429608c3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,7 +53,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **General**: Provide capability to filter CloudEvents ([#3533](https://github.com/kedacore/keda/issues/3533)) - **NATS Scaler**: Add TLS authentication ([#2296](https://github.com/kedacore/keda/issues/2296)) #### Experimental diff --git a/pkg/eventemitter/eventtypes.go b/apis/eventing/v1alpha1/cloudevent_types.go similarity index 63% rename from pkg/eventemitter/eventtypes.go rename to apis/eventing/v1alpha1/cloudevent_types.go index 3d00a96f758..89e14109882 100644 --- a/pkg/eventemitter/eventtypes.go +++ b/apis/eventing/v1alpha1/cloudevent_types.go @@ -14,12 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -package eventemitter +package v1alpha1 + +// CloudEventType contains the list of cloudevent types +// +kubebuilder:validation:Enum=keda.scaledobject.ready.v1;keda.scaledobject.failed.v1 +type CloudEventType string const ( // ScaledObjectReadyType is for event when a new ScaledObject is ready - ScaledObjectReadyType = "keda.scaledobject.ready.v1" + ScaledObjectReadyType CloudEventType = "keda.scaledobject.ready.v1" // ScaledObjectFailedType is for event when creating ScaledObject failed - ScaledObjectFailedType = "keda.scaledobject.failed.v1" + ScaledObjectFailedType CloudEventType = "keda.scaledobject.failed.v1" ) + +var AllEventTypes = []CloudEventType{ScaledObjectFailedType, ScaledObjectReadyType} diff --git a/apis/eventing/v1alpha1/cloudeventsource_types.go b/apis/eventing/v1alpha1/cloudeventsource_types.go index 2ede7dbcd10..8b81700eaad 100644 --- a/apis/eventing/v1alpha1/cloudeventsource_types.go +++ b/apis/eventing/v1alpha1/cloudeventsource_types.go @@ -51,6 +51,9 @@ type CloudEventSourceSpec struct { ClusterName string `json:"clusterName,omitempty"` Destination Destination `json:"destination"` + + // +optional + EventSubscription EventSubscription `json:"eventSubscription,omitempty"` } // CloudEventSourceStatus defines the observed state of CloudEventSource @@ -70,13 +73,22 @@ type CloudEventHTTP struct { URI string `json:"uri"` } +// EventSubscription defines filters for events +type EventSubscription struct { + // +optional + IncludedEventTypes []CloudEventType `json:"includedEventTypes,omitempty"` + + // +optional + ExcludedEventTypes []CloudEventType `json:"excludedEventTypes,omitempty"` +} + func init() { SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{}) } // GenerateIdentifier returns identifier for the object in for "kind.namespace.name" -func (t *CloudEventSource) GenerateIdentifier() string { - return v1alpha1.GenerateIdentifier("CloudEventSource", t.Namespace, t.Name) +func (ces *CloudEventSource) GenerateIdentifier() string { + return v1alpha1.GenerateIdentifier("CloudEventSource", ces.Namespace, ces.Name) } // GetCloudEventSourceInitializedConditions returns CloudEventSource Conditions initialized to the default -> Status: Unknown diff --git a/apis/eventing/v1alpha1/cloudeventsource_webhook.go b/apis/eventing/v1alpha1/cloudeventsource_webhook.go new file mode 100644 index 00000000000..b520fc4f27f --- /dev/null +++ b/apis/eventing/v1alpha1/cloudeventsource_webhook.go @@ -0,0 +1,97 @@ +/* +Copyright 2024 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "encoding/json" + "fmt" + + "golang.org/x/exp/slices" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/webhook" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +var cloudeventsourcelog = logf.Log.WithName("cloudeventsource-validation-webhook") + +func (ces *CloudEventSource) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(ces). + Complete() +} + +// +kubebuilder:webhook:path=/validate-eventing-keda-sh-v1alpha1-cloudeventsource,mutating=false,failurePolicy=ignore,sideEffects=None,groups=eventing.keda.sh,resources=cloudeventsources,verbs=create;update,versions=v1alpha1,name=vcloudeventsource.kb.io,admissionReviewVersions=v1 + +var _ webhook.Validator = &CloudEventSource{} + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (ces *CloudEventSource) ValidateCreate() (admission.Warnings, error) { + val, _ := json.MarshalIndent(ces, "", " ") + cloudeventsourcelog.Info(fmt.Sprintf("validating cloudeventsource creation for %s", string(val))) + return validateSpec(&ces.Spec) +} + +func (ces *CloudEventSource) ValidateUpdate(old runtime.Object) (admission.Warnings, error) { + val, _ := json.MarshalIndent(ces, "", " ") + cloudeventsourcelog.V(1).Info(fmt.Sprintf("validating cloudeventsource update for %s", string(val))) + + oldCes := old.(*CloudEventSource) + if isCloudEventSourceRemovingFinalizer(ces.ObjectMeta, oldCes.ObjectMeta, ces.Spec, oldCes.Spec) { + cloudeventsourcelog.V(1).Info("finalizer removal, skipping validation") + return nil, nil + } + return validateSpec(&ces.Spec) +} + +func (ces *CloudEventSource) ValidateDelete() (admission.Warnings, error) { + return nil, nil +} + +func isCloudEventSourceRemovingFinalizer(om metav1.ObjectMeta, oldOm metav1.ObjectMeta, spec CloudEventSourceSpec, oldSpec CloudEventSourceSpec) bool { + cesSpec, _ := json.MarshalIndent(spec, "", " ") + oldCesSpec, _ := json.MarshalIndent(oldSpec, "", " ") + cesSpecString := string(cesSpec) + oldCesSpecString := string(oldCesSpec) + + return len(om.Finalizers) == 0 && len(oldOm.Finalizers) == 1 && cesSpecString == oldCesSpecString +} + +func validateSpec(spec *CloudEventSourceSpec) (admission.Warnings, error) { + if spec.EventSubscription.ExcludedEventTypes != nil && spec.EventSubscription.IncludedEventTypes != nil { + return nil, fmt.Errorf("setting included types and excluded types at the same time is not supported") + } + + if spec.EventSubscription.ExcludedEventTypes != nil { + for _, excludedEventType := range spec.EventSubscription.ExcludedEventTypes { + if !slices.Contains(AllEventTypes, excludedEventType) { + return nil, fmt.Errorf("excludedEventType: %s in cloudeventsource spec is not supported", excludedEventType) + } + } + } + + if spec.EventSubscription.IncludedEventTypes != nil { + for _, includedEventType := range spec.EventSubscription.IncludedEventTypes { + if !slices.Contains(AllEventTypes, includedEventType) { + return nil, fmt.Errorf("includedEventType: %s in cloudeventsource spec is not supported", includedEventType) + } + } + } + return nil, nil +} diff --git a/apis/eventing/v1alpha1/cloudeventsource_webhook_test.go b/apis/eventing/v1alpha1/cloudeventsource_webhook_test.go new file mode 100644 index 00000000000..425dba8a148 --- /dev/null +++ b/apis/eventing/v1alpha1/cloudeventsource_webhook_test.go @@ -0,0 +1,242 @@ +/* +Copyright 2024 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "path/filepath" + "strconv" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + admissionv1beta1 "k8s.io/api/admission/v1beta1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +var cfg *rest.Config +var k8sClient client.Client +var testEnv *envtest.Environment +var ctx context.Context +var cancel context.CancelFunc + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "Webhook Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + ctx, cancel = context.WithCancel(context.Background()) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: false, + WebhookInstallOptions: envtest.WebhookInstallOptions{ + Paths: []string{filepath.Join("..", "..", "..", "config", "webhooks")}, + }, + } + var err error + // cfg is defined in this file globally. + done := make(chan interface{}) + go func() { + defer GinkgoRecover() + cfg, err = testEnv.Start() + close(done) + }() + Eventually(done).WithTimeout(time.Minute).Should(BeClosed()) + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + scheme := runtime.NewScheme() + err = AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + + err = clientgoscheme.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + + err = admissionv1beta1.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + + //+kubebuilder:scaffold:scheme + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + + // start webhook server using Manager + webhookInstallOptions := &testEnv.WebhookInstallOptions + mgr, err := ctrl.NewManager(cfg, ctrl.Options{ + Scheme: scheme, + WebhookServer: webhook.NewServer(webhook.Options{ + Host: webhookInstallOptions.LocalServingHost, + Port: webhookInstallOptions.LocalServingPort, + CertDir: webhookInstallOptions.LocalServingCertDir, + }), + LeaderElection: false, + Metrics: server.Options{ + BindAddress: "0", + }, + }) + Expect(err).NotTo(HaveOccurred()) + + err = (&CloudEventSource{}).SetupWebhookWithManager(mgr) + Expect(err).NotTo(HaveOccurred()) + + //+kubebuilder:scaffold:webhook + + go func() { + defer GinkgoRecover() + err = mgr.Start(ctx) + Expect(err).NotTo(HaveOccurred()) + }() + + // wait for the webhook server to get ready + dialer := &net.Dialer{Timeout: time.Second} + addrPort := fmt.Sprintf("%s:%d", webhookInstallOptions.LocalServingHost, webhookInstallOptions.LocalServingPort) + Eventually(func() error { + conn, err := tls.DialWithDialer(dialer, "tcp", addrPort, &tls.Config{InsecureSkipVerify: true}) + if err != nil { + return err + } + conn.Close() + return nil + }).Should(Succeed()) + +}) + +var _ = It("validate cloudeventsource when event type is not support", func() { + namespaceName := "nscloudeventnotsupport" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + spec := createCloudEventSourceSpecWithExcludeEventType("keda.scaledobject.ready.v1.test") + ces := createCloudEventSource("nsccesexcludenotsupport", namespaceName, spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ces) + }).Should(HaveOccurred()) + + spec = createCloudEventSourceSpecWithIncludeEventType("keda.scaledobject.ready.v1.test") + ces = createCloudEventSource("nsccesincludenotsupport", namespaceName, spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ces) + }).Should(HaveOccurred()) +}) + +var _ = It("validate cloudeventsource when event type is support", func() { + namespaceName := "cloudeventtestns" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + for k, eventType := range AllEventTypes { + spec := createCloudEventSourceSpecWithExcludeEventType(eventType) + ces := createCloudEventSource("cloudeventexclude"+strconv.Itoa(k), namespaceName, spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ces) + }).ShouldNot(HaveOccurred()) + } + + for k, eventType := range AllEventTypes { + spec := createCloudEventSourceSpecWithIncludeEventType(eventType) + ces := createCloudEventSource("cloudeventinclude"+strconv.Itoa(k), namespaceName, spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ces) + }).ShouldNot(HaveOccurred()) + } +}) + +var _ = It("validate invalid cloudeventsource which eventtype in both excludetypes and includetypes", func() { + namespaceName := "cloudeventtestnsinvalid" + namespace := createNamespace(namespaceName) + err := k8sClient.Create(context.Background(), namespace) + Expect(err).ToNot(HaveOccurred()) + + spec := createInvalidCloudEventSourceSpe(ScaledObjectReadyType) + ces := createCloudEventSource("invalidcloudevent", namespaceName, spec) + Eventually(func() error { + return k8sClient.Create(context.Background(), ces) + }).Should(HaveOccurred()) +}) + +// -------------------------------------------------------------------------- // +// ----------------------------- HELP FUNCTIONS ----------------------------- // +// -------------------------------------------------------------------------- // + +func createNamespace(name string) *v1.Namespace { + return &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + } +} + +func createCloudEventSourceSpecWithExcludeEventType(eventtype CloudEventType) CloudEventSourceSpec { + return CloudEventSourceSpec{ + EventSubscription: EventSubscription{ + ExcludedEventTypes: []CloudEventType{eventtype}, + }, + } +} + +func createCloudEventSourceSpecWithIncludeEventType(eventtype CloudEventType) CloudEventSourceSpec { + return CloudEventSourceSpec{ + EventSubscription: EventSubscription{ + IncludedEventTypes: []CloudEventType{eventtype}, + }, + } +} + +func createInvalidCloudEventSourceSpe(eventtype CloudEventType) CloudEventSourceSpec { + return CloudEventSourceSpec{ + EventSubscription: EventSubscription{ + ExcludedEventTypes: []CloudEventType{eventtype}, + IncludedEventTypes: []CloudEventType{eventtype}, + }, + } +} + +func createCloudEventSource(name string, namespace string, spec CloudEventSourceSpec) *CloudEventSource { + return &CloudEventSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "CloudEventSource", + APIVersion: "eventing.keda.sh", + }, + Spec: spec, + } +} diff --git a/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/apis/eventing/v1alpha1/zz_generated.deepcopy.go index 57f0e41b882..a76e4c544df 100644 --- a/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -22,7 +22,7 @@ package v1alpha1 import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -103,6 +103,7 @@ func (in *CloudEventSourceList) DeepCopyObject() runtime.Object { func (in *CloudEventSourceSpec) DeepCopyInto(out *CloudEventSourceSpec) { *out = *in in.Destination.DeepCopyInto(&out.Destination) + in.EventSubscription.DeepCopyInto(&out.EventSubscription) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CloudEventSourceSpec. @@ -154,3 +155,28 @@ func (in *Destination) DeepCopy() *Destination { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventSubscription) DeepCopyInto(out *EventSubscription) { + *out = *in + if in.IncludedEventTypes != nil { + in, out := &in.IncludedEventTypes, &out.IncludedEventTypes + *out = make([]CloudEventType, len(*in)) + copy(*out, *in) + } + if in.ExcludedEventTypes != nil { + in, out := &in.ExcludedEventTypes, &out.ExcludedEventTypes + *out = make([]CloudEventType, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventSubscription. +func (in *EventSubscription) DeepCopy() *EventSubscription { + if in == nil { + return nil + } + out := new(EventSubscription) + in.DeepCopyInto(out) + return out +} diff --git a/cmd/webhooks/main.go b/cmd/webhooks/main.go index 4f03ba99b63..d9832bdd1b7 100644 --- a/cmd/webhooks/main.go +++ b/cmd/webhooks/main.go @@ -34,6 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/k8s" kedautil "github.com/kedacore/keda/v2/pkg/util" @@ -49,6 +50,7 @@ func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(kedav1alpha1.AddToScheme(scheme)) + utilruntime.Must(eventingv1alpha1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } @@ -138,6 +140,10 @@ func setupWebhook(mgr manager.Manager) { setupLog.Error(err, "unable to create webhook", "webhook", "ScaledObject") os.Exit(1) } + if err := (&kedav1alpha1.ScaledJob{}).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "ScaledJob") + os.Exit(1) + } if err := (&kedav1alpha1.TriggerAuthentication{}).SetupWebhookWithManager(mgr); err != nil { setupLog.Error(err, "unable to create webhook", "webhook", "TriggerAuthentication") os.Exit(1) @@ -146,8 +152,8 @@ func setupWebhook(mgr manager.Manager) { setupLog.Error(err, "unable to create webhook", "webhook", "ClusterTriggerAuthentication") os.Exit(1) } - if err := (&kedav1alpha1.ScaledJob{}).SetupWebhookWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create webhook", "webhook", "ScaledJob") + if err := (&eventingv1alpha1.CloudEventSource{}).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "CloudEventSource") os.Exit(1) } } diff --git a/config/crd/bases/eventing.keda.sh_cloudeventsources.yaml b/config/crd/bases/eventing.keda.sh_cloudeventsources.yaml index 2225f2b6ec8..8fd3c5417b5 100644 --- a/config/crd/bases/eventing.keda.sh_cloudeventsources.yaml +++ b/config/crd/bases/eventing.keda.sh_cloudeventsources.yaml @@ -57,6 +57,28 @@ spec: - uri type: object type: object + eventSubscription: + description: EventSubscription defines filters for events + properties: + excludedEventTypes: + items: + description: CloudEventType contains the list of cloudevent + types + enum: + - keda.scaledobject.ready.v1 + - keda.scaledobject.failed.v1 + type: string + type: array + includedEventTypes: + items: + description: CloudEventType contains the list of cloudevent + types + enum: + - keda.scaledobject.ready.v1 + - keda.scaledobject.failed.v1 + type: string + type: array + type: object required: - destination type: object diff --git a/config/webhooks/validation_webhooks.yaml b/config/webhooks/validation_webhooks.yaml index 0e35590bc11..14ff71baef8 100644 --- a/config/webhooks/validation_webhooks.yaml +++ b/config/webhooks/validation_webhooks.yaml @@ -105,3 +105,27 @@ webhooks: - clustertriggerauthentications sideEffects: None timeoutSeconds: 10 +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: keda-admission-webhooks + namespace: keda + path: /validate-eventing-keda-sh-v1alpha1-cloudeventsource + failurePolicy: Ignore + matchPolicy: Equivalent + name: vcloudeventsource.kb.io + namespaceSelector: {} + objectSelector: {} + rules: + - apiGroups: + - eventing.keda.sh + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - cloudeventsources + sideEffects: None + timeoutSeconds: 10 diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index f634738d3c3..27625f6f10a 100755 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -42,6 +42,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util" "github.com/kedacore/keda/v2/pkg/common/message" @@ -183,7 +184,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request if !scaledObject.Status.Conditions.AreInitialized() { conditions := kedav1alpha1.GetInitializedConditions() if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil { - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventemitter.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) + r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) return ctrl.Result{}, err } } @@ -195,18 +196,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledObject check failed") - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventemitter.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg) + r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg) } else { wasReady := conditions.GetReadyCondition() if wasReady.IsFalse() || wasReady.IsUnknown() { - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventemitter.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg) + r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg) } reqLogger.V(1).Info(msg) conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg) } if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil { - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventemitter.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) + r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) return ctrl.Result{}, err } diff --git a/pkg/eventemitter/cloudevent_http_handler.go b/pkg/eventemitter/cloudevent_http_handler.go index e02e64861db..fea8b943b7e 100644 --- a/pkg/eventemitter/cloudevent_http_handler.go +++ b/pkg/eventemitter/cloudevent_http_handler.go @@ -93,7 +93,7 @@ func (c *CloudEventHTTPHandler) EmitEvent(eventData eventdata.EventData, failure event := cloudevents.NewEvent() event.SetSource(source) event.SetSubject(subject) - event.SetType(eventData.EventType) + event.SetType(string(eventData.CloudEventType)) if err := event.SetData(cloudevents.ApplicationJSON, EmitData{Reason: eventData.Reason, Message: eventData.Message}); err != nil { c.logger.Error(err, "Failed to set data to CloudEvents receiver") diff --git a/pkg/eventemitter/cloudevent_http_handler_test.go b/pkg/eventemitter/cloudevent_http_handler_test.go index b9ce9c250c2..28fc91aa098 100644 --- a/pkg/eventemitter/cloudevent_http_handler_test.go +++ b/pkg/eventemitter/cloudevent_http_handler_test.go @@ -51,12 +51,12 @@ var testErrCloudeventHTTPHandlerTestData = []parseCloudeventHTTPHandlerTestData{ } var testErrEventData = eventdata.EventData{ - Namespace: "aaa", - ObjectName: "bbb", - EventType: "ccc", - Reason: "ddd", - Message: "eee", - Time: time.Now().UTC(), + Namespace: "aaa", + ObjectName: "bbb", + CloudEventType: "ccc", + Reason: "ddd", + Message: "eee", + Time: time.Now().UTC(), } func TestCorrectCloudeventHTTPHandler(t *testing.T) { diff --git a/pkg/eventemitter/eventdata/eventdata.go b/pkg/eventemitter/eventdata/eventdata.go index e2a571d2035..3536f79fc0e 100644 --- a/pkg/eventemitter/eventdata/eventdata.go +++ b/pkg/eventemitter/eventdata/eventdata.go @@ -18,18 +18,20 @@ package eventdata import ( "time" + + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" ) // EventData will save all event info and handler info for retry. type EventData struct { - Namespace string - ObjectName string - ObjectType string - EventType string - Reason string - Message string - Time time.Time - HandlerKey string - RetryTimes int - Err error + Namespace string + ObjectName string + ObjectType string + CloudEventType eventingv1alpha1.CloudEventType + Reason string + Message string + Time time.Time + HandlerKey string + RetryTimes int + Err error } diff --git a/pkg/eventemitter/eventemitter.go b/pkg/eventemitter/eventemitter.go index 90ba7153383..7f473de74a3 100644 --- a/pkg/eventemitter/eventemitter.go +++ b/pkg/eventemitter/eventemitter.go @@ -59,7 +59,9 @@ type EventEmitter struct { recorder record.EventRecorder clusterName string eventHandlersCache map[string]EventDataHandler + eventFilterCache map[string]*EventFilter eventHandlersCacheLock *sync.RWMutex + eventFilterCacheLock *sync.RWMutex eventLoopContexts *sync.Map cloudEventProcessingChan chan eventdata.EventData } @@ -68,7 +70,7 @@ type EventEmitter struct { type EventHandler interface { DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error - Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType string, reason string, message string) + Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string) } // EventDataHandler defines the behavior for different event handlers @@ -97,7 +99,9 @@ func NewEventEmitter(client client.Client, recorder record.EventRecorder, cluste recorder: recorder, clusterName: clusterName, eventHandlersCache: map[string]EventDataHandler{}, + eventFilterCache: map[string]*EventFilter{}, eventHandlersCacheLock: &sync.RWMutex{}, + eventFilterCacheLock: &sync.RWMutex{}, eventLoopContexts: &sync.Map{}, cloudEventProcessingChan: make(chan eventdata.EventData, maxChannelBuffer), } @@ -165,7 +169,9 @@ func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource *eventingv1alpha1 // use in the loop. func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) { e.eventHandlersCacheLock.Lock() + e.eventFilterCacheLock.Lock() defer e.eventHandlersCacheLock.Unlock() + defer e.eventFilterCacheLock.Unlock() key := cloudEventSource.GenerateIdentifier() @@ -188,12 +194,17 @@ func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource } e.eventHandlersCache[eventHandlerKey] = eventHandler } + + // Create EventFilter from CloudEventSource + e.eventFilterCache[key] = NewEventFilter(cloudEventSource.Spec.EventSubscription.IncludedEventTypes, cloudEventSource.Spec.EventSubscription.ExcludedEventTypes) } // clearEventHandlersCache will clear all event handlers that created by the passing CloudEventSource func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha1.CloudEventSource) { e.eventHandlersCacheLock.Lock() defer e.eventHandlersCacheLock.Unlock() + e.eventFilterCacheLock.Lock() + defer e.eventFilterCacheLock.Unlock() key := cloudEventSource.GenerateIdentifier() @@ -205,6 +216,8 @@ func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha delete(e.eventHandlersCache, key) } } + + delete(e.eventFilterCache, key) } // checkIfEventHandlersExist will check if the event handlers that were created by passing CloudEventSource exist @@ -274,7 +287,7 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource } // Emit is emitting event to both local kubernetes and custom CloudEventSource handler. After emit event to local kubernetes, event will inqueue and waitng for handler's consuming. -func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType, cloudeventType, reason, message string) { +func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) { e.recorder.Event(object, eventType, reason, message) e.eventHandlersCacheLock.RLock() @@ -286,13 +299,13 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam objectName, _ := meta.NewAccessor().Name(object) objectType, _ := meta.NewAccessor().Kind(object) eventData := eventdata.EventData{ - Namespace: namesapce.Namespace, - EventType: cloudeventType, - ObjectName: strings.ToLower(objectName), - ObjectType: strings.ToLower(objectType), - Reason: reason, - Message: message, - Time: time.Now().UTC(), + Namespace: namesapce.Namespace, + CloudEventType: cloudeventType, + ObjectName: strings.ToLower(objectName), + ObjectType: strings.ToLower(objectType), + Reason: reason, + Message: message, + Time: time.Now().UTC(), } go e.enqueueEventData(eventData) } @@ -324,6 +337,19 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) { if eventData.HandlerKey == "" { for key, handler := range e.eventHandlersCache { + e.eventFilterCacheLock.RLock() + defer e.eventFilterCacheLock.RUnlock() + // Filter Event + identifierKey := getPrefixIdentifierFromKey(key) + + if e.eventFilterCache[identifierKey] != nil { + isFiltered := e.eventFilterCache[identifierKey].FilterEvent(eventData.CloudEventType) + if isFiltered { + e.log.V(1).Info("Event is filtered", "cloudeventType", eventData.CloudEventType, "event identifier", identifierKey) + return + } + } + eventData.HandlerKey = key if handler.GetActiveStatus() == metav1.ConditionTrue { go handler.EmitEvent(eventData, e.emitErrorHandle) @@ -400,6 +426,16 @@ func newEventHandlerKey(kindNamespaceName string, handlerType string) string { / return fmt.Sprintf("%s.%s", kindNamespaceName, handlerType) } +// getPrefixIdentifierFromKey will return the prefix identifier from the handler key. Handler key is generated by the format of "CloudEventSource.Namespace.Name.HandlerType" and the prefix identifier is "CloudEventSource.Namespace.Name" +func getPrefixIdentifierFromKey(handlerKey string) string { + keys := strings.Split(handlerKey, ".") + if len(keys) >= 3 { + return keys[0] + "." + keys[1] + "." + keys[2] + } + return "" +} + +// getHandlerTypeFromKey will return the handler type from the handler key. Handler key is generated by the format of "CloudEventSource.Namespace.Name.HandlerType" and the handler type is "HandlerType" func getHandlerTypeFromKey(handlerKey string) string { keys := strings.Split(handlerKey, ".") if len(keys) >= 4 { @@ -408,6 +444,7 @@ func getHandlerTypeFromKey(handlerKey string) string { return "" } +// getSourceNameFromKey will return the handler type from the source name. Source name is generated by the format of "CloudEventSource.Namespace.Name.HandlerType" and the source name is "Name" func getSourceNameFromKey(handlerKey string) string { keys := strings.Split(handlerKey, ".") if len(keys) >= 4 { diff --git a/pkg/eventemitter/eventemitter_test.go b/pkg/eventemitter/eventemitter_test.go index 7e93466a6d4..a8ad2ba1986 100644 --- a/pkg/eventemitter/eventemitter_test.go +++ b/pkg/eventemitter/eventemitter_test.go @@ -66,23 +66,27 @@ func TestEventHandler_FailedEmitEvent(t *testing.T) { key := newEventHandlerKey(cloudEventSource.GenerateIdentifier(), cloudEventHandlerTypeHTTP) caches[key] = eventHandler + filtercaches := map[string]*EventFilter{} + eventEmitter := EventEmitter{ client: mockClient, recorder: recorder, clusterName: "cluster-name", eventHandlersCache: caches, eventHandlersCacheLock: &sync.RWMutex{}, + eventFilterCache: filtercaches, + eventFilterCacheLock: &sync.RWMutex{}, eventLoopContexts: &sync.Map{}, cloudEventProcessingChan: make(chan eventdata.EventData, 1), } eventData := eventdata.EventData{ - Namespace: "aaa", - ObjectName: "bbb", - EventType: "ccc", - Reason: "ddd", - Message: "eee", - Time: time.Now().UTC(), + Namespace: "aaa", + ObjectName: "bbb", + CloudEventType: "ccc", + Reason: "ddd", + Message: "eee", + Time: time.Now().UTC(), } mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -128,23 +132,27 @@ func TestEventHandler_DirectCall(t *testing.T) { key := newEventHandlerKey(cloudEventSource.GenerateIdentifier(), cloudEventHandlerTypeHTTP) caches[key] = eventHandler + filtercaches := map[string]*EventFilter{} + eventEmitter := EventEmitter{ client: mockClient, recorder: recorder, clusterName: "cluster-name", eventHandlersCache: caches, eventHandlersCacheLock: &sync.RWMutex{}, + eventFilterCache: filtercaches, + eventFilterCacheLock: &sync.RWMutex{}, eventLoopContexts: &sync.Map{}, cloudEventProcessingChan: make(chan eventdata.EventData, 1), } eventData := eventdata.EventData{ - Namespace: "aaa", - ObjectName: "bbb", - EventType: "ccc", - Reason: "ddd", - Message: "eee", - Time: time.Now().UTC(), + Namespace: "aaa", + ObjectName: "bbb", + CloudEventType: "ccc", + Reason: "ddd", + Message: "eee", + Time: time.Now().UTC(), } mockClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() diff --git a/pkg/eventemitter/eventfilter.go b/pkg/eventemitter/eventfilter.go new file mode 100644 index 00000000000..11af3af1ac8 --- /dev/null +++ b/pkg/eventemitter/eventfilter.go @@ -0,0 +1,51 @@ +/* +Copyright 2024 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventemitter + +import ( + "golang.org/x/exp/slices" + + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" +) + +// EventFilter defines the behavior for different event handlers +type EventFilter struct { + IncludedEventTypes []eventingv1alpha1.CloudEventType + + ExcludedEventTypes []eventingv1alpha1.CloudEventType +} + +// NewEventFilter creates a new EventFilter +func NewEventFilter(includedEventTypes []eventingv1alpha1.CloudEventType, excludedEventTypes []eventingv1alpha1.CloudEventType) *EventFilter { + return &EventFilter{ + IncludedEventTypes: includedEventTypes, + ExcludedEventTypes: excludedEventTypes, + } +} + +// FilterEvent returns true if the event is filtered and should not be handled +func (e *EventFilter) FilterEvent(eventType eventingv1alpha1.CloudEventType) bool { + if len(e.IncludedEventTypes) > 0 { + return !slices.Contains(e.IncludedEventTypes, eventType) + } + + if len(e.ExcludedEventTypes) > 0 { + return slices.Contains(e.ExcludedEventTypes, eventType) + } + + return false +} diff --git a/pkg/mock/mock_eventemitter/mock_interface.go b/pkg/mock/mock_eventemitter/mock_interface.go index 63bc19082c4..d3346ea50bf 100644 --- a/pkg/mock/mock_eventemitter/mock_interface.go +++ b/pkg/mock/mock_eventemitter/mock_interface.go @@ -59,7 +59,7 @@ func (mr *MockEventHandlerMockRecorder) DeleteCloudEventSource(cloudEventSource } // Emit mocks base method. -func (m *MockEventHandler) Emit(object runtime.Object, namesapce types.NamespacedName, eventType, cloudeventType, reason, message string) { +func (m *MockEventHandler) Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType v1alpha1.CloudEventType, reason, message string) { m.ctrl.T.Helper() m.ctrl.Call(m, "Emit", object, namesapce, eventType, cloudeventType, reason, message) } diff --git a/tests/internals/cloudevent_source/cloudevent_source_test.go b/tests/internals/cloudevent_source/cloudevent_source_test.go index 01080cc4cd0..e56579207a3 100644 --- a/tests/internals/cloudevent_source/cloudevent_source_test.go +++ b/tests/internals/cloudevent_source/cloudevent_source_test.go @@ -29,12 +29,15 @@ var ( scaledObjectName = fmt.Sprintf("%s-so", testName) clientName = fmt.Sprintf("%s-client", testName) cloudeventSourceName = fmt.Sprintf("%s-ce", testName) + cloudeventSourceErrName = fmt.Sprintf("%s-ce-err", testName) + cloudeventSourceErrName2 = fmt.Sprintf("%s-ce-err2", testName) cloudEventHTTPReceiverName = fmt.Sprintf("%s-cloudevent-http-receiver", testName) cloudEventHTTPServiceName = fmt.Sprintf("%s-cloudevent-http-service", testName) cloudEventHTTPServiceURL = fmt.Sprintf("http://%s.%s.svc.cluster.local:8899", cloudEventHTTPServiceName, namespace) clusterName = "test-cluster" expectedSubject = fmt.Sprintf("/%s/%s/scaledobject/%s", clusterName, namespace, scaledObjectName) expectedSource = fmt.Sprintf("/%s/keda/keda", clusterName) + lastCloudEventTime = time.Now() ) type templateData struct { @@ -42,6 +45,8 @@ type templateData struct { ScaledObject string ClientName string CloudEventSourceName string + CloudeventSourceErrName string + CloudeventSourceErrName2 string CloudEventHTTPReceiverName string CloudEventHTTPServiceName string CloudEventHTTPServiceURL string @@ -62,6 +67,38 @@ const ( uri: {{.CloudEventHTTPServiceURL}} ` + cloudEventSourceWithExcludeTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: CloudEventSource + metadata: + name: {{.CloudEventSourceName}} + namespace: {{.TestNamespace}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + excludedEventTypes: + - keda.scaledobject.failed.v1 + ` + + cloudEventSourceWithIncludeTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: CloudEventSource + metadata: + name: {{.CloudEventSourceName}} + namespace: {{.TestNamespace}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + includedEventTypes: + - keda.scaledobject.failed.v1 + ` + cloudEventHTTPServiceTemplate = ` apiVersion: v1 kind: Service @@ -139,6 +176,40 @@ spec: - sh - -c - "exec tail -f /dev/null"` + + cloudEventSourceWithErrTypeTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: CloudEventSource + metadata: + name: {{.CloudeventSourceErrName}} + namespace: {{.TestNamespace}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + includedEventTypes: + - keda.scaledobject.failed.v2 + ` + + cloudEventSourceWithErrTypeTemplate2 = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: CloudEventSource + metadata: + name: {{.CloudeventSourceErrName2}} + namespace: {{.TestNamespace}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + includedEventTypes: + - keda.scaledobject.failed.v1 + excludedEventTypes: + - keda.scaledobject.failed.v1 + ` ) func TestScaledObjectGeneral(t *testing.T) { @@ -149,10 +220,12 @@ func TestScaledObjectGeneral(t *testing.T) { data, templates := getTemplateData() CreateKubernetesResources(t, kc, namespace, data, templates) - time.Sleep(15 * time.Second) assert.True(t, WaitForAllPodRunningInNamespace(t, kc, namespace, 5, 20), "all pods should be running") testErrEventSourceEmitValue(t, kc, data) + testErrEventSourceExcludeValue(t, kc, data) + testErrEventSourceIncludeValue(t, kc, data) + testErrEventSourceCreation(t, kc, data) DeleteKubernetesResources(t, namespace, data, templates) } @@ -163,7 +236,9 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem KubectlApplyWithTemplate(t, data, "scaledObjectErrTemplate", scaledObjectErrTemplate) // wait 15 seconds to ensure event propagation - time.Sleep(15 * time.Second) + time.Sleep(5 * time.Second) + KubectlDeleteWithTemplate(t, data, "scaledObjectErrTemplate", scaledObjectErrTemplate) + time.Sleep(10 * time.Second) out, outErr, err := ExecCommandOnSpecificPod(t, clientName, namespace, fmt.Sprintf("curl -X GET %s/getCloudEvent/%s", cloudEventHTTPServiceURL, "ScaledObjectCheckFailed")) assert.NotEmpty(t, out) @@ -188,9 +263,98 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.failed.v1") assert.Equal(t, cloudEvent.Source(), expectedSource) assert.Equal(t, cloudEvent.DataContentType(), "application/json") + + if lastCloudEventTime.Before(cloudEvent.Time()) { + lastCloudEventTime = cloudEvent.Time() + } + } + } + assert.NotEmpty(t, foundEvents) +} + +// tests error events not emitted by +func testErrEventSourceExcludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData) { + t.Log("--- test emitting eventsource about scaledobject err with exclude filter---") + + KubectlDeleteWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) + KubectlApplyWithTemplate(t, data, "cloudEventSourceWithExcludeTemplate", cloudEventSourceWithExcludeTemplate) + KubectlApplyWithTemplate(t, data, "scaledObjectErrTemplate", scaledObjectErrTemplate) + + // wait 15 seconds to ensure event propagation + time.Sleep(15 * time.Second) + + out, outErr, err := ExecCommandOnSpecificPod(t, clientName, namespace, fmt.Sprintf("curl -X GET %s/getCloudEvent/%s", cloudEventHTTPServiceURL, "ScaledObjectCheckFailed")) + assert.NotEmpty(t, out) + assert.Empty(t, outErr) + assert.NoError(t, err, "dont expect error requesting ") + + cloudEvents := []cloudevents.Event{} + err = json.Unmarshal([]byte(out), &cloudEvents) + + assert.NoError(t, err, "dont expect error unmarshaling the cloudEvents") + + for _, cloudEvent := range cloudEvents { + assert.Condition(t, func() bool { + if cloudEvent.Subject() == expectedSubject && + cloudEvent.Time().After(lastCloudEventTime) && + cloudEvent.Type() == "keda.scaledobject.failed.v1" { + return false + } + return true + }, "get filtered event") + } + + KubectlDeleteWithTemplate(t, data, "cloudEventSourceWithExcludeTemplate", cloudEventSourceWithExcludeTemplate) + KubectlApplyWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) +} + +// tests error events in include filter +func testErrEventSourceIncludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData) { + t.Log("--- test emitting eventsource about scaledobject err with include filter---") + + KubectlDeleteWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) + KubectlApplyWithTemplate(t, data, "cloudEventSourceWithIncludeTemplate", cloudEventSourceWithIncludeTemplate) + KubectlApplyWithTemplate(t, data, "scaledObjectErrTemplate", scaledObjectErrTemplate) + + // wait 15 seconds to ensure event propagation + time.Sleep(15 * time.Second) + + out, outErr, err := ExecCommandOnSpecificPod(t, clientName, namespace, fmt.Sprintf("curl -X GET %s/getCloudEvent/%s", cloudEventHTTPServiceURL, "ScaledObjectCheckFailed")) + assert.NotEmpty(t, out) + assert.Empty(t, outErr) + assert.NoError(t, err, "dont expect error requesting ") + + cloudEvents := []cloudevents.Event{} + err = json.Unmarshal([]byte(out), &cloudEvents) + + assert.NoError(t, err, "dont expect error unmarshaling the cloudEvents") + + foundEvents := []cloudevents.Event{} + for _, cloudEvent := range cloudEvents { + if cloudEvent.Subject() == expectedSubject && + cloudEvent.Time().After(lastCloudEventTime) && + cloudEvent.Type() == "keda.scaledobject.failed.v1" { + foundEvents = append(foundEvents, cloudEvent) } } assert.NotEmpty(t, foundEvents) + KubectlDeleteWithTemplate(t, data, "cloudEventSourceWithIncludeTemplate", cloudEventSourceWithIncludeTemplate) + KubectlApplyWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) +} + +// tests error event type when creation +func testErrEventSourceCreation(t *testing.T, _ *kubernetes.Clientset, data templateData) { + t.Log("--- test emitting eventsource about scaledobject err with include filter---") + + KubectlDeleteWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) + + err := KubectlApplyWithErrors(t, data, "cloudEventSourceWithErrTypeTemplate", cloudEventSourceWithErrTypeTemplate) + assert.ErrorContains(t, err, `The CloudEventSource "eventsource-test-ce-err" is invalid:`) + + err = KubectlApplyWithErrors(t, data, "cloudEventSourceWithErrTypeTemplate2", cloudEventSourceWithErrTypeTemplate2) + assert.ErrorContains(t, err, `setting included types and excluded types at the same time is not supported`) + + KubectlApplyWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) } // help function to load template data @@ -200,6 +364,8 @@ func getTemplateData() (templateData, []Template) { ScaledObject: scaledObjectName, ClientName: clientName, CloudEventSourceName: cloudeventSourceName, + CloudeventSourceErrName: cloudeventSourceErrName, + CloudeventSourceErrName2: cloudeventSourceErrName2, CloudEventHTTPReceiverName: cloudEventHTTPReceiverName, CloudEventHTTPServiceName: cloudEventHTTPServiceName, CloudEventHTTPServiceURL: cloudEventHTTPServiceURL,