diff --git a/internal/controllers/receiver_controller.go b/internal/controllers/receiver_controller.go index 85abeb71b..e57272dd4 100644 --- a/internal/controllers/receiver_controller.go +++ b/internal/controllers/receiver_controller.go @@ -41,6 +41,7 @@ import ( "github.com/fluxcd/pkg/runtime/predicates" apiv1 "github.com/fluxcd/notification-controller/api/v1" + "github.com/fluxcd/notification-controller/internal/server" ) // ReceiverReconciler reconciles a Receiver object @@ -62,6 +63,12 @@ func (r *ReceiverReconciler) SetupWithManager(mgr ctrl.Manager) error { } func (r *ReceiverReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts ReceiverReconcilerOptions) error { + // This index is used to list Receivers by their webhook path after the receiver server + // gets a request. + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.Receiver{}, + server.WebhookPathIndexKey, server.IndexReceiverWebhookPath); err != nil { + return err + } recoverPanic := true return ctrl.NewControllerManagedBy(mgr). For(&apiv1.Receiver{}, builder.WithPredicates( diff --git a/internal/controllers/receiver_controller_test.go b/internal/controllers/receiver_controller_test.go index f7c9696b6..5e84da96e 100644 --- a/internal/controllers/receiver_controller_test.go +++ b/internal/controllers/receiver_controller_test.go @@ -198,7 +198,9 @@ func TestReceiverReconciler_EventHandler(t *testing.T) { timeout := 30 * time.Second resultR := &apiv1.Receiver{} - receiverServer := server.NewReceiverServer("127.0.0.1:56788", logf.Log, k8sClient) + // Use the client from the manager as the server handler needs to list objects from the cache + // which the "live" k8s client does not have access to. + receiverServer := server.NewReceiverServer("127.0.0.1:56788", logf.Log, testEnv.GetClient()) receiverMdlw := middleware.New(middleware.Config{ Recorder: prommetrics.NewRecorder(prommetrics.Config{ Prefix: "gotk_receiver", diff --git a/internal/server/receiver_handler_test.go b/internal/server/receiver_handler_test.go index de3d00381..6e60e6662 100644 --- a/internal/server/receiver_handler_test.go +++ b/internal/server/receiver_handler_test.go @@ -321,7 +321,7 @@ func Test_handlePayload(t *testing.T) { Conditions: []metav1.Condition{{Type: meta.StalledCondition, Status: metav1.ConditionFalse}}, }, }, - expectedResponseCode: http.StatusNotFound, + expectedResponseCode: http.StatusServiceUnavailable, }, { name: "suspended receiver ignored", @@ -337,7 +337,7 @@ func Test_handlePayload(t *testing.T) { Conditions: []metav1.Condition{{Type: meta.ReadyCondition, Status: metav1.ConditionTrue}}, }, }, - expectedResponseCode: http.StatusNotFound, + expectedResponseCode: http.StatusServiceUnavailable, }, { name: "missing apiVersion in resource", @@ -372,7 +372,7 @@ func Test_handlePayload(t *testing.T) { "token": []byte("token"), }, }, - expectedResponseCode: http.StatusBadRequest, + expectedResponseCode: http.StatusInternalServerError, }, { name: "resource by name not found", @@ -406,7 +406,7 @@ func Test_handlePayload(t *testing.T) { "token": []byte("token"), }, }, - expectedResponseCode: http.StatusBadRequest, + expectedResponseCode: http.StatusInternalServerError, }, { name: "annotating resources by label match", @@ -563,7 +563,7 @@ func Test_handlePayload(t *testing.T) { "token": []byte("token"), }, }, - expectedResponseCode: http.StatusBadRequest, + expectedResponseCode: http.StatusInternalServerError, }, { name: "resource matchLabels is ignored if name is not *", @@ -641,6 +641,7 @@ func Test_handlePayload(t *testing.T) { } builder.WithObjects(tt.resources...) + builder.WithIndex(&apiv1.Receiver{}, WebhookPathIndexKey, IndexReceiverWebhookPath) if tt.secret != nil { builder.WithObjects(tt.secret) diff --git a/internal/server/receiver_handlers.go b/internal/server/receiver_handlers.go index 7632fbde9..cc589ba9f 100644 --- a/internal/server/receiver_handlers.go +++ b/internal/server/receiver_handlers.go @@ -23,6 +23,7 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -43,6 +44,10 @@ import ( apiv1 "github.com/fluxcd/notification-controller/api/v1" ) +var ( + WebhookPathIndexKey = ".metadata.webhookPath" +) + // defaultFluxAPIVersions is a map of Flux API kinds to their API versions. var defaultFluxAPIVersions = map[string]string{ "Bucket": "source.toolkit.fluxcd.io/v1beta2", @@ -53,6 +58,16 @@ var defaultFluxAPIVersions = map[string]string{ "ImageRepository": "image.toolkit.fluxcd.io/v1beta2", } +// IndexReceiverWebhookPath is a client.IndexerFunc that returns the Receiver's +// webhook path, if present in its status. +func IndexReceiverWebhookPath(o client.Object) []string { + receiver := o.(*apiv1.Receiver) + if receiver.Status.WebhookPath != "" { + return []string{receiver.Status.WebhookPath} + } + return nil +} + func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { ctx := context.Background() @@ -61,50 +76,53 @@ func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Req s.logger.Info(fmt.Sprintf("handling request: %s", digest)) var allReceivers apiv1.ReceiverList - err := s.kubeClient.List(ctx, &allReceivers) + err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{ + WebhookPathIndexKey: r.RequestURI, + }, client.Limit(1)) if err != nil { s.logger.Error(err, "unable to list receivers") - w.WriteHeader(http.StatusBadRequest) + w.WriteHeader(http.StatusInternalServerError) return } - receivers := make([]apiv1.Receiver, 0) - for _, receiver := range allReceivers.Items { - if !receiver.Spec.Suspend && - conditions.IsReady(&receiver) && - receiver.Status.WebhookPath == fmt.Sprintf("%s%s", apiv1.ReceiverWebhookPath, digest) { - receivers = append(receivers, receiver) - } - } - - if len(receivers) == 0 { + if len(allReceivers.Items) == 0 { w.WriteHeader(http.StatusNotFound) return } - withErrors := false - for _, receiver := range receivers { - logger := s.logger.WithValues( - "reconciler kind", apiv1.ReceiverKind, - "name", receiver.Name, - "namespace", receiver.Namespace) + receiver := allReceivers.Items[0] + logger := s.logger.WithValues( + "reconciler kind", apiv1.ReceiverKind, + "name", receiver.Name, + "namespace", receiver.Namespace) - if err := s.validate(ctx, receiver, r); err != nil { - logger.Error(err, "unable to validate payload") - withErrors = true - continue + if receiver.Spec.Suspend || !conditions.IsReady(&receiver) { + err := errors.New("unable to process request") + if receiver.Spec.Suspend { + logger.Error(err, "receiver is suspended") + } else { + logger.Error(err, "receiver is not ready") } + w.WriteHeader(http.StatusServiceUnavailable) + return + } - for _, resource := range receiver.Spec.Resources { - if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil { - logger.Error(err, "unable to process resource") - withErrors = true - } + if err := s.validate(ctx, receiver, r); err != nil { + logger.Error(err, "unable to validate payload") + w.WriteHeader(http.StatusBadRequest) + return + } + + var withErrors bool + for _, resource := range receiver.Spec.Resources { + if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil { + logger.Error(err, "unable to request reconciliation") + withErrors = true } } if withErrors { - w.WriteHeader(http.StatusBadRequest) + w.WriteHeader(http.StatusInternalServerError) } else { w.WriteHeader(http.StatusOK) }