From a80d34081bc66137b40fb7c2ee990cd83fa6faaf Mon Sep 17 00:00:00 2001 From: Jack Ding Date: Wed, 30 Nov 2022 21:07:24 -0500 Subject: [PATCH] Fix amq router race condition Add retries when starting AMQP server to deal with the race condition when AMQP router starts later than cloud-event-proxy service, for example when a node is rebooted. Signed-off-by: Jack Ding --- v1/amqp/amqp.go | 57 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/v1/amqp/amqp.go b/v1/amqp/amqp.go index 6b3205f..bec55f7 100644 --- a/v1/amqp/amqp.go +++ b/v1/amqp/amqp.go @@ -15,7 +15,9 @@ package amqp import ( + "context" "sync" + "time" "github.com/Azure/go-amqp" @@ -23,31 +25,48 @@ import ( "github.com/redhat-cne/sdk-go/pkg/channel" "github.com/redhat-cne/sdk-go/pkg/errorhandler" amqp1 "github.com/redhat-cne/sdk-go/pkg/protocol/amqp" + log "github.com/sirupsen/logrus" ) var ( - instance *AMQP - once sync.Once + instance *AMQP + retryTimeout = 500 * time.Millisecond + cancelTimeout = 30 * time.Second ) -//AMQP exposes amqp api methods +// AMQP exposes amqp api methods type AMQP struct { Router *amqp1.Router } -//GetAMQPInstance get event instance +// GetAMQPInstance get event instance func GetAMQPInstance(amqpHost string, dataIn <-chan *channel.DataChan, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) (*AMQP, error) { - once.Do(func() { - router, err := amqp1.InitServer(amqpHost, dataIn, dataOut, closeCh) - if err == nil { - instance = &AMQP{ - Router: router, - } + ctx, cancel := context.WithTimeout(context.Background(), cancelTimeout) + defer cancel() + var router *amqp1.Router + var err error + for { + select { + case <-ctx.Done(): + return nil, errorhandler.AMQPConnectionError{} + default: } - }) + router, err = amqp1.InitServer(amqpHost, dataIn, dataOut, closeCh) + if err != nil { + log.Info("retrying connecting to amqp.") + time.Sleep(retryTimeout) + continue + } + + instance = &AMQP{ + Router: router, + } + break + } if instance == nil || instance.Router == nil { - return nil, errorhandler.AMQPConnectionError{Desc: "amqp connection error"} + return nil, errorhandler.AMQPConnectionError{} } + if instance.Router.Client == nil { client, err := instance.Router.NewClient(amqpHost, []amqp.ConnOption{}) if err != nil { @@ -58,12 +77,12 @@ func GetAMQPInstance(amqpHost string, dataIn <-chan *channel.DataChan, dataOut c return instance, nil } -//Start start amqp processors +// Start start amqp processors func (a *AMQP) Start(wg *sync.WaitGroup) { go instance.Router.QDRRouter(wg) } -//NewSender - create new sender independent of the framework +// NewSender - create new sender independent of the framework func NewSender(hostName string, port int, address string) (*amqp1.Protocol, error) { return amqp1.NewSender(hostName, port, address) } @@ -73,7 +92,7 @@ func NewReceiver(hostName string, port int, address string) (*amqp1.Protocol, er return amqp1.NewReceiver(hostName, port, address) } -//DeleteSender send publisher address information on a channel to delete its sender object +// DeleteSender send publisher address information on a channel to delete its sender object func DeleteSender(inChan chan<- *channel.DataChan, address string) { // go ahead and create QDR to this address inChan <- &channel.DataChan{ @@ -83,7 +102,7 @@ func DeleteSender(inChan chan<- *channel.DataChan, address string) { } } -//CreateSender send publisher address information on a channel to create it's sender object +// CreateSender send publisher address information on a channel to create it's sender object func CreateSender(inChan chan<- *channel.DataChan, address string) { // go ahead and create QDR to this address inChan <- &channel.DataChan{ @@ -93,7 +112,7 @@ func CreateSender(inChan chan<- *channel.DataChan, address string) { } } -//DeleteListener send subscription address information on a channel to delete its listener object +// DeleteListener send subscription address information on a channel to delete its listener object func DeleteListener(inChan chan<- *channel.DataChan, address string) { // go ahead and create QDR listener to this address inChan <- &channel.DataChan{ @@ -103,7 +122,7 @@ func DeleteListener(inChan chan<- *channel.DataChan, address string) { } } -//CreateListener send subscription address information on a channel to create its listener object +// CreateListener send subscription address information on a channel to create its listener object func CreateListener(inChan chan<- *channel.DataChan, address string) { // go ahead and create QDR listener to this address inChan <- &channel.DataChan{ @@ -113,7 +132,7 @@ func CreateListener(inChan chan<- *channel.DataChan, address string) { } } -//CreateNewStatusListener send status address information on a channel to create its listener object +// CreateNewStatusListener send status address information on a channel to create its listener object func CreateNewStatusListener(inChan chan<- *channel.DataChan, address string, onReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error, processEventFn func(e interface{}) error) {