Skip to content

Commit

Permalink
feat: add data association with dst conn
Browse files Browse the repository at this point in the history
  • Loading branch information
pinglin committed Jul 29, 2022
1 parent 5f4392d commit 9233429
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 53 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3
github.com/iancoleman/strcase v0.2.0
github.com/instill-ai/protogen-go v0.2.1-alpha.0.20220718232229-94fd0d8bc0e3
github.com/instill-ai/protogen-go v0.2.1-alpha.0.20220722230100-1acc25ec33ca
github.com/instill-ai/usage-client v0.1.1-alpha
github.com/instill-ai/x v0.1.0-alpha.0.20220706215306-bceeac65f523
github.com/knadh/koanf v1.4.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,8 @@ github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/instill-ai/protogen-go v0.2.1-alpha.0.20220718232229-94fd0d8bc0e3 h1:+3ifykY8EXAgvJOKn6gVbFDwDTBbgtmlY462R+c5iuM=
github.com/instill-ai/protogen-go v0.2.1-alpha.0.20220718232229-94fd0d8bc0e3/go.mod h1:d9ebEdwMX2Las4OScym45qbQM+xcBQITqvq/8anTVas=
github.com/instill-ai/protogen-go v0.2.1-alpha.0.20220722230100-1acc25ec33ca h1:0LVcRVHuH+S9DOLIqevb2SN+931AnMIwhAES+0UXuho=
github.com/instill-ai/protogen-go v0.2.1-alpha.0.20220722230100-1acc25ec33ca/go.mod h1:d9ebEdwMX2Las4OScym45qbQM+xcBQITqvq/8anTVas=
github.com/instill-ai/usage-client v0.1.1-alpha h1:TG1jH82GHjPqmjogQ4I1d8KFY5cBzrskBt9stoARWnU=
github.com/instill-ai/usage-client v0.1.1-alpha/go.mod h1:Vi+RgL2YNT+hfztD33JzqFl/Y7/SsV+NpWGIjUgig3s=
github.com/instill-ai/x v0.1.0-alpha.0.20220706215306-bceeac65f523 h1:HsZW2VWEnPhxitcyJEGbuQ9vi2LVsSEA8ezPIkp4VQs=
Expand Down
16 changes: 9 additions & 7 deletions integration-test/rest-trigger-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export function CheckTriggerAsyncSingleImageSingleModelInst() {
});

const fd = new FormData();
fd.append("file", http.file(constant.dogImg));
fd.append("file", {data: http.file(constant.dogImg), filename: "dog"});
check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger-multipart`, fd.body(), {
headers: {
"Content-Type": `multipart/form-data; boundary=${fd.boundary}`,
Expand Down Expand Up @@ -145,9 +145,10 @@ export function CheckTriggerAsyncMultiImageSingleModelInst() {
});

const fd = new FormData();
fd.append("file", http.file(constant.dogImg));
fd.append("file", http.file(constant.dogImg));
fd.append("file", http.file(constant.dogImg));
fd.append("file", {data: http.file(constant.dogImg), filename: "dog"});
fd.append("file", {data: http.file(constant.catImg), filename: "cat"});
fd.append("file", {data: http.file(constant.bearImg), filename: "bear"});
fd.append("file", {data: http.file(constant.dogRGBAImg), filename: "dogRGBA"});
check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger-multipart`, fd.body(), {
headers: {
"Content-Type": `multipart/form-data; boundary=${fd.boundary}`,
Expand Down Expand Up @@ -232,9 +233,10 @@ export function CheckTriggerAsyncMultiImageMultiModelInst() {
});

const fd = new FormData();
fd.append("file", http.file(constant.dogImg));
fd.append("file", http.file(constant.dogImg));
fd.append("file", http.file(constant.dogImg));
fd.append("file", {data: http.file(constant.dogImg), filename: "dog"});
fd.append("file", {data: http.file(constant.catImg), filename: "cat"});
fd.append("file", {data: http.file(constant.bearImg), filename: "bear"});
fd.append("file", {data: http.file(constant.dogRGBAImg), filename: "dogRGBA"});
check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger-multipart`, fd.body(), {
headers: {
"Content-Type": `multipart/form-data; boundary=${fd.boundary}`,
Expand Down
18 changes: 9 additions & 9 deletions integration-test/rest-trigger-sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export function CheckTriggerSyncSingleImageSingleModelInst() {
});

const fd = new FormData();
fd.append("file", http.file(constant.dogImg));
fd.append("file", {data: http.file(constant.dogImg), filename: "dog"});
check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqHTTP.id}:trigger-multipart`, fd.body(), {
headers: {
"Content-Type": `multipart/form-data; boundary=${fd.boundary}`,
Expand Down Expand Up @@ -211,10 +211,10 @@ export function CheckTriggerSyncMultiImageSingleModelInst() {
});

const fd = new FormData();
fd.append("file", http.file(constant.dogImg));
fd.append("file", http.file(constant.catImg));
fd.append("file", http.file(constant.bearImg));
fd.append("file", http.file(constant.dogRGBAImg));
fd.append("file", {data: http.file(constant.dogImg), filename: "dog"});
fd.append("file", {data: http.file(constant.catImg), filename: "cat"});
fd.append("file", {data: http.file(constant.bearImg), filename: "bear"});
fd.append("file", {data: http.file(constant.dogRGBAImg), filename: "dogRGBA"});
check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger-multipart`, fd.body(), {
headers: {
"Content-Type": `multipart/form-data; boundary=${fd.boundary}`,
Expand Down Expand Up @@ -305,10 +305,10 @@ export function CheckTriggerSyncMultiImageMultiModelInst() {
});

const fd = new FormData();
fd.append("file", http.file(constant.dogImg));
fd.append("file", http.file(constant.catImg));
fd.append("file", http.file(constant.bearImg));
fd.append("file", http.file(constant.dogRGBAImg));
fd.append("file", {data: http.file(constant.dogImg), filename: "dog"});
fd.append("file", {data: http.file(constant.catImg), filename: "cat"});
fd.append("file", {data: http.file(constant.bearImg), filename: "bear"});
fd.append("file", {data: http.file(constant.dogRGBAImg), filename: "dogRGBA"});
check(http.request("POST", `${pipelineHost}/v1alpha/pipelines/${reqBody.id}:trigger-multipart`, fd.body(), {
headers: {
"Content-Type": `multipart/form-data; boundary=${fd.boundary}`,
Expand Down
12 changes: 12 additions & 0 deletions integration-test/rest.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ export function setup() {
"POST /v1alpha/destination-connectors response status for creating CSV destination connector 201": (r) => r.status === 201,
})

// Check connector state being updated in 120 secs
let currentTime = new Date().getTime();
let timeoutTime = new Date().getTime() + 120000;
while (timeoutTime > currentTime) {
var res = http.request("GET", `${connectorHost}/v1alpha/destination-connectors/${constant.dstCSVConnID}`)
if (res.json().destination_connector.connector.state === "STATE_CONNECTED") {
break
}
sleep(1)
currentTime = new Date().getTime();
}

});

group("Model Backend API: Deploy a detection model", function () {
Expand Down
30 changes: 12 additions & 18 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,20 +443,12 @@ func (h *handler) TriggerPipeline(ctx context.Context, req *pipelinePB.TriggerPi
}
}

triggerModelResp, err := h.service.TriggerPipeline(req, dbPipeline)
resp, err := h.service.TriggerPipeline(req, dbPipeline)
if err != nil {
return &pipelinePB.TriggerPipelineResponse{}, err
}

if triggerModelResp == nil {
return &pipelinePB.TriggerPipelineResponse{}, nil
}

resp := pipelinePB.TriggerPipelineResponse{
Output: triggerModelResp.Output,
}

return &resp, nil
return resp, nil

}

Expand Down Expand Up @@ -489,30 +481,32 @@ func (h *handler) TriggerPipelineBinaryFileUpload(stream pipelinePB.PipelineServ
}

// Read chuck
var fileNames []string
var fileLengths []uint64
buf := bytes.Buffer{}
content := bytes.Buffer{}
for {
data, err := stream.Recv()
if len(fileLengths) == 0 {
fileLengths = data.GetFileLengths()
}
if err != nil {
if err == io.EOF {
break
}

return status.Errorf(codes.Internal, "failed unexpectedly while reading chunks from stream: %s", err.Error())
}
if len(fileNames) == 0 {
fileNames = data.GetFileNames()
}
if len(fileLengths) == 0 {
fileLengths = data.GetFileLengths()
}
if data.Content == nil {
continue
}

if _, err := buf.Write(data.Content); err != nil {
if _, err := content.Write(data.Content); err != nil {
return status.Errorf(codes.Internal, "failed unexpectedly while reading chunks from stream: %s", err.Error())
}
}

obj, err := h.service.TriggerPipelineBinaryFileUpload(buf, fileLengths, dbPipeline)
obj, err := h.service.TriggerPipelineBinaryFileUpload(content, fileNames, fileLengths, dbPipeline)
if err != nil {
return err
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/handler/handlercustom.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func HandleTriggerPipelineBinaryFileUpload(w http.ResponseWriter, req *http.Requ
return
}

fileBytes, fileLengths, err := parseImageFormDataInputsToBytes(req)
content, fileNames, fileLengths, err := parseImageFormDataInputsToBytes(req)
if err != nil {
st := sterr.CreateErrorPreconditionFailure(
"[handler] error while reading file from request",
Expand All @@ -103,7 +103,7 @@ func HandleTriggerPipelineBinaryFileUpload(w http.ResponseWriter, req *http.Requ
}

var obj interface{}
if obj, err = service.TriggerPipelineBinaryFileUpload(*bytes.NewBuffer(fileBytes), fileLengths, dbPipeline); err != nil {
if obj, err = service.TriggerPipelineBinaryFileUpload(*bytes.NewBuffer(content), fileNames, fileLengths, dbPipeline); err != nil {
// TODO: return ResourceInfo error
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand All @@ -125,38 +125,39 @@ func HandleTriggerPipelineBinaryFileUpload(w http.ResponseWriter, req *http.Requ
}
}

func parseImageFormDataInputsToBytes(req *http.Request) (fileBytes []byte, fileLengths []uint64, err error) {
func parseImageFormDataInputsToBytes(req *http.Request) (content []byte, fileNames []string, fileLengths []uint64, err error) {
inputs := req.MultipartForm.File["file"]
var file multipart.File
for _, content := range inputs {
file, err = content.Open()
for _, input := range inputs {
file, err = input.Open()
defer func() {
err = file.Close()
}()

if err != nil {
return nil, nil, fmt.Errorf("Unable to open file for image")
return nil, nil, nil, fmt.Errorf("Unable to open file for image")
}

buff := new(bytes.Buffer)
numBytes, err := buff.ReadFrom(file)
if err != nil {
return nil, nil, fmt.Errorf("Unable to read content body from image")
return nil, nil, nil, fmt.Errorf("Unable to read content body from image")
}

if numBytes > int64(constant.MaxImageSizeBytes) {
return nil, nil, fmt.Errorf(
return nil, nil, nil, fmt.Errorf(
"Image size must be smaller than %vMB. Got %vMB",
float32(constant.MaxImageSizeBytes)/float32(constant.MB),
float32(numBytes)/float32(constant.MB),
)
}

fileBytes = append(fileBytes, buff.Bytes()...)
content = append(content, buff.Bytes()...)
fileNames = append(fileNames, input.Filename)
fileLengths = append(fileLengths, uint64(buff.Len()))
}

return fileBytes, fileLengths, nil
return content, fileNames, fileLengths, nil
}

func errorResponse(w http.ResponseWriter, s *status.Status) {
Expand Down
24 changes: 18 additions & 6 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -36,7 +37,7 @@ type Service interface {
UpdatePipelineState(id string, ownerRscName string, state datamodel.PipelineState) (*datamodel.Pipeline, error)
UpdatePipelineID(id string, ownerRscName string, newID string) (*datamodel.Pipeline, error)
TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, pipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineResponse, error)
TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLengths []uint64, pipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineBinaryFileUploadResponse, error)
TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileNames []string, fileLengths []uint64, pipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineBinaryFileUploadResponse, error)
}

type service struct {
Expand Down Expand Up @@ -405,6 +406,11 @@ func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipe
// If this is a async trigger, write to the destination connector
case dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_ASYNC):

var indices []string
for idx := range req.Inputs {
indices = append(indices, fmt.Sprintf("%s-%d", strconv.Itoa(idx), time.Now().UnixNano()))
}

for idx, modelInstRecName := range dbPipeline.Recipe.ModelInstances {

modelInstResp, err := s.modelServiceClient.GetModelInstance(ctx, &modelPB.GetModelInstanceRequest{
Expand All @@ -415,9 +421,13 @@ func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipe
}

_, err = s.connectorServiceClient.WriteDestinationConnector(ctx, &connectorPB.WriteDestinationConnectorRequest{
Name: dbPipeline.Recipe.Destination,
Task: modelInstResp.Instance.GetTask(),
Data: outputs[idx],
Name: dbPipeline.Recipe.Destination,
Task: modelInstResp.Instance.GetTask(),
SyncMode: connectorPB.SupportedSyncModes_SUPPORTED_SYNC_MODES_FULL_REFRESH,
DestinationSyncMode: connectorPB.SupportedDestinationSyncModes_SUPPORTED_DESTINATION_SYNC_MODES_APPEND,
ModelInstance: modelInstRecName,
Indices: indices,
Data: outputs[idx],
})
if err != nil {
return nil, status.Errorf(codes.Internal, "[connector-backend] Error %s at %dth model instance %s: %v", "WriteDestinationConnector", idx, modelInstRecName, err.Error())
Expand All @@ -430,7 +440,7 @@ func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipe

}

func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLengths []uint64, dbPipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineBinaryFileUploadResponse, error) {
func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileNames []string, fileLengths []uint64, dbPipeline *datamodel.Pipeline) (*pipelinePB.TriggerPipelineBinaryFileUploadResponse, error) {

if dbPipeline.State != datamodel.PipelineState(pipelinePB.Pipeline_STATE_ACTIVE) {
return nil, status.Error(codes.FailedPrecondition, fmt.Sprintf("The pipeline %s is not active", dbPipeline.ID))
Expand Down Expand Up @@ -503,8 +513,8 @@ func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLeng
}
}

// Check if this is a direct trigger (i.e., HTTP, gRPC source and destination connectors)
switch {
// Check if this is a sync trigger (i.e., HTTP, gRPC source and destination connectors)
case dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC):
return &pipelinePB.TriggerPipelineBinaryFileUploadResponse{
Output: outputs,
Expand All @@ -524,6 +534,8 @@ func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLeng
Task: modelInstResp.Instance.GetTask(),
SyncMode: connectorPB.SupportedSyncModes_SUPPORTED_SYNC_MODES_FULL_REFRESH,
DestinationSyncMode: connectorPB.SupportedDestinationSyncModes_SUPPORTED_DESTINATION_SYNC_MODES_APPEND,
ModelInstance: modelInstRecName,
Indices: fileNames,
Data: outputs[idx],
})
if err != nil {
Expand Down

0 comments on commit 9233429

Please sign in to comment.