Skip to content

Commit

Permalink
fix: Fix Azure EventsHub issue (#846)
Browse files Browse the repository at this point in the history
  • Loading branch information
VaibhavPage authored Aug 26, 2020
1 parent db9a7f3 commit 7643008
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions eventsources/sources/azureeventshub/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

handler := func(c context.Context, event *eventhub.Event) error {
log.Info("received an event from eventshub...")

eventData := &events.AzureEventsHubEventData{
Id: event.ID,
Body: event.Data,
Expand All @@ -98,6 +100,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
return errors.Wrapf(err, "failed to marshal the event data for event source %s and message id %s", el.GetEventName(), event.ID)
}

log.Info("dispatching the event to eventbus...")
err = dispatch(eventBytes)
if err != nil {
log.Error("failed to dispatch Azure EventHub event", zap.Error(err))
Expand All @@ -113,23 +116,25 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
return errors.Wrapf(err, "failed to get the hub runtime information for %s", el.GetEventName())
}

var listenerHandles []*eventhub.ListenerHandle
if runtimeInfo == nil {
return errors.Wrapf(err, "runtime information is not available for %s", el.GetEventName())
}

if runtimeInfo.PartitionIDs == nil {
return errors.Wrapf(err, "no partition ids are available for %s", el.GetEventName())
}

log.Info("handling the partitions...")
for _, partitionID := range runtimeInfo.PartitionIDs {
listenerHandle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
if err != nil {
if _, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset()); err != nil {
return errors.Wrapf(err, "failed to receive events from partition %s", partitionID)
}
listenerHandles = append(listenerHandles, listenerHandle)
}

<-ctx.Done()
log.Info("stopping listener handlers")

for _, handler := range listenerHandles {
handler.Close(ctx)
}
hub.Close(context.Background())

return nil
}

0 comments on commit 7643008

Please sign in to comment.