Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index receivers using webhook path as key #506

Merged
merged 1 commit into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions internal/controllers/receiver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/fluxcd/pkg/runtime/predicates"

apiv1 "github.com/fluxcd/notification-controller/api/v1"
"github.com/fluxcd/notification-controller/internal/server"
)

// ReceiverReconciler reconciles a Receiver object
Expand All @@ -62,6 +63,12 @@ func (r *ReceiverReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *ReceiverReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts ReceiverReconcilerOptions) error {
// This index is used to list Receivers by their webhook path after the receiver server
// gets a request.
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.Receiver{},
server.WebhookPathIndexKey, server.IndexReceiverWebhookPath); err != nil {
return err
}
recoverPanic := true
return ctrl.NewControllerManagedBy(mgr).
For(&apiv1.Receiver{}, builder.WithPredicates(
Expand Down
4 changes: 3 additions & 1 deletion internal/controllers/receiver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ func TestReceiverReconciler_EventHandler(t *testing.T) {
timeout := 30 * time.Second
resultR := &apiv1.Receiver{}

receiverServer := server.NewReceiverServer("127.0.0.1:56788", logf.Log, k8sClient)
// Use the client from the manager as the server handler needs to list objects from the cache
// which the "live" k8s client does not have access to.
receiverServer := server.NewReceiverServer("127.0.0.1:56788", logf.Log, testEnv.GetClient())
receiverMdlw := middleware.New(middleware.Config{
Recorder: prommetrics.NewRecorder(prommetrics.Config{
Prefix: "gotk_receiver",
Expand Down
11 changes: 6 additions & 5 deletions internal/server/receiver_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func Test_handlePayload(t *testing.T) {
Conditions: []metav1.Condition{{Type: meta.StalledCondition, Status: metav1.ConditionFalse}},
},
},
expectedResponseCode: http.StatusNotFound,
expectedResponseCode: http.StatusServiceUnavailable,
},
{
name: "suspended receiver ignored",
Expand All @@ -337,7 +337,7 @@ func Test_handlePayload(t *testing.T) {
Conditions: []metav1.Condition{{Type: meta.ReadyCondition, Status: metav1.ConditionTrue}},
},
},
expectedResponseCode: http.StatusNotFound,
expectedResponseCode: http.StatusServiceUnavailable,
},
{
name: "missing apiVersion in resource",
Expand Down Expand Up @@ -372,7 +372,7 @@ func Test_handlePayload(t *testing.T) {
"token": []byte("token"),
},
},
expectedResponseCode: http.StatusBadRequest,
expectedResponseCode: http.StatusInternalServerError,
},
{
name: "resource by name not found",
Expand Down Expand Up @@ -406,7 +406,7 @@ func Test_handlePayload(t *testing.T) {
"token": []byte("token"),
},
},
expectedResponseCode: http.StatusBadRequest,
expectedResponseCode: http.StatusInternalServerError,
},
{
name: "annotating resources by label match",
Expand Down Expand Up @@ -563,7 +563,7 @@ func Test_handlePayload(t *testing.T) {
"token": []byte("token"),
},
},
expectedResponseCode: http.StatusBadRequest,
expectedResponseCode: http.StatusInternalServerError,
},
{
name: "resource matchLabels is ignored if name is not *",
Expand Down Expand Up @@ -641,6 +641,7 @@ func Test_handlePayload(t *testing.T) {
}

builder.WithObjects(tt.resources...)
builder.WithIndex(&apiv1.Receiver{}, WebhookPathIndexKey, IndexReceiverWebhookPath)

if tt.secret != nil {
builder.WithObjects(tt.secret)
Expand Down
74 changes: 46 additions & 28 deletions internal/server/receiver_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -43,6 +44,10 @@ import (
apiv1 "github.com/fluxcd/notification-controller/api/v1"
)

var (
WebhookPathIndexKey = ".metadata.webhookPath"
)

// defaultFluxAPIVersions is a map of Flux API kinds to their API versions.
var defaultFluxAPIVersions = map[string]string{
"Bucket": "source.toolkit.fluxcd.io/v1beta2",
Expand All @@ -53,6 +58,16 @@ var defaultFluxAPIVersions = map[string]string{
"ImageRepository": "image.toolkit.fluxcd.io/v1beta2",
}

// IndexReceiverWebhookPath is a client.IndexerFunc that returns the Receiver's
// webhook path, if present in its status.
func IndexReceiverWebhookPath(o client.Object) []string {
receiver := o.(*apiv1.Receiver)
if receiver.Status.WebhookPath != "" {
return []string{receiver.Status.WebhookPath}
}
return nil
}

func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
Expand All @@ -61,50 +76,53 @@ func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Req
s.logger.Info(fmt.Sprintf("handling request: %s", digest))

var allReceivers apiv1.ReceiverList
err := s.kubeClient.List(ctx, &allReceivers)
err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{
WebhookPathIndexKey: r.RequestURI,
}, client.Limit(1))
if err != nil {
s.logger.Error(err, "unable to list receivers")
w.WriteHeader(http.StatusBadRequest)
w.WriteHeader(http.StatusInternalServerError)
return
}

receivers := make([]apiv1.Receiver, 0)
for _, receiver := range allReceivers.Items {
if !receiver.Spec.Suspend &&
conditions.IsReady(&receiver) &&
receiver.Status.WebhookPath == fmt.Sprintf("%s%s", apiv1.ReceiverWebhookPath, digest) {
receivers = append(receivers, receiver)
}
}

if len(receivers) == 0 {
if len(allReceivers.Items) == 0 {
w.WriteHeader(http.StatusNotFound)
return
}

withErrors := false
for _, receiver := range receivers {
logger := s.logger.WithValues(
"reconciler kind", apiv1.ReceiverKind,
"name", receiver.Name,
"namespace", receiver.Namespace)
receiver := allReceivers.Items[0]
logger := s.logger.WithValues(
"reconciler kind", apiv1.ReceiverKind,
"name", receiver.Name,
"namespace", receiver.Namespace)

if err := s.validate(ctx, receiver, r); err != nil {
logger.Error(err, "unable to validate payload")
withErrors = true
continue
if receiver.Spec.Suspend || !conditions.IsReady(&receiver) {
err := errors.New("unable to process request")
if receiver.Spec.Suspend {
logger.Error(err, "receiver is suspended")
} else {
logger.Error(err, "receiver is not ready")
}
w.WriteHeader(http.StatusServiceUnavailable)
return
}

for _, resource := range receiver.Spec.Resources {
if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil {
logger.Error(err, "unable to process resource")
withErrors = true
}
if err := s.validate(ctx, receiver, r); err != nil {
logger.Error(err, "unable to validate payload")
w.WriteHeader(http.StatusBadRequest)
return
}

var withErrors bool
for _, resource := range receiver.Spec.Resources {
if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil {
logger.Error(err, "unable to request reconciliation")
withErrors = true
}
}

aryan9600 marked this conversation as resolved.
Show resolved Hide resolved
if withErrors {
w.WriteHeader(http.StatusBadRequest)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
Expand Down