diff --git a/Makefile b/Makefile index 70cbe2d7fb..f57a2cbfc8 100644 --- a/Makefile +++ b/Makefile @@ -148,7 +148,6 @@ codegen: ./hack/update-openapigen.sh $(MAKE) swagger ./hack/update-api-docs.sh - ./hack/update-mocks.sh rm -rf ./vendor go mod tidy $(MAKE) manifests diff --git a/eventbus/driver/mocks/Driver.go b/eventbus/driver/mocks/Driver.go deleted file mode 100644 index c1e032955d..0000000000 --- a/eventbus/driver/mocks/Driver.go +++ /dev/null @@ -1,68 +0,0 @@ -// Code generated by mockery v1.0.0. DO NOT EDIT. - -package mocks - -import ( - context "context" - - driver "github.com/argoproj/argo-events/eventbus/driver" - event "github.com/cloudevents/sdk-go/v2/event" - - mock "github.com/stretchr/testify/mock" -) - -// Driver is an autogenerated mock type for the Driver type -type Driver struct { - mock.Mock -} - -// Connect provides a mock function with given fields: -func (_m *Driver) Connect() (driver.Connection, error) { - ret := _m.Called() - - var r0 driver.Connection - if rf, ok := ret.Get(0).(func() driver.Connection); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(driver.Connection) - } - } - - var r1 error - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// Publish provides a mock function with given fields: conn, message -func (_m *Driver) Publish(conn driver.Connection, message []byte) error { - ret := _m.Called(conn, message) - - var r0 error - if rf, ok := ret.Get(0).(func(driver.Connection, []byte) error); ok { - r0 = rf(conn, message) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// SubscribeEventSources provides a mock function with given fields: ctx, conn, closeCh, dependencyExpr, dependencies, filter, action -func (_m *Driver) SubscribeEventSources(ctx context.Context, conn driver.Connection, closeCh <-chan struct{}, dependencyExpr string, dependencies []driver.Dependency, filter func(string, event.Event) bool, action func(map[string]event.Event)) error { - ret := _m.Called(ctx, conn, closeCh, dependencyExpr, dependencies, filter, action) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, driver.Connection, <-chan struct{}, string, []driver.Dependency, func(string, event.Event) bool, func(map[string]event.Event)) error); ok { - r0 = rf(ctx, conn, closeCh, dependencyExpr, dependencies, filter, action) - } else { - r0 = ret.Error(0) - } - - return r0 -} diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index 2f42b851fb..0e1146616b 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -80,7 +80,7 @@ func (n *natsStreaming) Connect() (Connection, error) { nats.NoReconnect(), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { conn.natsConnected = false - log.Error("NATS connection los", zap.Error(err)) + log.Error("NATS connection lost", zap.Error(err)) }), nats.ReconnectHandler(func(nnc *nats.Conn) { conn.natsConnected = true diff --git a/eventsources/eventing.go b/eventsources/eventing.go index 4cbd419f04..6fcb283c6b 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" "github.com/argoproj/argo-events/common/logging" @@ -249,9 +250,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{}) logger := logging.FromContext(ctx).Desugar() logger.Info("Starting event source server...") servers := GetEventingServers(e.eventSource) - cctx, cancel := context.WithCancel(ctx) - defer cancel() - driver, err := eventbus.GetDriver(cctx, *e.eventBusConfig, e.eventBusSubject, e.hostname) + driver, err := eventbus.GetDriver(ctx, *e.eventBusConfig, e.eventBusSubject, e.hostname) if err != nil { logger.Error("failed to get eventbus driver", zap.Error(err)) return err @@ -263,8 +262,13 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{}) } defer e.eventBusConn.Close() + cctx, cancel := context.WithCancel(ctx) + wg := &sync.WaitGroup{} + // Daemon to reconnect + wg.Add(1) go func() { + defer wg.Done() logger.Info("starting eventbus connection daemon...") ticker := time.NewTicker(5 * time.Second) for { @@ -296,7 +300,9 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{}) // Continue starting other event services instead of failing all of them continue } + wg.Add(1) go func(s EventingServer) { + defer wg.Done() err := s.StartListening(cctx, func(data []byte) error { event := cloudevents.NewEvent() event.SetID(fmt.Sprintf("%x", uuid.New())) @@ -327,5 +333,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{}) logger.Info("Eventing server started.") <-stopCh logger.Info("Shutting down...") + cancel() + wg.Wait() return nil } diff --git a/sensors/listener.go b/sensors/listener.go index fcd329effe..b3f5ce6529 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/Knetic/govaluate" @@ -65,9 +66,11 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan } cctx, cancel := context.WithCancel(ctx) - defer cancel() + wg := &sync.WaitGroup{} for k, v := range triggerMapping { + wg.Add(1) go func(depExpression string, triggers []v1alpha1.Trigger) { + defer wg.Done() // Calculate dependencies of each group of triggers. de := strings.ReplaceAll(depExpression, "-", "\\-") expr, err := govaluate.NewEvaluableExpression(de) @@ -139,8 +142,11 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan } } + wg1 := &sync.WaitGroup{} closeSubCh := make(chan struct{}) + wg1.Add(1) go func() { + defer wg1.Done() logger.Sugar().Infof("started to subscribe events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID) err = ebDriver.SubscribeEventSources(cctx, conn, closeSubCh, depExpression, deps, filterFunc, actionFunc) if err != nil { @@ -156,6 +162,7 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan case <-cctx.Done(): logger.Sugar().Infof("exiting eventbus connection daemon for client %s...", clientID) ticker.Stop() + wg1.Wait() return case <-ticker.C: if conn == nil || conn.IsClosed() { @@ -168,7 +175,9 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan logger.Info("reconnected to NATS streaming server.", zap.Any("clientID", clientID)) closeSubCh <- struct{}{} time.Sleep(2 * time.Second) + wg1.Add(1) go func() { + defer wg1.Done() logger.Sugar().Infof("started to re-subscribe events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID) err = ebDriver.SubscribeEventSources(cctx, conn, closeSubCh, depExpression, deps, filterFunc, actionFunc) if err != nil { @@ -184,6 +193,8 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan logger.Info("Sensor started.") <-stopCh logger.Info("Shutting down...") + cancel() + wg.Wait() return nil }