Skip to content

Commit

Permalink
index receivers using webhook path as key
Browse files Browse the repository at this point in the history
Use `.status.webhookPath` as a key to index Receivers. Use this key
while listing Receivers during the handling of a payload.

Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Apr 24, 2023
1 parent 18c49ad commit d7595e9
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 4 deletions.
7 changes: 7 additions & 0 deletions internal/controllers/receiver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/fluxcd/pkg/runtime/predicates"

apiv1 "github.com/fluxcd/notification-controller/api/v1"
"github.com/fluxcd/notification-controller/internal/server"
)

// ReceiverReconciler reconciles a Receiver object
Expand All @@ -62,6 +63,12 @@ func (r *ReceiverReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *ReceiverReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts ReceiverReconcilerOptions) error {
// This index is used to list Receivers by their webhook path after the receiver server
// gets a request.
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.Receiver{},
server.WebhookPathIndexKey, server.IndexReceiverWebhookPath); err != nil {
return err
}
recoverPanic := true
return ctrl.NewControllerManagedBy(mgr).
For(&apiv1.Receiver{}, builder.WithPredicates(
Expand Down
4 changes: 3 additions & 1 deletion internal/controllers/receiver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ func TestReceiverReconciler_EventHandler(t *testing.T) {
timeout := 30 * time.Second
resultR := &apiv1.Receiver{}

receiverServer := server.NewReceiverServer("127.0.0.1:56788", logf.Log, k8sClient)
// Use the client from the manager as the server handler needs to list objects from the cache
// which the "live" k8s client does not have access to.
receiverServer := server.NewReceiverServer("127.0.0.1:56788", logf.Log, testEnv.GetClient())
receiverMdlw := middleware.New(middleware.Config{
Recorder: prommetrics.NewRecorder(prommetrics.Config{
Prefix: "gotk_receiver",
Expand Down
1 change: 1 addition & 0 deletions internal/server/receiver_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ func Test_handlePayload(t *testing.T) {
}

builder.WithObjects(tt.resources...)
builder.WithIndex(&apiv1.Receiver{}, WebhookPathIndexKey, IndexReceiverWebhookPath)

if tt.secret != nil {
builder.WithObjects(tt.secret)
Expand Down
21 changes: 18 additions & 3 deletions internal/server/receiver_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ import (
apiv1 "github.com/fluxcd/notification-controller/api/v1"
)

var (
WebhookPathIndexKey = ".metadata.webhookPath"
)

// defaultFluxAPIVersions is a map of Flux API kinds to their API versions.
var defaultFluxAPIVersions = map[string]string{
"Bucket": "source.toolkit.fluxcd.io/v1beta2",
Expand All @@ -53,6 +57,16 @@ var defaultFluxAPIVersions = map[string]string{
"ImageRepository": "image.toolkit.fluxcd.io/v1beta2",
}

// IndexReceiverWebhookPath is a client.IndexerFunc that returns the Receiver's
// webhook path, if present in its status.
func IndexReceiverWebhookPath(o client.Object) []string {
receiver := o.(*apiv1.Receiver)
if receiver.Status.WebhookPath != "" {
return []string{receiver.Status.WebhookPath}
}
return nil
}

func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
Expand All @@ -61,7 +75,9 @@ func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Req
s.logger.Info(fmt.Sprintf("handling request: %s", digest))

var allReceivers apiv1.ReceiverList
err := s.kubeClient.List(ctx, &allReceivers)
err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{
WebhookPathIndexKey: fmt.Sprintf("%s%s", apiv1.ReceiverWebhookPath, digest),
})
if err != nil {
s.logger.Error(err, "unable to list receivers")
w.WriteHeader(http.StatusBadRequest)
Expand All @@ -71,8 +87,7 @@ func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Req
receivers := make([]apiv1.Receiver, 0)
for _, receiver := range allReceivers.Items {
if !receiver.Spec.Suspend &&
conditions.IsReady(&receiver) &&
receiver.Status.WebhookPath == fmt.Sprintf("%s%s", apiv1.ReceiverWebhookPath, digest) {
conditions.IsReady(&receiver) {
receivers = append(receivers, receiver)
}
}
Expand Down

0 comments on commit d7595e9

Please sign in to comment.