Skip to content

Commit

Permalink
feat: Retry starting event server and quick fail. Closes #926 (#927)
Browse files Browse the repository at this point in the history
* feat: Retry starting event server and quick fail. Closes #926

* update
  • Loading branch information
whynowy authored Nov 4, 2020
1 parent c446bad commit 6efb0e0
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 45 deletions.
4 changes: 2 additions & 2 deletions common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
// DefaultRetry is a default retry backoff settings when retrying API calls
var DefaultRetry = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1,
Jitter: 1,
}

// IsRetryableKubeAPIError returns if the error is a retryable kubernetes error
Expand Down
112 changes: 70 additions & 42 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
"sync"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/eventbus"
eventbusdriver "github.com/argoproj/argo-events/eventbus/driver"
Expand Down Expand Up @@ -38,11 +45,6 @@ import (
apicommon "github.com/argoproj/argo-events/pkg/apis/common"
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
)

// EventingServer is the server API for Eventing service.
Expand Down Expand Up @@ -265,20 +267,22 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
logger.Error("failed to get eventbus driver", zap.Error(err))
return err
}
e.eventBusConn, err = driver.Connect()
if err != nil {
if err = common.Connect(&common.DefaultRetry, func() error {
e.eventBusConn, err = driver.Connect()
return err
}); err != nil {
logger.Error("failed to connect to eventbus", zap.Error(err))
return err
}
defer e.eventBusConn.Close()

cctx, cancel := context.WithCancel(ctx)
wg := &sync.WaitGroup{}
connWG := &sync.WaitGroup{}

// Daemon to reconnect
wg.Add(1)
connWG.Add(1)
go func() {
defer wg.Done()
defer connWG.Done()
logger.Info("starting eventbus connection daemon...")
ticker := time.NewTicker(5 * time.Second)
for {
Expand All @@ -300,6 +304,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
}
}()

wg := &sync.WaitGroup{}
for _, ss := range servers {
for _, server := range ss {
// Validation has been done in eventsource-controller, it's harmless to do it again here.
Expand All @@ -313,44 +318,67 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{})
wg.Add(1)
go func(s EventingServer) {
defer wg.Done()
err := s.StartListening(cctx, func(data []byte) error {
event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("%x", uuid.New()))
event.SetType(string(s.GetEventSourceType()))
event.SetSource(s.GetEventSourceName())
event.SetSubject(s.GetEventName())
event.SetTime(time.Now())
err := event.SetData(cloudevents.ApplicationJSON, data)
if err != nil {
return err
}
eventBody, err := json.Marshal(event)
if err != nil {
return err
}
if e.eventBusConn == nil || e.eventBusConn.IsClosed() {
return errors.New("failed to publish event, eventbus connection closed")
}
if err = driver.Publish(e.eventBusConn, eventBody); err != nil {
logger.Error("failed to publish an event", zap.Error(err), zap.Any(logging.LabelEventName,
if err = common.Connect(&wait.Backoff{
Steps: 10,
Duration: 1 * time.Second,
Factor: 1,
Jitter: 30,
}, func() error {
return s.StartListening(cctx, func(data []byte) error {
event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("%x", uuid.New()))
event.SetType(string(s.GetEventSourceType()))
event.SetSource(s.GetEventSourceName())
event.SetSubject(s.GetEventName())
event.SetTime(time.Now())
err := event.SetData(cloudevents.ApplicationJSON, data)
if err != nil {
return err
}
eventBody, err := json.Marshal(event)
if err != nil {
return err
}
if e.eventBusConn == nil || e.eventBusConn.IsClosed() {
return errors.New("failed to publish event, eventbus connection closed")
}
if err = driver.Publish(e.eventBusConn, eventBody); err != nil {
logger.Error("failed to publish an event", zap.Error(err), zap.Any(logging.LabelEventName,
s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()))
return err
}
logger.Info("succeeded to publish an event", zap.Error(err), zap.Any(logging.LabelEventName,
s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()))
return err
}
logger.Info("succeeded to publish an event", zap.Error(err), zap.Any(logging.LabelEventName,
s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()))
return nil
})
if err != nil {
return nil
})
}); err != nil {
logger.Error("failed to start listening eventsource", zap.Any(logging.LabelEventSourceType,
s.GetEventSourceType()), zap.Any(logging.LabelEventName, s.GetEventName()), zap.Error(err))
}
}(server)
}
}
logger.Info("Eventing server started.")
<-stopCh
logger.Info("Shutting down...")
cancel()
wg.Wait()
return nil

eventServersWGDone := make(chan bool)
go func() {
wg.Wait()
close(eventServersWGDone)
}()

for {
select {
case <-stopCh:
logger.Info("Shutting down...")
cancel()
<-eventServersWGDone
connWG.Wait()
return nil
case <-eventServersWGDone:
logger.Error("Erroring out, no active event server running")
cancel()
connWG.Wait()
return errors.New("no active event server running")
}
}
}
4 changes: 3 additions & 1 deletion sensors/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context, stopCh <-chan
func (sensorCtx *SensorContext) triggerActions(ctx context.Context, events map[string]cloudevents.Event, triggers []v1alpha1.Trigger) error {
log := logging.FromContext(ctx)
eventsMapping := make(map[string]*v1alpha1.Event)
depNames := make([]string, 0, len(events))
for k, v := range events {
eventsMapping[k] = convertEvent(v)
depNames = append(depNames, k)
}
for _, trigger := range triggers {
if err := sensortriggers.ApplyTemplateParameters(eventsMapping, &trigger); err != nil {
Expand Down Expand Up @@ -262,7 +264,7 @@ func (sensorCtx *SensorContext) triggerActions(ctx context.Context, events map[s
if err := triggerImpl.ApplyPolicy(newObj); err != nil {
return err
}
log.Infow("successfully processed the trigger", "triggerName", trigger.Template.Name)
log.Infow("successfully processed the trigger", zap.String("triggerName", trigger.Template.Name), zap.Any("triggeredBy", depNames))
}
return nil
}
Expand Down

0 comments on commit 6efb0e0

Please sign in to comment.