Skip to content

Commit

Permalink
feat: added response_to field in emitted events
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Jerome <[email protected]>
  • Loading branch information
DuckBoss committed Jul 18, 2023
1 parent 5269a31 commit 61fc821
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 18 deletions.
12 changes: 8 additions & 4 deletions cmd/yggctl/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,18 @@ func listenAction(ctx *cli.Context) error {
if !ok {
return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[2]), 1)
}
responseTo, ok := s.Body[3].(string)
if !ok {
return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[3]), 1)
}
var message string
if len(s.Body) > 3 {
message, ok = s.Body[3].(string)
if len(s.Body) > 4 {
message, ok = s.Body[4].(string)
if !ok {
return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[3]), 1)
return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[4]), 1)
}
}
log.Printf("%v: %v: %v: %v", worker, messageID, ipc.WorkerEventName(name), message)
log.Printf("%v: %v: %v: %v: %v", worker, messageID, responseTo, ipc.WorkerEventName(name), message)

}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/yggd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *Client) Connect() error {
// channel, emitting a D-Bus "WorkerEvent" signal for each.
go func() {
for e := range c.dispatcher.WorkerEvents {
args := []interface{}{e.Worker, e.Name, e.MessageID}
args := []interface{}{e.Worker, e.Name, e.MessageID, e.ResponseTo}
switch e.Name {
case ipc.WorkerEventNameWorking:
args = append(args, e.Message)
Expand Down
2 changes: 2 additions & 0 deletions dbus/com.redhat.Yggdrasil1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
@worker: Name of the worker emitting the event.
@name: Name of the event.
@message_id: The id associated with the worker message.
@response_to: The id of the message this event is in reply to, if any.
@message: Optional message included with the event.
Emitted by a worker when certain conditions arise, such as beginning
Expand All @@ -62,6 +63,7 @@
<arg type="s" name="worker" />
<arg type="u" name="name" />
<arg type="s" name="message_id" />
<arg type="s" name="response_to" />
<arg type="s" name="message" />
</signal>
</interface>
Expand Down
12 changes: 11 additions & 1 deletion internal/work/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,19 @@ func (d *Dispatcher) Connect() error {

eventMessageData, ok := s.Body[2].(map[string]string)
if !ok {
log.Errorf("cannot convert %T to string map", s.Body[2])
log.Errorf("cannot convert %T to string array", s.Body[2])
continue
}

eventOptionalResponseTo, ok := eventMessageData["responseTo"]
if ok {
event.ResponseTo = eventOptionalResponseTo
}

eventOptionalMessage, ok := eventMessageData["message"]
if ok {
event.Message = eventOptionalMessage
}
event.Message = eventMessageData["message"]

d.WorkerEvents <- event
Expand Down
6 changes: 4 additions & 2 deletions ipc/com.redhat.Yggdrasil1.Worker1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
Event:
@name: Name of the event.
@message_id: The id associated with the worker message.
@data: Key-value pairs of optional data provided with the event.
@response_to: The id of the message this event is in reply to, if any.
@data: Optional key-value pairs of data containing the message and responseTo
fields with the event.
Emitted by a worker when certain conditions arise, such as beginning
or ending work.
Expand All @@ -67,7 +69,7 @@
<signal name="Event">
<arg type="u" name="name" />
<arg type="s" name="message_id" />
<arg type="s" name="message" />
<arg type="a{ss}" name="data" />
</signal>
</interface>

Expand Down
9 changes: 5 additions & 4 deletions ipc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func (e WorkerEventName) String() string {
}

type WorkerEvent struct {
Worker string
Name WorkerEventName
MessageID string
Message string
Worker string
Name WorkerEventName
MessageID string
ResponseTo string
Message string
}
2 changes: 1 addition & 1 deletion worker/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var sleepTime time.Duration
// com.redhat.Yggdrasil1.Dispatcher1.Transmit method, returning the
// metadata, data, and the message id it received.
func echo(w *worker.Worker, addr string, id string, responseTo string, metadata map[string]string, data []byte) error {
if err := w.EmitEvent(ipc.WorkerEventNameWorking, id, fmt.Sprintf("echoing %v", data)); err != nil {
if err := w.EmitEvent(ipc.WorkerEventNameWorking, id, responseTo, fmt.Sprintf("echoing %v", data)); err != nil {
return fmt.Errorf("cannot call EmitEvent: %w", err)
}

Expand Down
11 changes: 6 additions & 5 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,12 @@ func (w *Worker) Transmit(addr string, id string, responseTo string, metadata ma
return
}

// EmitEvent emits a WorkerEvent, worker message id, and key-value pairs of optional data.
func (w *Worker) EmitEvent(event ipc.WorkerEventName, messageID string, message string) error {
// EmitEvent emits a WorkerEvent, worker message id, optional response message id, and an optional message.
func (w *Worker) EmitEvent(event ipc.WorkerEventName, messageID string, responseTo string, message string) error {
args := []interface{}{event, messageID}
args = append(args, map[string]string{
"message": message,
"responseTo": responseTo,
"message": message,
})
log.Debugf("emitting event %v", event)
return w.conn.Emit(dbus.ObjectPath(path.Join("/com/redhat/Yggdrasil1/Worker1", w.directive)), "com.redhat.Yggdrasil1.Worker1.Event", args...)
Expand All @@ -178,15 +179,15 @@ func (w *Worker) dispatch(addr string, id string, responseTo string, metadata ma
log.Tracef("metadata = %#v", metadata)
log.Tracef("data = %v", data)

if err := w.EmitEvent(ipc.WorkerEventNameBegin, id, ""); err != nil {
if err := w.EmitEvent(ipc.WorkerEventNameBegin, id, responseTo, ""); err != nil {
return dbus.NewError("com.redhat.Yggdrasil1.Worker1.EventError", []interface{}{err.Error()})
}

go func() {
if err := w.rx(w, addr, id, responseTo, metadata, data); err != nil {
log.Errorf("cannot call rx: %v", err)
}
if err := w.EmitEvent(ipc.WorkerEventNameEnd, id, ""); err != nil {
if err := w.EmitEvent(ipc.WorkerEventNameEnd, id, responseTo, ""); err != nil {
log.Errorf("cannot emit event: %v", err)
}
}()
Expand Down

0 comments on commit 61fc821

Please sign in to comment.