Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ping status fix #50

Merged
merged 1 commit into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions pkg/channel/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,34 @@ package channel

import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
)

// DataChan ...
type DataChan struct {
ID string
Address string
Data *cloudevents.Event
Status Status
ID string
ClientID uuid.UUID
Address string
Data *cloudevents.Event
Status Status
//Type defines type of data (Notification,Metric,Status)
Type Type
// OnReceiveFn to do on OnReceive
OnReceiveFn func(e cloudevents.Event)
// OnReceiveOverrideFn Optional for event, but override for status pings.This is an override function on receiving msg by amqp listener,
// if not set then the data is sent to out channel and processed by side car default method
// if not set then the data is sent to out channel and processed by side-car default method
OnReceiveOverrideFn func(e cloudevents.Event, dataChan *DataChan) error
// ProcessEventFn Optional, this allows to customize message handler thar was received at the out channel
ProcessEventFn func(e interface{}) error
}

// CreateCloudEvents ...
func (d *DataChan) CreateCloudEvents(dataType string) (*cloudevents.Event, error) {
ce := cloudevents.NewEvent(cloudevents.VersionV03)
jzding marked this conversation as resolved.
Show resolved Hide resolved
ce.SetDataContentType(cloudevents.ApplicationJSON)
ce.SetSpecVersion(cloudevents.VersionV03)
ce.SetType(dataType)
ce.SetSource(d.Address)
ce.SetID(d.ClientID.String())
return &ce, nil
}
131 changes: 69 additions & 62 deletions pkg/protocol/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

var (
cancelTimeout = 100 * time.Millisecond
cancelTimeout = 500 * time.Millisecond
retryTimeout = 500 * time.Millisecond
RequestReadHeaderTimeout = 2 * time.Second
)
Expand Down Expand Up @@ -113,7 +113,7 @@ func (h *Server) Start(wg *sync.WaitGroup) error {
Address: e.Source(),
Data: &e,
Status: status,
Type: eventType, // could be new event of new subscriber (sender)
Type: eventType,
ProcessEventFn: h.processEventFn,
}
var obj subscriber.Subscriber
Expand All @@ -123,7 +123,6 @@ func (h *Server) Start(wg *sync.WaitGroup) error {
log.Errorf("failied to parse subscription %s", err)
} else {
out.Address = obj.GetEndPointURI()

if obj.Action == channel.NEW {
if _, ok := h.Sender[obj.ClientID]; !ok { // we have a sender object
log.Infof("(1)subscriber not found for the following address %s by %s, will attempt to create", e.Source(), obj.GetEndPointURI())
Expand All @@ -132,14 +131,21 @@ func (h *Server) Start(wg *sync.WaitGroup) error {
localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.FAILED, 1)
out.Status = channel.FAILED
} else {
_, _ = h.subscriberAPI.CreateSubscription(obj.ClientID, obj)
localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.ACTIVE, 1)
out.Status = channel.SUCCESS
if _, err := h.subscriberAPI.CreateSubscription(obj.ClientID, obj); err != nil {
localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.ACTIVE, 1)
out.Status = channel.SUCCESS
} else {
localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.ACTIVE, 1)
out.Status = channel.FAILED
}
}
} else {
log.Infof("(1)subscriber already found for %s, by %s will update again\n", e.Source(), obj.GetEndPointURI())
log.Infof("sender already present,updating %s", obj.ClientID.String())
out.Status = channel.SUCCESS
_, _ = h.subscriberAPI.CreateSubscription(obj.ClientID, obj)
if _, err := h.subscriberAPI.CreateSubscription(obj.ClientID, obj); err != nil {
log.Errorf("failed creating subscriber %s", err)
out.Status = channel.FAILED
}
}
} else {
if _, ok := h.Sender[obj.ClientID]; !ok {
Expand All @@ -157,26 +163,22 @@ func (h *Server) Start(wg *sync.WaitGroup) error {
return err
}
statusHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(e cloudevents.Event) {
clientID, _ := uuid.Parse(e.ID())
out := channel.DataChan{
Address: e.Source(),
Data: &e,
Status: channel.NEW,
Type: channel.STATUS, // could be new event of new subscriber (sender)
Address: e.Source(),
ClientID: clientID,
Status: channel.NEW,
Type: channel.STATUS, // could be new event of new subscriber (sender)
}

if h.statusReceiveOverrideFn != nil {
out.Status = channel.SUCCESS
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1)
if err := h.statusReceiveOverrideFn(e, &out); err != nil {
out.Status = channel.FAILED
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1)
} else {
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1)
out.Status = channel.SUCCESS
}
} else {
out.Status = channel.FAILED
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1)
}
h.DataOut <- &out
})
Expand Down Expand Up @@ -362,6 +364,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
subs.Action = d.Status
ce, _ := subs.CreateCloudEvents()
ce.SetSubject(d.Status.String())
ce.SetSource(d.Address)

if len(h.Publishers) > 0 {
for _, pubURL := range h.Publishers {
Expand All @@ -384,19 +387,8 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
}
} else if d.Type == channel.EVENT && d.Status == channel.NEW {
// Post the events to the address specified
log.Infof("fetch all urls to send events for %s", d.Address)
eventSubscribers := h.subscriberAPI.GetClientIDAddressByResource(d.Address)
if len(eventSubscribers) == 0 {
log.Errorf("no subscriber found for resource %s", d.Address)
log.Errorf("event not publsihed to empty subscribers, clients need to register %s", d.Address)
log.Infof("event to log %s", d.Data.String())
d.Status = channel.FAILED
localmetrics.UpdateEventCreatedCount(d.Address, localmetrics.FAILED, -1)
h.DataOut <- d
} else {
for clientID, endPointURI := range eventSubscribers {
log.Infof("event to log %s", d.Data.String())
log.Infof("Loop and Post events %s, who have subscribed to %s", d.Address, endPointURI) // this address is event address
if d.ClientID != uuid.Nil {
if url := h.subscriberAPI.GetSubscriberURLByResourceAndClientID(d.ClientID, d.Address); url != nil {
data := &channel.DataChan{
ID: d.ID,
Address: d.Address,
Expand All @@ -407,7 +399,36 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
OnReceiveOverrideFn: d.OnReceiveOverrideFn,
ProcessEventFn: d.ProcessEventFn,
}
h.SendTo(wg, clientID, endPointURI.String(), data.Data, d.Type)
h.SendTo(wg, d.ClientID, *url, data.Data, d.Type)
log.Infof("status ping: queued event status for client %s for resource %s", d.ClientID.String(), d.Address)
} else {
log.Errorf("status ping: failed to find subscription for client %s", d.ClientID.String())
}
} else {
log.Infof("fetch all urls to send events for %s", d.Address)
eventSubscribers := h.subscriberAPI.GetClientIDAddressByResource(d.Address)
if len(eventSubscribers) == 0 {
log.Infof("no subscriber found for resource %s", d.Address)
log.Infof("skipping event publishing, clients need to register %s", d.Address)
log.Debugf("event to log %s", d.Data.String())
d.Status = channel.FAILED
localmetrics.UpdateEventCreatedCount(d.Address, localmetrics.FAILED, -1)
h.DataOut <- d
} else {
for clientID, endPointURI := range eventSubscribers {
log.Infof("post events %s to subscriber %s", d.Address, endPointURI) // this address is event address
data := &channel.DataChan{
ID: d.ID,
Address: d.Address,
Data: d.Data,
Status: d.Status,
Type: d.Type,
OnReceiveFn: d.OnReceiveFn,
OnReceiveOverrideFn: d.OnReceiveOverrideFn,
ProcessEventFn: d.ProcessEventFn,
}
h.SendTo(wg, clientID, endPointURI.String(), data.Data, d.Type)
}
}
}
} else if d.Type == channel.STATUS { //Ping for status
Expand All @@ -416,34 +437,20 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
// d.Address is resource address
// if its empty then Get all address and ID and create subscription object
//else Get only sub you are interested
//TODO: change to Get status for all events
// current implementation expects to have a resource address
// Post it to the address that has been specified : to target URL
subs := subscriber.New(h.clientID)
//Self URL
_ = subs.SetEndPointURI(h.ServiceName)
obj := pubsub.PubSub{}
if d.Address != "" {
obj.Resource = d.Address
}
subs.AddSubscription(obj)
subs.Action = d.Status
ce, _ := subs.CreateCloudEvents()
ce.SetSubject(d.Status.String())
ce.SetType(channel.STATUS.String())
if len(h.Publishers) > 0 {
for _, pubURL := range h.Publishers {
// this is called form consumer, so sender object registered at consumer side
if err := Post(fmt.Sprintf("%s%s", pubURL.String(), STATUS), *ce); err != nil {
log.Infof("error sending events status ping to %s for %s", pubURL.String(), d.Address) // this address is event address
d.Status = channel.FAILED
h.DataOut <- d
} else {
log.Infof("successfully sent status ping to %s for %s", pubURL.String(), d.Address)
d.Status = channel.SUCCESS
h.DataOut <- d
d.ClientID = h.ClientID()
if ce, err := d.CreateCloudEvents(channel.STATUS.String()); err == nil {
if len(h.Publishers) > 0 { //TODO: support ping to targeted publishers
for _, pubURL := range h.Publishers {
// this is called form consumer, so sender object registered at consumer side
if err := Post(fmt.Sprintf("%s%s", pubURL.String(), STATUS), *ce); err != nil {
log.Infof("error sending events status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address) // this address is event address
} else {
log.Infof("successfully sent status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address)
}
}
}
} else {
log.Errorf("error creating cloud events for status")
}
}
case <-h.CloseCh:
Expand Down Expand Up @@ -473,8 +480,8 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st
if sender == nil {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1)
}
if error := sender.Send(*e); error != nil {
log.Errorf("failed to send(TO): %s result %v ", clientAddress, error)
if err := sender.Send(*e); err != nil {
log.Errorf("failed to send(TO): %s result %v ", clientAddress, err)
if eventType == channel.EVENT {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1)
} else if eventType == channel.STATUS {
Expand All @@ -496,10 +503,10 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st
Type: channel.SUBSCRIBER,
}
}

log.Errorf("connection lost addressing %s", clientAddress)
} else {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.SUCCESS, 1)

h.DataOut <- &channel.DataChan{
Address: clientAddress,
Data: e,
Expand Down Expand Up @@ -586,7 +593,7 @@ func (h *Server) NewSender(clientID uuid.UUID, address string) error {
log.Errorf("failed to create http client for %s: %s", s, err2.Error())
return err2
}
log.Infof("Registering subscriber to %s", targetURL)
log.Infof("Registering subscriber endpoint %s", targetURL)
l[s].Protocol = protocol
l[s].Client = client
// this could be changed to use client ID
Expand Down Expand Up @@ -623,7 +630,7 @@ func (c *Protocol) Send(e cloudevents.Event) error {
log.Infof("sent with status code %d::%v", httpResult.StatusCode, result)
return nil
}
log.Printf("Sent with status code %d, result: %v", httpResult.StatusCode, result)
log.Infof("Sent with status code %d, result: %v", httpResult.StatusCode, result)
return fmt.Errorf(httpResult.Format, httpResult.Args...)
}
log.Printf("Send did not return an HTTP response: %s", result)
Expand Down
53 changes: 32 additions & 21 deletions pkg/protocol/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func createClient(t *testing.T, clientS *ceHttp.Server, closeCh chan struct{}, w
Type: channel.STATUS,
}
}

<-closeCh
}
func TestSubscribeCreated(t *testing.T) {
Expand Down Expand Up @@ -257,18 +256,22 @@ func TestSendSuccessStatus(t *testing.T) {
time.Sleep(500 * time.Millisecond)
assert.Equal(t, 1, len(server.Sender))
// read what client put in out channel
d := <-clientOutChannel
assert.Equal(t, channel.SUBSCRIBER, d.Type)
assert.Equal(t, channel.SUCCESS, d.Status)
// read Status
d = <-clientOutChannel
assert.Equal(t, channel.EVENT, d.Type)
assert.Equal(t, channel.NEW, d.Status)
select {
case d := <-clientOutChannel:
assert.Equal(t, channel.SUBSCRIBER, d.Type)
assert.Equal(t, channel.SUCCESS, d.Status)
case <-time.After(1 * time.Second):
log.Infof("timeout reading out channel ")
}

select {
case d := <-clientOutChannel:
assert.Equal(t, channel.EVENT, d.Type)
assert.Equal(t, channel.NEW, d.Status)
case <-time.After(1 * time.Second):
log.Infof("timeout reading out channel")
}
<-out
// read event
log.Info("waiting for event channel from the client when it received the event")
d = <-clientOutChannel // a client needs to break out or else it will be holding it forever
assert.Equal(t, channel.STATUS, d.Type)
dd := cneevent.Data{}
err = json.Unmarshal(e.Data(), &dd)
assert.Nil(t, err)
Expand Down Expand Up @@ -325,8 +328,10 @@ func TestPing(t *testing.T) {
in := make(chan *channel.DataChan)
out := make(chan *channel.DataChan)
closeCh := make(chan struct{})

server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
onStatusReceiveOverrideFn := func(e event.Event, d *channel.DataChan) error {
return nil
}
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, onStatusReceiveOverrideFn, nil)
assert.Nil(t, err)

wg := sync.WaitGroup{}
Expand All @@ -338,15 +343,21 @@ func TestPing(t *testing.T) {
time.Sleep(2 * time.Second)
err = server.NewSender(serverClientID, serverAddress.String())
assert.Nil(t, err)

// send event
// send status ping
in <- &channel.DataChan{
Address: subscriptionOne.Resource,
Status: channel.NEW,
Type: channel.STATUS,
Address: subscriptionOne.Resource,
ClientID: serverClientID,
Status: channel.NEW,
Type: channel.STATUS,
}
select {
case d := <-out:
assert.Equal(t, channel.STATUS, d.Type)
assert.Equal(t, channel.SUCCESS, d.Status)
assert.Equal(t, serverClientID, server.ClientID())
case <-time.After(1 * time.Second):
log.Infof("timeout reading out channel ")
}
d := <-out // a client needs to break out or else it will be holding it forever
assert.Equal(t, channel.STATUS, d.Type)

close(closeCh)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func (s *Subscriber) IncFailCount() {
}
}

// ResetFailCount ...
func (s *Subscriber) ResetFailCount() {
s.failedCount = 0
}

func (s *Subscriber) FailedCount() int {
return s.failedCount
}
Expand Down
2 changes: 1 addition & 1 deletion v1/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func CreateListener(inChan chan<- *channel.DataChan, address string) {
}
}

//CreateNewStatusListener send status address information on a channel to create it's listener object
//CreateNewStatusListener send status address information on a channel to create its listener object
func CreateNewStatusListener(inChan chan<- *channel.DataChan, address string,
onReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error,
processEventFn func(e interface{}) error) {
Expand Down
Loading