Skip to content

Commit

Permalink
fix: fix async trigger block issue
Browse files Browse the repository at this point in the history
close #67
  • Loading branch information
pinglin committed Aug 24, 2022
1 parent 1f4bfc8 commit 59f0fb8
Showing 1 changed file with 72 additions and 59 deletions.
131 changes: 72 additions & 59 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/go-redis/redis/v9"
Expand Down Expand Up @@ -373,83 +374,95 @@ func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipe
dataMappingIndices = append(dataMappingIndices, ulid.Make().String())
}

wg := sync.WaitGroup{}
wg.Add(1)

var modelInstOutputs []*pipelinePB.ModelInstanceOutput
for idx, modelInstance := range dbPipeline.Recipe.ModelInstances {
go func() {
for idx, modelInstance := range dbPipeline.Recipe.ModelInstances {

// TODO: async call model-backend
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := s.modelServiceClient.TriggerModelInstance(ctx, &modelPB.TriggerModelInstanceRequest{
Name: modelInstance,
Inputs: inputs,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "[model-backend] Error %s at %dth model instance %s: %v", "TriggerModel", idx, modelInstance, err.Error())
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := s.modelServiceClient.TriggerModelInstance(ctx, &modelPB.TriggerModelInstanceRequest{
Name: modelInstance,
Inputs: inputs,
})
if err != nil {
logger.Error(fmt.Sprintf("[model-backend] Error %s at %dth model instance %s: %v", "TriggerModel", idx, modelInstance, err.Error()))
}

batchOutputs := cvtModelBatchOutputToPipelineBatchOutput(resp.BatchOutputs)
for idx, batchOutput := range batchOutputs {
batchOutput.Index = dataMappingIndices[idx]
}
batchOutputs := cvtModelBatchOutputToPipelineBatchOutput(resp.BatchOutputs)
for idx, batchOutput := range batchOutputs {
batchOutput.Index = dataMappingIndices[idx]
}

modelInstOutputs = append(modelInstOutputs, &pipelinePB.ModelInstanceOutput{
ModelInstance: modelInstance,
Task: resp.Task,
BatchOutputs: batchOutputs,
})
modelInstOutputs = append(modelInstOutputs, &pipelinePB.ModelInstanceOutput{
ModelInstance: modelInstance,
Task: resp.Task,
BatchOutputs: batchOutputs,
})

// Increment trigger image numbers
uid, err := resource.GetPermalinkUID(dbPipeline.Owner)
if err != nil {
return nil, err
}
if strings.HasPrefix(dbPipeline.Owner, "users/") {
s.redisClient.IncrBy(context.Background(), fmt.Sprintf("user:%s:trigger.image.num", uid), int64(len(inputs)))
} else if strings.HasPrefix(dbPipeline.Owner, "orgs/") {
s.redisClient.IncrBy(context.Background(), fmt.Sprintf("org:%s:trigger.image.num", uid), int64(len(inputs)))
// Increment trigger image numbers
uid, err := resource.GetPermalinkUID(dbPipeline.Owner)
if err != nil {
logger.Error(err.Error())
}
if strings.HasPrefix(dbPipeline.Owner, "users/") {
s.redisClient.IncrBy(context.Background(), fmt.Sprintf("user:%s:trigger.image.num", uid), int64(len(inputs)))
} else if strings.HasPrefix(dbPipeline.Owner, "orgs/") {
s.redisClient.IncrBy(context.Background(), fmt.Sprintf("org:%s:trigger.image.num", uid), int64(len(inputs)))
}
}
}
wg.Done()
}()

switch {
// If this is a SYNC trigger (i.e., HTTP, gRPC source and destination connectors), return right away
case dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC):
wg.Wait()
return &pipelinePB.TriggerPipelineResponse{
DataMappingIndices: dataMappingIndices,
ModelInstanceOutputs: modelInstOutputs,
}, nil
// If this is a async trigger, write to the destination connector
case dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_ASYNC):
for idx, modelInstRecName := range dbPipeline.Recipe.ModelInstances {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = s.connectorServiceClient.WriteDestinationConnector(ctx, &connectorPB.WriteDestinationConnectorRequest{
Name: dbPipeline.Recipe.Destination,
SyncMode: connectorPB.SupportedSyncModes_SUPPORTED_SYNC_MODES_FULL_REFRESH,
DestinationSyncMode: connectorPB.SupportedDestinationSyncModes_SUPPORTED_DESTINATION_SYNC_MODES_APPEND,
Pipeline: fmt.Sprintf("pipelines/%s", dbPipeline.ID),
DataMappingIndices: dataMappingIndices,
ModelInstanceOutputs: modelInstOutputs,
Recipe: func() *pipelinePB.Recipe {
if dbPipeline.Recipe != nil {
b, err := json.Marshal(dbPipeline.Recipe)
if err != nil {
logger.Error(err.Error())
go func() {
wg.Wait()
for idx, modelInstRecName := range dbPipeline.Recipe.ModelInstances {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = s.connectorServiceClient.WriteDestinationConnector(ctx, &connectorPB.WriteDestinationConnectorRequest{
Name: dbPipeline.Recipe.Destination,
SyncMode: connectorPB.SupportedSyncModes_SUPPORTED_SYNC_MODES_FULL_REFRESH,
DestinationSyncMode: connectorPB.SupportedDestinationSyncModes_SUPPORTED_DESTINATION_SYNC_MODES_APPEND,
Pipeline: fmt.Sprintf("pipelines/%s", dbPipeline.ID),
DataMappingIndices: dataMappingIndices,
ModelInstanceOutputs: modelInstOutputs,
Recipe: func() *pipelinePB.Recipe {
if dbPipeline.Recipe != nil {
b, err := json.Marshal(dbPipeline.Recipe)
if err != nil {
logger.Error(err.Error())
}
pbRecipe := pipelinePB.Recipe{}
err = json.Unmarshal(b, &pbRecipe)
if err != nil {
logger.Error(err.Error())
}
return &pbRecipe
}
pbRecipe := pipelinePB.Recipe{}
err = json.Unmarshal(b, &pbRecipe)
if err != nil {
logger.Error(err.Error())
}
return &pbRecipe
}
return nil
}(),
})
if err != nil {
return nil, status.Errorf(codes.Internal, "[connector-backend] Error %s at %dth model instance %s: %v", "WriteDestinationConnector", idx, modelInstRecName, err.Error())
return nil
}(),
})
if err != nil {
logger.Error(fmt.Sprintf("[connector-backend] Error %s at %dth model instance %s: %v", "WriteDestinationConnector", idx, modelInstRecName, err.Error()))
}
}
}
return &pipelinePB.TriggerPipelineResponse{}, nil
}()
return &pipelinePB.TriggerPipelineResponse{
DataMappingIndices: dataMappingIndices,
ModelInstanceOutputs: nil,
}, nil
}

return nil, status.Errorf(codes.Internal, "something went very wrong - unable to trigger the pipeline")
Expand Down

0 comments on commit 59f0fb8

Please sign in to comment.