Skip to content

Commit

Permalink
fix cloud event flacky unit tests by adding waitgroup
Browse files Browse the repository at this point in the history
This commit adds waitgroup to reconciler, so can be used when creating
goroutines to send cloud events. waitgroup.Wait() can be called during
unit tests to make sure all goroutines are completed and avoid the case
that some events are not written into channel before checking. This
change shouldn't have influence on current hebaviour since we don't call
Wait to block the code.

Signed-off-by: Yongxuan Zhang [email protected]
  • Loading branch information
Yongxuanzhang committed Oct 25, 2022
1 parent eb45fb0 commit 61a9321
Show file tree
Hide file tree
Showing 15 changed files with 68 additions and 27 deletions.
7 changes: 4 additions & 3 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"log"
"net/http"
"os"
"sync"

"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
Expand Down Expand Up @@ -104,9 +105,9 @@ func main() {

ctx = filteredinformerfactory.WithSelectors(ctx, v1beta1.ManagedByLabelKey)
sharedmain.MainWithConfig(ctx, ControllerLogKey, cfg,
taskrun.NewController(opts, clock.RealClock{}),
pipelinerun.NewController(opts, clock.RealClock{}),
run.NewController(),
taskrun.NewController(opts, clock.RealClock{}, &sync.WaitGroup{}),
pipelinerun.NewController(opts, clock.RealClock{}, &sync.WaitGroup{}),
run.NewController(&sync.WaitGroup{}),
resolutionrequest.NewController(clock.RealClock{}),
)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/reconciler/events/cloudevent/cloud_event_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cloudevent
import (
"context"
"errors"
"sync"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -122,7 +123,7 @@ func SendCloudEvents(tr *v1beta1.TaskRun, ceclient CEClient, logger *zap.Sugared
// sdk-go capabilities.
// It accepts a runtime.Object to avoid making objectWithCondition public since
// it's only used within the events/cloudevents packages.
func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error {
func SendCloudEventWithRetries(ctx context.Context, wg *sync.WaitGroup, object runtime.Object) error {
var (
o objectWithCondition
ok bool
Expand All @@ -144,7 +145,9 @@ func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error
_, isRun := object.(*v1alpha1.Run)

wasIn := make(chan error)
wg.Add(1)
go func() {
defer wg.Done()
wasIn <- nil
logger.Debugf("Sending cloudevent of type %q", event.Type())
// In case of Run event, check cache if cloudevent is already sent
Expand Down
13 changes: 10 additions & 3 deletions pkg/reconciler/events/cloudevent/cloud_event_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cloudevent

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -607,9 +608,11 @@ func TestSendCloudEventWithRetries(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := setupFakeContext(t, tc.clientBehaviour, true)
if err := SendCloudEventWithRetries(ctx, tc.object); err != nil {
var wg sync.WaitGroup
if err := SendCloudEventWithRetries(ctx, &wg, tc.object); err != nil {
t.Fatalf("Unexpected error sending cloud events: %v", err)
}
wg.Wait()
ceClient := Get(ctx).(FakeClient)
if err := eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, tc.wantCEvents); err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -651,7 +654,9 @@ func TestSendCloudEventWithRetriesInvalid(t *testing.T) {
}, true)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err := SendCloudEventWithRetries(ctx, tc.object)
var wg sync.WaitGroup
err := SendCloudEventWithRetries(ctx, &wg, tc.object)
wg.Wait()
if err == nil {
t.Fatalf("Expected an error sending cloud events for invalid object, got none")
}
Expand All @@ -662,7 +667,9 @@ func TestSendCloudEventWithRetriesInvalid(t *testing.T) {
func TestSendCloudEventWithRetriesNoClient(t *testing.T) {

ctx := setupFakeContext(t, FakeClientBehaviour{}, false)
err := SendCloudEventWithRetries(ctx, &v1beta1.TaskRun{Status: v1beta1.TaskRunStatus{}})
var wg sync.WaitGroup
err := SendCloudEventWithRetries(ctx, &wg, &v1beta1.TaskRun{Status: v1beta1.TaskRunStatus{}})
wg.Wait()
if err == nil {
t.Fatalf("Expected an error sending cloud events with no client in the context, got none")
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/reconciler/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package events

import (
"context"
"sync"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/tektoncd/pipeline/pkg/apis/config"
Expand Down Expand Up @@ -47,7 +48,7 @@ const (
//
// k8s events are always sent if afterCondition is different from beforeCondition
// Cloud events are always sent if enabled, i.e. if a sink is available
func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object) {
func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition *apis.Condition, object runtime.Object, wg *sync.WaitGroup) {
recorder := controller.GetEventRecorder(ctx)
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
Expand All @@ -61,7 +62,7 @@ func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition *
if sendCloudEvents {
// Only send events if the new condition represents a change
if !equality.Semantic.DeepEqual(beforeCondition, afterCondition) {
err := cloudevent.SendCloudEventWithRetries(ctx, object)
err := cloudevent.SendCloudEventWithRetries(ctx, wg, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
Expand All @@ -70,7 +71,7 @@ func Emit(ctx context.Context, beforeCondition *apis.Condition, afterCondition *
}

// EmitCloudEvents emits CloudEvents (only) for object
func EmitCloudEvents(ctx context.Context, object runtime.Object) {
func EmitCloudEvents(ctx context.Context, object runtime.Object, wg *sync.WaitGroup) {
logger := logging.FromContext(ctx)
configs := config.FromContextOrDefaults(ctx)
sendCloudEvents := (configs.Defaults.DefaultCloudEventsSink != "")
Expand All @@ -79,7 +80,7 @@ func EmitCloudEvents(ctx context.Context, object runtime.Object) {
}

if sendCloudEvents {
err := cloudevent.SendCloudEventWithRetries(ctx, object)
err := cloudevent.SendCloudEventWithRetries(ctx, wg, object)
if err != nil {
logger.Warnf("Failed to emit cloud events %v", err.Error())
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/reconciler/events/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package events

import (
"errors"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -235,7 +236,9 @@ func TestEmit(t *testing.T) {
ctx = config.ToContext(ctx, cfg)

recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder)
Emit(ctx, nil, after, object)
var wg sync.WaitGroup
Emit(ctx, nil, after, object, &wg)
wg.Wait()
if err := eventstest.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil {
t.Fatalf(err.Error())
}
Expand Down Expand Up @@ -291,7 +294,9 @@ func TestEmitCloudEvents(t *testing.T) {
ctx = config.ToContext(ctx, cfg)

recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder)
EmitCloudEvents(ctx, object)
var wg sync.WaitGroup
EmitCloudEvents(ctx, object, &wg)
wg.Wait()
if err := eventstest.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil {
t.Fatalf(err.Error())
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pipelinerun

import (
"context"
"sync"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
Expand All @@ -43,7 +44,7 @@ import (
)

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(context.Context, configmap.Watcher) *controller.Impl {
func NewController(opts *pipeline.Options, clock clock.PassiveClock, wg *sync.WaitGroup) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
kubeclientset := kubeclient.Get(ctx)
Expand All @@ -61,6 +62,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
PipelineClientSet: pipelineclientset,
Images: opts.Images,
Clock: clock,
waitGroup: wg,
pipelineRunLister: pipelineRunInformer.Lister(),
taskRunLister: taskRunInformer.Lister(),
runLister: runInformer.Lister(),
Expand Down
6 changes: 4 additions & 2 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"

"k8s.io/apimachinery/pkg/util/sets"

Expand Down Expand Up @@ -131,6 +132,7 @@ type Reconciler struct {
PipelineClientSet clientset.Interface
Images pipeline.Images
Clock clock.PassiveClock
waitGroup *sync.WaitGroup

// listers index properties about resources
pipelineRunLister listers.PipelineRunLister
Expand Down Expand Up @@ -172,7 +174,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
// We also want to send the "Started" event as soon as possible for anyone who may be waiting
// on the event to perform user facing initialisations, such has reset a CI check status
afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded)
events.Emit(ctx, nil, afterCondition, pr)
events.Emit(ctx, nil, afterCondition, pr, c.waitGroup)

// We already sent an event for start, so update `before` with the current status
before = pr.Status.GetCondition(apis.ConditionSucceeded)
Expand Down Expand Up @@ -282,7 +284,7 @@ func (c *Reconciler) finishReconcileUpdateEmitEvents(ctx context.Context, pr *v1
logger := logging.FromContext(ctx)

afterCondition := pr.Status.GetCondition(apis.ConditionSucceeded)
events.Emit(ctx, beforeCondition, afterCondition, pr)
events.Emit(ctx, beforeCondition, afterCondition, pr, c.waitGroup)
_, err := c.updateLabelsAndAnnotations(ctx, pr)
if err != nil {
logger.Warn("Failed to update PipelineRun labels/annotations", zap.Error(err))
Expand Down
7 changes: 5 additions & 2 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http/httptest"
"net/url"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -137,7 +138,8 @@ func initializePipelineRunControllerAssets(t *testing.T, d test.Data, opts pipel
test.EnsureConfigurationConfigMapsExist(&d)
c, informers := test.SeedTestData(t, ctx, d)
configMapWatcher := cminformer.NewInformedWatcher(c.Kube, system.Namespace())
ctl := NewController(&opts, testClock)(ctx, configMapWatcher)
var wg sync.WaitGroup
ctl := NewController(&opts, testClock, &wg)(ctx, configMapWatcher)
if la, ok := ctl.Reconciler.(reconciler.LeaderAware); ok {
if err := la.Promote(reconciler.UniversalBucket(), func(reconciler.Bucket, types.NamespacedName) {}); err != nil {
t.Fatalf("error promoting reconciler leader: %v", err)
Expand All @@ -151,6 +153,7 @@ func initializePipelineRunControllerAssets(t *testing.T, d test.Data, opts pipel
Clients: c,
Controller: ctl,
Informers: informers,
WaitGroup: &wg,
Recorder: controller.GetEventRecorder(ctx).(*record.FakeRecorder),
Ctx: ctx,
}, cancel
Expand Down Expand Up @@ -6549,7 +6552,7 @@ spec:
checkPipelineRunConditionStatusAndReason(t, reconciledRun, corev1.ConditionUnknown, v1beta1.PipelineRunReasonRunning.String())

verifyTaskRunStatusesCount(t, cms[0].Data[embeddedStatusFeatureFlag], reconciledRun.Status, 1)

prt.TestAssets.WaitGroup.Wait()
wantCloudEvents := []string{
`(?s)dev.tekton.event.pipelinerun.started.v1.*test-pipelinerun`,
`(?s)dev.tekton.event.pipelinerun.running.v1.*test-pipelinerun`,
Expand Down
4 changes: 3 additions & 1 deletion pkg/reconciler/run/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package run

import (
"context"
"sync"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
Expand All @@ -32,7 +33,7 @@ import (

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
// This is a read-only controller, hence the SkipStatusUpdates set to true
func NewController() func(context.Context, configmap.Watcher) *controller.Impl {
func NewController(wg *sync.WaitGroup) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
runInformer := runinformer.Get(ctx)
Expand All @@ -43,6 +44,7 @@ func NewController() func(context.Context, configmap.Watcher) *controller.Impl {
c := &Reconciler{
cloudEventClient: cloudeventclient.Get(ctx),
cacheClient: cacheclient.Get(ctx),
waitGroup: wg,
}
impl := runreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
return controller.Options{
Expand Down
4 changes: 3 additions & 1 deletion pkg/reconciler/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package run

import (
"context"
"sync"

lru "github.com/hashicorp/golang-lru"
"github.com/tektoncd/pipeline/pkg/apis/config"
Expand All @@ -36,6 +37,7 @@ import (
type Reconciler struct {
cloudEventClient cloudevent.CEClient
cacheClient *lru.Cache
waitGroup *sync.WaitGroup
}

// Check that our Reconciler implements runreconciler.Interface
Expand Down Expand Up @@ -66,7 +68,7 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, run *v1alpha1.Run) pkgre
condition := runEvents.Status.GetCondition(apis.ConditionSucceeded)
logger.Debugf("Emitting cloudevent for %s, condition: %s", runEvents.Name, condition)

events.EmitCloudEvents(ctx, &runEvents)
events.EmitCloudEvents(ctx, &runEvents, c.waitGroup)
}

return nil
Expand Down
7 changes: 5 additions & 2 deletions pkg/reconciler/run/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package run
import (
"context"
"strings"
"sync"
"testing"

duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -54,7 +55,8 @@ func initializeRunControllerAssets(t *testing.T, d test.Data) (test.Assets, func
test.EnsureConfigurationConfigMapsExist(&d)
c, informers := test.SeedTestData(t, ctx, d)
configMapWatcher := cminformer.NewInformedWatcher(c.Kube, system.Namespace())
ctl := NewController()(ctx, configMapWatcher)
var wg sync.WaitGroup
ctl := NewController(&wg)(ctx, configMapWatcher)
if err := configMapWatcher.Start(ctx.Done()); err != nil {
t.Fatalf("error starting configmap watcher: %v", err)
}
Expand All @@ -68,6 +70,7 @@ func initializeRunControllerAssets(t *testing.T, d test.Data) (test.Assets, func
Controller: ctl,
Clients: c,
Informers: informers,
WaitGroup: &wg,
Recorder: controller.GetEventRecorder(ctx).(*record.FakeRecorder),
Ctx: ctx,
}, cancel
Expand Down Expand Up @@ -188,7 +191,7 @@ func TestReconcile_CloudEvents(t *testing.T) {
if d := cmp.Diff(&run, urun, ignoreResourceVersion); d != "" {
t.Fatalf("run should not have changed, go %v instead", d)
}

testAssets.WaitGroup.Wait()
ceClient := clients.CloudEvents.(cloudevent.FakeClient)
err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, tc.wantCloudEvents)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package taskrun

import (
"context"
"sync"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
Expand All @@ -44,7 +45,7 @@ import (
)

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(context.Context, configmap.Watcher) *controller.Impl {
func NewController(opts *pipeline.Options, clock clock.PassiveClock, wg *sync.WaitGroup) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
kubeclientset := kubeclient.Get(ctx)
Expand Down Expand Up @@ -73,6 +74,7 @@ func NewController(opts *pipeline.Options, clock clock.PassiveClock) func(contex
cloudEventClient: cloudeventclient.Get(ctx),
metrics: taskrunmetrics.Get(ctx),
entrypointCache: entrypointCache,
waitGroup: wg,
podLister: podInformer.Lister(),
pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger),
resolutionRequester: resolution.NewCRDRequester(resolutionclient.Get(ctx), resolutionInformer.Lister()),
Expand Down
Loading

0 comments on commit 61a9321

Please sign in to comment.