Skip to content

Commit

Permalink
fix(component): fix task bug (#275)
Browse files Browse the repository at this point in the history
Because

- the wrong task might cause 500 error

This commit

- fix task bug
- adopt latest changes of connector-backend
  • Loading branch information
donch1989 authored Oct 11, 2023
1 parent 5e698f9 commit 5400080
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 26 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0
github.com/iancoleman/strcase v0.2.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/component v0.5.0-alpha.0.20231009140320-4a7e8ba84a93
github.com/instill-ai/operator v0.1.0-alpha.0.20231009162341-2f6e7b8f673a
github.com/instill-ai/component v0.5.0-alpha.0.20231011164605-745a8446e5bd
github.com/instill-ai/operator v0.1.0-alpha.0.20231011183640-10bb1721655a
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231010040039-43df18289361
github.com/instill-ai/usage-client v0.2.4-alpha.0.20230814155646-874e57a1e4b0
github.com/instill-ai/x v0.3.0-alpha
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1109,10 +1109,10 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/component v0.5.0-alpha.0.20231009140320-4a7e8ba84a93 h1:sjrAMMWzfU+kNqROQSX21+LT4ANKURuSp7hiDnDZGBA=
github.com/instill-ai/component v0.5.0-alpha.0.20231009140320-4a7e8ba84a93/go.mod h1:eiGvlNBPqac2oWv83KKIlXJRI7VykcSQkQ9amK4KQVs=
github.com/instill-ai/operator v0.1.0-alpha.0.20231009162341-2f6e7b8f673a h1:D2exMPXlYLTFegk3t7h6BInhk444vvtAst1SwW2ZE34=
github.com/instill-ai/operator v0.1.0-alpha.0.20231009162341-2f6e7b8f673a/go.mod h1:EIcM9mNzK2Xir4npx3KQ6txzaNo7UMPAffE173uzBII=
github.com/instill-ai/component v0.5.0-alpha.0.20231011164605-745a8446e5bd h1:XD0JN8VzdhcgaHVhgnFCKRZIqJB20U8nC+UMmKdGttY=
github.com/instill-ai/component v0.5.0-alpha.0.20231011164605-745a8446e5bd/go.mod h1:eiGvlNBPqac2oWv83KKIlXJRI7VykcSQkQ9amK4KQVs=
github.com/instill-ai/operator v0.1.0-alpha.0.20231011183640-10bb1721655a h1:ExwDVqfXPo0AyYGoargFxV6R0cHHfe75cOBggN1/nlA=
github.com/instill-ai/operator v0.1.0-alpha.0.20231011183640-10bb1721655a/go.mod h1:HGXS7ur7j704soJm7PcwFOx/6+2ijFKVGUIxiTMuP5Y=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231010040039-43df18289361 h1:sI4K3ybWFndoT3REMLhhzJ0WXmHiAuhidIXbjMDfxas=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20231010040039-43df18289361/go.mod h1:z/L84htamlJ4QOR4jtJOaa+y3Hihu7WEqOipW0LEkmc=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20230814155646-874e57a1e4b0 h1:9QoCxaktvqGJYGjN8KhkWsv1DVfwbt5G1d/Ycx1kJxo=
Expand Down
16 changes: 12 additions & 4 deletions integration-test/const.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ export const simpleRecipe = {
definition_name: "connector-definitions/airbyte-destination-csv",
configuration: {
input: {
text: "{ start.input }"
data: {
text: "{ start.input }"
}
}
}
},
Expand All @@ -127,7 +129,9 @@ export const simpleRecipe = {
definition_name: "connector-definitions/airbyte-destination-csv",
configuration: {
input: {
text: "{ start.input }"
data: {
text: "{ start.input }"
}
}
}
},
Expand Down Expand Up @@ -205,7 +209,9 @@ export const simpleRecipeDupId = {
definition_name: "connector-definitions/airbyte-destination-csv",
configuration: {
input: {
text: "{ start.input }"
data: {
text: "{ start.input }"
}
}
}
},
Expand All @@ -215,7 +221,9 @@ export const simpleRecipeDupId = {
definition_name: "connector-definitions/airbyte-destination-csv",
configuration: {
input: {
text: "{ start.input }"
data: {
text: "{ start.input }"
}
}
}
},
Expand Down
25 changes: 23 additions & 2 deletions pkg/service/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,21 @@ func (s *service) GenerateOpenApiSpec(startCompOrigin *pipelinePB.Component, end

var walk *structpb.Value
if strings.HasPrefix(comp.DefinitionName, "connector-definitions") {
task := "default"
task := ""
if parsedTask, ok := comp.GetConfiguration().Fields["task"]; ok {
task = parsedTask.GetStringValue()
}
if task == "" {
keys := make([]string, 0, len(comp.GetConnectorDefinition().Spec.OpenapiSpecifications.GetFields()))
if len(keys) != 1 {
return nil, fmt.Errorf("must specify a task")
}
task = keys[0]
}

if _, ok := comp.GetConnectorDefinition().Spec.OpenapiSpecifications.GetFields()[task]; !ok {
return nil, fmt.Errorf("generate OpenAPI spec error")
}
walk = comp.GetConnectorDefinition().Spec.OpenapiSpecifications.GetFields()[task]

splits := strings.Split(str, ".")
Expand All @@ -342,10 +352,21 @@ func (s *service) GenerateOpenApiSpec(startCompOrigin *pipelinePB.Component, end

} else if utils.IsOperatorDefinition(comp.DefinitionName) {

task := "default"
task := ""
if parsedTask, ok := comp.GetConfiguration().Fields["task"]; ok {
task = parsedTask.GetStringValue()
}
if task == "" {
keys := make([]string, 0, len(comp.GetOperatorDefinition().Spec.OpenapiSpecifications.GetFields()))
if len(keys) != 1 {
return nil, fmt.Errorf("must specify a task")
}
task = keys[0]
}

if _, ok := comp.GetOperatorDefinition().Spec.OpenapiSpecifications.GetFields()[task]; !ok {
return nil, fmt.Errorf("generate OpenAPI spec error")
}

walk = comp.GetOperatorDefinition().Spec.OpenapiSpecifications.GetFields()[task]

Expand Down
41 changes: 28 additions & 13 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,11 +629,16 @@ func (s *service) preTriggerPipeline(recipe *datamodel.Recipe, pipelineInputs []
case "integer_array":
vals := []interface{}{}
for _, val := range val.GetListValue().AsSlice() {
n, err := strconv.ParseInt(val.(string), 10, 64)
if err != nil {
return err
switch val := val.(type) {
case string:
n, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return err
}
vals = append(vals, n)
default:
vals = append(vals, val)
}
vals = append(vals, n)
}
structVal, err := structpb.NewList(vals)
if err != nil {
Expand All @@ -644,11 +649,16 @@ func (s *service) preTriggerPipeline(recipe *datamodel.Recipe, pipelineInputs []
case "number_array":
vals := []interface{}{}
for _, val := range val.GetListValue().AsSlice() {
n, err := strconv.ParseFloat(val.(string), 64)
if err != nil {
return err
switch val := val.(type) {
case string:
n, err := strconv.ParseFloat(val, 64)
if err != nil {
return err
}
vals = append(vals, n)
default:
vals = append(vals, val)
}
vals = append(vals, n)
}
structVal, err := structpb.NewList(vals)
if err != nil {
Expand All @@ -658,11 +668,16 @@ func (s *service) preTriggerPipeline(recipe *datamodel.Recipe, pipelineInputs []
case "boolean_array":
vals := []interface{}{}
for _, val := range val.GetListValue().AsSlice() {
n, err := strconv.ParseBool(val.(string))
if err != nil {
return err
switch val := val.(type) {
case string:
n, err := strconv.ParseBool(val)
if err != nil {
return err
}
vals = append(vals, n)
default:
vals = append(vals, val)
}
vals = append(vals, n)
}
structVal, err := structpb.NewList(vals)
if err != nil {
Expand Down Expand Up @@ -1161,7 +1176,7 @@ func (s *service) triggerPipeline(
compInputs = append(compInputs, compInput)
}

task := "default"
task := ""
if comp.Configuration.Fields["task"] != nil {
task = comp.Configuration.Fields["task"].GetStringValue()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (w *worker) TriggerAsyncPipelineWorkflow(ctx workflow.Context, param *Trigg
compInputs = append(compInputs, compInput)
}

task := "default"
task := ""
if comp.Configuration.Fields["task"] != nil {
task = comp.Configuration.Fields["task"].GetStringValue()
}
Expand Down

0 comments on commit 5400080

Please sign in to comment.