From 5b0837f7bb05b3c9593676413a634a7c9201da5a Mon Sep 17 00:00:00 2001 From: Jack Ding Date: Tue, 17 Jan 2023 19:26:06 -0500 Subject: [PATCH] Allow configurable timeout for init amq connection Signed-off-by: Jack Ding --- v1/amqp/amqp.go | 15 +++++++++------ v1/amqp/amqp_test.go | 6 ++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/v1/amqp/amqp.go b/v1/amqp/amqp.go index 8687364..fef7fac 100644 --- a/v1/amqp/amqp.go +++ b/v1/amqp/amqp.go @@ -29,9 +29,8 @@ import ( ) var ( - instance *AMQP - retryTimeout = 500 * time.Millisecond - cancelTimeout = 300 * time.Second + instance *AMQP + retryTimeout = 500 * time.Millisecond ) // AMQP exposes amqp api methods @@ -40,11 +39,12 @@ type AMQP struct { } // GetAMQPInstance get event instance -func GetAMQPInstance(amqpHost string, dataIn <-chan *channel.DataChan, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) (*AMQP, error) { - ctx, cancel := context.WithTimeout(context.Background(), cancelTimeout) +func GetAMQPInstance(amqpHost string, dataIn <-chan *channel.DataChan, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}, amqInitTimeout time.Duration) (*AMQP, error) { + ctx, cancel := context.WithTimeout(context.Background(), amqInitTimeout) defer cancel() var router *amqp1.Router var err error + isFirstFail := true for { select { case <-ctx.Done(): @@ -53,7 +53,10 @@ func GetAMQPInstance(amqpHost string, dataIn <-chan *channel.DataChan, dataOut c } router, err = amqp1.InitServer(amqpHost, dataIn, dataOut, closeCh) if err != nil { - log.Info("retrying connecting to amqp.") + if isFirstFail { + log.Infof("retrying connecting to amqp every %s for %s", retryTimeout, amqInitTimeout) + isFirstFail = false + } time.Sleep(retryTimeout) continue } diff --git a/v1/amqp/amqp_test.go b/v1/amqp/amqp_test.go index e2e095f..5a462f3 100644 --- a/v1/amqp/amqp_test.go +++ b/v1/amqp/amqp_test.go @@ -16,6 +16,7 @@ package amqp_test import ( "testing" + "time" "github.com/redhat-cne/sdk-go/pkg/channel" api "github.com/redhat-cne/sdk-go/v1/amqp" @@ -29,11 +30,12 @@ var ( in = make(chan *channel.DataChan) out = make(chan *channel.DataChan) close = make(chan struct{}) - globalInstance, _ = api.GetAMQPInstance(s, in, out, close) + timeout = 1 * time.Second + globalInstance, _ = api.GetAMQPInstance(s, in, out, close, timeout) ) func TestAPI_GetAPIInstance(t *testing.T) { - localInstance, err := api.GetAMQPInstance(s, in, out, close) + localInstance, err := api.GetAMQPInstance(s, in, out, close, timeout) if err != nil { t.Skipf("ampq.Dial(%#v): %v", localInstance, err) }