From fe7dc7e8190576ded75a0d9a9bae094fcbd0a155 Mon Sep 17 00:00:00 2001 From: Jason Jerome Date: Mon, 28 Aug 2023 08:18:26 -0400 Subject: [PATCH] feat: added response_to field in emitted events Signed-off-by: Jason Jerome --- cmd/yggctl/actions.go | 12 ++++++--- cmd/yggd/client.go | 2 +- dbus/com.redhat.Yggdrasil1.xml | 2 ++ internal/work/dispatcher.go | 6 +++++ internal/work/dispatcher_test.go | 39 ++++++++++++++++++--------- ipc/com.redhat.Yggdrasil1.Worker1.xml | 2 ++ ipc/interfaces.go | 9 ++++--- worker/echo/main.go | 1 + worker/worker.go | 7 ++--- 9 files changed, 55 insertions(+), 25 deletions(-) diff --git a/cmd/yggctl/actions.go b/cmd/yggctl/actions.go index 1d29ab7a..4f81efff 100644 --- a/cmd/yggctl/actions.go +++ b/cmd/yggctl/actions.go @@ -193,11 +193,15 @@ func listenAction(ctx *cli.Context) error { if !ok { return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[2]), 1) } + responseTo, ok := s.Body[3].(string) + if !ok { + return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[3]), 1) + } var data map[string]string - if len(s.Body) > 3 { - data, ok = s.Body[3].(map[string]string) + if len(s.Body) > 4 { + data, ok = s.Body[4].(map[string]string) if !ok { - return cli.Exit(fmt.Errorf("cannot cast %T as map[string]string", s.Body[3]), 1) + return cli.Exit(fmt.Errorf("cannot cast %T as map[string]string", s.Body[4]), 1) } } parsedData, err := json.Marshal(data) @@ -205,7 +209,7 @@ func listenAction(ctx *cli.Context) error { return cli.Exit(fmt.Errorf("unable to parse optional data: %v", data), 1) } - log.Printf("%v: %v: %v: %v", worker, messageID, ipc.WorkerEventName(name), string(parsedData)) + log.Printf("%v: %v: %v: %v: %v", worker, messageID, ipc.WorkerEventName(name), responseTo, string(parsedData)) } } return nil diff --git a/cmd/yggd/client.go b/cmd/yggd/client.go index c3323b6c..9d01114d 100644 --- a/cmd/yggd/client.go +++ b/cmd/yggd/client.go @@ -177,7 +177,7 @@ func (c *Client) Connect() error { // channel, emitting a D-Bus "WorkerEvent" signal for each. go func() { for e := range c.dispatcher.WorkerEvents { - args := []interface{}{e.Worker, e.Name, e.MessageID} + args := []interface{}{e.Worker, e.Name, e.MessageID, e.ResponseTo} switch e.Name { case ipc.WorkerEventNameWorking: args = append(args, e.Data) diff --git a/dbus/com.redhat.Yggdrasil1.xml b/dbus/com.redhat.Yggdrasil1.xml index 4ee81e0b..9605216e 100644 --- a/dbus/com.redhat.Yggdrasil1.xml +++ b/dbus/com.redhat.Yggdrasil1.xml @@ -40,6 +40,7 @@ @worker: Name of the worker emitting the event. @name: Name of the event. @message_id: The id associated with the worker message. + @response_to: Unique ID of the message this message is in reply to. @data: Key-value pairs of optional data provided with the event. Emitted by a worker when certain conditions arise, such as beginning @@ -62,6 +63,7 @@ + diff --git a/internal/work/dispatcher.go b/internal/work/dispatcher.go index cb3cd664..0308e197 100644 --- a/internal/work/dispatcher.go +++ b/internal/work/dispatcher.go @@ -396,6 +396,12 @@ func workerEventFromSignal(s *dbus.Signal) (*ipc.WorkerEvent, error) { } event.MessageID = messageID case 2: + responseTo, ok := v.(string) + if !ok { + return nil, newStringTypeConversionError(v) + } + event.ResponseTo = responseTo + case 3: data, ok := v.(map[string]string) if !ok { return nil, newStringMapTypeConversionError(v) diff --git a/internal/work/dispatcher_test.go b/internal/work/dispatcher_test.go index db5269f4..1f222756 100644 --- a/internal/work/dispatcher_test.go +++ b/internal/work/dispatcher_test.go @@ -19,31 +19,34 @@ func TestWorkerEventFromSignal(t *testing.T) { { input: &dbus.Signal{ Name: "com.redhat.Yggdrasil1.Worker1.Event", - Body: []interface{}{uint32(1), "6925055f-167a-45cc-9869-1789ee37883f"}, + Body: []interface{}{uint32(1), "6925055f-167a-45cc-9869-1789ee37883f", "123456f-167a-45cc-9869-1789ee37883f"}, }, want: &ipc.WorkerEvent{ - Name: ipc.WorkerEventNameBegin, - MessageID: "6925055f-167a-45cc-9869-1789ee37883f", + Name: ipc.WorkerEventNameBegin, + MessageID: "6925055f-167a-45cc-9869-1789ee37883f", + ResponseTo: "123456f-167a-45cc-9869-1789ee37883f", }, }, { input: &dbus.Signal{ Name: "com.redhat.Yggdrasil1.Worker1.Event", - Body: []interface{}{uint32(2), "6925055f-167a-45cc-9869-1789ee37883f"}, + Body: []interface{}{uint32(2), "6925055f-167a-45cc-9869-1789ee37883f", "123456f-167a-45cc-9869-1789ee37883f"}, }, want: &ipc.WorkerEvent{ - Name: ipc.WorkerEventNameEnd, - MessageID: "6925055f-167a-45cc-9869-1789ee37883f", + Name: ipc.WorkerEventNameEnd, + MessageID: "6925055f-167a-45cc-9869-1789ee37883f", + ResponseTo: "123456f-167a-45cc-9869-1789ee37883f", }, }, { input: &dbus.Signal{ Name: "com.redhat.Yggdrasil1.Worker1.Event", - Body: []interface{}{uint32(3), "6925055f-167a-45cc-9869-1789ee37883f"}, + Body: []interface{}{uint32(3), "6925055f-167a-45cc-9869-1789ee37883f", "123456f-167a-45cc-9869-1789ee37883f"}, }, want: &ipc.WorkerEvent{ - Name: ipc.WorkerEventNameWorking, - MessageID: "6925055f-167a-45cc-9869-1789ee37883f", + Name: ipc.WorkerEventNameWorking, + MessageID: "6925055f-167a-45cc-9869-1789ee37883f", + ResponseTo: "123456f-167a-45cc-9869-1789ee37883f", }, }, { @@ -52,13 +55,15 @@ func TestWorkerEventFromSignal(t *testing.T) { Body: []interface{}{ uint32(3), "6925055f-167a-45cc-9869-1789ee37883f", + "123456f-167a-45cc-9869-1789ee37883f", map[string]string{"message": "working message"}, }, }, want: &ipc.WorkerEvent{ - Name: ipc.WorkerEventNameWorking, - MessageID: "6925055f-167a-45cc-9869-1789ee37883f", - Data: map[string]string{"message": "working message"}, + Name: ipc.WorkerEventNameWorking, + MessageID: "6925055f-167a-45cc-9869-1789ee37883f", + ResponseTo: "123456f-167a-45cc-9869-1789ee37883f", + Data: map[string]string{"message": "working message"}, }, }, { @@ -80,7 +85,15 @@ func TestWorkerEventFromSignal(t *testing.T) { { input: &dbus.Signal{ Name: "com.redhat.Yggdrasil1.Worker1.Event", - Body: []interface{}{uint32(3), "6925055f-167a-45cc-9869-1789ee37883f", 3}, + Body: []interface{}{uint32(1), "6925055f-167a-45cc-9869-1789ee37883f", 3}, + }, + want: nil, + wantError: newStringTypeConversionError(3), + }, + { + input: &dbus.Signal{ + Name: "com.redhat.Yggdrasil1.Worker1.Event", + Body: []interface{}{uint32(3), "6925055f-167a-45cc-9869-1789ee37883f", "6925055f-167a-45cc-9869-1789ee37883f", 3}, }, want: nil, wantError: newStringMapTypeConversionError(3), diff --git a/ipc/com.redhat.Yggdrasil1.Worker1.xml b/ipc/com.redhat.Yggdrasil1.Worker1.xml index 064d4f69..e82326e2 100644 --- a/ipc/com.redhat.Yggdrasil1.Worker1.xml +++ b/ipc/com.redhat.Yggdrasil1.Worker1.xml @@ -46,6 +46,7 @@ Event: @name: Name of the event. @message_id: The id associated with the worker message. + @response_to: Unique ID of the message this message is in reply to. @data: Key-value pairs of optional data provided with the event. Emitted by a worker when certain conditions arise, such as beginning @@ -67,6 +68,7 @@ + diff --git a/ipc/interfaces.go b/ipc/interfaces.go index 898ae550..6e9350ad 100644 --- a/ipc/interfaces.go +++ b/ipc/interfaces.go @@ -54,8 +54,9 @@ func (e WorkerEventName) String() string { } type WorkerEvent struct { - Worker string - Name WorkerEventName - MessageID string - Data map[string]string + Worker string + Name WorkerEventName + MessageID string + ResponseTo string + Data map[string]string } diff --git a/worker/echo/main.go b/worker/echo/main.go index 7dc3ae1f..68455e9b 100644 --- a/worker/echo/main.go +++ b/worker/echo/main.go @@ -33,6 +33,7 @@ func echo( if err := w.EmitEvent( ipc.WorkerEventNameWorking, rcvId, + responseTo, map[string]string{"message": fmt.Sprintf("echoing %v", data)}, ); err != nil { return fmt.Errorf("cannot call EmitEvent: %w", err) diff --git a/worker/worker.go b/worker/worker.go index 03ddc833..d6d8415c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -159,10 +159,11 @@ func (w *Worker) Transmit(addr string, id string, responseTo string, metadata ma } // EmitEvent emits a WorkerEvent, worker message id, and key-value pairs of optional data. -func (w *Worker) EmitEvent(event ipc.WorkerEventName, messageID string, data map[string]string) error { +func (w *Worker) EmitEvent(event ipc.WorkerEventName, messageID string, responseTo string, data map[string]string) error { args := []interface{}{ event, messageID, + responseTo, data, } log.Debugf("emitting event %v", event) @@ -179,7 +180,7 @@ func (w *Worker) dispatch(addr string, id string, responseTo string, metadata ma log.Tracef("metadata = %#v", metadata) log.Tracef("data = %v", data) - if err := w.EmitEvent(ipc.WorkerEventNameBegin, id, map[string]string{}); err != nil { + if err := w.EmitEvent(ipc.WorkerEventNameBegin, id, responseTo, map[string]string{}); err != nil { return dbus.NewError("com.redhat.Yggdrasil1.Worker1.EventError", []interface{}{err.Error()}) } @@ -187,7 +188,7 @@ func (w *Worker) dispatch(addr string, id string, responseTo string, metadata ma if err := w.rx(w, addr, id, responseTo, metadata, data); err != nil { log.Errorf("cannot call rx: %v", err) } - if err := w.EmitEvent(ipc.WorkerEventNameEnd, id, map[string]string{}); err != nil { + if err := w.EmitEvent(ipc.WorkerEventNameEnd, id, responseTo, map[string]string{}); err != nil { log.Errorf("cannot emit event: %v", err) } }()