Skip to content

Commit

Permalink
fix: complete trigger async binary file route
Browse files Browse the repository at this point in the history
  • Loading branch information
pinglin committed Jul 17, 2022
1 parent 3a86f3a commit 5003e5c
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,13 @@ func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLeng

dbPipeline.Owner = ownerPermalink

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var outputs []*structpb.Struct
for idx, modelInst := range dbPipeline.Recipe.ModelInstances {

// TODO: async call model-backend
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

stream, err := s.modelServiceClient.TriggerModelInstanceBinaryFileUpload(ctx)
defer func() {
_ = stream.CloseSend()
Expand Down Expand Up @@ -512,8 +512,26 @@ func (s *service) TriggerPipelineBinaryFileUpload(fileBuf bytes.Buffer, fileLeng
Output: outputs,
}, nil
case dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_ASYNC):
return nil, nil
}
for idx, modelInstRecName := range dbPipeline.Recipe.ModelInstances {

modelInstResp, err := s.modelServiceClient.GetModelInstance(ctx, &modelPB.GetModelInstanceRequest{
Name: modelInstRecName,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "[model-backend] Error %s at %dth model instance %s: %v", "GetModelInstance", idx, modelInstRecName, err.Error())
}

return nil, nil
_, err = s.connectorServiceClient.WriteDestinationConnector(ctx, &connectorPB.WriteDestinationConnectorRequest{
Name: dbPipeline.Recipe.Destination,
Task: modelInstResp.Instance.GetTask(),
Data: outputs[idx],
})
if err != nil {
return nil, status.Errorf(codes.Internal, "[connector-backend] Error %s at %dth model instance %s: %v", "WriteDestinationConnector", idx, modelInstRecName, err.Error())
}
}
return &pipelinePB.TriggerPipelineBinaryFileUploadResponse{}, nil
default:
return &pipelinePB.TriggerPipelineBinaryFileUploadResponse{}, nil
}
}

0 comments on commit 5003e5c

Please sign in to comment.