From 4f4a81946028e25e104d347d5abcbe2a8164ab1b Mon Sep 17 00:00:00 2001 From: Aneesh Puttur Date: Fri, 30 Sep 2022 16:40:39 -0400 Subject: [PATCH] Implment GurrentState API as per O-ran Spec (#51) calling currentstate via api with endpoint /resource/currentse Signed-off-by: Aneesh Puttur Signed-off-by: Aneesh Puttur --- pkg/channel/data.go | 10 +++ pkg/protocol/http/http.go | 157 ++++++++++++++++++++++----------- pkg/protocol/http/http_test.go | 132 +++++++++++---------------- 3 files changed, 170 insertions(+), 129 deletions(-) diff --git a/pkg/channel/data.go b/pkg/channel/data.go index 3a95975..d3186cb 100644 --- a/pkg/channel/data.go +++ b/pkg/channel/data.go @@ -35,6 +35,16 @@ type DataChan struct { OnReceiveOverrideFn func(e cloudevents.Event, dataChan *DataChan) error // ProcessEventFn Optional, this allows to customize message handler thar was received at the out channel ProcessEventFn func(e interface{}) error + // Only for status lost if marshalled to json + StatusChan chan<- *StatusChan + ReturnAddress *string +} + +// StatusChan channel used for writing status data out here +type StatusChan struct { + ID string + ClientID uuid.UUID + Data *cloudevents.Event } // CreateCloudEvents ... diff --git a/pkg/protocol/http/http.go b/pkg/protocol/http/http.go index 1f711d6..708fa1f 100644 --- a/pkg/protocol/http/http.go +++ b/pkg/protocol/http/http.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "strings" "sync" @@ -46,7 +47,6 @@ const ( DEFAULT ServiceResourcePath = "" HEALTH ServiceResourcePath = "/health" EVENT ServiceResourcePath = "/event" - STATUS ServiceResourcePath = "/status" SUBSCRIPTION ServiceResourcePath = "/subscription" ) @@ -94,6 +94,7 @@ func InitServer(serviceName string, port int, storePath string, dataIn <-chan *c return uuid.NewMD5(namespace, url) }(serviceName), } + log.Infof(" registering publishing http service for client id %s", server.clientID.String()) return &server, nil } @@ -162,30 +163,6 @@ func (h *Server) Start(wg *sync.WaitGroup) error { log.Errorf("failed to create subscription handler: %s", err.Error()) return err } - statusHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(e cloudevents.Event) { - clientID, _ := uuid.Parse(e.ID()) - out := channel.DataChan{ - Address: e.Source(), - ClientID: clientID, - Status: channel.NEW, - Type: channel.STATUS, // could be new event of new subscriber (sender) - } - - if h.statusReceiveOverrideFn != nil { - if err := h.statusReceiveOverrideFn(e, &out); err != nil { - out.Status = channel.FAILED - localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1) - } else { - localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1) - out.Status = channel.SUCCESS - } - } - h.DataOut <- &out - }) - if err != nil { - log.Errorf("failed to create status handler: %s", err.Error()) - return err - } eventHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(e cloudevents.Event) { out := channel.DataChan{ Address: e.Source(), // cloud event source is bus address @@ -203,13 +180,51 @@ func (h *Server) Start(wg *sync.WaitGroup) error { r := mux.NewRouter() - r.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + r.HandleFunc("/{resourceAddress:.*}/{clientID:.*}/CurrentState", func(w http.ResponseWriter, req *http.Request) { + params := mux.Vars(req) + clientID := params["clientID"] + resource := params["resourceAddress"] + clientUUID, parseError := uuid.Parse(clientID) + + if parseError != nil || (resource == "" && clientID == "") { + _ = json.NewEncoder(w).Encode(map[string]bool{"ok": false}) + } + out := channel.DataChan{ + Address: resource, + ClientID: clientUUID, + Status: channel.NEW, + Type: channel.STATUS, // could be new event of new subscriber (sender) + } + e, _ := out.CreateCloudEvents("CurrentState") + e.SetSource(resource) + // statusReceiveOverrideFn must return value for + if h.statusReceiveOverrideFn != nil { + if statusErr := h.statusReceiveOverrideFn(*e, &out); statusErr != nil { + out.Status = channel.FAILED + //out.Data here has the event to be published send it back + localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.FAILED, 1) + _ = json.NewEncoder(w).Encode(map[string]string{"message": statusErr.Error()}) + } else if out.Data != nil { + localmetrics.UpdateStatusCheckCount(out.Address, localmetrics.SUCCESS, 1) + out.Status = channel.SUCCESS + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(*out.Data) + } else { + out.Status = channel.FAILED + _ = json.NewEncoder(w).Encode(map[string]string{"message": "resource not found"}) + } + } else { + out.Status = channel.FAILED + _ = json.NewEncoder(w).Encode(map[string]string{"message": "onReceive function not defined"}) + } + }).Methods(http.MethodGet) + + r.HandleFunc("/health", func(w http.ResponseWriter, req *http.Request) { _ = json.NewEncoder(w).Encode(map[string]bool{"ok": true}) }) r.Handle("/event", eventHandler) r.Handle("/subscription", subscriptionHandler) - r.Handle("/status", statusHandler) err = r.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error { pathTemplate, err := route.GetPathTemplate() @@ -367,7 +382,7 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { ce.SetSource(d.Address) if len(h.Publishers) > 0 { - for _, pubURL := range h.Publishers { + for _, pubURL := range h.Publishers { // if you call if err := Post(fmt.Sprintf("%s/subscription", pubURL.String()), *ce); err != nil { log.Errorf("(1)error creating: %v at %s with data %s=%s", err, pubURL.String(), ce.String(), ce.Data()) localmetrics.UpdateSenderCreatedCount(d.Address, localmetrics.ACTIVE, -1) @@ -437,20 +452,48 @@ func (h *Server) HTTPProcessor(wg *sync.WaitGroup) { // d.Address is resource address // if its empty then Get all address and ID and create subscription object //else Get only sub you are interested - d.ClientID = h.ClientID() - if ce, err := d.CreateCloudEvents(channel.STATUS.String()); err == nil { - if len(h.Publishers) > 0 { //TODO: support ping to targeted publishers - for _, pubURL := range h.Publishers { - // this is called form consumer, so sender object registered at consumer side - if err := Post(fmt.Sprintf("%s%s", pubURL.String(), STATUS), *ce); err != nil { - log.Infof("error sending events status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address) // this address is event address + sendToStatusChannel := func(d *channel.DataChan, e *cloudevents.Event, cID uuid.UUID) { + if d.StatusChan == nil { + return + } + defer func() { + if r := recover(); r != nil { + log.Infof("close channel: recovered in f %s", r) + } + }() + select { + case d.StatusChan <- &channel.StatusChan{ + ClientID: cID, + Data: e, + }: + case <-time.After(1 * time.Second): + log.Info("timed out sending current state back to calling channel") + } + } + d.ClientID = h.clientID + if len(h.Publishers) > 0 { //TODO: support ping to targeted publishers + for _, pubURL := range h.Publishers { + stateURL := fmt.Sprintf("%s%s/%s/%s", pubURL.String(), d.Address, d.ClientID, "CurrentState") + // this is called form consumer, so sender object registered at consumer side + log.Infof("current state call :reaching out to %s", stateURL) + res, state, resErr := GetByte(stateURL) + log.Infof("response %s", string(res)) + log.Infof("state %d", state) + log.Infof("resErr %s", resErr) + if resErr == nil && state == http.StatusOK { + var cloudEvent cloudevents.Event + if err := json.Unmarshal(res, &cloudEvent); err != nil { + sendToStatusChannel(d, nil, d.ClientID) + log.Infof("failed to send status ping to %s for %s", stateURL, d.Address) } else { - log.Infof("successfully sent status ping to %s for %s", fmt.Sprintf("%s%s", pubURL.String(), STATUS), d.Address) + sendToStatusChannel(d, &cloudEvent, d.ClientID) + log.Infof("success, status sent to %s for %s", stateURL, d.Address) } + } else { + sendToStatusChannel(d, nil, d.ClientID) + log.Infof("failed to send status ping to %s for %s", stateURL, d.Address) } } - } else { - log.Errorf("error creating cloud events for status") } } case <-h.CloseCh: @@ -484,8 +527,6 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st log.Errorf("failed to send(TO): %s result %v ", clientAddress, err) if eventType == channel.EVENT { localmetrics.UpdateEventCreatedCount(clientAddress, localmetrics.FAILED, 1) - } else if eventType == channel.STATUS { - localmetrics.UpdateStatusCheckCount(clientAddress, localmetrics.FAILED, 1) } h.DataOut <- &channel.DataChan{ Address: clientAddress, @@ -514,16 +555,12 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress st Type: eventType, } } - }(h, clientAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol, eventType channel.Type) *Protocol { - path := EVENT - if eventType == channel.STATUS { - path = STATUS - } - if s, ok := sender[path]; ok { + }(h, clientAddress, eventType, e, wg, func(sender map[ServiceResourcePath]*Protocol) *Protocol { + if s, ok := sender[EVENT]; ok { return s } return nil - }(sender, eventType)) + }(sender)) } } @@ -578,7 +615,7 @@ func (h *Server) DeleteSender(key uuid.UUID) { func (h *Server) NewSender(clientID uuid.UUID, address string) error { l := map[ServiceResourcePath]*Protocol{} h.SetSender(clientID, l) - for _, s := range []ServiceResourcePath{DEFAULT, HEALTH, EVENT, STATUS} { + for _, s := range []ServiceResourcePath{DEFAULT, HEALTH, EVENT} { l[s] = &Protocol{} //server.NewClient(host, []httpP.Option{}) targetURL := fmt.Sprintf("%s%s", address, s) @@ -643,17 +680,37 @@ func Get(url string) (int, error) { // using variable url is security hole. Do we need to fix this response, errResp := http.Get(url) if errResp != nil { - log.Warnf("return health check of the rest service for error %v", errResp) + log.Warnf("return rest service error %v", errResp) return http.StatusBadRequest, errResp } if response != nil && response.StatusCode == http.StatusOK { response.Body.Close() - log.Info("rest service returned healthy status") return http.StatusOK, nil } return http.StatusInternalServerError, nil } +// GetByte ... getter method +func GetByte(url string) ([]byte, int, error) { + log.Infof("health check %s ", url) + // using variable url is security hole. Do we need to fix this + response, errResp := http.Get(url) + if errResp != nil { + log.Warnf("return rest service error %v", errResp) + return []byte{}, http.StatusBadRequest, errResp + } + defer response.Body.Close() + + if response.StatusCode == http.StatusOK { + bodyBytes, err := io.ReadAll(response.Body) + if err != nil { + return []byte{}, http.StatusBadRequest, err + } + return bodyBytes, http.StatusOK, nil + } + return []byte{}, http.StatusInternalServerError, nil +} + // Post ... This is used for internal posting from sidecar to rest api or // used for lazy calls func Post(address string, e cloudevents.Event) error { diff --git a/pkg/protocol/http/http_test.go b/pkg/protocol/http/http_test.go index 468b672..58899b8 100644 --- a/pkg/protocol/http/http_test.go +++ b/pkg/protocol/http/http_test.go @@ -3,6 +3,7 @@ package http_test import ( "encoding/json" "fmt" + "io/ioutil" "net/http" "net/url" "os" @@ -59,6 +60,25 @@ var ( _type = string(ptp.PtpStateChange) ) +// CloudEvents return cloud events objects +func CloudEvents() cloudevents.Event { + e := cloudevents.Event{ + Context: cloudevents.EventContextV1{ + Type: _type, + Source: ceSource, + ID: "full-event", + Time: &ceTimestamp, + DataSchema: &ceSchema, + Subject: strptr("topic"), + }.AsV1(), + } + cne := CloudNativeEvents() + + _ = e.SetData(cloudevents.ApplicationJSON, cne.Data) + + return e +} + // CloudNativeEvents generates cloud events for testing func CloudNativeEvents() cneevent.Event { data := cneevent.Data{} @@ -84,37 +104,8 @@ func CloudNativeEvents() cneevent.Event { return cne } -//CloudEvents return cloud events objects -func CloudEvents() cloudevents.Event { - data := cneevent.Data{} - value := cneevent.DataValue{ - Resource: subscriptionOne.Resource, - DataType: cneevent.NOTIFICATION, - ValueType: cneevent.ENUMERATION, - Value: ptp.ACQUIRING_SYNC, - } - data.SetVersion("1.0") //nolint:errcheck - data.AppendValues(value) //nolint:errcheck - - e := cloudevents.Event{ - Context: cloudevents.EventContextV1{ - Type: _type, - Source: ceSource, - ID: "full-event", - Time: &ceTimestamp, - DataSchema: &ceSchema, - Subject: strptr("topic"), - }.AsV1(), - } - cne := CloudNativeEvents() - - _ = e.SetData(cloudevents.ApplicationJSON, cne.Data) - - return e -} - // client registers with server and ask for status , also receive any event that was generated -func createClient(t *testing.T, clientS *ceHttp.Server, closeCh chan struct{}, withStatus bool, clientOutChannel chan *channel.DataChan) { +func createClient(t *testing.T, clientS *ceHttp.Server, closeCh chan struct{}, clientOutChannel chan *channel.DataChan) { in := make(chan *channel.DataChan, 10) var err error assert.Nil(t, clientS) @@ -135,14 +126,7 @@ func createClient(t *testing.T, clientS *ceHttp.Server, closeCh chan struct{}, w Type: channel.SUBSCRIBER, } time.Sleep(250 * time.Millisecond) - if withStatus { - // ping for status, this will send the status check ping to the address - in <- &channel.DataChan{ - Address: subscriptionOne.Resource, - Status: channel.NEW, - Type: channel.STATUS, - } - } + <-closeCh } func TestSubscribeCreated(t *testing.T) { @@ -159,7 +143,7 @@ func TestSubscribeCreated(t *testing.T) { assert.Nil(t, err) server.HTTPProcessor(&wg) var clientS *ceHttp.Server - go createClient(t, clientS, closeCh, false, eventChannel) + go createClient(t, clientS, closeCh, eventChannel) time.Sleep(500 * time.Millisecond) <-out assert.Equal(t, 1, len(server.Sender)) @@ -187,7 +171,7 @@ func TestSendEvent(t *testing.T) { server.HTTPProcessor(&wg) time.Sleep(500 * time.Millisecond) var clientS *ceHttp.Server - go createClient(t, clientS, closeCh, false, clientOutChannel) + go createClient(t, clientS, closeCh, clientOutChannel) // read what server has in outChannel <-out time.Sleep(500 * time.Millisecond) @@ -222,13 +206,12 @@ func TestSendEvent(t *testing.T) { close(closeCh) } -func TestSendSuccessStatus(t *testing.T) { +func TestSendSuccess(t *testing.T) { //time.Sleep(250 * time.Millisecond) //closeClient := make(chan struct{}) //createClient(clientAddress, closeClient) - e := CloudEvents() in := make(chan *channel.DataChan) out := make(chan *channel.DataChan) clientOutChannel := make(chan *channel.DataChan) @@ -244,38 +227,17 @@ func TestSendSuccessStatus(t *testing.T) { }, nil) assert.Nil(t, err) wg := sync.WaitGroup{} - // Start the server and channel proceesor + // Start the server and channel processor err = server.Start(&wg) assert.Nil(t, err) server.HTTPProcessor(&wg) // create a sender var clientS *ceHttp.Server - go createClient(t, clientS, closeCh, true, clientOutChannel) - <-out + go createClient(t, clientS, closeCh, clientOutChannel) time.Sleep(500 * time.Millisecond) - assert.Equal(t, 1, len(server.Sender)) - // read what client put in out channel - select { - case d := <-clientOutChannel: - assert.Equal(t, channel.SUBSCRIBER, d.Type) - assert.Equal(t, channel.SUCCESS, d.Status) - case <-time.After(1 * time.Second): - log.Infof("timeout reading out channel ") - } - - select { - case d := <-clientOutChannel: - assert.Equal(t, channel.EVENT, d.Type) - assert.Equal(t, channel.NEW, d.Status) - case <-time.After(1 * time.Second): - log.Infof("timeout reading out channel") - } <-out - dd := cneevent.Data{} - err = json.Unmarshal(e.Data(), &dd) - assert.Nil(t, err) - assert.Equal(t, dd.Version, "1.0") + assert.Equal(t, 1, len(server.Sender)) close(closeCh) //waitTimeout(&wg, timeout) } @@ -324,11 +286,13 @@ func TestSender(t *testing.T) { close(closeCh) } -func TestPing(t *testing.T) { +func TestStatus(t *testing.T) { in := make(chan *channel.DataChan) out := make(chan *channel.DataChan) closeCh := make(chan struct{}) onStatusReceiveOverrideFn := func(e event.Event, d *channel.DataChan) error { + ce := CloudEvents() + d.Data = &ce return nil } server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, onStatusReceiveOverrideFn, nil) @@ -344,21 +308,31 @@ func TestPing(t *testing.T) { err = server.NewSender(serverClientID, serverAddress.String()) assert.Nil(t, err) // send status ping - in <- &channel.DataChan{ - Address: subscriptionOne.Resource, - ClientID: serverClientID, - Status: channel.NEW, - Type: channel.STATUS, + hClient := &http.Client{ + Transport: &http.Transport{ + MaxIdleConnsPerHost: 20, + }, + Timeout: 10 * time.Second, } - select { - case d := <-out: - assert.Equal(t, channel.STATUS, d.Type) - assert.Equal(t, channel.SUCCESS, d.Status) - assert.Equal(t, serverClientID, server.ClientID()) - case <-time.After(1 * time.Second): - log.Infof("timeout reading out channel ") + requestURL := fmt.Sprintf("%s/%s/%s/CurrentState", serverAddress.String(), subscriptionOne.Resource, clientClientID) + log.Printf(requestURL) + req, err := http.NewRequest("GET", requestURL, nil) + assert.Nil(t, err) + req.Header.Set("Content-Type", "application/json") + resp, err := hClient.Do(req) + assert.Nil(t, err) + defer resp.Body.Close() + bodyBytes, err := ioutil.ReadAll(resp.Body) + assert.Nil(t, err) + ce := cloudevents.Event{} + err = json.Unmarshal(bodyBytes, &ce) + log.Info(string(bodyBytes)) + if e, ok := err.(*json.SyntaxError); ok { + log.Infof("syntax error at byte offset %d", e.Offset) } + assert.Nil(t, err) + close(closeCh) }