Skip to content

Commit

Permalink
Fix amq router race condition
Browse files Browse the repository at this point in the history
Add retries when starting AMQP server to deal with the race
condition when AMQP router starts later than cloud-event-proxy
service, for example when a node is rebooted.

Signed-off-by: Jack Ding <[email protected]>
  • Loading branch information
jzding committed Dec 2, 2022
1 parent e226a93 commit a80d340
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions v1/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,58 @@
package amqp

import (
"context"
"sync"
"time"

"github.com/Azure/go-amqp"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/redhat-cne/sdk-go/pkg/channel"
"github.com/redhat-cne/sdk-go/pkg/errorhandler"
amqp1 "github.com/redhat-cne/sdk-go/pkg/protocol/amqp"
log "github.com/sirupsen/logrus"
)

var (
instance *AMQP
once sync.Once
instance *AMQP
retryTimeout = 500 * time.Millisecond
cancelTimeout = 30 * time.Second
)

//AMQP exposes amqp api methods
// AMQP exposes amqp api methods
type AMQP struct {
Router *amqp1.Router
}

//GetAMQPInstance get event instance
// GetAMQPInstance get event instance
func GetAMQPInstance(amqpHost string, dataIn <-chan *channel.DataChan, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) (*AMQP, error) {
once.Do(func() {
router, err := amqp1.InitServer(amqpHost, dataIn, dataOut, closeCh)
if err == nil {
instance = &AMQP{
Router: router,
}
ctx, cancel := context.WithTimeout(context.Background(), cancelTimeout)
defer cancel()
var router *amqp1.Router
var err error
for {
select {
case <-ctx.Done():
return nil, errorhandler.AMQPConnectionError{}
default:
}
})
router, err = amqp1.InitServer(amqpHost, dataIn, dataOut, closeCh)
if err != nil {
log.Info("retrying connecting to amqp.")
time.Sleep(retryTimeout)
continue
}

instance = &AMQP{
Router: router,
}
break
}
if instance == nil || instance.Router == nil {
return nil, errorhandler.AMQPConnectionError{Desc: "amqp connection error"}
return nil, errorhandler.AMQPConnectionError{}
}

if instance.Router.Client == nil {
client, err := instance.Router.NewClient(amqpHost, []amqp.ConnOption{})
if err != nil {
Expand All @@ -58,12 +77,12 @@ func GetAMQPInstance(amqpHost string, dataIn <-chan *channel.DataChan, dataOut c
return instance, nil
}

//Start start amqp processors
// Start start amqp processors
func (a *AMQP) Start(wg *sync.WaitGroup) {
go instance.Router.QDRRouter(wg)
}

//NewSender - create new sender independent of the framework
// NewSender - create new sender independent of the framework
func NewSender(hostName string, port int, address string) (*amqp1.Protocol, error) {
return amqp1.NewSender(hostName, port, address)
}
Expand All @@ -73,7 +92,7 @@ func NewReceiver(hostName string, port int, address string) (*amqp1.Protocol, er
return amqp1.NewReceiver(hostName, port, address)
}

//DeleteSender send publisher address information on a channel to delete its sender object
// DeleteSender send publisher address information on a channel to delete its sender object
func DeleteSender(inChan chan<- *channel.DataChan, address string) {
// go ahead and create QDR to this address
inChan <- &channel.DataChan{
Expand All @@ -83,7 +102,7 @@ func DeleteSender(inChan chan<- *channel.DataChan, address string) {
}
}

//CreateSender send publisher address information on a channel to create it's sender object
// CreateSender send publisher address information on a channel to create it's sender object
func CreateSender(inChan chan<- *channel.DataChan, address string) {
// go ahead and create QDR to this address
inChan <- &channel.DataChan{
Expand All @@ -93,7 +112,7 @@ func CreateSender(inChan chan<- *channel.DataChan, address string) {
}
}

//DeleteListener send subscription address information on a channel to delete its listener object
// DeleteListener send subscription address information on a channel to delete its listener object
func DeleteListener(inChan chan<- *channel.DataChan, address string) {
// go ahead and create QDR listener to this address
inChan <- &channel.DataChan{
Expand All @@ -103,7 +122,7 @@ func DeleteListener(inChan chan<- *channel.DataChan, address string) {
}
}

//CreateListener send subscription address information on a channel to create its listener object
// CreateListener send subscription address information on a channel to create its listener object
func CreateListener(inChan chan<- *channel.DataChan, address string) {
// go ahead and create QDR listener to this address
inChan <- &channel.DataChan{
Expand All @@ -113,7 +132,7 @@ func CreateListener(inChan chan<- *channel.DataChan, address string) {
}
}

//CreateNewStatusListener send status address information on a channel to create its listener object
// CreateNewStatusListener send status address information on a channel to create its listener object
func CreateNewStatusListener(inChan chan<- *channel.DataChan, address string,
onReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error,
processEventFn func(e interface{}) error) {
Expand Down

0 comments on commit a80d340

Please sign in to comment.