From 59f0fb8f8102804ff432c565979e1cf337631bb8 Mon Sep 17 00:00:00 2001 From: Ping-Lin Chang Date: Thu, 25 Aug 2022 00:28:03 +0100 Subject: [PATCH] fix: fix async trigger block issue close #67 --- pkg/service/service.go | 131 ++++++++++++++++++++++------------------- 1 file changed, 72 insertions(+), 59 deletions(-) diff --git a/pkg/service/service.go b/pkg/service/service.go index 05d9fbb0b..59b915740 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "strings" + "sync" "time" "github.com/go-redis/redis/v9" @@ -373,83 +374,95 @@ func (s *service) TriggerPipeline(req *pipelinePB.TriggerPipelineRequest, dbPipe dataMappingIndices = append(dataMappingIndices, ulid.Make().String()) } + wg := sync.WaitGroup{} + wg.Add(1) + var modelInstOutputs []*pipelinePB.ModelInstanceOutput - for idx, modelInstance := range dbPipeline.Recipe.ModelInstances { + go func() { + for idx, modelInstance := range dbPipeline.Recipe.ModelInstances { - // TODO: async call model-backend - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - resp, err := s.modelServiceClient.TriggerModelInstance(ctx, &modelPB.TriggerModelInstanceRequest{ - Name: modelInstance, - Inputs: inputs, - }) - if err != nil { - return nil, status.Errorf(codes.Internal, "[model-backend] Error %s at %dth model instance %s: %v", "TriggerModel", idx, modelInstance, err.Error()) - } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + resp, err := s.modelServiceClient.TriggerModelInstance(ctx, &modelPB.TriggerModelInstanceRequest{ + Name: modelInstance, + Inputs: inputs, + }) + if err != nil { + logger.Error(fmt.Sprintf("[model-backend] Error %s at %dth model instance %s: %v", "TriggerModel", idx, modelInstance, err.Error())) + } - batchOutputs := cvtModelBatchOutputToPipelineBatchOutput(resp.BatchOutputs) - for idx, batchOutput := range batchOutputs { - batchOutput.Index = dataMappingIndices[idx] - } + batchOutputs := cvtModelBatchOutputToPipelineBatchOutput(resp.BatchOutputs) + for idx, batchOutput := range batchOutputs { + batchOutput.Index = dataMappingIndices[idx] + } - modelInstOutputs = append(modelInstOutputs, &pipelinePB.ModelInstanceOutput{ - ModelInstance: modelInstance, - Task: resp.Task, - BatchOutputs: batchOutputs, - }) + modelInstOutputs = append(modelInstOutputs, &pipelinePB.ModelInstanceOutput{ + ModelInstance: modelInstance, + Task: resp.Task, + BatchOutputs: batchOutputs, + }) - // Increment trigger image numbers - uid, err := resource.GetPermalinkUID(dbPipeline.Owner) - if err != nil { - return nil, err - } - if strings.HasPrefix(dbPipeline.Owner, "users/") { - s.redisClient.IncrBy(context.Background(), fmt.Sprintf("user:%s:trigger.image.num", uid), int64(len(inputs))) - } else if strings.HasPrefix(dbPipeline.Owner, "orgs/") { - s.redisClient.IncrBy(context.Background(), fmt.Sprintf("org:%s:trigger.image.num", uid), int64(len(inputs))) + // Increment trigger image numbers + uid, err := resource.GetPermalinkUID(dbPipeline.Owner) + if err != nil { + logger.Error(err.Error()) + } + if strings.HasPrefix(dbPipeline.Owner, "users/") { + s.redisClient.IncrBy(context.Background(), fmt.Sprintf("user:%s:trigger.image.num", uid), int64(len(inputs))) + } else if strings.HasPrefix(dbPipeline.Owner, "orgs/") { + s.redisClient.IncrBy(context.Background(), fmt.Sprintf("org:%s:trigger.image.num", uid), int64(len(inputs))) + } } - } + wg.Done() + }() switch { // If this is a SYNC trigger (i.e., HTTP, gRPC source and destination connectors), return right away case dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_SYNC): + wg.Wait() return &pipelinePB.TriggerPipelineResponse{ DataMappingIndices: dataMappingIndices, ModelInstanceOutputs: modelInstOutputs, }, nil // If this is a async trigger, write to the destination connector case dbPipeline.Mode == datamodel.PipelineMode(pipelinePB.Pipeline_MODE_ASYNC): - for idx, modelInstRecName := range dbPipeline.Recipe.ModelInstances { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - _, err = s.connectorServiceClient.WriteDestinationConnector(ctx, &connectorPB.WriteDestinationConnectorRequest{ - Name: dbPipeline.Recipe.Destination, - SyncMode: connectorPB.SupportedSyncModes_SUPPORTED_SYNC_MODES_FULL_REFRESH, - DestinationSyncMode: connectorPB.SupportedDestinationSyncModes_SUPPORTED_DESTINATION_SYNC_MODES_APPEND, - Pipeline: fmt.Sprintf("pipelines/%s", dbPipeline.ID), - DataMappingIndices: dataMappingIndices, - ModelInstanceOutputs: modelInstOutputs, - Recipe: func() *pipelinePB.Recipe { - if dbPipeline.Recipe != nil { - b, err := json.Marshal(dbPipeline.Recipe) - if err != nil { - logger.Error(err.Error()) + go func() { + wg.Wait() + for idx, modelInstRecName := range dbPipeline.Recipe.ModelInstances { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = s.connectorServiceClient.WriteDestinationConnector(ctx, &connectorPB.WriteDestinationConnectorRequest{ + Name: dbPipeline.Recipe.Destination, + SyncMode: connectorPB.SupportedSyncModes_SUPPORTED_SYNC_MODES_FULL_REFRESH, + DestinationSyncMode: connectorPB.SupportedDestinationSyncModes_SUPPORTED_DESTINATION_SYNC_MODES_APPEND, + Pipeline: fmt.Sprintf("pipelines/%s", dbPipeline.ID), + DataMappingIndices: dataMappingIndices, + ModelInstanceOutputs: modelInstOutputs, + Recipe: func() *pipelinePB.Recipe { + if dbPipeline.Recipe != nil { + b, err := json.Marshal(dbPipeline.Recipe) + if err != nil { + logger.Error(err.Error()) + } + pbRecipe := pipelinePB.Recipe{} + err = json.Unmarshal(b, &pbRecipe) + if err != nil { + logger.Error(err.Error()) + } + return &pbRecipe } - pbRecipe := pipelinePB.Recipe{} - err = json.Unmarshal(b, &pbRecipe) - if err != nil { - logger.Error(err.Error()) - } - return &pbRecipe - } - return nil - }(), - }) - if err != nil { - return nil, status.Errorf(codes.Internal, "[connector-backend] Error %s at %dth model instance %s: %v", "WriteDestinationConnector", idx, modelInstRecName, err.Error()) + return nil + }(), + }) + if err != nil { + logger.Error(fmt.Sprintf("[connector-backend] Error %s at %dth model instance %s: %v", "WriteDestinationConnector", idx, modelInstRecName, err.Error())) + } } - } - return &pipelinePB.TriggerPipelineResponse{}, nil + }() + return &pipelinePB.TriggerPipelineResponse{ + DataMappingIndices: dataMappingIndices, + ModelInstanceOutputs: nil, + }, nil } return nil, status.Errorf(codes.Internal, "something went very wrong - unable to trigger the pipeline")