Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

entity event handler test fixes #2026

Merged
merged 3 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/controlplane/handlers_githubwebhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *UnitTestSuite) TestHandleWebHookPing() {
srv := newDefaultServer(t, mockStore)
defer srv.evt.Close()

pq := testqueue.NewPassthroughQueue()
pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass)
Expand Down Expand Up @@ -144,7 +144,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepository() {
srv := newDefaultServer(t, mockStore)
defer srv.evt.Close()

pq := testqueue.NewPassthroughQueue()
pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass)
Expand Down Expand Up @@ -210,7 +210,7 @@ func (s *UnitTestSuite) TestHandleWebHookRepository() {
srv := newDefaultServer(t, mockStore)
defer srv.evt.Close()

pq := testqueue.NewPassthroughQueue()
pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass)
Expand Down Expand Up @@ -327,7 +327,7 @@ func (s *UnitTestSuite) TestHandleWebHookUnexistentRepoPackage() {
srv := newDefaultServer(t, mockStore)
defer srv.evt.Close()

pq := testqueue.NewPassthroughQueue()
pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

srv.evt.Register(engine.ExecuteEntityEventTopic, pq.Pass)
Expand Down
16 changes: 8 additions & 8 deletions internal/engine/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,16 @@ default allow = true`,
})
require.NoError(t, err, "failed to setup eventer")

pq := testqueue.NewPassthroughQueue(t)
queued := pq.GetQueue()

go func() {
t.Log("Running eventer")
evt.Register(engine.FlushEntityEventTopic, pq.Pass)
err := evt.Run(context.Background())
require.NoError(t, err, "failed to run eventer")
}()

pq := testqueue.NewPassthroughQueue()
queued := pq.GetQueue()

testTimeout := 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
Expand All @@ -286,8 +287,6 @@ default allow = true`,
}, evt)
require.NoError(t, err, "expected no error")

evt.Register(engine.FlushEntityEventTopic, pq.Pass)

eiw := engine.NewEntityInfoWrapper().
WithProvider(providerName).
WithProjectID(projectID).
Expand All @@ -301,20 +300,21 @@ default allow = true`,
msg, err := eiw.BuildMessage()
require.NoError(t, err, "expected no error")

t.Log("waiting for eventer to start")
<-evt.Running()

// Run in the background
go func() {
t.Log("Running entity event handler")
require.NoError(t, e.HandleEntityEvent(msg), "expected no error")
}()

t.Log("waiting for eventer to start")
<-evt.Running()

// expect flush
t.Log("waiting for flush")
require.NotNil(t, <-queued, "expected message")

require.NoError(t, evt.Close(), "expected no error")

t.Log("waiting for executor to finish")
e.Wait()
}
13 changes: 11 additions & 2 deletions internal/util/testqueue/passthroughqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@
// Package testqueue contains queue utilities for testing
package testqueue

import "github.com/ThreeDotsLabs/watermill/message"
import (
"testing"

"github.com/ThreeDotsLabs/watermill/message"
)

// PassthroughQueue is a queue that passes messages through.
// It's only useful for testing.
type PassthroughQueue struct {
ch chan *message.Message
t *testing.T
}

// NewPassthroughQueue creates a new PassthroughQueue
func NewPassthroughQueue() *PassthroughQueue {
func NewPassthroughQueue(t *testing.T) *PassthroughQueue {
t.Helper()

return &PassthroughQueue{
ch: make(chan *message.Message),
t: t,
}
}

Expand All @@ -38,6 +46,7 @@ func (q *PassthroughQueue) GetQueue() <-chan *message.Message {

// Pass passes a message through the queue
func (q *PassthroughQueue) Pass(msg *message.Message) error {
q.t.Logf("Passing message through queue: %s", msg.UUID)
rdimitrov marked this conversation as resolved.
Show resolved Hide resolved
q.ch <- msg
return nil
}
Loading