Skip to content

Commit

Permalink
feat: add filter for list pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
pinglin committed Jun 20, 2022
1 parent 243e7a1 commit ffe8856
Show file tree
Hide file tree
Showing 11 changed files with 452 additions and 31 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3
github.com/iancoleman/strcase v0.2.0
github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220615161406-12d8be2b3938
github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220620010454-ca08bcdfc4dd
github.com/instill-ai/usage-client v0.0.0-20220607201439-d646c37f5b02
github.com/instill-ai/x v0.1.0-alpha.0.20220604235252-39fcffc82edb
github.com/knadh/koanf v1.4.1
github.com/mennanov/fieldmask-utils v0.5.0
github.com/rs/cors v1.8.2
github.com/stretchr/testify v1.7.2
go.einride.tech/aip v0.54.1
go.temporal.io/sdk v1.15.0
go.uber.org/zap v1.21.0
golang.org/x/net v0.0.0-20220614195744-fb05da6f9022
Expand Down Expand Up @@ -67,7 +68,7 @@ require (
golang.org/x/sys v0.0.0-20220614162138-6c1b26c55098 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
google.golang.org/genproto v0.0.0-20220614165028-45ed7f3ff16e // indirect
google.golang.org/genproto v0.0.0-20220614165028-45ed7f3ff16e
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,8 @@ github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220615161406-12d8be2b3938 h1:LR/Zk/EyqXID8BKm+AnrPvd4L4qqCRD/9sLIWvL3KF0=
github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220615161406-12d8be2b3938/go.mod h1:d9ebEdwMX2Las4OScym45qbQM+xcBQITqvq/8anTVas=
github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220620010454-ca08bcdfc4dd h1:m6XdTfydrZwaQHqLIDMn0/hGp2C6e8HxRoO6YclrL+U=
github.com/instill-ai/protogen-go v0.1.5-alpha.0.20220620010454-ca08bcdfc4dd/go.mod h1:d9ebEdwMX2Las4OScym45qbQM+xcBQITqvq/8anTVas=
github.com/instill-ai/usage-client v0.0.0-20220607201439-d646c37f5b02 h1:7dhRYHERy+NbvESpaQ0NPOo3CiiDpvLsARR90Ftkiqw=
github.com/instill-ai/usage-client v0.0.0-20220607201439-d646c37f5b02/go.mod h1:saH0H46iHHMxBx+znN3CoE4IOylbTlpQUPj0Do06yKo=
github.com/instill-ai/x v0.1.0-alpha.0.20220604235252-39fcffc82edb h1:70AJVfr463jWkgPQ1w281zsQ1LK/tOW5INTNc+yOBsI=
Expand Down Expand Up @@ -1188,6 +1188,8 @@ github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPS
github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b/go.mod h1:T3BPAOm2cqquPa0MKWeNkmOM5RQsRhkrwMWonFMN7fE=
go.einride.tech/aip v0.54.1 h1:srys7sFWPixEqyOu0gWuZAC86p4UAnWJIQcA01Ys3R4=
go.einride.tech/aip v0.54.1/go.mod h1:tUzBlpbLzt0LbL2GcO7RHQyHdnVFK25KvfZ638MTbgk=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
Expand Down Expand Up @@ -1901,9 +1903,11 @@ gorm.io/gorm v1.21.4/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
gorm.io/gorm v1.23.4/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
gorm.io/gorm v1.23.6 h1:KFLdNgri4ExFFGTRGGFWON2P1ZN28+9SJRN8voOoYe0=
gorm.io/gorm v1.23.6/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
gotest.tools/v3 v3.1.0 h1:rVV8Tcg/8jHUkPUorwjaMTtemIMVXfIPKiOqnhEhakk=
gotest.tools/v3 v3.1.0/go.mod h1:fHy7eyTmJFO5bQbUsEGQ1v4m2J3Jz9eWL54TP2/ZuYQ=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
37 changes: 37 additions & 0 deletions integration-test/rest-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,43 @@ export function CheckList() {
[`GET /v1alpha/pipelines?page_size=100&page_token=${resFirst100.json().next_page_token} response next_page_token is empty`]: (r) => r.json().next_page_token == "",
});

// Filtering
check(http.request("GET", `${pipelineHost}/v1alpha/pipelines?filter=mode=MODE_SYNC`, null, {headers: {"Content-Type": "application/json",}}), {
[`GET /v1alpha/pipelines?filter=mode=MODE_SYNC response 200`]: (r) => r.status == 200,
[`GET /v1alpha/pipelines?filter=mode=MODE_SYNC response pipelines.length > 0`]: (r) => r.json().pipelines.length > 0,
});

check(http.request("GET", `${pipelineHost}/v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20state=STATE_ACTIVE`, null, {headers: {"Content-Type": "application/json",}}), {
[`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20state=STATE_ACTIVE response 200`]: (r) => r.status == 200,
[`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20state=STATE_ACTIVE response pipelines.length > 0`]: (r) => r.json().pipelines.length > 0,
});

check(http.request("GET", `${pipelineHost}/v1alpha/pipelines?filter=state=STATE_ACTIVE%20AND%20create_time>timestamp%28%222000-06-19T23:31:08.657Z%22%29`, null, {headers: {"Content-Type": "application/json",}}), {
[`GET /v1alpha/pipelines?filter=state=STATE_ACTIVE%20AND%20create_time%20>%20timestamp%28%222000-06-19T23:31:08.657Z%22%29 response 200`]: (r) => r.status == 200,
[`GET /v1alpha/pipelines?filter=state=STATE_ACTIVE%20AND%20create_time%20>%20timestamp%28%222000-06-19T23:31:08.657Z%22%29 response pipelines.length > 0`]: (r) => r.json().pipelines.length > 0,
});

// Get UUID for foreign resources
var srcConnUid = http.get(`${connectorHost}/v1alpha/source-connectors/source-http`, {}, {headers: {"Content-Type": "application/json"},}).json().source_connector.uid
var srcConnPermalink = `source-connectors/${srcConnUid}`

var dstConnUid = http.get(`${connectorHost}/v1alpha/destination-connectors/destination-http`, {}, {headers: {"Content-Type": "application/json"},}).json().destination_connector.uid
var dstConnPermalink = `destination-connectors/${dstConnUid}`

var modelUid = http.get(`${modelHost}/v1alpha/models/${constant.model_id}`, {}, {headers: {"Content-Type": "application/json"},}).json().model.uid
var modelInstUid = http.get(`${modelHost}/v1alpha/models/${constant.model_id}/instances/latest`, {}, {headers: {"Content-Type": "application/json"},}).json().instance.uid
var modelInstPermalink = `models/${modelUid}/instances/${modelInstUid}`

check(http.request("GET", `${pipelineHost}/v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22`, null, {headers: {"Content-Type": "application/json",}}), {
[`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22 response 200`]: (r) => r.status == 200,
[`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22 response pipelines.length > 0`]: (r) => r.json().pipelines.length > 0,
});

check(http.request("GET", `${pipelineHost}/v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22%20AND%20recipe.model_instances:%22${modelInstPermalink}%22`, null, {headers: {"Content-Type": "application/json",}}), {
[`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22%20AND%20recipe.model_instances:%22${modelInstPermalink}%22 response 200`]: (r) => r.status == 200,
[`GET /v1alpha/pipelines?filter=mode=MODE_SYNC%20AND%20recipe.source=%22${srcConnPermalink}%22%20AND%20recipe.model_instances:%22${modelInstPermalink}%22 response pipelines.length > 0`]: (r) => r.json().pipelines.length > 0,
});

// Delete the pipelines
for (const reqBody of reqBodies) {
check(http.request(
Expand Down
29 changes: 21 additions & 8 deletions integration-test/rest.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ export let options = {
export function setup() {

group("Connector Backend API: Create a http source connector", function () {
check(http.request("POST", `${connectorHost}/v1alpha/source-connectors`,

var res = http.request("POST", `${connectorHost}/v1alpha/source-connectors`,
JSON.stringify({
"id": "source-http",
"source_connector_definition": "source-connector-definitions/source-http",
Expand All @@ -34,13 +35,16 @@ export function setup() {
}
}), {
headers: { "Content-Type": "application/json" },
}), {
})
check(res, {
"POST /v1alpha/source-connectors response status for creating directness HTTP source connector 201": (r) => r.status === 201,
})

});

group("Connector Backend API: Create a http destination connector", function () {
check(http.request("POST", `${connectorHost}/v1alpha/destination-connectors`,

var res = http.request("POST", `${connectorHost}/v1alpha/destination-connectors`,
JSON.stringify({
"id": "destination-http",
"destination_connector_definition": "destination-connector-definitions/destination-http",
Expand All @@ -49,13 +53,17 @@ export function setup() {
}
}), {
headers: { "Content-Type": "application/json" },
}), {
})

check(res, {
"POST /v1alpha/destination-connectors response status for creating directness HTTP destination connector 201": (r) => r.status === 201,
})

});

group("Connector Backend API: Create a CSV destination connector", function () {
check(http.request("POST", `${connectorHost}/v1alpha/destination-connectors`,

var res = http.request("POST", `${connectorHost}/v1alpha/destination-connectors`,
JSON.stringify({
"id": constant.dstCSVConnID,
"destination_connector_definition": "destination-connector-definitions/destination-csv",
Expand All @@ -66,9 +74,12 @@ export function setup() {
}
}), {
headers: { "Content-Type": "application/json" },
}), {
})

check(res, {
"POST /v1alpha/destination-connectors response status for creating CSV destination connector 201": (r) => r.status === 201,
})

});

group("Model Backend API: Deploy a detection model", function () {
Expand All @@ -86,11 +97,13 @@ export function setup() {
"POST /v1alpha/models:multipart task det response status": (r) => r.status === 201
});

check(http.post(`${modelHost}/v1alpha/models/${constant.model_id}/instances/latest:deploy`, {}, {
var res = http.post(`${modelHost}/v1alpha/models/${constant.model_id}/instances/latest:deploy`, {}, {
headers: {
"Content-Type": "application/json"
},
}), {
})

check(res, {
[`POST /v1alpha/models/${constant.model_id}/instances/latest:deploy online task det response status`]: (r) => r.status === 200
});

Expand Down
16 changes: 8 additions & 8 deletions pkg/datamodel/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,28 @@ type Pipeline struct {
type PipelineMode pipelinePB.Pipeline_Mode

// Scan function for custom GORM type PipelineMode
func (c *PipelineMode) Scan(value interface{}) error {
*c = PipelineMode(pipelinePB.Pipeline_Mode_value[value.(string)])
func (p *PipelineMode) Scan(value interface{}) error {
*p = PipelineMode(pipelinePB.Pipeline_Mode_value[value.(string)])
return nil
}

// Value function for custom GORM type PipelineMode
func (c PipelineMode) Value() (driver.Value, error) {
return pipelinePB.Pipeline_Mode(c).String(), nil
func (p PipelineMode) Value() (driver.Value, error) {
return pipelinePB.Pipeline_Mode(p).String(), nil
}

// PipelineState is an alias type for Protobuf enum Pipeline.State
type PipelineState pipelinePB.Pipeline_State

// Scan function for custom GORM type PipelineState
func (c *PipelineState) Scan(value interface{}) error {
*c = PipelineState(pipelinePB.Pipeline_State_value[value.(string)])
func (p *PipelineState) Scan(value interface{}) error {
*p = PipelineState(pipelinePB.Pipeline_State_value[value.(string)])
return nil
}

// Value function for custom GORM type PipelineState
func (c PipelineState) Value() (driver.Value, error) {
return pipelinePB.Pipeline_State(c).String(), nil
func (p PipelineState) Value() (driver.Value, error) {
return pipelinePB.Pipeline_State(p).String(), nil
}

// Recipe is the data model of the pipeline recipe
Expand Down
27 changes: 26 additions & 1 deletion pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gofrs/uuid"
"github.com/gogo/status"
"github.com/iancoleman/strcase"
"go.einride.tech/aip/filtering"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -113,7 +114,31 @@ func (h *handler) ListPipeline(ctx context.Context, req *pipelinePB.ListPipeline
return &pipelinePB.ListPipelineResponse{}, err
}

dbPipelines, totalSize, nextPageToken, err := h.service.ListPipeline(owner, req.GetPageSize(), req.GetPageToken(), isBasicView)
var mode pipelinePB.Pipeline_Mode
var state pipelinePB.Pipeline_State
declarations, err := filtering.NewDeclarations([]filtering.DeclarationOption{
filtering.DeclareStandardFunctions(),
filtering.DeclareFunction("time.now", filtering.NewFunctionOverload("time.now", filtering.TypeTimestamp)),
filtering.DeclareIdent("uid", filtering.TypeString),
filtering.DeclareIdent("id", filtering.TypeString),
filtering.DeclareIdent("description", filtering.TypeString),
filtering.DeclareIdent("recipe", filtering.TypeMap(filtering.TypeString, filtering.TypeString)),
filtering.DeclareEnumIdent("mode", mode.Type()),
filtering.DeclareEnumIdent("state", state.Type()),
filtering.DeclareIdent("owner", filtering.TypeString),
filtering.DeclareIdent("create_time", filtering.TypeTimestamp),
filtering.DeclareIdent("update_time", filtering.TypeTimestamp),
}...)
if err != nil {
return &pipelinePB.ListPipelineResponse{}, err
}

filter, err := filtering.ParseFilter(req, declarations)
if err != nil {
return &pipelinePB.ListPipelineResponse{}, err
}

dbPipelines, totalSize, nextPageToken, err := h.service.ListPipeline(owner, req.GetPageSize(), req.GetPageToken(), isBasicView, filter)
if err != nil {
return &pipelinePB.ListPipelineResponse{}, err
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (

"github.com/gofrs/uuid"
"github.com/jackc/pgconn"
"go.einride.tech/aip/filtering"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gorm.io/gorm"
"gorm.io/gorm/clause"

"github.com/instill-ai/pipeline-backend/pkg/datamodel"
"github.com/instill-ai/x/paginate"
Expand All @@ -23,7 +25,7 @@ const MaxPageSize = 100
// Repository interface
type Repository interface {
CreatePipeline(pipeline *datamodel.Pipeline) error
ListPipeline(owner string, pageSize int64, pageToken string, isBasicView bool) ([]datamodel.Pipeline, int64, string, error)
ListPipeline(owner string, pageSize int64, pageToken string, isBasicView bool, filter filtering.Filter) ([]datamodel.Pipeline, int64, string, error)
GetPipelineByID(id string, owner string, isBasicView bool) (*datamodel.Pipeline, error)
GetPipelineByUID(uid uuid.UUID, owner string, isBasicView bool) (*datamodel.Pipeline, error)
UpdatePipeline(id string, owner string, pipeline *datamodel.Pipeline) error
Expand Down Expand Up @@ -55,7 +57,7 @@ func (r *repository) CreatePipeline(pipeline *datamodel.Pipeline) error {
return nil
}

func (r *repository) ListPipeline(owner string, pageSize int64, pageToken string, isBasicView bool) (pipelines []datamodel.Pipeline, totalSize int64, nextPageToken string, err error) {
func (r *repository) ListPipeline(owner string, pageSize int64, pageToken string, isBasicView bool, filter filtering.Filter) (pipelines []datamodel.Pipeline, totalSize int64, nextPageToken string, err error) {

if result := r.db.Model(&datamodel.Pipeline{}).Where("owner = ?", owner).Count(&totalSize); result.Error != nil {
return nil, 0, "", status.Errorf(codes.Internal, result.Error.Error())
Expand Down Expand Up @@ -83,6 +85,12 @@ func (r *repository) ListPipeline(owner string, pageSize int64, pageToken string
queryBuilder.Omit("pipeline.recipe")
}

if expr, err := r.transpileFilter(filter); err != nil {
return nil, 0, "", status.Errorf(codes.Internal, err.Error())
} else if expr != nil {
queryBuilder.Clauses(expr)
}

var createTime time.Time
rows, err := queryBuilder.Rows()
if err != nil {
Expand Down Expand Up @@ -189,3 +197,10 @@ func (r *repository) UpdatePipelineState(id string, owner string, state datamode
}
return nil
}

// TranspileFilter transpiles a parsed AIP filter expression to GORM DB clauses
func (r *repository) transpileFilter(filter filtering.Filter) (*clause.Expr, error) {
return (&Transpiler{
filter: filter,
}).Transpile()
}
Loading

0 comments on commit ffe8856

Please sign in to comment.