Skip to content

Commit

Permalink
Fix delete subription and msg responses
Browse files Browse the repository at this point in the history
Signed-off-by: Aneesh Puttur <[email protected]>
  • Loading branch information
aneeshkp committed Feb 28, 2023
1 parent 881cc84 commit 7dbf075
Show file tree
Hide file tree
Showing 10 changed files with 390 additions and 125 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,11 @@ test:
gha:
go test ./... --tags=unittests -coverprofile=cover.out

fmt: ## Go fmt your code
hack/gofmt.sh

fmt-code: ## Run go fmt against code.
go fmt ./...

vet: ## Run go vet against code.
go vet ./...
80 changes: 40 additions & 40 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,75 +28,75 @@ These metrics describe the status of the cloud native events, publisher and subs

All these metrics are prefixed with `cne_`

| Name | Description | Type |
|-------------------------------------------------------|----------------------------------------------------------|---------|
| cne_amqp_events_received | Metric to get number of events received by the transport. | Gauge |
| cne_amqp_events_published | Metric to get number of events published by the transport. | Gauge |
| cne_amqp_connection_reset | Metric to get number of connection resets. | Gauge |
| cne_amqp_sender | Metric to get number of sender created. | Gauge |
| cne_amqp_receiver | Metric to get number of receiver created. | Gauge |
| cne_amqp_status_check_published | Metric to get number of status check published by the transport | Gauge |
| Name | Description | Type |
|-----------------------------|----------------------------------------------------------|---------|
| cne_transport_events_received | Metric to get number of events received by the transport. | Gauge |
| cne_transport_events_published | Metric to get number of events published by the transport. | Gauge |
| cne_transport_connection_reset | Metric to get number of connection resets. | Gauge |
| cne_transport_sender | Metric to get number of sender created. | Gauge |
| cne_transport_receiver | Metric to get number of receiver created. | Gauge |
| cne_transport_status_check_published | Metric to get number of status check published by the transport | Gauge |

`cne_amqp_events_received` - The number of events received by the amqp protocol, and their status by address.
`cne_transport_events_received` - The number of events received by the transport protocol, and their status by address.

Example
```json
# HELP cne_amqp_events_received Metric to get number of events received by the transport
# TYPE cne_amqp_events_received gauge
cne_amqp_events_received{address="/news-service/finance",status="success"} 8
cne_amqp_events_received{address="/news-service/sports",status="success"} 8
# HELP cne_transport_events_received Metric to get number of events received by the transport
# TYPE cne_transport_events_received gauge
cne_transport_events_received{address="/news-service/finance",status="success"} 8
cne_transport_events_received{address="/news-service/sports",status="success"} 8
```

`cne_amqp_events_published` - This metrics indicates number of events that were published via amqp , grouped by address and status.
`cne_transport_events_published` - This metrics indicates number of events that were published via transport , grouped by address and status.

Example
```json
# HELP cne_amqp_events_published Metric to get number of events published by the transport
# TYPE cne_amqp_events_published gauge
cne_amqp_events_published{address="/news-service/finance",status="connection reset"} 1
cne_amqp_events_published{address="/news-service/finance",status="success"} 8
cne_amqp_events_published{address="/news-service/sports",status="connection reset"} 1
cne_amqp_events_published{address="/news-service/sports",status="success"} 8
# HELP cne_transport_events_published Metric to get number of events published by the transport
# TYPE cne_transport_events_published gauge
cne_transport_events_published{address="/news-service/finance",status="connection reset"} 1
cne_transport_events_published{address="/news-service/finance",status="success"} 8
cne_transport_events_published{address="/news-service/sports",status="connection reset"} 1
cne_transport_events_published{address="/news-service/sports",status="success"} 8
```

`cne_amqp_connection_reset` - This metrics indicates number of types amqp connection was reset
`cne_transport_connection_reset` - This metrics indicates number of types transport connection was reset

Example
```json
# HELP cne_amqp_connection_reset Metric to get number of connection resets
# TYPE cne_amqp_connection_reset gauge
cne_amqp_connection_reset 1
# HELP cne_transport_connection_reset Metric to get number of connection resets
# TYPE cne_transport_connection_reset gauge
cne_transport_connection_reset 1
```

`cne_amqp_sender` - This metrics indicates number of amqp sender objects were created , grouped by address and status.
`cne_transport_sender` - This metrics indicates number of transport sender objects were created , grouped by address and status.

Example
```json
# HELP cne_amqp_sender Metric to get number of sender active
# TYPE cne_amqp_sender gauge
cne_amqp_sender{address="/news-service/finance",status="active"} 1
cne_amqp_sender{address="/news-service/sports",status="active"} 1
# HELP cne_transport_sender Metric to get number of sender active
# TYPE cne_transport_sender gauge
cne_transport_sender{address="/news-service/finance",status="active"} 1
cne_transport_sender{address="/news-service/sports",status="active"} 1
```

`cne_amqp_receiver` - This metrics indicates number of amqp receiver objects were created, grouped by address and status.
`cne_transport_receiver` - This metrics indicates number of transport receiver objects were created, grouped by address and status.

Example
```json
# HELP cne_amqp_receiver Metric to get number of receiver active
# TYPE cne_amqp_receiver gauge
cne_amqp_receiver{address="/news-service/finance",status="active"} 1
cne_amqp_receiver{address="/news-service/sports",status="active"} 1
# HELP cne_transport_receiver Metric to get number of receiver active
# TYPE cne_transport_receiver gauge
cne_transport_receiver{address="/news-service/finance",status="active"} 1
cne_transport_receiver{address="/news-service/sports",status="active"} 1
```

`cne_amqp_status_check_published` - This metrics indicates status check that were published via amqp , grouped by address and status.
`cne_transport_status_check_published` - This metrics indicates status check that were published via transport , grouped by address and status.

Example
```json
# HELP cne_amqp_status_check_published Metric to get number of status check published by the transport
# TYPE cne_amqp_status_check_published gauge
cne_amqp_status_check_published{address="/news-service/finance/status",status="failed"} 1
cne_amqp_status_check_published{address="/news-service/sports/status",status="connection reset"} 1
cne_amqp_status_check_published{address="/news-service/sports/status",status="success"} 1
# HELP cne_transport_status_check_published Metric to get number of status check published by the transport
# TYPE cne_transport_status_check_published gauge
cne_transport_status_check_published{address="/news-service/finance/status",status="failed"} 1
cne_transport_status_check_published{address="/news-service/sports/status",status="connection reset"} 1
cne_transport_status_check_published{address="/news-service/sports/status",status="success"} 1
```


Expand Down
62 changes: 31 additions & 31 deletions pkg/localmetrics/localmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,89 +34,89 @@ const (

var (

//amqpEventReceivedCount ... Total no of events received by the transport
amqpEventReceivedCount = prometheus.NewGaugeVec(
//transportEventReceivedCount ... Total no of events received by the transport
transportEventReceivedCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cne_amqp_events_received",
Help: "Metric to get number of events received by the transport",
Name: "cne_transport_events_received",
Help: "Metric to get number of events received by the transport",
}, []string{"address", "status"})
//amqpEventPublishedCount ... Total no of events published by the transport
amqpEventPublishedCount = prometheus.NewGaugeVec(
//transportEventPublishedCount ... Total no of events published by the transport
transportEventPublishedCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cne_amqp_events_published",
Name: "cne_transport_events_published",
Help: "Metric to get number of events published by the transport",
}, []string{"address", "status"})

//amqpConnectionResetCount ... Total no of connection resets
amqpConnectionResetCount = prometheus.NewGaugeVec(
//transportConnectionResetCount ... Total no of connection resets
transportConnectionResetCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cne_amqp_connection_reset",
Name: "cne_transport_connection_reset",
Help: "Metric to get number of connection resets",
}, []string{})

//amqpSenderCount ... Total no of events published by the transport
amqpSenderCount = prometheus.NewGaugeVec(
//transportSenderCount ... Total no of events published by the transport
transportSenderCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cne_amqp_sender",
Name: "cne_transport_sender",
Help: "Metric to get number of sender created",
}, []string{"address", "status"})

//amqpReceiverCount ... Total no of events published by the transport
amqpReceiverCount = prometheus.NewGaugeVec(
//transportReceiverCount ... Total no of events published by the transport
transportReceiverCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cne_amqp_receiver",
Name: "cne_transport_receiver",
Help: "Metric to get number of receiver created",
}, []string{"address", "status"})

//amqpStatusCheckCount ... Total no of status check received by the transport
amqpStatusCheckCount = prometheus.NewGaugeVec(
//transportStatusCheckCount ... Total no of status check received by the transport
transportStatusCheckCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cne_amqp_status_check_published",
Name: "cne_transport_status_check_published",
Help: "Metric to get number of status check published by the transport",
}, []string{"address", "status"})
)

// RegisterMetrics ...
func RegisterMetrics() {
prometheus.MustRegister(amqpEventReceivedCount)
prometheus.MustRegister(amqpEventPublishedCount)
prometheus.MustRegister(amqpConnectionResetCount)
prometheus.MustRegister(amqpSenderCount)
prometheus.MustRegister(amqpReceiverCount)
prometheus.MustRegister(amqpStatusCheckCount)
prometheus.MustRegister(transportEventReceivedCount)
prometheus.MustRegister(transportEventPublishedCount)
prometheus.MustRegister(transportConnectionResetCount)
prometheus.MustRegister(transportSenderCount)
prometheus.MustRegister(transportReceiverCount)
prometheus.MustRegister(transportStatusCheckCount)
}

// UpdateTransportConnectionResetCount ...
func UpdateTransportConnectionResetCount(val int) {
amqpConnectionResetCount.With(prometheus.Labels{}).Add(float64(val))
transportConnectionResetCount.With(prometheus.Labels{}).Add(float64(val))
}

// UpdateEventReceivedCount ...
func UpdateEventReceivedCount(address string, status MetricStatus, val int) {
amqpEventReceivedCount.With(
transportEventReceivedCount.With(
prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val))
}

// UpdateEventCreatedCount ...
func UpdateEventCreatedCount(address string, status MetricStatus, val int) {
amqpEventPublishedCount.With(
transportEventPublishedCount.With(
prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val))
}

// UpdateStatusCheckCount ...
func UpdateStatusCheckCount(address string, status MetricStatus, val int) {
amqpEventPublishedCount.With(
transportEventPublishedCount.With(
prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val))
}

// UpdateSenderCreatedCount ...
func UpdateSenderCreatedCount(address string, status MetricStatus, val int) {
amqpSenderCount.With(
transportSenderCount.With(
prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val))
}

// UpdateReceiverCreatedCount ...
func UpdateReceiverCreatedCount(address string, status MetricStatus, val int) {
amqpReceiverCount.With(
transportReceiverCount.With(
prometheus.Labels{"address": address, "status": string(status)}).Add(float64(val))
}
67 changes: 37 additions & 30 deletions pkg/protocol/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package http
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"syscall"
"time"

"github.com/redhat-cne/sdk-go/pkg/errorhandler"
Expand All @@ -31,7 +33,7 @@ import (
)

var (
cancelTimeout = 500 * time.Millisecond
cancelTimeout = 2000 * time.Millisecond
retryTimeout = 500 * time.Millisecond
RequestReadHeaderTimeout = 2 * time.Second
)
Expand Down Expand Up @@ -153,6 +155,7 @@ func (h *Server) Start(wg *sync.WaitGroup) error {
log.Infof("deleting subscribers")
_ = h.subscriberAPI.DeleteClient(obj.ClientID)
h.DeleteSender(obj.ClientID)
out.Status = channel.SUCCESS
localmetrics.UpdateSenderCreatedCount(obj.GetEndPointURI(), localmetrics.ACTIVE, -1)
}
}
Expand Down Expand Up @@ -369,6 +372,16 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
select {
case d := <-h.DataIn: //skips publisher object processing
if d.Type == channel.SUBSCRIBER { // Listener means subscriber aka sender
if d.Status == channel.DELETE {
log.Infof("Deleting client %s", d.ClientID)
if dErr := h.subscriberAPI.DeleteClient(d.ClientID); dErr == nil {
h.DeleteSender(d.ClientID)
localmetrics.UpdateSenderCreatedCount(d.Address, localmetrics.ACTIVE, -1)
} else {
log.Errorf("Failed to delete subscriber %s", d.Address)
}
continue
}
// Post it to the address that has been specified : to target URL
subs := subscriber.New(h.clientID)
//Self URL
Expand Down Expand Up @@ -520,9 +533,14 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress, r
log.Infof("event genrated %s", e.String())
return
}

wg.Add(1)
go func(h *Server, clientAddress, resourceAddress string, eventType channel.Type, e *cloudevents.Event, wg *sync.WaitGroup, sender *Protocol) {
defer wg.Done()
if h.subscriberAPI.SubscriberMarkedForDelete(clientID) {
log.Infof("not posting event, subscriber %s is marked for delete due to inactivity ", clientAddress)
return
}
if sender == nil {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1)
return
Expand All @@ -532,22 +550,25 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress, r
if eventType == channel.EVENT {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1)
}
// has subscriber failed to connect for n times delete the subscribers
if h.subscriberAPI.IncFailCountToFail(clientID) {
h.DataOut <- &channel.DataChan{
ClientID: clientID,
Address: clientAddress,
Data: e,
Status: channel.DELETE,
Type: channel.SUBSCRIBER,
}
} else {
log.Errorf("client %s not responding, waiting %d times before marking to delete subscriber",
clientAddress, h.subscriberAPI.FailCountThreshold()-h.subscriberAPI.GetFailCount(clientID))
}
h.DataOut <- &channel.DataChan{
Address: resourceAddress,
Data: e,
Status: channel.FAILED,
Type: eventType,
}
// has subscriber failed to connect for n times delete the subscribers
if h.subscriberAPI.IncFailCountToFail(clientID) {
log.Errorf("client %s not responding, deleting subscription ", clientAddress)
h.DataOut <- &channel.DataChan{
Address: clientAddress,
Data: e,
Status: channel.DELETE,
Type: channel.SUBSCRIBER,
}
}
log.Errorf("connection lost addressing %s", clientAddress)
} else {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.SUCCESS, 1)
Expand Down Expand Up @@ -671,7 +692,7 @@ func (c *Protocol) Send(e cloudevents.Event) error {
e.SetDataContentType(cloudevents.ApplicationJSON)
ctx := cloudevents.ContextWithTarget(sendCtx, c.Protocol.Target.String())
result := c.Client.Send(ctx, e)
if cloudevents.IsUndelivered(result) {
if cloudevents.IsUndelivered(result) || errors.Is(result, syscall.ECONNREFUSED) {
log.Errorf("failed to send to address %s with %s", c.Protocol.Target.String(), result)
return fmt.Errorf("failed to send to address %s with error %s", c.Protocol.Target.String(), result.Error())
} else if !cloudevents.IsACK(result) {
Expand All @@ -688,8 +709,7 @@ func (c *Protocol) Send(e cloudevents.Event) error {
log.Infof("Sent with status code %d, result: %v", httpResult.StatusCode, result)
return fmt.Errorf(httpResult.Format, httpResult.Args...)
}
log.Printf("Send did not return an HTTP response: %s", result)
return fmt.Errorf("send did not return an HTTP response: %s", result)
return nil
}

// Get ... getter method
Expand Down Expand Up @@ -748,23 +768,10 @@ func Post(address string, e cloudevents.Event) error {
e.SetDataContentType(cloudevents.ApplicationJSON)
ctx := cloudevents.ContextWithTarget(sendCtx, address)
result := c.Send(ctx, e)
if cloudevents.IsUndelivered(result) {
// With current implementation of cloudevents we cannot get ack on delivered of not
if cloudevents.IsUndelivered(result) || errors.Is(result, syscall.ECONNREFUSED) {
log.Errorf("failed to send to address %s with %s", address, result)
return result
} else if !cloudevents.IsACK(result) {
log.Printf("sent: not accepted : %t", cloudevents.IsACK(result))
return result
}
var httpResult *cehttp.Result

if cloudevents.ResultAs(result, &httpResult) {
if httpResult.StatusCode == http.StatusOK {
log.Infof("sent with status code %d::%v", httpResult.StatusCode, result)
return nil
}
log.Printf("Sent with status code %d, result: %v", httpResult.StatusCode, result)
return fmt.Errorf(httpResult.Format, httpResult.Args...)
}
log.Printf("Send did not return an HTTP response: %s", result)
return fmt.Errorf("send did not return an HTTP response: %s", result)
return nil
}
Loading

0 comments on commit 7dbf075

Please sign in to comment.