Skip to content

Commit

Permalink
Implment GurrentState API as per O-ran Spec (#51)
Browse files Browse the repository at this point in the history
calling currentstate via api with endpoint /resource/currentse

Signed-off-by: Aneesh Puttur <[email protected]>

Signed-off-by: Aneesh Puttur <[email protected]>
  • Loading branch information
aneeshkp authored Sep 30, 2022
1 parent 02b0318 commit 4f4a819
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 129 deletions.
10 changes: 10 additions & 0 deletions pkg/channel/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ type DataChan struct {
OnReceiveOverrideFn func(e cloudevents.Event, dataChan *DataChan) error
// ProcessEventFn Optional, this allows to customize message handler thar was received at the out channel
ProcessEventFn func(e interface{}) error
// Only for status lost if marshalled to json
StatusChan chan<- *StatusChan
ReturnAddress *string
}

// StatusChan channel used for writing status data out here
type StatusChan struct {
ID string
ClientID uuid.UUID
Data *cloudevents.Event
}

// CreateCloudEvents ...
Expand Down
157 changes: 107 additions & 50 deletions pkg/protocol/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -46,7 +47,6 @@ const (
DEFAULT ServiceResourcePath = ""
HEALTH ServiceResourcePath = "/health"
EVENT ServiceResourcePath = "/event"
STATUS ServiceResourcePath = "/status"
SUBSCRIPTION ServiceResourcePath = "/subscription"
)

Expand Down Expand Up @@ -94,6 +94,7 @@ func InitServer(serviceName string, port int, storePath string, dataIn <-chan *c
return uuid.NewMD5(namespace, url)
}(serviceName),
}
log.Infof(" registering publishing http service for client id %s", server.clientID.String())
return &server, nil
}

Expand Down Expand Up @@ -162,30 +163,6 @@ func (h *Server) Start(wg *sync.WaitGroup) error {
log.Errorf("failed to create subscription handler: %s", err.Error())
return err
}
statusHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(e cloudevents.Event) {
clientID, _ := uuid.Parse(e.ID())
out := channel.DataChan{
Address: e.Source(),
ClientID: clientID,
Status: channel.NEW,
Type: channel.STATUS, // could be new event of new subscriber (sender)
}

if h.statusReceiveOverrideFn != nil {
if err := h.statusReceiveOverrideFn(e, &out); err != nil {
out.Status = channel.FAILED
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1)
} else {
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1)
out.Status = channel.SUCCESS
}
}
h.DataOut <- &out
})
if err != nil {
log.Errorf("failed to create status handler: %s", err.Error())
return err
}
eventHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(e cloudevents.Event) {
out := channel.DataChan{
Address: e.Source(), // cloud event source is bus address
Expand All @@ -203,13 +180,51 @@ func (h *Server) Start(wg *sync.WaitGroup) error {

r := mux.NewRouter()

r.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
r.HandleFunc("/{resourceAddress:.*}/{clientID:.*}/CurrentState", func(w http.ResponseWriter, req *http.Request) {
params := mux.Vars(req)
clientID := params["clientID"]
resource := params["resourceAddress"]
clientUUID, parseError := uuid.Parse(clientID)

if parseError != nil || (resource == "" && clientID == "") {
_ = json.NewEncoder(w).Encode(map[string]bool{"ok": false})
}
out := channel.DataChan{
Address: resource,
ClientID: clientUUID,
Status: channel.NEW,
Type: channel.STATUS, // could be new event of new subscriber (sender)
}
e, _ := out.CreateCloudEvents("CurrentState")
e.SetSource(resource)
// statusReceiveOverrideFn must return value for
if h.statusReceiveOverrideFn != nil {
if statusErr := h.statusReceiveOverrideFn(*e, &out); statusErr != nil {
out.Status = channel.FAILED
//out.Data here has the event to be published send it back
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1)
_ = json.NewEncoder(w).Encode(map[string]string{"message": statusErr.Error()})
} else if out.Data != nil {
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1)
out.Status = channel.SUCCESS
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(*out.Data)
} else {
out.Status = channel.FAILED
_ = json.NewEncoder(w).Encode(map[string]string{"message": "resource not found"})
}
} else {
out.Status = channel.FAILED
_ = json.NewEncoder(w).Encode(map[string]string{"message": "onReceive function not defined"})
}
}).Methods(http.MethodGet)

r.HandleFunc("/health", func(w http.ResponseWriter, req *http.Request) {
_ = json.NewEncoder(w).Encode(map[string]bool{"ok": true})
})

r.Handle("/event", eventHandler)
r.Handle("/subscription", subscriptionHandler)
r.Handle("/status", statusHandler)

err = r.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error {
pathTemplate, err := route.GetPathTemplate()
Expand Down Expand Up @@ -367,7 +382,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
ce.SetSource(d.Address)

if len(h.Publishers) > 0 {
for _, pubURL := range h.Publishers {
for _, pubURL := range h.Publishers { // if you call
if err := Post(fmt.Sprintf("%s/subscription", pubURL.String()), *ce); err != nil {
log.Errorf("(1)error creating: %v at %s with data %s=%s", err, pubURL.String(), ce.String(), ce.Data())
localmetrics.UpdateSenderCreatedCount(d.Address, localmetrics.ACTIVE, -1)
Expand Down Expand Up @@ -437,20 +452,48 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
// d.Address is resource address
// if its empty then Get all address and ID and create subscription object
//else Get only sub you are interested
d.ClientID = h.ClientID()
if ce, err := d.CreateCloudEvents(channel.STATUS.String()); err == nil {
if len(h.Publishers) > 0 { //TODO: support ping to targeted publishers
for _, pubURL := range h.Publishers {
// this is called form consumer, so sender object registered at consumer side
if err := Post(fmt.Sprintf("%s%s", pubURL.String(), STATUS), *ce); err != nil {
log.Infof("error sending events status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address) // this address is event address
sendToStatusChannel := func(d *channel.DataChan, e *cloudevents.Event, cID uuid.UUID) {
if d.StatusChan == nil {
return
}
defer func() {
if r := recover(); r != nil {
log.Infof("close channel: recovered in f %s", r)
}
}()
select {
case d.StatusChan <- &channel.StatusChan{
ClientID: cID,
Data: e,
}:
case <-time.After(1 * time.Second):
log.Info("timed out sending current state back to calling channel")
}
}
d.ClientID = h.clientID
if len(h.Publishers) > 0 { //TODO: support ping to targeted publishers
for _, pubURL := range h.Publishers {
stateURL := fmt.Sprintf("%s%s/%s/%s", pubURL.String(), d.Address, d.ClientID, "CurrentState")
// this is called form consumer, so sender object registered at consumer side
log.Infof("current state call :reaching out to %s", stateURL)
res, state, resErr := GetByte(stateURL)
log.Infof("response %s", string(res))
log.Infof("state %d", state)
log.Infof("resErr %s", resErr)
if resErr == nil && state == http.StatusOK {
var cloudEvent cloudevents.Event
if err := json.Unmarshal(res, &cloudEvent); err != nil {
sendToStatusChannel(d, nil, d.ClientID)
log.Infof("failed to send status ping to %s for %s", stateURL, d.Address)
} else {
log.Infof("successfully sent status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address)
sendToStatusChannel(d, &cloudEvent, d.ClientID)
log.Infof("success, status sent to %s for %s", stateURL, d.Address)
}
} else {
sendToStatusChannel(d, nil, d.ClientID)
log.Infof("failed to send status ping to %s for %s", stateURL, d.Address)
}
}
} else {
log.Errorf("error creating cloud events for status")
}
}
case <-h.CloseCh:
Expand Down Expand Up @@ -484,8 +527,6 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st
log.Errorf("failed to send(TO): %s result %v ", clientAddress, err)
if eventType == channel.EVENT {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1)
} else if eventType == channel.STATUS {
localmetrics.UpdateStatusCheckCount(clientAddress, localmetrics.FAILED, 1)
}
h.DataOut <- &channel.DataChan{
Address: clientAddress,
Expand Down Expand Up @@ -514,16 +555,12 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st
Type: eventType,
}
}
}(h, clientAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol, eventType channel.Type) *Protocol {
path := EVENT
if eventType == channel.STATUS {
path = STATUS
}
if s, ok := sender[path]; ok {
}(h, clientAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol) *Protocol {
if s, ok := sender[EVENT]; ok {
return s
}
return nil
}(sender, eventType))
}(sender))
}
}

Expand Down Expand Up @@ -578,7 +615,7 @@ func (h *Server) DeleteSender(key uuid.UUID) {
func (h *Server) NewSender(clientID uuid.UUID, address string) error {
l := map[ServiceResourcePath]*Protocol{}
h.SetSender(clientID, l)
for _, s := range []ServiceResourcePath{DEFAULT, HEALTH, EVENT, STATUS} {
for _, s := range []ServiceResourcePath{DEFAULT, HEALTH, EVENT} {
l[s] = &Protocol{}
//server.NewClient(host, []httpP.Option{})
targetURL := fmt.Sprintf("%s%s", address, s)
Expand Down Expand Up @@ -643,17 +680,37 @@ func Get(url string) (int, error) {
// using variable url is security hole. Do we need to fix this
response, errResp := http.Get(url)
if errResp != nil {
log.Warnf("return health check of the rest service for error %v", errResp)
log.Warnf("return rest service error %v", errResp)
return http.StatusBadRequest, errResp
}
if response != nil && response.StatusCode == http.StatusOK {
response.Body.Close()
log.Info("rest service returned healthy status")
return http.StatusOK, nil
}
return http.StatusInternalServerError, nil
}

// GetByte ... getter method
func GetByte(url string) ([]byte, int, error) {
log.Infof("health check %s ", url)
// using variable url is security hole. Do we need to fix this
response, errResp := http.Get(url)
if errResp != nil {
log.Warnf("return rest service error %v", errResp)
return []byte{}, http.StatusBadRequest, errResp
}
defer response.Body.Close()

if response.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
return []byte{}, http.StatusBadRequest, err
}
return bodyBytes, http.StatusOK, nil
}
return []byte{}, http.StatusInternalServerError, nil
}

// Post ... This is used for internal posting from sidecar to rest api or
// used for lazy calls
func Post(address string, e cloudevents.Event) error {
Expand Down
Loading

0 comments on commit 4f4a819

Please sign in to comment.