Skip to content

Commit

Permalink
feat: minor support for rpc apps
Browse files Browse the repository at this point in the history
  • Loading branch information
w-h-a committed Aug 24, 2024
1 parent 5561e6a commit c9b3e48
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 25 deletions.
59 changes: 47 additions & 12 deletions sidecar/custom/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -48,11 +49,7 @@ func (s *customSidecar) SaveStateToStore(storeId string, state []*store.Record)
}

for _, record := range state {
key := fmt.Sprintf("%s:%s", s.options.Id, record.Key)
if err := st.Write(&store.Record{
Key: key,
Value: record.Value,
}); err != nil {
if err := st.Write(record); err != nil {
return err
}
}
Expand All @@ -66,9 +63,7 @@ func (s *customSidecar) RetrieveStateFromStore(storeId, key string) ([]*store.Re
return nil, nil
}

prefix := fmt.Sprintf("%s:%s", s.options.Id, key)

recs, err := st.Read(prefix, store.ReadWithPrefix())
recs, err := st.Read(key, store.ReadWithPrefix())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -185,11 +180,31 @@ func (s *customSidecar) sendEventToTarget(target string, event *sidecar.Event) e
}

func (s *customSidecar) postEventToApp(event *sidecar.Event) error {
url := fmt.Sprintf("http://%s:%s", s.options.ServiceName, s.options.ServicePort)
var rsp *sidecar.Event
var err error

rsp, err := s.sendEventViaHttp(s.options.ServiceName, s.options.ServiceName, s.options.ServicePort.Port, event.EventName, url, event)
if err != nil {
return err
if s.options.ServicePort.Protocol == "rpc" {
// TODO: refactor
url := fmt.Sprintf("%s:%s", s.options.ServiceName, s.options.ServicePort)

serviceTitle := strings.Title(s.options.ServiceName)

eventTitle := strings.Title(event.EventName)

// TODO: not a great assumption
method := fmt.Sprintf("%s.%s", serviceTitle, eventTitle)

rsp, err = s.sendEventViaRpc(s.options.ServiceName, s.options.ServiceName, s.options.ServicePort.Port, method, url, event)
if err != nil {
return err
}
} else {
url := fmt.Sprintf("http://%s:%s", s.options.ServiceName, s.options.ServicePort)

rsp, err = s.sendEventViaHttp(s.options.ServiceName, s.options.ServiceName, s.options.ServicePort.Port, event.EventName, url, event)
if err != nil {
return err
}
}

if rsp != nil {
Expand Down Expand Up @@ -221,6 +236,26 @@ func (s *customSidecar) sendEventViaHttp(namespace, name, port, endpoint, baseUr
return rsp, nil
}

func (s *customSidecar) sendEventViaRpc(namespace, name, port, method, baseUrl string, event *sidecar.Event) (*sidecar.Event, error) {
p, _ := strconv.Atoi(port)

req := s.options.RpcClient.NewRequest(
client.RequestWithNamespace(namespace),
client.RequestWithName(name),
client.RequestWithPort(p),
client.RequestWithMethod(method),
client.RequestWithUnmarshaledRequest(event),
)

rsp := &sidecar.Event{}

if err := s.options.RpcClient.Call(context.Background(), req, rsp, client.CallWithAddress(baseUrl)); err != nil {
return nil, err
}

return rsp, nil
}

func NewSidecar(opts ...sidecar.SidecarOption) sidecar.Sidecar {
options := sidecar.NewSidecarOptions(opts...)

Expand Down
8 changes: 0 additions & 8 deletions sidecar/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
type SidecarOption func(o *SidecarOptions)

type SidecarOptions struct {
Id string
ServiceName string
HttpPort Port
RpcPort Port
Expand All @@ -28,12 +27,6 @@ type Port struct {
Protocol string
}

func SidecarWithId(id string) SidecarOption {
return func(o *SidecarOptions) {
o.Id = id
}
}

func SidecarWithServiceName(n string) SidecarOption {
return func(o *SidecarOptions) {
o.ServiceName = n
Expand Down Expand Up @@ -84,7 +77,6 @@ func SidecarWithBrokers(b map[string]broker.Broker) SidecarOption {

func NewSidecarOptions(opts ...SidecarOption) SidecarOptions {
options := SidecarOptions{
Id: defaultID,
Stores: map[string]store.Store{},
Brokers: map[string]broker.Broker{},
Context: context.Background(),
Expand Down
5 changes: 0 additions & 5 deletions sidecar/sidecar.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package sidecar

import (
"github.com/google/uuid"
"github.com/w-h-a/pkg/store"
)

var (
defaultID = uuid.New().String()
)

type Sidecar interface {
Options() SidecarOptions
OnEventPublished(event *Event) error
Expand Down

0 comments on commit c9b3e48

Please sign in to comment.