Skip to content

Commit

Permalink
fix trigger joins (#340)
Browse files Browse the repository at this point in the history
* fix trigger joins

* remove unused imports
  • Loading branch information
ukclivecox authored Jul 5, 2022
1 parent d7bb6f8 commit 57f7e70
Show file tree
Hide file tree
Showing 11 changed files with 643 additions and 23 deletions.
14 changes: 8 additions & 6 deletions operator/pkg/cli/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ func showKafkaOutputMsg(e *kafka.Message, tensor string) error {
res := &v2_dataplane.ModelInferResponse{}
err := proto.Unmarshal(e.Value, res)
if err != nil {
return err
fmt.Printf("%s:%s\n", string(e.Key), string(e.Value))
return nil
}
err = updateResponseFromRawContents(res)
if err != nil {
Expand All @@ -261,12 +262,12 @@ func showKafkaOutputMsg(e *kafka.Message, tensor string) error {
if tensor != "" {
for _, output := range res.Outputs {
if output.Name == tensor {
printProto(output)
printProtoWithKey(e.Key, output)
}
}

} else {
printProto(res)
printProtoWithKey(e.Key, res)
}
return nil
}
Expand All @@ -275,7 +276,8 @@ func showKafkaInputMsg(e *kafka.Message, tensor string) error {
req := &v2_dataplane.ModelInferRequest{}
err := proto.Unmarshal(e.Value, req)
if err != nil {
return err
fmt.Printf("%s:%s\n", string(e.Key), string(e.Value))
return nil
}
err = updateRequestFromRawContents(req)
if err != nil {
Expand All @@ -284,12 +286,12 @@ func showKafkaInputMsg(e *kafka.Message, tensor string) error {
if tensor != "" {
for _, input := range req.Inputs {
if input.Name == tensor {
printProto(input)
printProtoWithKey(e.Key, input)
}
}

} else {
printProto(req)
printProtoWithKey(e.Key, req)
}
return nil
}
9 changes: 9 additions & 0 deletions operator/pkg/cli/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ func printProto(msg proto.Message) {
}
}

func printProtoWithKey(key []byte, msg proto.Message) {
resJson, err := protojson.Marshal(msg)
if err != nil {
fmt.Printf("Failed to print proto: %s", err.Error())
} else {
fmt.Printf("%s:%s\n", string(key), string(resJson))
}
}

func unMarshallYamlStrict(data []byte, msg interface{}) error {
jsonData, err := yaml.YAMLToJSON(data)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions samples/models/id1_node.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: id1_node
namespace: seldon-mesh
spec:
storageUri: "gs://seldon-models/triton/id_node"
requirements:
- triton
- python
10 changes: 10 additions & 0 deletions samples/models/id2_node.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: id2_node
namespace: seldon-mesh
spec:
storageUri: "gs://seldon-models/triton/id_node"
requirements:
- triton
- python
10 changes: 10 additions & 0 deletions samples/models/join_node.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: join_node
namespace: seldon-mesh
spec:
storageUri: "gs://seldon-models/triton/join_node"
requirements:
- triton
- python
18 changes: 18 additions & 0 deletions samples/pipelines/triggers_join_inputs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
name: triggers_join_inputs
namespace: seldon-mesh
spec:
steps:
- name: join_node
inputs:
- triggers_join_inputs.inputs.INPUT1
- triggers_join_inputs.inputs.INPUT2
triggers:
- triggers_join_inputs.inputs.TRIGGER1
- triggers_join_inputs.inputs.TRIGGER2
triggersJoinType: any
output:
steps:
- join_node
28 changes: 28 additions & 0 deletions samples/pipelines/triggers_join_internal.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
name: triggers_join_internal
namespace: seldon-mesh
spec:
steps:
- name: id1_node
inputs:
- triggers_join_internal.inputs.TRIGGER1
tensorMap:
triggers_join_internal.inputs.TRIGGER1: INPUT1
- name: id2_node
inputs:
- triggers_join_internal.inputs.TRIGGER2
tensorMap:
triggers_join_internal.inputs.TRIGGER2: INPUT1
- name: join_node
inputs:
- triggers_join_internal.inputs.INPUT1
- triggers_join_internal.inputs.INPUT2
triggers:
- id1_node.outputs.OUTPUT1
- id2_node.outputs.OUTPUT1
triggersJoinType: any
output:
steps:
- join_node
Loading

0 comments on commit 57f7e70

Please sign in to comment.