Skip to content

Commit

Permalink
error event response is only published to the local websocket (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
adnaan authored Mar 15, 2024
1 parent e03a30e commit 9ccfeaa
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 42 deletions.
12 changes: 7 additions & 5 deletions examples/chirper/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func insertChirp(ctx fir.RouteContext, db *bolthold.Store) (*Chirp, error) {
}

type queryReq struct {
Order string `json:"order" schema:"order"`
Search string `json:"search" schema:"search"`
Offset int `json:"offset" schema:"offset"`
Limit int `json:"limit" schema:"limit"`
Order string `json:"order"`
Search string `json:"search"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}

func loadChirps(db *bolthold.Store) fir.OnEventFunc {
Expand All @@ -46,7 +46,9 @@ func loadChirps(db *bolthold.Store) fir.OnEventFunc {
return err
}
var chirps []Chirp
if err := db.Find(&chirps, &bolthold.Query{}); err != nil {
q := &bolthold.Query{}
q = q.SortBy("CreatedAt").Reverse()
if err := db.Find(&chirps, q); err != nil {
return err
}
return ctx.Data(map[string]any{"chirps": chirps})
Expand Down
52 changes: 29 additions & 23 deletions route.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,22 @@ func writeAndPublishEvents(ctx RouteContext) eventPublisher {
if err != nil {
logger.Debugf("error publishing patch: %v", err)
}
events := renderDOMEvents(ctx, pubsubEvent)

eventsData, err := json.Marshal(events)
if err != nil {
logger.Errorf("error marshaling patch: %v", err)
return err
}
ctx.response.Write(eventsData)
return nil
return writeEventHTTP(ctx, pubsubEvent)
}
}

func writeEventHTTP(ctx RouteContext, event pubsub.Event) error {
events := renderDOMEvents(ctx, event)
eventsData, err := json.Marshal(events)
if err != nil {
logger.Errorf("error marshaling patch: %v", err)
return err
}
ctx.response.Write(eventsData)
return nil
}

// set route template concurrency safe
func (rt *route) setTemplate(t *template.Template) {
rt.template = t
Expand Down Expand Up @@ -340,7 +344,11 @@ func (rt *route) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

handleOnEventResult(onEventFunc(eventCtx), eventCtx, writeAndPublishEvents(eventCtx))
// error event is not published
errorEvent := handleOnEventResult(onEventFunc(eventCtx), eventCtx, writeAndPublishEvents(eventCtx))
if errorEvent != nil {
writeEventHTTP(eventCtx, *errorEvent)
}

} else {
// postForm
Expand Down Expand Up @@ -412,7 +420,7 @@ func (rt *route) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func handleOnEventResult(err error, ctx RouteContext, publish eventPublisher) {
func handleOnEventResult(err error, ctx RouteContext, publish eventPublisher) *pubsub.Event {
target := ""
if ctx.event.Target != nil {
target = *ctx.event.Target
Expand All @@ -425,7 +433,7 @@ func handleOnEventResult(err error, ctx RouteContext, publish eventPublisher) {
ElementKey: ctx.event.ElementKey,
SessionID: ctx.event.SessionID,
})
return
return nil
}

switch errVal := err.(type) {
Expand All @@ -434,31 +442,29 @@ func handleOnEventResult(err error, ctx RouteContext, publish eventPublisher) {
ctx.event.ID: firErrors.User(errVal.Err).Error(),
"onevent": firErrors.User(errVal.Err).Error(),
}
publish(pubsub.Event{
return &pubsub.Event{
ID: &ctx.event.ID,
State: eventstate.Error,
Target: &target,
ElementKey: ctx.event.ElementKey,
Detail: &dom.Detail{Data: errs},
SessionID: ctx.event.SessionID,
})
return
}
case *firErrors.Fields:
fieldErrorsData := *errVal
fieldErrors := make(map[string]any)
for field, err := range fieldErrorsData {
fieldErrors[field] = err.Error()
}
errs := map[string]any{ctx.event.ID: fieldErrors}
publish(pubsub.Event{
return &pubsub.Event{
ID: &ctx.event.ID,
State: eventstate.Error,
Target: &target,
ElementKey: ctx.event.ElementKey,
Detail: &dom.Detail{Data: errs},
SessionID: ctx.event.SessionID,
})
return
}
case *routeData:
publish(pubsub.Event{
ID: &ctx.event.ID,
Expand All @@ -468,7 +474,7 @@ func handleOnEventResult(err error, ctx RouteContext, publish eventPublisher) {
Detail: &dom.Detail{Data: *errVal},
SessionID: ctx.event.SessionID,
})
return
return nil

case *routeDataWithState:
publish(pubsub.Event{
Expand All @@ -479,7 +485,7 @@ func handleOnEventResult(err error, ctx RouteContext, publish eventPublisher) {
Detail: &dom.Detail{Data: *errVal.routeData, State: *errVal.stateData},
SessionID: ctx.event.SessionID,
})
return
return nil
case *stateData:
publish(pubsub.Event{
ID: &ctx.event.ID,
Expand All @@ -489,21 +495,21 @@ func handleOnEventResult(err error, ctx RouteContext, publish eventPublisher) {
Detail: &dom.Detail{State: *errVal},
SessionID: ctx.event.SessionID,
})
return
return nil
default:
errs := map[string]any{
ctx.event.ID: firErrors.User(err).Error(),
"onevent": firErrors.User(err).Error(),
}
publish(pubsub.Event{

return &pubsub.Event{
ID: &ctx.event.ID,
State: eventstate.Error,
Target: &target,
ElementKey: ctx.event.ElementKey,
Detail: &dom.Detail{Data: errs},
SessionID: ctx.event.SessionID,
})
return
}
}
}

Expand Down
38 changes: 24 additions & 14 deletions websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func onWebsocket(w http.ResponseWriter, r *http.Request, cntrl *controller) {
route: route,
}

go renderAndWriteEvent(send, *routeChannel, routeCtx, pubsubEvent)
go renderAndWriteEventWS(send, *routeChannel, routeCtx, pubsubEvent)
}
}()

Expand Down Expand Up @@ -175,8 +175,11 @@ func onWebsocket(w http.ResponseWriter, r *http.Request, cntrl *controller) {

// update request context with user
eventCtx.request = eventCtx.request.WithContext(context.WithValue(context.Background(), UserKey, user))

handleOnEventResult(onEventFunc(eventCtx), eventCtx, publishEvents(ctx, eventCtx, *route.channelFunc(eventCtx.request, route.id)))
channel := *route.channelFunc(eventCtx.request, route.id)
errorEvent := handleOnEventResult(onEventFunc(eventCtx), eventCtx, publishEvents(ctx, eventCtx, channel))
if errorEvent != nil {
renderAndWriteEventWS(send, channel, eventCtx, *errorEvent)
}
}
}()

Expand Down Expand Up @@ -345,22 +348,29 @@ loop:
route: eventRoute,
}

// withEventLogger := logger.Logger().
// With(
// "route_id", eventRoute.id,
// "event_id", event.ID,
// "session_id", eventSessionID,
// "element_key", event.ElementKey,
// )
// withEventLogger.Info("received user event")
withEventLogger := logger.Logger().
With(
"route_id", eventRoute.id,
"event_id", event.ID,
"session_id", eventSessionID,
"element_key", event.ElementKey,
)
withEventLogger.Debug("received user event")
onEventFunc, ok := eventRoute.onEvents[strings.ToLower(event.ID)]
if !ok {
logger.Errorf("err: event %v, event.id not found", event)
continue
}

// handle user events
go handleOnEventResult(onEventFunc(eventCtx), eventCtx, publishEvents(ctx, eventCtx, *eventRoute.channelFunc(eventCtx.request, eventRoute.id)))
go func() {
channel := *eventRoute.channelFunc(eventCtx.request, eventRoute.id)
// errors are only sent to current local connection and not published
errorEvent := handleOnEventResult(onEventFunc(eventCtx), eventCtx, publishEvents(ctx, eventCtx, channel))
if errorEvent != nil {
renderAndWriteEventWS(send, channel, eventCtx, *errorEvent)
}
}()

}

close(writePumpDone)
Expand Down Expand Up @@ -391,7 +401,7 @@ loop:

}

func renderAndWriteEvent(send chan []byte, channel string, ctx RouteContext, pubsubEvent pubsub.Event) error {
func renderAndWriteEventWS(send chan []byte, channel string, ctx RouteContext, pubsubEvent pubsub.Event) error {
events := renderDOMEvents(ctx, pubsubEvent)
eventsData, err := json.Marshal(events)
if err != nil {
Expand Down

0 comments on commit 9ccfeaa

Please sign in to comment.