Skip to content

Commit

Permalink
fix: fix multi-region connection problem for Instill Model connector (#…
Browse files Browse the repository at this point in the history
…439)

Because

- The Instill Model connector can't work properly in internal mode since
the pods in different region clusters can't connect to each other, we
decided to use external IP and bypass the user authorization.

This commit

- Fixes the multi-region connection problem by bypassing the user
authorization header.
  • Loading branch information
donch1989 authored Mar 29, 2024
1 parent 828bfb5 commit a02add6
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 13 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ dev: ## Run dev container
@docker run -d --rm \
-e DOCKER_HOST=tcp://${SOCAT_HOST}:${SOCAT_PORT} \
-v $(PWD):/${SERVICE_NAME} \
-v $(PWD)/../go.work:/go.work \
-v $(PWD)/../go.work.sum:/go.work.sum \
-v $(PWD)/../component:/component \
-v vdp:/vdp \
-v airbyte:/airbyte \
-p ${SERVICE_PORT}:${SERVICE_PORT} \
--network instill-network \
--name ${SERVICE_NAME} \
instill/${SERVICE_NAME}:dev >/dev/null 2>&1
instill/${SERVICE_NAME}:dev

.PHONY: logs
logs: ## Tail container logs with -n 10
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/iancoleman/strcase v0.2.0
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/instill-ai/component v0.14.0-beta
github.com/instill-ai/component v0.14.0-beta.0.20240329171945-591a6a20233f
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240308151517-4b0523c184d1
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a
github.com/instill-ai/x v0.4.0-alpha
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1191,8 +1191,8 @@ github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/instill-ai/component v0.14.0-beta h1:zk3NHDOtyZoIE6/Y3hZX1RaQe/fcxTFYciT9NGbFHA4=
github.com/instill-ai/component v0.14.0-beta/go.mod h1:sP9gwuSTsEjHGKgur3fDyA33MSpyBmE90aS04HfcZt8=
github.com/instill-ai/component v0.14.0-beta.0.20240329171945-591a6a20233f h1:Hrf4yF/tG0XXOU92xtoNqRxLB8MIl/IpIDEey7gZMGw=
github.com/instill-ai/component v0.14.0-beta.0.20240329171945-591a6a20233f/go.mod h1:sP9gwuSTsEjHGKgur3fDyA33MSpyBmE90aS04HfcZt8=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240308151517-4b0523c184d1 h1:8bhIcJZcUMKvZas2L0uyaVt/V+Tzw0OSR8GtdcFflMo=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20240308151517-4b0523c184d1/go.mod h1:jhEL0SauySMoPLVvx105DWyThju9sYTbsXIySVCArmM=
github.com/instill-ai/usage-client v0.2.4-alpha.0.20240123081026-6c78d9a5197a h1:gmy8BcCFDZQan40c/D3f62DwTYtlCwi0VrSax+pKffw=
Expand Down
2 changes: 2 additions & 0 deletions pkg/service/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (s *service) includeConnectorComponentDetail(ctx context.Context, comp *pip
str := structpb.Struct{}
_ = str.UnmarshalJSON(conn.Configuration)
// TODO: optimize this
str.Fields["header_authorization"] = structpb.NewStringValue(resource.GetRequestSingleHeader(ctx, "authorization"))
str.Fields["instill_user_uid"] = structpb.NewStringValue(resource.GetRequestSingleHeader(ctx, constant.HeaderUserUIDKey))
str.Fields["instill_model_backend"] = structpb.NewStringValue(fmt.Sprintf("%s:%d", config.Config.ModelBackend.Host, config.Config.ModelBackend.PublicPort))
str.Fields["instill_mgmt_backend"] = structpb.NewStringValue(fmt.Sprintf("%s:%d", config.Config.MgmtBackend.Host, config.Config.MgmtBackend.PublicPort))
Expand Down Expand Up @@ -1199,6 +1200,7 @@ func (s *service) convertDatamodelToProto(
str := proto.Clone(pbConnector.Configuration).(*structpb.Struct)
// TODO: optimize this
if str.Fields != nil {
str.Fields["header_authorization"] = structpb.NewStringValue(resource.GetRequestSingleHeader(ctx, "authorization"))
str.Fields["instill_user_uid"] = structpb.NewStringValue(uuid.FromStringOrNil(strings.Split(dbConnector.Owner, "/")[1]).String())
str.Fields["instill_model_backend"] = structpb.NewStringValue(fmt.Sprintf("%s:%d", config.Config.ModelBackend.Host, config.Config.ModelBackend.PublicPort))
str.Fields["instill_mgmt_backend"] = structpb.NewStringValue(fmt.Sprintf("%s:%d", config.Config.MgmtBackend.Host, config.Config.MgmtBackend.PublicPort))
Expand Down
31 changes: 31 additions & 0 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,21 @@ func (s *service) triggerPipeline(
inputBlobRedisKeys = append(inputBlobRedisKeys, inputBlobRedisKey)
defer s.redisClient.Del(ctx, inputBlobRedisKey)
}

m := worker.WorkflowMeta{
HeaderAuthorization: resource.GetRequestSingleHeader(ctx, "authorization"),
}
b, _ := json.Marshal(m)
metadataRedisKey := fmt.Sprintf("async_pipeline_metadata:%s", pipelineTriggerID)

s.redisClient.Set(
ctx,
metadataRedisKey,
b,
time.Duration(config.Config.Server.Workflow.MaxWorkflowTimeout)*time.Second,
)
defer s.redisClient.Del(ctx, metadataRedisKey)

memo := map[string]any{}
memo["number_of_data"] = len(inputBlobRedisKeys)

Expand Down Expand Up @@ -1363,6 +1378,7 @@ func (s *service) triggerPipeline(
UserUID: userUID,
ReturnTraces: returnTraces,
Mode: mgmtPB.Mode_MODE_SYNC,
MetadataRedisKey: metadataRedisKey,
})
if err != nil {
logger.Error(fmt.Sprintf("unable to execute workflow: %s", err.Error()))
Expand Down Expand Up @@ -1433,6 +1449,20 @@ func (s *service) triggerAsyncPipeline(
)
inputBlobRedisKeys = append(inputBlobRedisKeys, inputBlobRedisKey)
}

m := worker.WorkflowMeta{
HeaderAuthorization: resource.GetRequestSingleHeader(ctx, "authorization"),
}
b, _ := json.Marshal(m)
metadataRedisKey := fmt.Sprintf("async_pipeline_metadata:%s", pipelineTriggerID)

s.redisClient.Set(
ctx,
metadataRedisKey,
b,
time.Duration(config.Config.Server.Workflow.MaxWorkflowTimeout)*time.Second,
)

memo := map[string]any{}
memo["number_of_data"] = len(inputBlobRedisKeys)

Expand Down Expand Up @@ -1462,6 +1492,7 @@ func (s *service) triggerAsyncPipeline(
UserUID: userUID,
ReturnTraces: returnTraces,
Mode: mgmtPB.Mode_MODE_ASYNC,
MetadataRedisKey: metadataRedisKey,
})
if err != nil {
logger.Error(fmt.Sprintf("unable to execute workflow: %s", err.Error()))
Expand Down
35 changes: 26 additions & 9 deletions pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type TriggerPipelineWorkflowRequest struct {
UserUID uuid.UUID
ReturnTraces bool
Mode mgmtPB.Mode
MetadataRedisKey string
}

type TriggerPipelineWorkflowResponse struct {
Expand All @@ -58,6 +59,7 @@ type ExecuteDAGActivityRequest struct {
MemoryBlobRedisKeys []string
OwnerPermalink string
UserUID uuid.UUID
MetadataRedisKey string
}
type ExecuteDAGActivityResponse struct {
MemoryBlobRedisKeys []string
Expand Down Expand Up @@ -102,6 +104,7 @@ type ExecuteConnectorActivityRequest struct {
PipelineMetadata PipelineMetadataStruct
Task string
UserUID uuid.UUID
MetadataRedisKey string
}

// ExecuteConnectorActivityRequest represents the parameters for TriggerActivity
Expand All @@ -112,6 +115,7 @@ type ExecuteOperatorActivityRequest struct {
PipelineMetadata PipelineMetadataStruct
Task string
UserUID uuid.UUID
MetadataRedisKey string
}

type ExecuteActivityResponse struct {
Expand All @@ -122,6 +126,10 @@ type PipelineMetadataStruct struct {
UserUID string
}

type WorkflowMeta struct {
HeaderAuthorization string `json:"header_authorization"`
}

var tracer = otel.Tracer("pipeline-backend.temporal.tracer")

func (w *worker) GetMemoryBlob(ctx context.Context, redisKeys []string) ([]ItemMemory, error) {
Expand Down Expand Up @@ -306,6 +314,7 @@ func (w *worker) TriggerPipelineWorkflow(ctx workflow.Context, param *TriggerPip
PipelineRecipe: param.PipelineRecipe,
OwnerPermalink: param.OwnerPermalink,
UserUID: param.UserUID,
MetadataRedisKey: param.MetadataRedisKey,
}).Get(ctx, &dagResult); err != nil {
w.writeErrorDataPoint(sCtx, err, span, startTime, &dataPoint)
return nil, err
Expand Down Expand Up @@ -657,8 +666,9 @@ func (w *worker) DAGActivity(ctx context.Context, param *ExecuteDAGActivityReque
PipelineMetadata: PipelineMetadataStruct{
UserUID: param.UserUID.String(),
},
Task: task,
UserUID: param.UserUID,
Task: task,
UserUID: param.UserUID,
MetadataRedisKey: param.MetadataRedisKey,
})
if err != nil {
return err
Expand Down Expand Up @@ -708,8 +718,9 @@ func (w *worker) DAGActivity(ctx context.Context, param *ExecuteDAGActivityReque
PipelineMetadata: PipelineMetadataStruct{
UserUID: param.UserUID.String(),
},
Task: task,
UserUID: param.UserUID,
Task: task,
UserUID: param.UserUID,
MetadataRedisKey: param.MetadataRedisKey,
})
if err != nil {
return err
Expand Down Expand Up @@ -856,16 +867,22 @@ func (w *worker) ConnectorActivity(ctx context.Context, param *ExecuteConnectorA
logger.Fatal(err.Error())
}
// TODO: optimize this
m, err := w.redisClient.Get(ctx, param.MetadataRedisKey).Bytes()
if err != nil {
return nil
}
workflowMeta := WorkflowMeta{}
err = json.Unmarshal(m, &workflowMeta)
if err != nil {
return nil
}

str.Fields["header_authorization"] = structpb.NewStringValue(workflowMeta.HeaderAuthorization)
str.Fields["instill_user_uid"] = structpb.NewStringValue(param.PipelineMetadata.UserUID)
str.Fields["instill_model_backend"] = structpb.NewStringValue(fmt.Sprintf("%s:%d", config.Config.ModelBackend.Host, config.Config.ModelBackend.PublicPort))
str.Fields["instill_mgmt_backend"] = structpb.NewStringValue(fmt.Sprintf("%s:%d", config.Config.MgmtBackend.Host, config.Config.MgmtBackend.PublicPort))
return &str
}
str := structpb.Struct{Fields: make(map[string]*structpb.Value)}
// TODO: optimize this
str.Fields["instill_user_uid"] = structpb.NewStringValue(param.PipelineMetadata.UserUID)
str.Fields["instill_model_backend"] = structpb.NewStringValue(fmt.Sprintf("%s:%d", config.Config.ModelBackend.Host, config.Config.ModelBackend.PublicPort))
str.Fields["instill_mgmt_backend"] = structpb.NewStringValue(fmt.Sprintf("%s:%d", config.Config.MgmtBackend.Host, config.Config.MgmtBackend.PublicPort))
return nil
}()

Expand Down

0 comments on commit a02add6

Please sign in to comment.