Skip to content

Commit

Permalink
Merge pull request #145 from fluxcd/continue-outer
Browse files Browse the repository at this point in the history
Unit tests for event forwarding
  • Loading branch information
squaremo authored Feb 23, 2021
2 parents f280418 + d7acd1f commit 22c6082
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 19 deletions.
194 changes: 184 additions & 10 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 The Flux authors
Copyright 2020, 2021 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,11 +17,18 @@ limitations under the License.
package controllers

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -30,7 +37,11 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

notificationv1 "github.com/fluxcd/notification-controller/api/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/recorder"

notifyv1 "github.com/fluxcd/notification-controller/api/v1beta1"
"github.com/fluxcd/notification-controller/internal/server"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -64,15 +75,8 @@ var _ = BeforeSuite(func(done Done) {
Expect(err).ToNot(HaveOccurred())
Expect(cfg).ToNot(BeNil())

err = notificationv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

err = notificationv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

err = notificationv1.AddToScheme(scheme.Scheme)
err = notifyv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

// +kubebuilder:scaffold:scheme

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expand All @@ -87,3 +91,173 @@ var _ = AfterSuite(func() {
err := testEnv.Stop()
Expect(err).ToNot(HaveOccurred())
})

var _ = Describe("Event handlers", func() {

var (
namespace = "default"
rcvServer *httptest.Server
providerName = "test-provider"
provider notifyv1.Provider
stopCh chan struct{}
req *http.Request
)

// This sets up the minimal objects so that we can test the
// events handling.
BeforeEach(func() {
ctx := context.Background()

// We're not testing the provider, but this is a way to know
// whether events have been handled.
rcvServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req = r
w.WriteHeader(200)
}))

provider = notifyv1.Provider{
Spec: notifyv1.ProviderSpec{
Type: "generic",
Address: rcvServer.URL,
},
}
provider.Name = providerName
provider.Namespace = namespace
By("Creating provider")
Expect(k8sClient.Create(ctx, &provider)).To(Succeed())

By("Creating and starting event server")
// TODO let OS assign port number
eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient)
stopCh = make(chan struct{})
go eventServer.ListenAndServe(stopCh)
})

AfterEach(func() {
req = nil
rcvServer.Close()
close(stopCh)
Expect(k8sClient.Delete(context.Background(), &provider)).To(Succeed())
})

// The following test "templates" will create the alert, then
// serialise the event and post it to the event server. They
// differ on what's expected to happen to the event.

var (
alert notifyv1.Alert
event recorder.Event
)

JustBeforeEach(func() {
alert.Name = "test-alert"
alert.Namespace = namespace
Expect(k8sClient.Create(context.Background(), &alert)).To(Succeed())
// the event server won't dispatch to an alert if it has
// not been marked "ready"
meta.SetResourceCondition(&alert, meta.ReadyCondition, metav1.ConditionTrue, meta.ReconciliationSucceededReason, "artificially set to ready")
Expect(k8sClient.Status().Update(context.Background(), &alert)).To(Succeed())
})

AfterEach(func() {
Expect(k8sClient.Delete(context.Background(), &alert)).To(Succeed())
})

testSent := func() {
buf := &bytes.Buffer{}
Expect(json.NewEncoder(buf).Encode(&event)).To(Succeed())
res, err := http.Post("http://localhost:56789/", "application/json", buf)
Expect(err).ToNot(HaveOccurred())
Expect(res.StatusCode).To(Equal(202)) // event_server responds with 202 Accepted
}

testForwarded := func() {
Eventually(func() bool {
return req == nil
}, "2s", "0.1s").Should(BeFalse())
}

testFiltered := func() {
// The event_server does forwarding in a goroutine, after
// responding to the POST of the event. This makes it
// difficult to know whether the provider has filtered the
// event, or just not run the goroutine yet. For now, I'll use
// a timeout (and Consistently so it can fail early)
Consistently(func() bool {
return req == nil
}, "1s", "0.1s").Should(BeTrue())
}

Describe("event forwarding", func() {
BeforeEach(func() {
alert = notifyv1.Alert{}
alert.Spec = notifyv1.AlertSpec{
ProviderRef: meta.LocalObjectReference{
Name: providerName,
},
EventSeverity: "info",
EventSources: []notifyv1.CrossNamespaceObjectReference{
{
Kind: "Bucket",
Name: "hyacinth",
Namespace: "default",
},
},
}
event = recorder.Event{
InvolvedObject: corev1.ObjectReference{
Kind: "Bucket",
Name: "hyacinth",
Namespace: "default",
},
Severity: "info",
Timestamp: metav1.Now(),
Message: "well that happened",
Reason: "event-happened",
ReportingController: "source-controller",
}
})

Context("matching by source", func() {
It("forwards when source is a match", func() {
testSent()
testForwarded()
})
It("drops event when source Kind does not match", func() {
event.InvolvedObject.Kind = "GitRepository"
testSent()
testFiltered()
})
It("drops event when source name does not match", func() {
event.InvolvedObject.Name = "slop"
testSent()
testFiltered()
})
It("drops event when source namespace does not match", func() {
event.InvolvedObject.Namespace = "all-buckets"
testSent()
testFiltered()
})
})

Context("filtering by ExclusionList", func() {
BeforeEach(func() {
alert.Spec.ExclusionList = []string{
"doesnotoccur", // not intended to match
"well",
}
})

It("forwards event that is not matched", func() {
event.Message = "not excluded"
testSent()
testForwarded()
})

It("drops event that is matched by exclusion", func() {
testSent()
testFiltered()
})
})
})
})
10 changes: 3 additions & 7 deletions internal/server/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,26 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)

// find matching alerts
alerts := make([]v1beta1.Alert, 0)
each_alert:
for _, alert := range allAlerts.Items {
// skip suspended and not ready alerts
isReady := apimeta.IsStatusConditionTrue(alert.Status.Conditions, meta.ReadyCondition)
if alert.Spec.Suspend || !isReady {
continue
continue each_alert
}

// skip alert if the message matches a regex from the exclusion list
var skip bool
if len(alert.Spec.ExclusionList) > 0 {
for _, exp := range alert.Spec.ExclusionList {
if r, err := regexp.Compile(exp); err == nil {
if r.Match([]byte(event.Message)) {
skip = true
break
continue each_alert
}
} else {
s.logger.Error(err, fmt.Sprintf("failed to compile regex: %s", exp))
}
}
}
if skip {
continue
}

// filter alerts by object and severity
for _, source := range alert.Spec.EventSources {
Expand Down
3 changes: 1 addition & 2 deletions internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func NewEventServer(port string, logger logr.Logger, kubeClient client.Client) *

// ListenAndServe starts the HTTP server on the specified port
func (s *EventServer) ListenAndServe(stopCh <-chan struct{}) {
mux := http.DefaultServeMux

mux := http.NewServeMux()
mux.HandleFunc("/", s.handleEvent())

srv := &http.Server{
Expand Down
File renamed without changes.

0 comments on commit 22c6082

Please sign in to comment.