Skip to content

Commit

Permalink
Fix revision discarded on event rate limiting key calculation
Browse files Browse the repository at this point in the history
Signed-off-by: Matheus Pimenta <[email protected]>
  • Loading branch information
matheuscscp committed May 11, 2023
1 parent ed5b2fe commit f95559e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 77 deletions.
45 changes: 2 additions & 43 deletions internal/server/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package server
import (
"context"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strings"
Expand All @@ -45,30 +43,13 @@ import (

func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
r.Context()
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error(err, "reading the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}
defer r.Body.Close()

event := &eventv1.Event{}
err = json.Unmarshal(body, event)
if err != nil {
s.logger.Error(err, "decoding the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}

cleanupMetadata(event)
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)

ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()

var allAlerts apiv1beta2.AlertList
err = s.kubeClient.List(ctx, &allAlerts)
err := s.kubeClient.List(ctx, &allAlerts)
if err != nil {
s.logger.Error(err, "listing alerts failed")
w.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -342,28 +323,6 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
return false
}

// cleanupMetadata removes metadata entries which are not used for alerting
func cleanupMetadata(event *eventv1.Event) {
group := event.InvolvedObject.GetObjectKind().GroupVersionKind().Group
excludeList := []string{
fmt.Sprintf("%s/%s", group, eventv1.MetaChecksumKey),
fmt.Sprintf("%s/%s", group, eventv1.MetaDigestKey),
}

meta := make(map[string]string)
if event.Metadata != nil && len(event.Metadata) > 0 {
// Filter other meta based on group prefix, while filtering out excludes
for key, val := range event.Metadata {
if strings.HasPrefix(key, group) && !inList(excludeList, key) {
newKey := strings.TrimPrefix(key, fmt.Sprintf("%s/", group))
meta[newKey] = val
}
}
}

event.Metadata = meta
}

func inList(l []string, i string) bool {
for _, v := range l {
if strings.EqualFold(v, i) {
Expand Down
104 changes: 74 additions & 30 deletions internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)

type eventContextKey struct{}

// EventServer handles event POST requests
type EventServer struct {
port string
Expand All @@ -63,8 +65,16 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
s.logger.Error(err, "Event server crashed")
os.Exit(1)
}
var handler http.Handler = http.HandlerFunc(s.handleEvent())
for _, middleware := range []func(http.Handler) http.Handler{
limitMiddleware.Handle,
s.logRateLimitMiddleware,
s.cleanupMetadataMiddleware,
} {
handler = middleware(handler)
}
mux := http.NewServeMux()
mux.Handle("/", s.logRateLimitMiddleware(limitMiddleware.Handle(http.HandlerFunc(s.handleEvent()))))
mux.Handle("/", handler)
h := std.Handler("", mdlw, mux)
srv := &http.Server{
Addr: s.port,
Expand All @@ -90,6 +100,59 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
}
}

// cleanupMetadataMiddleware cleans up the metadata using cleanupMetadata() and
// adds the cleaned event in the request context which can then be queried and
// used directly by the other http handlers.
func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error(err, "reading the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}
r.Body.Close()
r.Body = io.NopCloser(bytes.NewBuffer(body))

event := &eventv1.Event{}
err = json.Unmarshal(body, event)
if err != nil {
s.logger.Error(err, "decoding the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}

cleanupMetadata(event)

ctxWithEvent := context.WithValue(r.Context(), eventContextKey{}, event)
reqWithEvent := r.WithContext(ctxWithEvent)

h.ServeHTTP(w, reqWithEvent)
})
}

// cleanupMetadata removes metadata entries which are not used for alerting.
func cleanupMetadata(event *eventv1.Event) {
group := event.InvolvedObject.GetObjectKind().GroupVersionKind().Group
excludeList := []string{
fmt.Sprintf("%s/%s", group, eventv1.MetaChecksumKey),
fmt.Sprintf("%s/%s", group, eventv1.MetaDigestKey),
}

meta := make(map[string]string)
if event.Metadata != nil && len(event.Metadata) > 0 {
// Filter other meta based on group prefix, while filtering out excludes
for key, val := range event.Metadata {
if strings.HasPrefix(key, group) && !inList(excludeList, key) {
newKey := strings.TrimPrefix(key, fmt.Sprintf("%s/", group))
meta[newKey] = val
}
}
}

event.Metadata = meta
}

type statusRecorder struct {
http.ResponseWriter
Status int
Expand All @@ -109,23 +172,7 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
h.ServeHTTP(recorder, r)

if recorder.Status == http.StatusTooManyRequests {
body, err := io.ReadAll(r.Body)
if err != nil {
s.logger.Error(err, "reading the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}

event := &eventv1.Event{}
err = json.Unmarshal(body, event)
if err != nil {
s.logger.Error(err, "decoding the request body failed")
w.WriteHeader(http.StatusBadRequest)
return
}

r.Body = io.NopCloser(bytes.NewBuffer(body))

event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
s.logger.V(1).Info("Discarding event, rate limiting duplicate events",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
Expand All @@ -135,24 +182,21 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
}

func eventKeyFunc(r *http.Request) (string, error) {
body, err := io.ReadAll(r.Body)
if err != nil {
return "", err
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)

comps := []string{
"event",
event.InvolvedObject.Name,
event.InvolvedObject.Namespace,
event.InvolvedObject.Kind,
event.Message,
}

event := &eventv1.Event{}
err = json.Unmarshal(body, event)
if err != nil {
return "", err
}

r.Body = io.NopCloser(bytes.NewBuffer(body))

comps := []string{"event", event.InvolvedObject.Name, event.InvolvedObject.Namespace, event.InvolvedObject.Kind, event.Message}
revString, ok := event.Metadata[eventv1.MetaRevisionKey]
if ok {
comps = append(comps, revString)
}

val := strings.Join(comps, "/")
digest := sha256.Sum256([]byte(val))
return fmt.Sprintf("%x", digest), nil
Expand Down
11 changes: 7 additions & 4 deletions internal/server/event_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -123,18 +124,20 @@ func TestEventKeyFunc(t *testing.T) {
}
for i, tt := range tests {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
event := eventv1.Event{
event := &eventv1.Event{
InvolvedObject: tt.involvedObject,
Severity: tt.severity,
Message: tt.message,
}
cleanupMetadata(event)
eventData, err := json.Marshal(event)
g.Expect(err).ShouldNot(gomega.HaveOccurred())

req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData))
g.Expect(err).ShouldNot(gomega.HaveOccurred())
res := httptest.NewRecorder()
handler.ServeHTTP(res, req)
req := httptest.NewRequest("POST", "/", bytes.NewBuffer(eventData))
ctxWithEvent := context.WithValue(req.Context(), eventContextKey{}, event)
reqWithEvent := req.WithContext(ctxWithEvent)
handler.ServeHTTP(res, reqWithEvent)

if tt.rateLimit {
g.Expect(res.Code).Should(gomega.Equal(429))
Expand Down

0 comments on commit f95559e

Please sign in to comment.