Skip to content

Commit

Permalink
fix: Use WaitGroup to start eventsoures and sensors (#819)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Aug 7, 2020
1 parent 6149a49 commit 6471850
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 74 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ codegen:
./hack/update-openapigen.sh
$(MAKE) swagger
./hack/update-api-docs.sh
./hack/update-mocks.sh
rm -rf ./vendor
go mod tidy
$(MAKE) manifests
Expand Down
68 changes: 0 additions & 68 deletions eventbus/driver/mocks/Driver.go

This file was deleted.

2 changes: 1 addition & 1 deletion eventbus/driver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (n *natsStreaming) Connect() (Connection, error) {
nats.NoReconnect(),
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
conn.natsConnected = false
log.Error("NATS connection los", zap.Error(err))
log.Error("NATS connection lost", zap.Error(err))
}),
nats.ReconnectHandler(func(nnc *nats.Conn) {
conn.natsConnected = true
Expand Down
14 changes: 11 additions & 3 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/argoproj/argo-events/common/logging"
Expand Down Expand Up @@ -249,9 +250,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
logger := logging.FromContext(ctx).Desugar()
logger.Info("Starting event source server...")
servers := GetEventingServers(e.eventSource)
cctx, cancel := context.WithCancel(ctx)
defer cancel()
driver, err := eventbus.GetDriver(cctx, *e.eventBusConfig, e.eventBusSubject, e.hostname)
driver, err := eventbus.GetDriver(ctx, *e.eventBusConfig, e.eventBusSubject, e.hostname)
if err != nil {
logger.Error("failed to get eventbus driver", zap.Error(err))
return err
Expand All @@ -263,8 +262,13 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
}
defer e.eventBusConn.Close()

cctx, cancel := context.WithCancel(ctx)
wg := &sync.WaitGroup{}

// Daemon to reconnect
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("starting eventbus connection daemon...")
ticker := time.NewTicker(5 * time.Second)
for {
Expand Down Expand Up @@ -296,7 +300,9 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
// Continue starting other event services instead of failing all of them
continue
}
wg.Add(1)
go func(s EventingServer) {
defer wg.Done()
err := s.StartListening(cctx, func(data []byte) error {
event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("%x", uuid.New()))
Expand Down Expand Up @@ -327,5 +333,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
logger.Info("Eventing server started.")
<-stopCh
logger.Info("Shutting down...")
cancel()
wg.Wait()
return nil
}
13 changes: 12 additions & 1 deletion sensors/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/Knetic/govaluate"
Expand Down Expand Up @@ -65,9 +66,11 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan
}

cctx, cancel := context.WithCancel(ctx)
defer cancel()
wg := &sync.WaitGroup{}
for k, v := range triggerMapping {
wg.Add(1)
go func(depExpression string, triggers []v1alpha1.Trigger) {
defer wg.Done()
// Calculate dependencies of each group of triggers.
de := strings.ReplaceAll(depExpression, "-", "\\-")
expr, err := govaluate.NewEvaluableExpression(de)
Expand Down Expand Up @@ -139,8 +142,11 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan
}
}

wg1 := &sync.WaitGroup{}
closeSubCh := make(chan struct{})
wg1.Add(1)
go func() {
defer wg1.Done()
logger.Sugar().Infof("started to subscribe events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID)
err = ebDriver.SubscribeEventSources(cctx, conn, closeSubCh, depExpression, deps, filterFunc, actionFunc)
if err != nil {
Expand All @@ -156,6 +162,7 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan
case <-cctx.Done():
logger.Sugar().Infof("exiting eventbus connection daemon for client %s...", clientID)
ticker.Stop()
wg1.Wait()
return
case <-ticker.C:
if conn == nil || conn.IsClosed() {
Expand All @@ -168,7 +175,9 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan
logger.Info("reconnected to NATS streaming server.", zap.Any("clientID", clientID))
closeSubCh <- struct{}{}
time.Sleep(2 * time.Second)
wg1.Add(1)
go func() {
defer wg1.Done()
logger.Sugar().Infof("started to re-subscribe events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID)
err = ebDriver.SubscribeEventSources(cctx, conn, closeSubCh, depExpression, deps, filterFunc, actionFunc)
if err != nil {
Expand All @@ -184,6 +193,8 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan
logger.Info("Sensor started.")
<-stopCh
logger.Info("Shutting down...")
cancel()
wg.Wait()
return nil
}

Expand Down

0 comments on commit 6471850

Please sign in to comment.