From 42bedd003f22613201c2ab1d177647d4bb71447b Mon Sep 17 00:00:00 2001 From: Aneesh Puttur Date: Fri, 2 Sep 2022 17:02:44 -0400 Subject: [PATCH] Ping status fix passing clinetID to know which client the event to be sent to .this fix is for http only Signed-off-by: Aneesh Puttur --- pkg/channel/data.go | 23 ++++-- pkg/protocol/http/http.go | 131 +++++++++++++++++---------------- pkg/protocol/http/http_test.go | 53 +++++++------ pkg/subscriber/subscriber.go | 5 ++ v1/amqp/amqp.go | 2 +- v1/http/http.go | 10 +++ v1/subscriber/subscriber.go | 21 +++++- 7 files changed, 154 insertions(+), 91 deletions(-) diff --git a/pkg/channel/data.go b/pkg/channel/data.go index d6c83a5..3a95975 100644 --- a/pkg/channel/data.go +++ b/pkg/channel/data.go @@ -16,21 +16,34 @@ package channel import ( cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" ) // DataChan ... type DataChan struct { - ID string - Address string - Data *cloudevents.Event - Status Status + ID string + ClientID uuid.UUID + Address string + Data *cloudevents.Event + Status Status //Type defines type of data (Notification,Metric,Status) Type Type // OnReceiveFn to do on OnReceive OnReceiveFn func(e cloudevents.Event) // OnReceiveOverrideFn Optional for event, but override for status pings.This is an override function on receiving msg by amqp listener, - // if not set then the data is sent to out channel and processed by side car default method + // if not set then the data is sent to out channel and processed by side-car default method OnReceiveOverrideFn func(e cloudevents.Event, dataChan *DataChan) error // ProcessEventFn Optional, this allows to customize message handler thar was received at the out channel ProcessEventFn func(e interface{}) error } + +// CreateCloudEvents ... +func (d *DataChan) CreateCloudEvents(dataType string) (*cloudevents.Event, error) { + ce := cloudevents.NewEvent(cloudevents.VersionV03) + ce.SetDataContentType(cloudevents.ApplicationJSON) + ce.SetSpecVersion(cloudevents.VersionV03) + ce.SetType(dataType) + ce.SetSource(d.Address) + ce.SetID(d.ClientID.String()) + return &ce, nil +} diff --git a/pkg/protocol/http/http.go b/pkg/protocol/http/http.go index ea3f1b9..1f711d6 100644 --- a/pkg/protocol/http/http.go +++ b/pkg/protocol/http/http.go @@ -30,7 +30,7 @@ import ( ) var ( - cancelTimeout = 100 * time.Millisecond + cancelTimeout = 500 * time.Millisecond retryTimeout = 500 * time.Millisecond RequestReadHeaderTimeout = 2 * time.Second ) @@ -113,7 +113,7 @@ func (h *Server) Start(wg *sync.WaitGroup) error { Address: e.Source(), Data: &e, Status: status, - Type: eventType, // could be new event of new subscriber (sender) + Type: eventType, ProcessEventFn: h.processEventFn, } var obj subscriber.Subscriber @@ -123,7 +123,6 @@ func (h *Server) Start(wg *sync.WaitGroup) error { log.Errorf("failied to parse subscription %s", err) } else { out.Address = obj.GetEndPointURI() - if obj.Action == channel.NEW { if _, ok := h.Sender[obj.ClientID]; !ok { // we have a sender object log.Infof("(1)subscriber not found for the following address %s by %s, will attempt to create", e.Source(), obj.GetEndPointURI()) @@ -132,14 +131,21 @@ func (h *Server) Start(wg *sync.WaitGroup) error { localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.FAILED, 1) out.Status = channel.FAILED } else { - _, _ = h.subscriberAPI.CreateSubscription(obj.ClientID, obj) - localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.ACTIVE, 1) - out.Status = channel.SUCCESS + if _, err := h.subscriberAPI.CreateSubscription(obj.ClientID, obj); err != nil { + localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.ACTIVE, 1) + out.Status = channel.SUCCESS + } else { + localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.ACTIVE, 1) + out.Status = channel.FAILED + } } } else { - log.Infof("(1)subscriber already found for %s, by %s will update again\n", e.Source(), obj.GetEndPointURI()) + log.Infof("sender already present,updating %s", obj.ClientID.String()) out.Status = channel.SUCCESS - _, _ = h.subscriberAPI.CreateSubscription(obj.ClientID, obj) + if _, err := h.subscriberAPI.CreateSubscription(obj.ClientID, obj); err != nil { + log.Errorf("failed creating subscriber %s", err) + out.Status = channel.FAILED + } } } else { if _, ok := h.Sender[obj.ClientID]; !ok { @@ -157,16 +163,15 @@ func (h *Server) Start(wg *sync.WaitGroup) error { return err } statusHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(e cloudevents.Event) { + clientID, _ := uuid.Parse(e.ID()) out := channel.DataChan{ - Address: e.Source(), - Data: &e, - Status: channel.NEW, - Type: channel.STATUS, // could be new event of new subscriber (sender) + Address: e.Source(), + ClientID: clientID, + Status: channel.NEW, + Type: channel.STATUS, // could be new event of new subscriber (sender) } if h.statusReceiveOverrideFn != nil { - out.Status = channel.SUCCESS - localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1) if err := h.statusReceiveOverrideFn(e, &out); err != nil { out.Status = channel.FAILED localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1) @@ -174,9 +179,6 @@ func (h *Server) Start(wg *sync.WaitGroup) error { localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1) out.Status = channel.SUCCESS } - } else { - out.Status = channel.FAILED - localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1) } h.DataOut <- &out }) @@ -362,6 +364,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { subs.Action = d.Status ce, _ := subs.CreateCloudEvents() ce.SetSubject(d.Status.String()) + ce.SetSource(d.Address) if len(h.Publishers) > 0 { for _, pubURL := range h.Publishers { @@ -384,19 +387,8 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { } } else if d.Type == channel.EVENT && d.Status == channel.NEW { // Post the events to the address specified - log.Infof("fetch all urls to send events for %s", d.Address) - eventSubscribers := h.subscriberAPI.GetClientIDAddressByResource(d.Address) - if len(eventSubscribers) == 0 { - log.Errorf("no subscriber found for resource %s", d.Address) - log.Errorf("event not publsihed to empty subscribers, clients need to register %s", d.Address) - log.Infof("event to log %s", d.Data.String()) - d.Status = channel.FAILED - localmetrics.UpdateEventCreatedCount(d.Address, localmetrics.FAILED, -1) - h.DataOut <- d - } else { - for clientID, endPointURI := range eventSubscribers { - log.Infof("event to log %s", d.Data.String()) - log.Infof("Loop and Post events %s, who have subscribed to %s", d.Address, endPointURI) // this address is event address + if d.ClientID != uuid.Nil { + if url := h.subscriberAPI.GetSubscriberURLByResourceAndClientID(d.ClientID, d.Address); url != nil { data := &channel.DataChan{ ID: d.ID, Address: d.Address, @@ -407,7 +399,36 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { OnReceiveOverrideFn: d.OnReceiveOverrideFn, ProcessEventFn: d.ProcessEventFn, } - h.SendTo(wg, clientID, endPointURI.String(), data.Data, d.Type) + h.SendTo(wg, d.ClientID, *url, data.Data, d.Type) + log.Infof("status ping: queued event status for client %s for resource %s", d.ClientID.String(), d.Address) + } else { + log.Errorf("status ping: failed to find subscription for client %s", d.ClientID.String()) + } + } else { + log.Infof("fetch all urls to send events for %s", d.Address) + eventSubscribers := h.subscriberAPI.GetClientIDAddressByResource(d.Address) + if len(eventSubscribers) == 0 { + log.Infof("no subscriber found for resource %s", d.Address) + log.Infof("skipping event publishing, clients need to register %s", d.Address) + log.Debugf("event to log %s", d.Data.String()) + d.Status = channel.FAILED + localmetrics.UpdateEventCreatedCount(d.Address, localmetrics.FAILED, -1) + h.DataOut <- d + } else { + for clientID, endPointURI := range eventSubscribers { + log.Infof("post events %s to subscriber %s", d.Address, endPointURI) // this address is event address + data := &channel.DataChan{ + ID: d.ID, + Address: d.Address, + Data: d.Data, + Status: d.Status, + Type: d.Type, + OnReceiveFn: d.OnReceiveFn, + OnReceiveOverrideFn: d.OnReceiveOverrideFn, + ProcessEventFn: d.ProcessEventFn, + } + h.SendTo(wg, clientID, endPointURI.String(), data.Data, d.Type) + } } } } else if d.Type == channel.STATUS { //Ping for status @@ -416,34 +437,20 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { // d.Address is resource address // if its empty then Get all address and ID and create subscription object //else Get only sub you are interested - //TODO: change to Get status for all events - // current implementation expects to have a resource address - // Post it to the address that has been specified : to target URL - subs := subscriber.New(h.clientID) - //Self URL - _ = subs.SetEndPointURI(h.ServiceName) - obj := pubsub.PubSub{} - if d.Address != "" { - obj.Resource = d.Address - } - subs.AddSubscription(obj) - subs.Action = d.Status - ce, _ := subs.CreateCloudEvents() - ce.SetSubject(d.Status.String()) - ce.SetType(channel.STATUS.String()) - if len(h.Publishers) > 0 { - for _, pubURL := range h.Publishers { - // this is called form consumer, so sender object registered at consumer side - if err := Post(fmt.Sprintf("%s%s", pubURL.String(), STATUS), *ce); err != nil { - log.Infof("error sending events status ping to %s for %s", pubURL.String(), d.Address) // this address is event address - d.Status = channel.FAILED - h.DataOut <- d - } else { - log.Infof("successfully sent status ping to %s for %s", pubURL.String(), d.Address) - d.Status = channel.SUCCESS - h.DataOut <- d + d.ClientID = h.ClientID() + if ce, err := d.CreateCloudEvents(channel.STATUS.String()); err == nil { + if len(h.Publishers) > 0 { //TODO: support ping to targeted publishers + for _, pubURL := range h.Publishers { + // this is called form consumer, so sender object registered at consumer side + if err := Post(fmt.Sprintf("%s%s", pubURL.String(), STATUS), *ce); err != nil { + log.Infof("error sending events status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address) // this address is event address + } else { + log.Infof("successfully sent status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address) + } } } + } else { + log.Errorf("error creating cloud events for status") } } case <-h.CloseCh: @@ -473,8 +480,8 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st if sender == nil { localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1) } - if error := sender.Send(*e); error != nil { - log.Errorf("failed to send(TO): %s result %v ", clientAddress, error) + if err := sender.Send(*e); err != nil { + log.Errorf("failed to send(TO): %s result %v ", clientAddress, err) if eventType == channel.EVENT { localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1) } else if eventType == channel.STATUS { @@ -496,10 +503,10 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st Type: channel.SUBSCRIBER, } } - log.Errorf("connection lost addressing %s", clientAddress) } else { localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.SUCCESS, 1) + h.DataOut <- &channel.DataChan{ Address: clientAddress, Data: e, @@ -586,7 +593,7 @@ func (h *Server) NewSender(clientID uuid.UUID, address string) error { log.Errorf("failed to create http client for %s: %s", s, err2.Error()) return err2 } - log.Infof("Registering subscriber to %s", targetURL) + log.Infof("Registering subscriber endpoint %s", targetURL) l[s].Protocol = protocol l[s].Client = client // this could be changed to use client ID @@ -623,7 +630,7 @@ func (c *Protocol) Send(e cloudevents.Event) error { log.Infof("sent with status code %d::%v", httpResult.StatusCode, result) return nil } - log.Printf("Sent with status code %d, result: %v", httpResult.StatusCode, result) + log.Infof("Sent with status code %d, result: %v", httpResult.StatusCode, result) return fmt.Errorf(httpResult.Format, httpResult.Args...) } log.Printf("Send did not return an HTTP response: %s", result) diff --git a/pkg/protocol/http/http_test.go b/pkg/protocol/http/http_test.go index aab7cae..468b672 100644 --- a/pkg/protocol/http/http_test.go +++ b/pkg/protocol/http/http_test.go @@ -143,7 +143,6 @@ func createClient(t *testing.T, clientS *ceHttp.Server, closeCh chan struct{}, w Type: channel.STATUS, } } - <-closeCh } func TestSubscribeCreated(t *testing.T) { @@ -257,18 +256,22 @@ func TestSendSuccessStatus(t *testing.T) { time.Sleep(500 * time.Millisecond) assert.Equal(t, 1, len(server.Sender)) // read what client put in out channel - d := <-clientOutChannel - assert.Equal(t, channel.SUBSCRIBER, d.Type) - assert.Equal(t, channel.SUCCESS, d.Status) - // read Status - d = <-clientOutChannel - assert.Equal(t, channel.EVENT, d.Type) - assert.Equal(t, channel.NEW, d.Status) + select { + case d := <-clientOutChannel: + assert.Equal(t, channel.SUBSCRIBER, d.Type) + assert.Equal(t, channel.SUCCESS, d.Status) + case <-time.After(1 * time.Second): + log.Infof("timeout reading out channel ") + } + + select { + case d := <-clientOutChannel: + assert.Equal(t, channel.EVENT, d.Type) + assert.Equal(t, channel.NEW, d.Status) + case <-time.After(1 * time.Second): + log.Infof("timeout reading out channel") + } <-out - // read event - log.Info("waiting for event channel from the client when it received the event") - d = <-clientOutChannel // a client needs to break out or else it will be holding it forever - assert.Equal(t, channel.STATUS, d.Type) dd := cneevent.Data{} err = json.Unmarshal(e.Data(), &dd) assert.Nil(t, err) @@ -325,8 +328,10 @@ func TestPing(t *testing.T) { in := make(chan *channel.DataChan) out := make(chan *channel.DataChan) closeCh := make(chan struct{}) - - server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil) + onStatusReceiveOverrideFn := func(e event.Event, d *channel.DataChan) error { + return nil + } + server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, onStatusReceiveOverrideFn, nil) assert.Nil(t, err) wg := sync.WaitGroup{} @@ -338,15 +343,21 @@ func TestPing(t *testing.T) { time.Sleep(2 * time.Second) err = server.NewSender(serverClientID, serverAddress.String()) assert.Nil(t, err) - - // send event + // send status ping in <- &channel.DataChan{ - Address: subscriptionOne.Resource, - Status: channel.NEW, - Type: channel.STATUS, + Address: subscriptionOne.Resource, + ClientID: serverClientID, + Status: channel.NEW, + Type: channel.STATUS, + } + select { + case d := <-out: + assert.Equal(t, channel.STATUS, d.Type) + assert.Equal(t, channel.SUCCESS, d.Status) + assert.Equal(t, serverClientID, server.ClientID()) + case <-time.After(1 * time.Second): + log.Infof("timeout reading out channel ") } - d := <-out // a client needs to break out or else it will be holding it forever - assert.Equal(t, channel.STATUS, d.Type) close(closeCh) } diff --git a/pkg/subscriber/subscriber.go b/pkg/subscriber/subscriber.go index 44b22f0..6739bad 100644 --- a/pkg/subscriber/subscriber.go +++ b/pkg/subscriber/subscriber.go @@ -67,6 +67,11 @@ func (s *Subscriber) IncFailCount() { } } +// ResetFailCount ... +func (s *Subscriber) ResetFailCount() { + s.failedCount = 0 +} + func (s *Subscriber) FailedCount() int { return s.failedCount } diff --git a/v1/amqp/amqp.go b/v1/amqp/amqp.go index 9d2d430..6b3205f 100644 --- a/v1/amqp/amqp.go +++ b/v1/amqp/amqp.go @@ -113,7 +113,7 @@ func CreateListener(inChan chan<- *channel.DataChan, address string) { } } -//CreateNewStatusListener send status address information on a channel to create it's listener object +//CreateNewStatusListener send status address information on a channel to create its listener object func CreateNewStatusListener(inChan chan<- *channel.DataChan, address string, onReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error, processEventFn func(e interface{}) error) { diff --git a/v1/http/http.go b/v1/http/http.go index 643ea81..8d46414 100644 --- a/v1/http/http.go +++ b/v1/http/http.go @@ -22,6 +22,8 @@ import ( "sync/atomic" "time" + "github.com/google/uuid" + "github.com/redhat-cne/sdk-go/pkg/types" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -74,6 +76,14 @@ func (h *HTTP) Start(wg *sync.WaitGroup) { } } +// ClientID return clientID that was set by publisher/subscriber service +func (h *HTTP) ClientID() uuid.UUID { + if h.server != nil { + return h.server.ClientID() + } + return uuid.Nil +} + // Shutdown ... func (h *HTTP) Shutdown() { log.Warn("Shutting down http transport service") diff --git a/v1/subscriber/subscriber.go b/v1/subscriber/subscriber.go index 9491b73..c38bd33 100644 --- a/v1/subscriber/subscriber.go +++ b/v1/subscriber/subscriber.go @@ -83,9 +83,9 @@ func (p *API) ReloadStore() { } } } - log.Infof("following subscribers reloaded") + log.Infof("%d registered clients reloaded", len(p.subscriberStore.Store)) for k, v := range p.subscriberStore.Store { - log.Infof("Registered clients %s : %s", k, v.String()) + log.Infof("registered clients %s : %s", k, v.String()) } } @@ -149,6 +149,7 @@ func (p *API) CreateSubscription(clientID uuid.UUID, sub subscriber.Subscriber) if subscriptionClient, ok = p.HasClient(clientID); !ok { subscriptionClient = subscriber.New(clientID) } + subscriptionClient.ResetFailCount() _ = subscriptionClient.SetEndPointURI(sub.GetEndPointURI()) subscriptionClient.SetStatus(subscriber.Active) pubStore := subscriptionClient.GetSubStore() @@ -211,6 +212,22 @@ func (p *API) GetSubscription(clientID uuid.UUID, subID string) (sub pubsub.PubS return } +// GetSubscriberURLByResourceAndClientID get subscription information by client id/resource +func (p *API) GetSubscriberURLByResourceAndClientID(clientID uuid.UUID, resource string) (url *string) { + for _, subscriber := range p.subscriberStore.Store { + if subscriber.ClientID == clientID { + for _, sub := range subscriber.SubStore.Store { + if sub.GetResource() == resource { + return func(s string) *string { + return &s + }(subscriber.GetEndPointURI()) + } + } + } + } + return nil +} + // GetSubscriberURLByResource get subscriptionOne information func (p *API) GetSubscriberURLByResource(resource string) (urls []string) { for _, subscriber := range p.subscriberStore.Store {