Skip to content

Commit

Permalink
fix: sidecar
Browse files Browse the repository at this point in the history
  • Loading branch information
w-h-a committed Sep 20, 2024
1 parent 150b67d commit 21d93df
Showing 1 changed file with 10 additions and 25 deletions.
35 changes: 10 additions & 25 deletions sidecar/custom/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *customSidecar) ListStateFromStore(storeId string) ([]*store.Record, err
st, ok := c.options.Stores[storeId]
if !ok {
log.Warnf("store %s was not found", storeId)
return nil, nil
return nil, sidecar.ErrComponentNotFound
}

// TODO: limit + offset
Expand All @@ -79,7 +79,7 @@ func (s *customSidecar) SingleStateFromStore(storeId, key string) ([]*store.Reco
st, ok := s.options.Stores[storeId]
if !ok {
log.Warnf("store %s was not found", storeId)
return nil, nil
return nil, sidecar.ErrComponentNotFound
}

recs, err := st.Read(key)
Expand All @@ -94,7 +94,7 @@ func (s *customSidecar) RemoveStateFromStore(storeId, key string) error {
st, ok := s.options.Stores[storeId]
if !ok {
log.Warnf("store %s was not found", storeId)
return nil
return sidecar.ErrComponentNotFound
}

if err := st.Delete(key); err != nil {
Expand All @@ -106,11 +106,16 @@ func (s *customSidecar) RemoveStateFromStore(storeId, key string) error {

func (s *customSidecar) WriteEventToBroker(event *sidecar.Event) error {
if len(event.To) == 0 {
log.Warnf("event %#+event has no address", event)
return nil
}

if err := s.actOnEventFromService(event); err != nil {
return err
if len(event.Concurrent) > 0 {
s.sendEventToTargetsConcurrently(event)
} else {
if err := s.sendEventToTargetsSequentially(event); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -182,22 +187,6 @@ func (s *customSidecar) String() string {
return "custom"
}

func (s *customSidecar) actOnEventFromService(event *sidecar.Event) error {
if len(event.To) == 0 {
return nil
}

if len(event.Concurrent) > 0 {
s.sendEventToTargetsConcurrently(event)
} else {
if err := s.sendEventToTargetsSequentially(event); err != nil {
return err
}
}

return nil
}

func (s *customSidecar) sendEventToTargetsConcurrently(event *sidecar.Event) {
for _, target := range event.To {
go func() {
Expand Down Expand Up @@ -255,10 +244,6 @@ func (s *customSidecar) sendEventToService(event *sidecar.Event) error {
return err
}

if err := s.actOnEventFromService(rsp); err != nil {
return err
}

return nil
}

Expand Down

0 comments on commit 21d93df

Please sign in to comment.