Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix revision discarded on event rate limiting key calculation #517

Merged
merged 1 commit into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 2 additions & 43 deletions internal/server/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package server
import (
"context"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strings"
Expand All @@ -45,30 +43,13 @@ import (

func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
r.Context()
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error(err, "reading the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()

event := &eventv1.Event{}
err = json.Unmarshal(body, event)
if err != nil {
s.logger.Error(err, "decoding the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}

cleanupMetadata(event)
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)

ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()

var allAlerts apiv1beta2.AlertList
err = s.kubeClient.List(ctx, &allAlerts)
err := s.kubeClient.List(ctx, &allAlerts)
if err != nil {
s.logger.Error(err, "listing alerts failed")
w.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -342,28 +323,6 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
return false
}

// cleanupMetadata removes metadata entries which are not used for alerting
func cleanupMetadata(event *eventv1.Event) {
group := event.InvolvedObject.GetObjectKind().GroupVersionKind().Group
excludeList := []string{
fmt.Sprintf("%s/%s", group, eventv1.MetaChecksumKey),
fmt.Sprintf("%s/%s", group, eventv1.MetaDigestKey),
}

meta := make(map[string]string)
if event.Metadata != nil && len(event.Metadata) > 0 {
// Filter other meta based on group prefix, while filtering out excludes
for key, val := range event.Metadata {
if strings.HasPrefix(key, group) && !inList(excludeList, key) {
newKey := strings.TrimPrefix(key, fmt.Sprintf("%s/", group))
meta[newKey] = val
}
}
}

event.Metadata = meta
}

func inList(l []string, i string) bool {
for _, v := range l {
if strings.EqualFold(v, i) {
Expand Down
104 changes: 74 additions & 30 deletions internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)

type eventContextKey struct{}

// EventServer handles event POST requests
type EventServer struct {
port string
Expand All @@ -63,8 +65,16 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
s.logger.Error(err, "Event server crashed")
os.Exit(1)
}
var handler http.Handler = http.HandlerFunc(s.handleEvent())
for _, middleware := range []func(http.Handler) http.Handler{
limitMiddleware.Handle,
s.logRateLimitMiddleware,
s.cleanupMetadataMiddleware,
} {
handler = middleware(handler)
}
mux := http.NewServeMux()
mux.Handle("/", s.logRateLimitMiddleware(limitMiddleware.Handle(http.HandlerFunc(s.handleEvent()))))
mux.Handle("/", handler)
h := std.Handler("", mdlw, mux)
srv := &http.Server{
Addr: s.port,
Expand All @@ -90,6 +100,59 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
}
}

// cleanupMetadataMiddleware cleans up the metadata using cleanupMetadata() and
// adds the cleaned event in the request context which can then be queried and
// used directly by the other http handlers.
func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error(err, "reading the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}
r.Body.Close()
r.Body = io.NopCloser(bytes.NewBuffer(body))

event := &eventv1.Event{}
err = json.Unmarshal(body, event)
if err != nil {
s.logger.Error(err, "decoding the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}

cleanupMetadata(event)

ctxWithEvent := context.WithValue(r.Context(), eventContextKey{}, event)
reqWithEvent := r.WithContext(ctxWithEvent)

h.ServeHTTP(w, reqWithEvent)
})
}

// cleanupMetadata removes metadata entries which are not used for alerting.
func cleanupMetadata(event *eventv1.Event) {
group := event.InvolvedObject.GetObjectKind().GroupVersionKind().Group
excludeList := []string{
fmt.Sprintf("%s/%s", group, eventv1.MetaChecksumKey),
fmt.Sprintf("%s/%s", group, eventv1.MetaDigestKey),
}

meta := make(map[string]string)
if event.Metadata != nil && len(event.Metadata) > 0 {
// Filter other meta based on group prefix, while filtering out excludes
for key, val := range event.Metadata {
if strings.HasPrefix(key, group) && !inList(excludeList, key) {
newKey := strings.TrimPrefix(key, fmt.Sprintf("%s/", group))
meta[newKey] = val
}
}
}

event.Metadata = meta
}

type statusRecorder struct {
http.ResponseWriter
Status int
Expand All @@ -109,23 +172,7 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
h.ServeHTTP(recorder, r)

if recorder.Status == http.StatusTooManyRequests {
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error(err, "reading the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}

event := &eventv1.Event{}
err = json.Unmarshal(body, event)
if err != nil {
s.logger.Error(err, "decoding the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}

r.Body = io.NopCloser(bytes.NewBuffer(body))

event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
s.logger.V(1).Info("Discarding event, rate limiting duplicate events",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
Expand All @@ -135,24 +182,21 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
}

func eventKeyFunc(r *http.Request) (string, error) {
body, err := io.ReadAll(r.Body)
if err != nil {
return "", err
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)

comps := []string{
"event",
event.InvolvedObject.Name,
event.InvolvedObject.Namespace,
event.InvolvedObject.Kind,
event.Message,
}

event := &eventv1.Event{}
err = json.Unmarshal(body, event)
if err != nil {
return "", err
}

r.Body = io.NopCloser(bytes.NewBuffer(body))

comps := []string{"event", event.InvolvedObject.Name, event.InvolvedObject.Namespace, event.InvolvedObject.Kind, event.Message}
revString, ok := event.Metadata[eventv1.MetaRevisionKey]
if ok {
comps = append(comps, revString)
}

val := strings.Join(comps, "/")
digest := sha256.Sum256([]byte(val))
return fmt.Sprintf("%x", digest), nil
Expand Down
55 changes: 51 additions & 4 deletions internal/server/event_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -53,6 +54,7 @@ func TestEventKeyFunc(t *testing.T) {
severity string
message string
rateLimit bool
metadata map[string]string
}{
{
involvedObject: corev1.ObjectReference{
Expand Down Expand Up @@ -120,21 +122,66 @@ func TestEventKeyFunc(t *testing.T) {
message: "Health check passed",
rateLimit: true,
},
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "4",
Namespace: "4",
},
severity: eventv1.EventSeverityInfo,
message: "Health check passed",
metadata: map[string]string{
fmt.Sprintf("%s/%s", "kustomize.toolkit.fluxcd.io", eventv1.MetaRevisionKey): "rev1",
},
rateLimit: false,
},
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "4",
Namespace: "4",
},
severity: eventv1.EventSeverityInfo,
message: "Health check passed",
metadata: map[string]string{
fmt.Sprintf("%s/%s", "kustomize.toolkit.fluxcd.io", eventv1.MetaRevisionKey): "rev1",
},
rateLimit: true,
},
{
involvedObject: corev1.ObjectReference{
APIVersion: "kustomize.toolkit.fluxcd.io/v1beta1",
Kind: "Kustomization",
Name: "4",
Namespace: "4",
},
severity: eventv1.EventSeverityInfo,
message: "Health check passed",
metadata: map[string]string{
fmt.Sprintf("%s/%s", "kustomize.toolkit.fluxcd.io", eventv1.MetaRevisionKey): "rev2",
},
rateLimit: false,
},
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
event := eventv1.Event{
event := &eventv1.Event{
InvolvedObject: tt.involvedObject,
Severity: tt.severity,
Message: tt.message,
Metadata: tt.metadata,
}
cleanupMetadata(event)
eventData, err := json.Marshal(event)
g.Expect(err).ShouldNot(gomega.HaveOccurred())

req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData))
g.Expect(err).ShouldNot(gomega.HaveOccurred())
res := httptest.NewRecorder()
handler.ServeHTTP(res, req)
req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData))
ctxWithEvent := context.WithValue(req.Context(), eventContextKey{}, event)
reqWithEvent := req.WithContext(ctxWithEvent)
handler.ServeHTTP(res, reqWithEvent)

if tt.rateLimit {
g.Expect(res.Code).Should(gomega.Equal(429))
Expand Down