Skip to content

Commit

Permalink
feat: added response_to field in emitted events
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Jerome <[email protected]>
  • Loading branch information
DuckBoss committed Aug 29, 2023
1 parent 2d03b8a commit fe7dc7e
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 25 deletions.
12 changes: 8 additions & 4 deletions cmd/yggctl/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,19 +193,23 @@ func listenAction(ctx *cli.Context) error {
if !ok {
return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[2]), 1)
}
responseTo, ok := s.Body[3].(string)
if !ok {
return cli.Exit(fmt.Errorf("cannot cast %T as string", s.Body[3]), 1)
}
var data map[string]string
if len(s.Body) > 3 {
data, ok = s.Body[3].(map[string]string)
if len(s.Body) > 4 {
data, ok = s.Body[4].(map[string]string)
if !ok {
return cli.Exit(fmt.Errorf("cannot cast %T as map[string]string", s.Body[3]), 1)
return cli.Exit(fmt.Errorf("cannot cast %T as map[string]string", s.Body[4]), 1)
}
}
parsedData, err := json.Marshal(data)
if err != nil {
return cli.Exit(fmt.Errorf("unable to parse optional data: %v", data), 1)
}

log.Printf("%v: %v: %v: %v", worker, messageID, ipc.WorkerEventName(name), string(parsedData))
log.Printf("%v: %v: %v: %v: %v", worker, messageID, ipc.WorkerEventName(name), responseTo, string(parsedData))
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/yggd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *Client) Connect() error {
// channel, emitting a D-Bus "WorkerEvent" signal for each.
go func() {
for e := range c.dispatcher.WorkerEvents {
args := []interface{}{e.Worker, e.Name, e.MessageID}
args := []interface{}{e.Worker, e.Name, e.MessageID, e.ResponseTo}
switch e.Name {
case ipc.WorkerEventNameWorking:
args = append(args, e.Data)
Expand Down
2 changes: 2 additions & 0 deletions dbus/com.redhat.Yggdrasil1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
@worker: Name of the worker emitting the event.
@name: Name of the event.
@message_id: The id associated with the worker message.
@response_to: Unique ID of the message this message is in reply to.
@data: Key-value pairs of optional data provided with the event.
Emitted by a worker when certain conditions arise, such as beginning
Expand All @@ -62,6 +63,7 @@
<arg type="s" name="worker" />
<arg type="u" name="name" />
<arg type="s" name="message_id" />
<arg type="s" name="response_to" />
<arg type="a{ss}" name="data" />
</signal>
</interface>
Expand Down
6 changes: 6 additions & 0 deletions internal/work/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,12 @@ func workerEventFromSignal(s *dbus.Signal) (*ipc.WorkerEvent, error) {
}
event.MessageID = messageID
case 2:
responseTo, ok := v.(string)
if !ok {
return nil, newStringTypeConversionError(v)
}
event.ResponseTo = responseTo
case 3:
data, ok := v.(map[string]string)
if !ok {
return nil, newStringMapTypeConversionError(v)
Expand Down
39 changes: 26 additions & 13 deletions internal/work/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,34 @@ func TestWorkerEventFromSignal(t *testing.T) {
{
input: &dbus.Signal{
Name: "com.redhat.Yggdrasil1.Worker1.Event",
Body: []interface{}{uint32(1), "6925055f-167a-45cc-9869-1789ee37883f"},
Body: []interface{}{uint32(1), "6925055f-167a-45cc-9869-1789ee37883f", "123456f-167a-45cc-9869-1789ee37883f"},
},
want: &ipc.WorkerEvent{
Name: ipc.WorkerEventNameBegin,
MessageID: "6925055f-167a-45cc-9869-1789ee37883f",
Name: ipc.WorkerEventNameBegin,
MessageID: "6925055f-167a-45cc-9869-1789ee37883f",
ResponseTo: "123456f-167a-45cc-9869-1789ee37883f",
},
},
{
input: &dbus.Signal{
Name: "com.redhat.Yggdrasil1.Worker1.Event",
Body: []interface{}{uint32(2), "6925055f-167a-45cc-9869-1789ee37883f"},
Body: []interface{}{uint32(2), "6925055f-167a-45cc-9869-1789ee37883f", "123456f-167a-45cc-9869-1789ee37883f"},
},
want: &ipc.WorkerEvent{
Name: ipc.WorkerEventNameEnd,
MessageID: "6925055f-167a-45cc-9869-1789ee37883f",
Name: ipc.WorkerEventNameEnd,
MessageID: "6925055f-167a-45cc-9869-1789ee37883f",
ResponseTo: "123456f-167a-45cc-9869-1789ee37883f",
},
},
{
input: &dbus.Signal{
Name: "com.redhat.Yggdrasil1.Worker1.Event",
Body: []interface{}{uint32(3), "6925055f-167a-45cc-9869-1789ee37883f"},
Body: []interface{}{uint32(3), "6925055f-167a-45cc-9869-1789ee37883f", "123456f-167a-45cc-9869-1789ee37883f"},
},
want: &ipc.WorkerEvent{
Name: ipc.WorkerEventNameWorking,
MessageID: "6925055f-167a-45cc-9869-1789ee37883f",
Name: ipc.WorkerEventNameWorking,
MessageID: "6925055f-167a-45cc-9869-1789ee37883f",
ResponseTo: "123456f-167a-45cc-9869-1789ee37883f",
},
},
{
Expand All @@ -52,13 +55,15 @@ func TestWorkerEventFromSignal(t *testing.T) {
Body: []interface{}{
uint32(3),
"6925055f-167a-45cc-9869-1789ee37883f",
"123456f-167a-45cc-9869-1789ee37883f",
map[string]string{"message": "working message"},
},
},
want: &ipc.WorkerEvent{
Name: ipc.WorkerEventNameWorking,
MessageID: "6925055f-167a-45cc-9869-1789ee37883f",
Data: map[string]string{"message": "working message"},
Name: ipc.WorkerEventNameWorking,
MessageID: "6925055f-167a-45cc-9869-1789ee37883f",
ResponseTo: "123456f-167a-45cc-9869-1789ee37883f",
Data: map[string]string{"message": "working message"},
},
},
{
Expand All @@ -80,7 +85,15 @@ func TestWorkerEventFromSignal(t *testing.T) {
{
input: &dbus.Signal{
Name: "com.redhat.Yggdrasil1.Worker1.Event",
Body: []interface{}{uint32(3), "6925055f-167a-45cc-9869-1789ee37883f", 3},
Body: []interface{}{uint32(1), "6925055f-167a-45cc-9869-1789ee37883f", 3},
},
want: nil,
wantError: newStringTypeConversionError(3),
},
{
input: &dbus.Signal{
Name: "com.redhat.Yggdrasil1.Worker1.Event",
Body: []interface{}{uint32(3), "6925055f-167a-45cc-9869-1789ee37883f", "6925055f-167a-45cc-9869-1789ee37883f", 3},
},
want: nil,
wantError: newStringMapTypeConversionError(3),
Expand Down
2 changes: 2 additions & 0 deletions ipc/com.redhat.Yggdrasil1.Worker1.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
Event:
@name: Name of the event.
@message_id: The id associated with the worker message.
@response_to: Unique ID of the message this message is in reply to.
@data: Key-value pairs of optional data provided with the event.
Emitted by a worker when certain conditions arise, such as beginning
Expand All @@ -67,6 +68,7 @@
<signal name="Event">
<arg type="u" name="name" />
<arg type="s" name="message_id" />
<arg type="s" name="response_to" />
<arg type="a{ss}" name="data" />
</signal>
</interface>
Expand Down
9 changes: 5 additions & 4 deletions ipc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func (e WorkerEventName) String() string {
}

type WorkerEvent struct {
Worker string
Name WorkerEventName
MessageID string
Data map[string]string
Worker string
Name WorkerEventName
MessageID string
ResponseTo string
Data map[string]string
}
1 change: 1 addition & 0 deletions worker/echo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func echo(
if err := w.EmitEvent(
ipc.WorkerEventNameWorking,
rcvId,
responseTo,
map[string]string{"message": fmt.Sprintf("echoing %v", data)},
); err != nil {
return fmt.Errorf("cannot call EmitEvent: %w", err)
Expand Down
7 changes: 4 additions & 3 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ func (w *Worker) Transmit(addr string, id string, responseTo string, metadata ma
}

// EmitEvent emits a WorkerEvent, worker message id, and key-value pairs of optional data.
func (w *Worker) EmitEvent(event ipc.WorkerEventName, messageID string, data map[string]string) error {
func (w *Worker) EmitEvent(event ipc.WorkerEventName, messageID string, responseTo string, data map[string]string) error {
args := []interface{}{
event,
messageID,
responseTo,
data,
}
log.Debugf("emitting event %v", event)
Expand All @@ -179,15 +180,15 @@ func (w *Worker) dispatch(addr string, id string, responseTo string, metadata ma
log.Tracef("metadata = %#v", metadata)
log.Tracef("data = %v", data)

if err := w.EmitEvent(ipc.WorkerEventNameBegin, id, map[string]string{}); err != nil {
if err := w.EmitEvent(ipc.WorkerEventNameBegin, id, responseTo, map[string]string{}); err != nil {
return dbus.NewError("com.redhat.Yggdrasil1.Worker1.EventError", []interface{}{err.Error()})
}

go func() {
if err := w.rx(w, addr, id, responseTo, metadata, data); err != nil {
log.Errorf("cannot call rx: %v", err)
}
if err := w.EmitEvent(ipc.WorkerEventNameEnd, id, map[string]string{}); err != nil {
if err := w.EmitEvent(ipc.WorkerEventNameEnd, id, responseTo, map[string]string{}); err != nil {
log.Errorf("cannot emit event: %v", err)
}
}()
Expand Down

0 comments on commit fe7dc7e

Please sign in to comment.