From c9b3e48a82917b3623cbe9983c5bbb4113723caa Mon Sep 17 00:00:00 2001 From: w-h-a Date: Sat, 24 Aug 2024 15:31:40 -0700 Subject: [PATCH] feat: minor support for rpc apps --- sidecar/custom/sidecar.go | 59 +++++++++++++++++++++++++++++++-------- sidecar/options.go | 8 ------ sidecar/sidecar.go | 5 ---- 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/sidecar/custom/sidecar.go b/sidecar/custom/sidecar.go index d914c0d..cbeda4f 100644 --- a/sidecar/custom/sidecar.go +++ b/sidecar/custom/sidecar.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strconv" + "strings" "sync" "time" @@ -48,11 +49,7 @@ func (s *customSidecar) SaveStateToStore(storeId string, state []*store.Record) } for _, record := range state { - key := fmt.Sprintf("%s:%s", s.options.Id, record.Key) - if err := st.Write(&store.Record{ - Key: key, - Value: record.Value, - }); err != nil { + if err := st.Write(record); err != nil { return err } } @@ -66,9 +63,7 @@ func (s *customSidecar) RetrieveStateFromStore(storeId, key string) ([]*store.Re return nil, nil } - prefix := fmt.Sprintf("%s:%s", s.options.Id, key) - - recs, err := st.Read(prefix, store.ReadWithPrefix()) + recs, err := st.Read(key, store.ReadWithPrefix()) if err != nil { return nil, err } @@ -185,11 +180,31 @@ func (s *customSidecar) sendEventToTarget(target string, event *sidecar.Event) e } func (s *customSidecar) postEventToApp(event *sidecar.Event) error { - url := fmt.Sprintf("http://%s:%s", s.options.ServiceName, s.options.ServicePort) + var rsp *sidecar.Event + var err error - rsp, err := s.sendEventViaHttp(s.options.ServiceName, s.options.ServiceName, s.options.ServicePort.Port, event.EventName, url, event) - if err != nil { - return err + if s.options.ServicePort.Protocol == "rpc" { + // TODO: refactor + url := fmt.Sprintf("%s:%s", s.options.ServiceName, s.options.ServicePort) + + serviceTitle := strings.Title(s.options.ServiceName) + + eventTitle := strings.Title(event.EventName) + + // TODO: not a great assumption + method := fmt.Sprintf("%s.%s", serviceTitle, eventTitle) + + rsp, err = s.sendEventViaRpc(s.options.ServiceName, s.options.ServiceName, s.options.ServicePort.Port, method, url, event) + if err != nil { + return err + } + } else { + url := fmt.Sprintf("http://%s:%s", s.options.ServiceName, s.options.ServicePort) + + rsp, err = s.sendEventViaHttp(s.options.ServiceName, s.options.ServiceName, s.options.ServicePort.Port, event.EventName, url, event) + if err != nil { + return err + } } if rsp != nil { @@ -221,6 +236,26 @@ func (s *customSidecar) sendEventViaHttp(namespace, name, port, endpoint, baseUr return rsp, nil } +func (s *customSidecar) sendEventViaRpc(namespace, name, port, method, baseUrl string, event *sidecar.Event) (*sidecar.Event, error) { + p, _ := strconv.Atoi(port) + + req := s.options.RpcClient.NewRequest( + client.RequestWithNamespace(namespace), + client.RequestWithName(name), + client.RequestWithPort(p), + client.RequestWithMethod(method), + client.RequestWithUnmarshaledRequest(event), + ) + + rsp := &sidecar.Event{} + + if err := s.options.RpcClient.Call(context.Background(), req, rsp, client.CallWithAddress(baseUrl)); err != nil { + return nil, err + } + + return rsp, nil +} + func NewSidecar(opts ...sidecar.SidecarOption) sidecar.Sidecar { options := sidecar.NewSidecarOptions(opts...) diff --git a/sidecar/options.go b/sidecar/options.go index 4cb95fc..c0a884c 100644 --- a/sidecar/options.go +++ b/sidecar/options.go @@ -11,7 +11,6 @@ import ( type SidecarOption func(o *SidecarOptions) type SidecarOptions struct { - Id string ServiceName string HttpPort Port RpcPort Port @@ -28,12 +27,6 @@ type Port struct { Protocol string } -func SidecarWithId(id string) SidecarOption { - return func(o *SidecarOptions) { - o.Id = id - } -} - func SidecarWithServiceName(n string) SidecarOption { return func(o *SidecarOptions) { o.ServiceName = n @@ -84,7 +77,6 @@ func SidecarWithBrokers(b map[string]broker.Broker) SidecarOption { func NewSidecarOptions(opts ...SidecarOption) SidecarOptions { options := SidecarOptions{ - Id: defaultID, Stores: map[string]store.Store{}, Brokers: map[string]broker.Broker{}, Context: context.Background(), diff --git a/sidecar/sidecar.go b/sidecar/sidecar.go index 3951494..e1bce83 100644 --- a/sidecar/sidecar.go +++ b/sidecar/sidecar.go @@ -1,14 +1,9 @@ package sidecar import ( - "github.com/google/uuid" "github.com/w-h-a/pkg/store" ) -var ( - defaultID = uuid.New().String() -) - type Sidecar interface { Options() SidecarOptions OnEventPublished(event *Event) error