Skip to content

Commit

Permalink
feat: allow the string data to reference all data types that can be s…
Browse files Browse the repository at this point in the history
…tringified
  • Loading branch information
donch1989 committed Feb 13, 2024
1 parent 703d4f2 commit 10c0c57
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 77 deletions.
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ require (
github.com/mennanov/fieldmask-utils v1.0.0
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852
github.com/openfga/go-sdk v0.2.3
github.com/osteele/liquid v1.3.0
go.einride.tech/aip v0.60.0
go.opentelemetry.io/contrib/propagators/b3 v1.17.0
go.opentelemetry.io/otel v1.16.0
Expand Down Expand Up @@ -164,7 +163,6 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/olekukonko/tablewriter v0.0.4 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/osteele/tuesday v1.0.3 // indirect
github.com/otiai10/gosseract/v2 v2.2.4 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -189,7 +187,6 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/mysql v1.3.2 // indirect
)
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1560,10 +1560,6 @@ github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuh
github.com/openfga/go-sdk v0.2.3 h1:VPCouXbUP+vtGREbLu8BIuzFiA1kBDQ6tnunFTxtLzc=
github.com/openfga/go-sdk v0.2.3/go.mod h1:2k8hL4VJ46GXUGbnQ1QOrcZWcP1kKATLPeqVnuLzgIE=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/osteele/liquid v1.3.0 h1:TwZNI5Y0K+v0MF6JDSoEeRGeHugV8OTi7GIXfdA91fY=
github.com/osteele/liquid v1.3.0/go.mod h1:VmzQQHa5v4E0GvGzqccfAfLgMwRk2V+s1QbxYx9dGak=
github.com/osteele/tuesday v1.0.3 h1:SrCmo6sWwSgnvs1bivmXLvD7Ko9+aJvvkmDjB5G4FTU=
github.com/osteele/tuesday v1.0.3/go.mod h1:pREKpE+L03UFuR+hiznj3q7j3qB1rUZ4XfKejwWFF2M=
github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJG+0mI8eUu6xqkFDYS2kb2saOteoSB3cE=
github.com/otiai10/gosseract/v2 v2.2.4 h1:h/PV+oJqke8q2Ccw9bjpMBWfd7N2vtGDCUcihZj3nRo=
github.com/otiai10/gosseract/v2 v2.2.4/go.mod h1:ahOp/kHojnOMGv1RaUnR0jwY5JVa6BYKhYAS8nbMLSo=
Expand Down
2 changes: 0 additions & 2 deletions pkg/handler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,6 @@ func (h *PublicHandler) cloneNamespacePipeline(ctx context.Context, req CloneNam
trace.WithSpanKind(trace.SpanKindServer))
defer span.End()

fmt.Println(req.GetName(), req.GetTarget())

logUUID, _ := uuid.NewV4()

logger, _ := logger.GetZapLogger(ctx)
Expand Down
27 changes: 12 additions & 15 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,13 +948,13 @@ func (s *service) preTriggerPipeline(ctx context.Context, isAdmin bool, ns resou
errors = append(errors, fmt.Sprintf("inputs[%d]: data error", idx))
continue
}
var i interface{}
var i any

Check warning on line 951 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L951

Added line #L951 was not covered by tests
if err := json.Unmarshal(b, &i); err != nil {
errors = append(errors, fmt.Sprintf("inputs[%d]: data error", idx))
continue
}

m := i.(map[string]interface{})
m := i.(map[string]any)

Check warning on line 957 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L957

Added line #L957 was not covered by tests

for k := range m {
switch s := m[k].(type) {
Expand All @@ -969,19 +969,16 @@ func (s *service) preTriggerPipeline(ctx context.Context, isAdmin bool, ns resou
pipelineInput.Fields[k] = structpb.NewStringValue(fmt.Sprintf("data:%s;base64,%s", mimeType, s))
}
}
case []interface{}:
case []string:

Check warning on line 972 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L972

Added line #L972 was not covered by tests
if instillFormatMap[k] != "array:string" {
for idx := range s {
switch item := s[idx].(type) {
case string:
if !strings.HasPrefix(item, "data:") {
b, err := base64.StdEncoding.DecodeString(item)
if err != nil {
return fmt.Errorf("can not decode file %s, %s", instillFormatMap[k], s)
}
mimeType := strings.Split(mimetype.Detect(b).String(), ";")[0]
pipelineInput.Fields[k].GetListValue().GetValues()[idx] = structpb.NewStringValue(fmt.Sprintf("data:%s;base64,%s", mimeType, item))
if !strings.HasPrefix(s[idx], "data:") {
b, err := base64.StdEncoding.DecodeString(s[idx])
if err != nil {
return fmt.Errorf("can not decode file %s, %s", instillFormatMap[k], s)

Check warning on line 978 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L975-L978

Added lines #L975 - L978 were not covered by tests
}
mimeType := strings.Split(mimetype.Detect(b).String(), ";")[0]
pipelineInput.Fields[k].GetListValue().GetValues()[idx] = structpb.NewStringValue(fmt.Sprintf("data:%s;base64,%s", mimeType, s[idx]))

Check warning on line 981 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L980-L981

Added lines #L980 - L981 were not covered by tests
}

}
Expand Down Expand Up @@ -1427,7 +1424,7 @@ func (s *service) triggerPipeline(
inputBlobRedisKeys = append(inputBlobRedisKeys, inputBlobRedisKey)
defer s.redisClient.Del(context.Background(), inputBlobRedisKey)
}
memo := map[string]interface{}{}
memo := map[string]any{}

Check warning on line 1427 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L1427

Added line #L1427 was not covered by tests
memo["number_of_data"] = len(inputBlobRedisKeys)

workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -1526,7 +1523,7 @@ func (s *service) triggerAsyncPipeline(
)
inputBlobRedisKeys = append(inputBlobRedisKeys, inputBlobRedisKey)
}
memo := map[string]interface{}{}
memo := map[string]any{}

Check warning on line 1526 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L1526

Added line #L1526 was not covered by tests
memo["number_of_data"] = len(inputBlobRedisKeys)

workflowOptions := client.StartWorkflowOptions{
Expand Down Expand Up @@ -1827,7 +1824,7 @@ func (s *service) ListConnectorDefinitions(ctx context.Context, pageSize int32,
if idx == 0 {
typeMap[string(expr.Vars[idx].(protoreflect.Name))] = true
} else {
typeMap[string(expr.Vars[idx].([]interface{})[0].(protoreflect.Name))] = true
typeMap[string(expr.Vars[idx].([]any)[0].(protoreflect.Name))] = true

Check warning on line 1827 in pkg/service/service.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/service.go#L1827

Added line #L1827 was not covered by tests
}

}
Expand Down
104 changes: 51 additions & 53 deletions pkg/utils/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (

"github.com/instill-ai/pipeline-backend/pkg/datamodel"
"github.com/oliveagle/jsonpath"
"github.com/osteele/liquid"
"github.com/osteele/liquid/render"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -171,12 +169,12 @@ func (d *dag) TopologicalSort() ([][]*datamodel.Component, error) {
return ans, nil
}

func traverseBinding(bindings interface{}, path string) (interface{}, error) {
func traverseBinding(bindings any, path string) (any, error) {

res, err := jsonpath.JsonPathLookup(bindings, "$."+path)
if err != nil {
// check primitive value
var ret interface{}
var ret any
err := json.Unmarshal([]byte(path), &ret)
if err != nil {
return nil, fmt.Errorf("reference not correct: '%s'", path)
Expand All @@ -188,7 +186,7 @@ func traverseBinding(bindings interface{}, path string) (interface{}, error) {
return res, nil
}
}
func RenderInput(input interface{}, bindings map[string]interface{}) (interface{}, error) {
func RenderInput(input any, bindings map[string]any) (any, error) {

switch input := input.(type) {
case string:
Expand All @@ -204,16 +202,38 @@ func RenderInput(input interface{}, bindings map[string]interface{}) (interface{

}

// TODO: we should retire Liquid instead of changing the delimiters
engine := liquid.NewEngine().Delims("${", "}", "{%", "%}")
out, err := engine.ParseAndRenderString(input, bindings)
if err != nil {
return nil, err
val := ""
for {
startIdx := strings.Index(input, "${")
endIdx := strings.Index(input, "}")
if startIdx == -1 || endIdx == -1 {
val += input
break
}

val += input[:startIdx]
ref := strings.TrimSpace(input[startIdx+2 : endIdx])
v, err := traverseBinding(bindings, ref)
if err != nil {
return nil, err
}

switch v := v.(type) {
case string:
val += input[:startIdx] + v
default:
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
val += input[:startIdx] + string(b)
}
input = input[endIdx+1:]
}
return out, err
return val, nil

case map[string]interface{}:
ret := map[string]interface{}{}
case map[string]any:
ret := map[string]any{}
for k, v := range input {
converted, err := RenderInput(v, bindings)
if err != nil {
Expand All @@ -223,8 +243,8 @@ func RenderInput(input interface{}, bindings map[string]interface{}) (interface{

}
return ret, nil
case []interface{}:
ret := []interface{}{}
case []any:
ret := []any{}
for _, v := range input {
converted, err := RenderInput(v, bindings)
if err != nil {
Expand Down Expand Up @@ -260,7 +280,7 @@ func FindConditionUpstream(expr ast.Expr, upstreams *[]string) {
}
}

func EvalCondition(expr ast.Expr, value map[string]interface{}) (interface{}, error) {
func EvalCondition(expr ast.Expr, value map[string]any) (any, error) {
switch e := (expr).(type) {
case *ast.UnaryExpr:
xRes, err := EvalCondition(e.X, value)
Expand Down Expand Up @@ -453,7 +473,7 @@ func EvalCondition(expr ast.Expr, value map[string]interface{}) (interface{}, er
if err != nil {
return nil, err
}
return v.(map[string]interface{})[e.Sel.String()], nil
return v.(map[string]any)[e.Sel.String()], nil
case *ast.BasicLit:
if e.Kind == token.INT {
return strconv.ParseInt(e.Value, 10, 64)
Expand Down Expand Up @@ -521,14 +541,8 @@ func GenerateDAG(components []*datamodel.Component) (*dag, error) {
graph := NewDAG(components)
for _, component := range components {

// TODO: we should retire Liquid instead of changing the delimiters
engine := liquid.NewEngine().Delims("${", "}", "{%", "%}")
configuration := proto.Clone(component.Configuration)
template, _ := protojson.Marshal(configuration)
out, err := engine.ParseTemplate(template)
if err != nil {
return nil, err
}

condUpstreams := []string{}
if cond := component.Configuration.Fields["condition"].GetStringValue(); cond != "" {
Expand All @@ -551,19 +565,6 @@ func GenerateDAG(components []*datamodel.Component) (*dag, error) {
}
}

for _, node := range out.GetRoot().(*render.SeqNode).Children {
parents := []string{}
switch node := node.(type) {
case *render.ObjectNode:
upstream := strings.Split(node.Args, ".")[0]
parents = append(parents, upstream)
}
for idx := range parents {
if _, ok := componentIDMap[parents[idx]]; ok {
graph.AddEdge(componentIDMap[parents[idx]], component)
}
}
}
parents := FindReferenceParent(string(template))
for idx := range parents {
if upstream, ok := componentIDMap[parents[idx]]; ok {
Expand All @@ -580,7 +581,7 @@ func GenerateDAG(components []*datamodel.Component) (*dag, error) {

// TODO: simplify this
func FindReferenceParent(input string) []string {
var parsed interface{}
var parsed any
err := json.Unmarshal([]byte(input), &parsed)
if err != nil {
return []string{}
Expand All @@ -589,24 +590,21 @@ func FindReferenceParent(input string) []string {
switch parsed := parsed.(type) {
case string:

if strings.HasPrefix(parsed, "${") && strings.HasSuffix(parsed, "}") && strings.Count(parsed, "${") == 1 {

parsed = parsed[2:]
parsed = parsed[:len(parsed)-1]
parsed = strings.TrimSpace(parsed)
var b interface{}
err := json.Unmarshal([]byte(parsed), &b)

// if the json is Unmarshalable, means that it is not a reference
if err == nil {
return []string{}
upstreams := []string{}
for {
startIdx := strings.Index(input, "${")
endIdx := strings.Index(input, "}")
if startIdx == -1 || endIdx == -1 {
break
}
return []string{strings.Split(parsed, ".")[0]}

ref := strings.TrimSpace(input[startIdx+2 : endIdx])
upstreams = append(upstreams, strings.Split(ref, ".")[0])
input = input[endIdx+1:]
}
return []string{}

case map[string]interface{}:
return upstreams

case map[string]any:
parents := []string{}
for _, v := range parsed {
encoded, err := json.Marshal(v)
Expand All @@ -617,7 +615,7 @@ func FindReferenceParent(input string) []string {

}
return parents
case []interface{}:
case []any:
parents := []string{}
for _, v := range parsed {
encoded, err := json.Marshal(v)
Expand Down

0 comments on commit 10c0c57

Please sign in to comment.