From 6fdf28f378d036806328f1ace527fa4657df4f21 Mon Sep 17 00:00:00 2001 From: Prajith P Date: Sat, 5 Aug 2023 23:50:17 +0530 Subject: [PATCH 1/4] add scaler for temporal Signed-off-by: Prajith P Signed-off-by: Prajith P --- pkg/scalers/temporal_scaler.go | 172 ++++++++++++ pkg/scalers/temporal_scaler_test.go | 73 +++++ pkg/scaling/scalers_builder.go | 2 + tests/scalers/temporal/temporal_test.go | 338 ++++++++++++++++++++++++ 4 files changed, 585 insertions(+) create mode 100644 pkg/scalers/temporal_scaler.go create mode 100644 pkg/scalers/temporal_scaler_test.go create mode 100644 tests/scalers/temporal/temporal_test.go diff --git a/pkg/scalers/temporal_scaler.go b/pkg/scalers/temporal_scaler.go new file mode 100644 index 00000000000..4587fb83941 --- /dev/null +++ b/pkg/scalers/temporal_scaler.go @@ -0,0 +1,172 @@ +package scalers + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + kedautil "github.com/kedacore/keda/v2/pkg/util" + tclfilter "go.temporal.io/api/filter/v1" + workflowservice "go.temporal.io/api/workflowservice/v1" + sdk "go.temporal.io/sdk/client" + "google.golang.org/grpc" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +const ( + defaultTargetWorkflowLength = 5 + defaultActivationTargetWorkflowLength = 0 +) + +type temporalWorkflowScaler struct { + metricType v2.MetricTargetType + metadata *temporalWorkflowMetadata + tcl sdk.Client +} + +type temporalWorkflowMetadata struct { + activationTargetWorkflowLength int64 + endpoint string + namespace string + workflowName string + scalerIndex int + targetQueueSize int64 + metricName string +} + +// NewTemporalWorkflowScaler creates a new instance of temporalWorkflowScaler. +func NewTemporalWorkflowScaler(config *ScalerConfig) (Scaler, error) { + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("failed to get scaler metric type: %w", err) + } + + meta, err := parseTemporalMetadata(config) + if err != nil { + return nil, fmt.Errorf("failed to parse Temporal metadata: %w", err) + } + + c, err := sdk.Dial(sdk.Options{ + HostPort: meta.endpoint, + ConnectionOptions: sdk.ConnectionOptions{ + DialOptions: []grpc.DialOption{ + grpc.WithTimeout(time.Duration(10) * time.Second), + }, + }, + }) + + if err != nil { + return nil, fmt.Errorf("failed to create Temporal client: %w", err) + } + + return &temporalWorkflowScaler{ + metricType: metricType, + metadata: meta, + tcl: c, + }, nil +} + +// Close closes the Temporal client connection. +func (s *temporalWorkflowScaler) Close(context.Context) error { + if s.tcl != nil { + s.tcl.Close() + } + return nil +} + +// GetMetricSpecForScaling returns the metric specification for scaling. +func (s *temporalWorkflowScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: s.metadata.metricName, + }, + Target: GetMetricTarget(s.metricType, s.metadata.targetQueueSize), + } + metricSpec := v2.MetricSpec{ + External: externalMetric, + Type: externalMetricType, + } + return []v2.MetricSpec{metricSpec} +} + +// GetMetricsAndActivity returns metrics and activity for the scaler. +func (s *temporalWorkflowScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + queueSize, err := s.getQueueSize(ctx) + if err != nil { + return nil, false, fmt.Errorf("failed to get Temporal queue size: %w", err) + } + + metric := GenerateMetricInMili(metricName, float64(queueSize)) + + return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.activationTargetWorkflowLength, nil +} + +// getQueueSize returns the queue size of open workflows. +func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error) { + listOpenWorkflowExecutionsRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{ + Namespace: s.metadata.namespace, + Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{ + TypeFilter: &tclfilter.WorkflowTypeFilter{ + Name: s.metadata.workflowName, + }, + }, + } + ws, err := s.tcl.ListOpenWorkflow(ctx, listOpenWorkflowExecutionsRequest) + if err != nil { + return 0, fmt.Errorf("failed to get workflows: %w", err) + } + + queueLength := int64(len(ws.Executions)) + return queueLength, nil +} + +// parseTemporalMetadata parses the Temporal metadata from the ScalerConfig. +func parseTemporalMetadata(config *ScalerConfig) (*temporalWorkflowMetadata, error) { + meta := &temporalWorkflowMetadata{} + meta.activationTargetWorkflowLength = defaultActivationTargetWorkflowLength + meta.targetQueueSize = defaultTargetWorkflowLength + + if config.TriggerMetadata["endpoint"] == "" { + return nil, errors.New("no Temporal gRPC endpoint provided") + } + meta.endpoint = config.TriggerMetadata["endpoint"] + + if config.TriggerMetadata["namespace"] == "" { + meta.namespace = "default" + } else { + meta.namespace = config.TriggerMetadata["namespace"] + } + + if config.TriggerMetadata["workflowName"] == "" { + return nil, errors.New("no workflow name provided") + } + meta.workflowName = config.TriggerMetadata["workflowName"] + + if size, ok := config.TriggerMetadata["targetQueueSize"]; ok { + queueSize, err := strconv.ParseInt(size, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid targetQueueSize - must be an integer") + } + meta.targetQueueSize = queueSize + } + + if size, ok := config.TriggerMetadata["activationTargetQueueSize"]; ok { + activationTargetQueueSize, err := strconv.ParseInt(size, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid activationTargetQueueSize - must be an integer") + } + meta.activationTargetWorkflowLength = activationTargetQueueSize + } + + meta.metricName = GenerateMetricNameWithIndex( + config.ScalerIndex, kedautil.NormalizeString( + fmt.Sprintf("temporal-%s-%s", meta.namespace, meta.workflowName), + ), + ) + meta.scalerIndex = config.ScalerIndex + + return meta, nil +} diff --git a/pkg/scalers/temporal_scaler_test.go b/pkg/scalers/temporal_scaler_test.go new file mode 100644 index 00000000000..adb74724424 --- /dev/null +++ b/pkg/scalers/temporal_scaler_test.go @@ -0,0 +1,73 @@ +package scalers + +import ( + "context" + "testing" +) + +var ( + temporalEndpoint = "localhost:7233" + temporalNamespace = "v2" + temporalWorkflowName = "SayHello" +) + +type parseTemporalMetadataTestData struct { + metadata map[string]string + isError bool +} + +type temporalMetricIdentifier struct { + metadataTestData *parseTemporalMetadataTestData + scalerIndex int + name string +} + +var testTemporalMetadata = []parseTemporalMetadataTestData{ + // nothing passed + {map[string]string{}, true}, + // Missing workflow, should fail + {map[string]string{"endpoint": temporalEndpoint, "namespace": temporalNamespace}, true}, + // Missing namespace, should success + {map[string]string{"endpoint": temporalEndpoint, "workflowName": temporalWorkflowName}, false}, + // Missing endpoint, should fail + {map[string]string{"workflowName": temporalWorkflowName, "namespace": temporalNamespace}, true}, + // All good. + {map[string]string{"endpoint": temporalEndpoint, "workflowName": temporalWorkflowName, "namespace": temporalNamespace}, false}, + // All good + activationLagThreshold + {map[string]string{"endpoint": temporalEndpoint, "workflowName": temporalWorkflowName, "namespace": temporalNamespace, "activationTargetQueueSize": "10"}, false}, +} + +var temporalMetricIdentifiers = []temporalMetricIdentifier{ + {&testTemporalMetadata[4], 0, "s0-temporal-v2-SayHello"}, + {&testTemporalMetadata[4], 1, "s1-temporal-v2-SayHello"}, +} + +func TestTemporalParseMetadata(t *testing.T) { + for _, testData := range testTemporalMetadata { + _, err := parseTemporalMetadata(&ScalerConfig{TriggerMetadata: testData.metadata}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } else if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestTemporalGetMetricSpecForScaling(t *testing.T) { + for _, testData := range temporalMetricIdentifiers { + ctx := context.Background() + meta, err := parseTemporalMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockTemporalScaler := temporalWorkflowScaler{ + metadata: meta, + } + + metricSpec := mockTemporalScaler.GetMetricSpecForScaling(ctx) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 7a4a53b24e5..d9760135fab 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -229,6 +229,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewSolrScaler(config) case "stan": return scalers.NewStanScaler(config) + case "temporal": + return scalers.NewTemporalWorkflowScaler(config) default: return nil, fmt.Errorf("no scaler found for type: %s", triggerType) } diff --git a/tests/scalers/temporal/temporal_test.go b/tests/scalers/temporal/temporal_test.go new file mode 100644 index 00000000000..02033e0e22e --- /dev/null +++ b/tests/scalers/temporal/temporal_test.go @@ -0,0 +1,338 @@ +//go:build e2e +// +build e2e + +package temporal_test + +import ( + "fmt" + "testing" + + "github.com/joho/godotenv" + . "github.com/kedacore/keda/v2/tests/helper" + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" +) + +var _ = godotenv.Load("../../.env") + +const ( + testName = "temporal-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + PgDeploymentName = fmt.Sprintf("postgresql-%s", testName) + + TemporalDeploymentName = fmt.Sprintf("temporal-%s", testName) + + scaledObjectName = fmt.Sprintf("%s-so", testName) + deploymentName = fmt.Sprintf("%s", testName) + pgUsername = "test-user" + pgRootPassword = "some-test-password" +) + +type templateData struct { + DeploymentName string + TestNamespace string + PgDeploymentName string + TemporalDeploymentName string + ScaledObjectName string + PgRootUserName string + PgRootPassword string + ItemsToWrite int +} + +const ( + pgServiceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.PgDeploymentName}} + namespace: {{.TestNamespace}} +spec: + selector: + app: postgresql + ports: + - name: psql + protocol: TCP + port: 5432 + targetPort: 5432 +` + pgDeploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.PgDeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: postgresql +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + app: postgresql + template: + metadata: + labels: + app: postgresql + spec: + containers: + - name: psql + image: bitnami/postgresql:13 + env: + - name: POSTGRESQL_USERNAME + value: {{.PgRootUserName}} + - name: POSTGRESQL_PASSWORD + value: {{.PgRootPassword}} + ports: + - containerPort: 5432 + livenessProbe: + tcpSocket: + port: 5432 + initialDelaySeconds: 5 + timeoutSeconds: 5 + readinessProbe: + tcpSocket: + port: 5432 + initialDelaySeconds: 5 + timeoutSeconds: 5 +` + temporalServiceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.TemporalDeploymentName}} + namespace: {{.TestNamespace}} +spec: + type: ClusterIP + ports: + - port: 7233 + protocol: TCP + targetPort: 7233 + selector: + app: temporal +` + temporalDeploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.TemporalDeploymentName}} + namespace: {{.TestNamespace}} +spec: + replicas: 1 + selector: + matchLabels: + app: temporal + template: + metadata: + labels: + app: temporal + spec: + containers: + - env: + - name: AUTO_SETUP + value: "true" + - name: DB + value: postgresql + - name: DB_PORT + value: "5432" + - name: POSTGRES_USER + value: {{.PgRootUserName}} + - name: POSTGRES_PWD + value: {{.PgRootPassword}} + - name: POSTGRES_SEEDS + value: {{.PgDeploymentName}}.{{.TestNamespace}}.svc.cluster.local + image: temporalio/auto-setup:1.20.1 + imagePullPolicy: IfNotPresent + livenessProbe: + exec: + command: + - /bin/sh + - -ec + - test $(ps -ef | grep -v grep | grep temporal-server | wc -l) -eq 1 + failureThreshold: 3 + initialDelaySeconds: 5 + periodSeconds: 30 + successThreshold: 1 + timeoutSeconds: 1 + name: temporal + ports: + - containerPort: 7233 + protocol: TCP + readinessProbe: + exec: + command: + - /bin/sh + - -ec + - test $(ps -ef | grep -v grep | grep temporal-server | wc -l) -eq 1 + failureThreshold: 3 + initialDelaySeconds: 5 + periodSeconds: 30 + successThreshold: 1 + timeoutSeconds: 1 +` + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 0 + maxReplicaCount: 1 + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 10 + triggers: + - type: temporal + metadata: + namespace: default + workflowName: SayHello + targetQueueSize: "2" + activationTargetQueueSize: "3" + endpoint: {{.TemporalDeploymentName}}.{{.TestNamespace}}.svc.cluster.local:7233 +` + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: nginx + image: nginxinc/nginx-unprivileged + ports: + - containerPort: 80 +` + jobWorkerTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: worker + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: worker + image: "prajithp/temporal-sample:1.0.0" + imagePullPolicy: Always + env: + - name: TEMPORAL_ADDR + value: {{.TemporalDeploymentName}}.{{.TestNamespace}}.svc.cluster.local:7233 + - name: TEMPORAL_NAMESPACE + value: default + - name: MODE + value: WORKER + restartPolicy: OnFailure + backoffLimit: 4 +` + + jobWorkeFlowTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: workerflow + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: workerflow + image: "prajithp/temporal-sample:1.0.0" + imagePullPolicy: Always + env: + - name: TEMPORAL_ADDR + value: {{.TemporalDeploymentName}}.{{.TestNamespace}}.svc.cluster.local:7233 + restartPolicy: OnFailure + backoffLimit: 4 +` +) + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + TemporalDeploymentName: TemporalDeploymentName, + PgDeploymentName: PgDeploymentName, + ScaledObjectName: scaledObjectName, + PgRootUserName: pgUsername, + PgRootPassword: pgRootPassword, + DeploymentName: deploymentName, + }, []Template{ + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + } +} + +func TestTemporalScaler(t *testing.T) { + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + CreateNamespace(t, kc, testNamespace) + + KubectlApplyWithTemplate(t, data, "pgDeploymentTemplate", pgDeploymentTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, PgDeploymentName, testNamespace, 1, 30, 2), "postgresql is not in a ready state") + KubectlApplyWithTemplate(t, data, "pgServiceTemplate", pgServiceTemplate) + + KubectlApplyWithTemplate(t, data, "temporalServiceTemplate", temporalServiceTemplate) + KubectlApplyWithTemplate(t, data, "temporalDeploymentTemplate", temporalDeploymentTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, TemporalDeploymentName, testNamespace, 1, 30, 4), "temporal is not in a ready state") + KubectlApplyWithTemplate(t, data, "temporalServiceTemplate", temporalServiceTemplate) + + KubectlApplyMultipleWithTemplate(t, data, templates) + + testActivation(t, kc, data) + testScaleOut(t, kc, data) + testScaleIn(t, kc, data) + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing activation ---") + + KubectlApplyWithTemplate(t, data, "jobWorkFlowActivation", jobWorkeFlowTemplate) + assert.True(t, WaitForJobCount(t, kc, testNamespace, 1, 60, 3), "job count in namespace should be 1") + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 180) + KubectlDeleteWithTemplate(t, data, "jobWorkFlowActivation", jobWorkeFlowTemplate) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + + KubectlApplyWithTemplate(t, data, "jobWorkFlow", jobWorkeFlowTemplate) + assert.True(t, WaitForJobCount(t, kc, testNamespace, 1, 60, 3), "job count in namespace should be 1") + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 3), + "replica count should be %d after 3 minutes", 1) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale in ---") + + KubectlApplyWithTemplate(t, data, "jobWorker", jobWorkerTemplate) + // workflow is already waiting for response from worker + assert.True(t, WaitForJobCount(t, kc, testNamespace, 2, 60, 3), "job count in namespace should be 2") + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 5), + "replica count should be %d after 5 minutes", 0) +} From c00d519397a683cf42e3d14fb6f4049aa55e32a5 Mon Sep 17 00:00:00 2001 From: Prajith P Date: Sun, 6 Aug 2023 00:01:57 +0530 Subject: [PATCH 2/4] update CHANGELOG.md Signed-off-by: Prajith P Signed-off-by: Prajith P --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f3c5f251fb..5248b81b023 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX)) +- **General**: Add Temporal Scaler `temporal` ([#4724](https://github.com/kedacore/keda/issues/4724)) ### Improvements From bca8a3e7c82c87857ad8ed97e97ae6c3f2b46736 Mon Sep 17 00:00:00 2001 From: Prajith P Date: Wed, 9 Aug 2023 15:29:12 +0530 Subject: [PATCH 3/4] use pagination to get number of open workflows Signed-off-by: Prajith P --- pkg/scalers/temporal_scaler.go | 39 ++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/pkg/scalers/temporal_scaler.go b/pkg/scalers/temporal_scaler.go index 4587fb83941..b1a5ab3939e 100644 --- a/pkg/scalers/temporal_scaler.go +++ b/pkg/scalers/temporal_scaler.go @@ -106,20 +106,37 @@ func (s *temporalWorkflowScaler) GetMetricsAndActivity(ctx context.Context, metr // getQueueSize returns the queue size of open workflows. func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error) { - listOpenWorkflowExecutionsRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{ - Namespace: s.metadata.namespace, - Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{ - TypeFilter: &tclfilter.WorkflowTypeFilter{ - Name: s.metadata.workflowName, + + var executionIds = make([]string, 0) + var nextPageToken []byte + + for { + listOpenWorkflowExecutionsRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{ + Namespace: s.metadata.namespace, + MaximumPageSize: 1000, + NextPageToken: nextPageToken, + Filters: &workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{ + TypeFilter: &tclfilter.WorkflowTypeFilter{ + Name: s.metadata.workflowName, + }, }, - }, - } - ws, err := s.tcl.ListOpenWorkflow(ctx, listOpenWorkflowExecutionsRequest) - if err != nil { - return 0, fmt.Errorf("failed to get workflows: %w", err) + } + ws, err := s.tcl.ListOpenWorkflow(ctx, listOpenWorkflowExecutionsRequest) + if err != nil { + return 0, fmt.Errorf("failed to get workflows: %w", err) + } + + for _, execution := range ws.Executions { + executionId := execution.Execution.WorkflowId + "__" + execution.Execution.RunId + executionIds = append(executionIds, executionId) + } + + if nextPageToken = ws.NextPageToken; len(nextPageToken) == 0 { + break + } } - queueLength := int64(len(ws.Executions)) + queueLength := int64(len(executionIds)) return queueLength, nil } From 673d9f10bfd7a8278732f2bed8445d45db4ab263 Mon Sep 17 00:00:00 2001 From: Prajith P Date: Thu, 10 Aug 2023 17:42:48 +0530 Subject: [PATCH 4/4] Refined Queue Sizing: Incorporating Activity Names in Evaluation Signed-off-by: Prajith P --- pkg/scalers/temporal_scaler.go | 94 ++++++++++++++++++++-- pkg/scalers/temporal_scaler_test.go | 102 +++++++++++++++++++++++- tests/scalers/temporal/temporal_test.go | 1 + 3 files changed, 188 insertions(+), 9 deletions(-) diff --git a/pkg/scalers/temporal_scaler.go b/pkg/scalers/temporal_scaler.go index b1a5ab3939e..238ad582655 100644 --- a/pkg/scalers/temporal_scaler.go +++ b/pkg/scalers/temporal_scaler.go @@ -5,8 +5,11 @@ import ( "errors" "fmt" "strconv" + "strings" + "sync" "time" + "github.com/go-logr/logr" kedautil "github.com/kedacore/keda/v2/pkg/util" tclfilter "go.temporal.io/api/filter/v1" workflowservice "go.temporal.io/api/workflowservice/v1" @@ -19,12 +22,19 @@ import ( const ( defaultTargetWorkflowLength = 5 defaultActivationTargetWorkflowLength = 0 + temporalClientTimeOut = 30 ) +type executionInfo struct { + workflowId string + runId string +} + type temporalWorkflowScaler struct { metricType v2.MetricTargetType metadata *temporalWorkflowMetadata tcl sdk.Client + logger logr.Logger } type temporalWorkflowMetadata struct { @@ -32,6 +42,7 @@ type temporalWorkflowMetadata struct { endpoint string namespace string workflowName string + activities []string scalerIndex int targetQueueSize int64 metricName string @@ -49,11 +60,13 @@ func NewTemporalWorkflowScaler(config *ScalerConfig) (Scaler, error) { return nil, fmt.Errorf("failed to parse Temporal metadata: %w", err) } + logger := InitializeLogger(config, "temporal_workflow_scaler") + c, err := sdk.Dial(sdk.Options{ HostPort: meta.endpoint, ConnectionOptions: sdk.ConnectionOptions{ DialOptions: []grpc.DialOption{ - grpc.WithTimeout(time.Duration(10) * time.Second), + grpc.WithTimeout(time.Duration(temporalClientTimeOut) * time.Second), }, }, }) @@ -66,6 +79,7 @@ func NewTemporalWorkflowScaler(config *ScalerConfig) (Scaler, error) { metricType: metricType, metadata: meta, tcl: c, + logger: logger, }, nil } @@ -107,9 +121,8 @@ func (s *temporalWorkflowScaler) GetMetricsAndActivity(ctx context.Context, metr // getQueueSize returns the queue size of open workflows. func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error) { - var executionIds = make([]string, 0) + var executions []executionInfo var nextPageToken []byte - for { listOpenWorkflowExecutionsRequest := &workflowservice.ListOpenWorkflowExecutionsRequest{ Namespace: s.metadata.namespace, @@ -126,9 +139,12 @@ func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error return 0, fmt.Errorf("failed to get workflows: %w", err) } - for _, execution := range ws.Executions { - executionId := execution.Execution.WorkflowId + "__" + execution.Execution.RunId - executionIds = append(executionIds, executionId) + for _, exec := range ws.GetExecutions() { + execution := executionInfo{ + workflowId: exec.Execution.GetWorkflowId(), + runId: exec.Execution.RunId, + } + executions = append(executions, execution) } if nextPageToken = ws.NextPageToken; len(nextPageToken) == 0 { @@ -136,10 +152,70 @@ func (s *temporalWorkflowScaler) getQueueSize(ctx context.Context) (int64, error } } - queueLength := int64(len(executionIds)) + pendingCh := make(chan string, len(executions)) + var wg sync.WaitGroup + + for _, execInfo := range executions { + wg.Add(1) + go func(e executionInfo) { + defer wg.Done() + + workflowId := e.workflowId + runId := e.runId + + if !s.isActivityRunning(ctx, workflowId, runId) { + executionId := workflowId + "__" + runId + pendingCh <- executionId + } + + }(execInfo) + } + wg.Wait() + close(pendingCh) + + var queueLength int64 + for range pendingCh { + queueLength++ + } return queueLength, nil } +// isActivityRunning checks if there are running activities associated with a specific workflow execution. +func (s *temporalWorkflowScaler) isActivityRunning(ctx context.Context, workflowId, runId string) bool { + resp, err := s.tcl.DescribeWorkflowExecution(ctx, workflowId, runId) + if err != nil { + s.logger.Error(err, "error describing workflow execution", "workflowId", workflowId, "runId", runId) + return false + } + + // If there is no activityName and there are running activities, return true. + if len(s.metadata.activities) == 0 && len(resp.GetPendingActivities()) > 0 { + return true + } + + // Store the IDs of running activities. Make sure no duplicates incase of anything. + runningActivities := make(map[string]struct{}) + for _, pendingActivity := range resp.GetPendingActivities() { + activityName := pendingActivity.ActivityType.GetName() + if s.hasMatchingActivityName(activityName) { + runningActivities[pendingActivity.ActivityId] = struct{}{} + } + } + + // Return true if there are any running activities, otherwise false. + return len(runningActivities) > 0 +} + +// hasMatchingActivityName checks if the provided activity name matches any of the defined activity names in the metadata. +func (s *temporalWorkflowScaler) hasMatchingActivityName(activityName string) bool { + for _, activity := range s.metadata.activities { + if activityName == activity { + return true + } + } + return false +} + // parseTemporalMetadata parses the Temporal metadata from the ScalerConfig. func parseTemporalMetadata(config *ScalerConfig) (*temporalWorkflowMetadata, error) { meta := &temporalWorkflowMetadata{} @@ -162,6 +238,10 @@ func parseTemporalMetadata(config *ScalerConfig) (*temporalWorkflowMetadata, err } meta.workflowName = config.TriggerMetadata["workflowName"] + if activities := config.TriggerMetadata["activityName"]; activities != "" { + meta.activities = strings.Split(activities, ",") + } + if size, ok := config.TriggerMetadata["targetQueueSize"]; ok { queueSize, err := strconv.ParseInt(size, 10, 64) if err != nil { diff --git a/pkg/scalers/temporal_scaler_test.go b/pkg/scalers/temporal_scaler_test.go index adb74724424..00c8e0194b2 100644 --- a/pkg/scalers/temporal_scaler_test.go +++ b/pkg/scalers/temporal_scaler_test.go @@ -3,12 +3,15 @@ package scalers import ( "context" "testing" + + "github.com/stretchr/testify/assert" ) var ( temporalEndpoint = "localhost:7233" temporalNamespace = "v2" temporalWorkflowName = "SayHello" + activityName = "say_hello" ) type parseTemporalMetadataTestData struct { @@ -32,9 +35,9 @@ var testTemporalMetadata = []parseTemporalMetadataTestData{ // Missing endpoint, should fail {map[string]string{"workflowName": temporalWorkflowName, "namespace": temporalNamespace}, true}, // All good. - {map[string]string{"endpoint": temporalEndpoint, "workflowName": temporalWorkflowName, "namespace": temporalNamespace}, false}, + {map[string]string{"endpoint": temporalEndpoint, "activityName": activityName, "workflowName": temporalWorkflowName, "namespace": temporalNamespace}, false}, // All good + activationLagThreshold - {map[string]string{"endpoint": temporalEndpoint, "workflowName": temporalWorkflowName, "namespace": temporalNamespace, "activationTargetQueueSize": "10"}, false}, + {map[string]string{"endpoint": temporalEndpoint, "activityName": activityName, "workflowName": temporalWorkflowName, "namespace": temporalNamespace, "activationTargetQueueSize": "10"}, false}, } var temporalMetricIdentifiers = []temporalMetricIdentifier{ @@ -71,3 +74,98 @@ func TestTemporalGetMetricSpecForScaling(t *testing.T) { } } } + +func TestParseTemporalMetadata(t *testing.T) { + cases := []struct { + name string + metadata map[string]string + wantMeta *temporalWorkflowMetadata + wantErr bool + }{ + { + name: "empty metadata", + wantMeta: nil, + wantErr: true, + }, + { + name: "empty workflowName", + metadata: map[string]string{ + "endpoint": "test:7233", + "namespace": "default", + "activityName": "test123", + }, + wantMeta: nil, + wantErr: true, + }, + { + name: "multiple activityName", + metadata: map[string]string{ + "endpoint": "test:7233", + "namespace": "default", + "activityName": "test123,test", + "workflowName": "testxx", + }, + wantMeta: &temporalWorkflowMetadata{ + endpoint: "test:7233", + namespace: "default", + activities: []string{"test123", "test"}, + workflowName: "testxx", + targetQueueSize: 5, + metricName: "s0-temporal-default-testxx", + }, + wantErr: false, + }, + { + name: "empty activityName", + metadata: map[string]string{ + "endpoint": "test:7233", + "namespace": "default", + "workflowName": "testxx", + }, + wantMeta: &temporalWorkflowMetadata{ + endpoint: "test:7233", + namespace: "default", + activities: nil, + workflowName: "testxx", + targetQueueSize: 5, + metricName: "s0-temporal-default-testxx", + }, + wantErr: false, + }, + { + name: "activationTargetQueueSize should not be 0", + metadata: map[string]string{ + "endpoint": "test:7233", + "namespace": "default", + "workflowName": "testxx", + "activationTargetQueueSize": "12", + }, + wantMeta: &temporalWorkflowMetadata{ + endpoint: "test:7233", + namespace: "default", + activities: nil, + workflowName: "testxx", + targetQueueSize: 5, + metricName: "s0-temporal-default-testxx", + activationTargetWorkflowLength: 12, + }, + wantErr: false, + }, + } + + for _, testCase := range cases { + c := testCase + t.Run(c.name, func(t *testing.T) { + config := &ScalerConfig{ + TriggerMetadata: c.metadata, + } + meta, err := parseTemporalMetadata(config) + if c.wantErr == true && err != nil { + t.Log("Expected error, got err") + } else { + assert.NoError(t, err) + } + assert.Equal(t, c.wantMeta, meta) + }) + } +} diff --git a/tests/scalers/temporal/temporal_test.go b/tests/scalers/temporal/temporal_test.go index 02033e0e22e..34c320a07c8 100644 --- a/tests/scalers/temporal/temporal_test.go +++ b/tests/scalers/temporal/temporal_test.go @@ -196,6 +196,7 @@ spec: metadata: namespace: default workflowName: SayHello + activityName: say_hello targetQueueSize: "2" activationTargetQueueSize: "3" endpoint: {{.TemporalDeploymentName}}.{{.TestNamespace}}.svc.cluster.local:7233