Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement currentState API as per O-ran Spec (#51) #53

Merged
merged 1 commit into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/channel/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ type DataChan struct {
OnReceiveOverrideFn func(e cloudevents.Event, dataChan *DataChan) error
// ProcessEventFn Optional, this allows to customize message handler thar was received at the out channel
ProcessEventFn func(e interface{}) error
// Only for status lost if marshalled to json
StatusChan chan<- *StatusChan
ReturnAddress *string
}

// StatusChan channel used for writing status data out here
type StatusChan struct {
ID string
ClientID uuid.UUID
Data *cloudevents.Event
}

// CreateCloudEvents ...
Expand Down
157 changes: 107 additions & 50 deletions pkg/protocol/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -46,7 +47,6 @@ const (
DEFAULT ServiceResourcePath = ""
HEALTH ServiceResourcePath = "/health"
EVENT ServiceResourcePath = "/event"
STATUS ServiceResourcePath = "/status"
SUBSCRIPTION ServiceResourcePath = "/subscription"
)

Expand Down Expand Up @@ -94,6 +94,7 @@ func InitServer(serviceName string, port int, storePath string, dataIn <-chan *c
return uuid.NewMD5(namespace, url)
}(serviceName),
}
log.Infof(" registering publishing http service for client id %s", server.clientID.String())
return &server, nil
}

Expand Down Expand Up @@ -162,30 +163,6 @@ func (h *Server) Start(wg *sync.WaitGroup) error {
log.Errorf("failed to create subscription handler: %s", err.Error())
return err
}
statusHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(e cloudevents.Event) {
clientID, _ := uuid.Parse(e.ID())
out := channel.DataChan{
Address: e.Source(),
ClientID: clientID,
Status: channel.NEW,
Type: channel.STATUS, // could be new event of new subscriber (sender)
}

if h.statusReceiveOverrideFn != nil {
if err := h.statusReceiveOverrideFn(e, &out); err != nil {
out.Status = channel.FAILED
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1)
} else {
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1)
out.Status = channel.SUCCESS
}
}
h.DataOut <- &out
})
if err != nil {
log.Errorf("failed to create status handler: %s", err.Error())
return err
}
eventHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(e cloudevents.Event) {
out := channel.DataChan{
Address: e.Source(), // cloud event source is bus address
Expand All @@ -203,13 +180,51 @@ func (h *Server) Start(wg *sync.WaitGroup) error {

r := mux.NewRouter()

r.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
r.HandleFunc("/{resourceAddress:.*}/{clientID:.*}/CurrentState", func(w http.ResponseWriter, req *http.Request) {
params := mux.Vars(req)
clientID := params["clientID"]
resource := params["resourceAddress"]
clientUUID, parseError := uuid.Parse(clientID)

if parseError != nil || (resource == "" && clientID == "") {
_ = json.NewEncoder(w).Encode(map[string]bool{"ok": false})
}
out := channel.DataChan{
Address: resource,
ClientID: clientUUID,
Status: channel.NEW,
Type: channel.STATUS, // could be new event of new subscriber (sender)
}
e, _ := out.CreateCloudEvents("CurrentState")
e.SetSource(resource)
// statusReceiveOverrideFn must return value for
if h.statusReceiveOverrideFn != nil {
if statusErr := h.statusReceiveOverrideFn(*e, &out); statusErr != nil {
out.Status = channel.FAILED
//out.Data here has the event to be published send it back
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1)
_ = json.NewEncoder(w).Encode(map[string]string{"message": statusErr.Error()})
} else if out.Data != nil {
localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1)
out.Status = channel.SUCCESS
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(*out.Data)
} else {
out.Status = channel.FAILED
_ = json.NewEncoder(w).Encode(map[string]string{"message": "resource not found"})
}
} else {
out.Status = channel.FAILED
_ = json.NewEncoder(w).Encode(map[string]string{"message": "onReceive function not defined"})
}
}).Methods(http.MethodGet)

r.HandleFunc("/health", func(w http.ResponseWriter, req *http.Request) {
_ = json.NewEncoder(w).Encode(map[string]bool{"ok": true})
})

r.Handle("/event", eventHandler)
r.Handle("/subscription", subscriptionHandler)
r.Handle("/status", statusHandler)

err = r.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error {
pathTemplate, err := route.GetPathTemplate()
Expand Down Expand Up @@ -367,7 +382,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
ce.SetSource(d.Address)

if len(h.Publishers) > 0 {
for _, pubURL := range h.Publishers {
for _, pubURL := range h.Publishers { // if you call
if err := Post(fmt.Sprintf("%s/subscription", pubURL.String()), *ce); err != nil {
log.Errorf("(1)error creating: %v at %s with data %s=%s", err, pubURL.String(), ce.String(), ce.Data())
localmetrics.UpdateSenderCreatedCount(d.Address, localmetrics.ACTIVE, -1)
Expand Down Expand Up @@ -437,20 +452,48 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) {
// d.Address is resource address
// if its empty then Get all address and ID and create subscription object
//else Get only sub you are interested
d.ClientID = h.ClientID()
if ce, err := d.CreateCloudEvents(channel.STATUS.String()); err == nil {
if len(h.Publishers) > 0 { //TODO: support ping to targeted publishers
for _, pubURL := range h.Publishers {
// this is called form consumer, so sender object registered at consumer side
if err := Post(fmt.Sprintf("%s%s", pubURL.String(), STATUS), *ce); err != nil {
log.Infof("error sending events status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address) // this address is event address
sendToStatusChannel := func(d *channel.DataChan, e *cloudevents.Event, cID uuid.UUID) {
if d.StatusChan == nil {
return
}
defer func() {
if r := recover(); r != nil {
log.Infof("close channel: recovered in f %s", r)
}
}()
select {
case d.StatusChan <- &channel.StatusChan{
ClientID: cID,
Data: e,
}:
case <-time.After(1 * time.Second):
log.Info("timed out sending current state back to calling channel")
}
}
d.ClientID = h.clientID
if len(h.Publishers) > 0 { //TODO: support ping to targeted publishers
for _, pubURL := range h.Publishers {
stateURL := fmt.Sprintf("%s%s/%s/%s", pubURL.String(), d.Address, d.ClientID, "CurrentState")
// this is called form consumer, so sender object registered at consumer side
log.Infof("current state call :reaching out to %s", stateURL)
res, state, resErr := GetByte(stateURL)
log.Infof("response %s", string(res))
log.Infof("state %d", state)
log.Infof("resErr %s", resErr)
if resErr == nil && state == http.StatusOK {
var cloudEvent cloudevents.Event
if err := json.Unmarshal(res, &cloudEvent); err != nil {
sendToStatusChannel(d, nil, d.ClientID)
log.Infof("failed to send status ping to %s for %s", stateURL, d.Address)
} else {
log.Infof("successfully sent status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address)
sendToStatusChannel(d, &cloudEvent, d.ClientID)
log.Infof("success, status sent to %s for %s", stateURL, d.Address)
}
} else {
sendToStatusChannel(d, nil, d.ClientID)
log.Infof("failed to send status ping to %s for %s", stateURL, d.Address)
}
}
} else {
log.Errorf("error creating cloud events for status")
}
}
case <-h.CloseCh:
Expand Down Expand Up @@ -484,8 +527,6 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st
log.Errorf("failed to send(TO): %s result %v ", clientAddress, err)
if eventType == channel.EVENT {
localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1)
} else if eventType == channel.STATUS {
localmetrics.UpdateStatusCheckCount(clientAddress, localmetrics.FAILED, 1)
}
h.DataOut <- &channel.DataChan{
Address: clientAddress,
Expand Down Expand Up @@ -514,16 +555,12 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st
Type: eventType,
}
}
}(h, clientAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol, eventType channel.Type) *Protocol {
path := EVENT
if eventType == channel.STATUS {
path = STATUS
}
if s, ok := sender[path]; ok {
}(h, clientAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol) *Protocol {
if s, ok := sender[EVENT]; ok {
return s
}
return nil
}(sender, eventType))
}(sender))
}
}

Expand Down Expand Up @@ -578,7 +615,7 @@ func (h *Server) DeleteSender(key uuid.UUID) {
func (h *Server) NewSender(clientID uuid.UUID, address string) error {
l := map[ServiceResourcePath]*Protocol{}
h.SetSender(clientID, l)
for _, s := range []ServiceResourcePath{DEFAULT, HEALTH, EVENT, STATUS} {
for _, s := range []ServiceResourcePath{DEFAULT, HEALTH, EVENT} {
l[s] = &Protocol{}
//server.NewClient(host, []httpP.Option{})
targetURL := fmt.Sprintf("%s%s", address, s)
Expand Down Expand Up @@ -643,17 +680,37 @@ func Get(url string) (int, error) {
// using variable url is security hole. Do we need to fix this
response, errResp := http.Get(url)
if errResp != nil {
log.Warnf("return health check of the rest service for error %v", errResp)
log.Warnf("return rest service error %v", errResp)
return http.StatusBadRequest, errResp
}
if response != nil && response.StatusCode == http.StatusOK {
response.Body.Close()
log.Info("rest service returned healthy status")
return http.StatusOK, nil
}
return http.StatusInternalServerError, nil
}

// GetByte ... getter method
func GetByte(url string) ([]byte, int, error) {
log.Infof("health check %s ", url)
// using variable url is security hole. Do we need to fix this
response, errResp := http.Get(url)
if errResp != nil {
log.Warnf("return rest service error %v", errResp)
return []byte{}, http.StatusBadRequest, errResp
}
defer response.Body.Close()

if response.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(response.Body)
if err != nil {
return []byte{}, http.StatusBadRequest, err
}
return bodyBytes, http.StatusOK, nil
}
return []byte{}, http.StatusInternalServerError, nil
}

// Post ... This is used for internal posting from sidecar to rest api or
// used for lazy calls
func Post(address string, e cloudevents.Event) error {
Expand Down
Loading