diff --git a/.github/workflows/nightly-e2e.yml b/.github/workflows/nightly-e2e.yml index 32b813bf0ab..77b39152d60 100644 --- a/.github/workflows/nightly-e2e.yml +++ b/.github/workflows/nightly-e2e.yml @@ -23,4 +23,9 @@ jobs: AZURE_SP_TENANT: ${{ secrets.AZURE_SP_TENANT }} TEST_STORAGE_CONNECTION_STRING: ${{ secrets.TEST_STORAGE_CONNECTION_STRING }} TEST_LOG_ANALYTICS_WORKSPACE_ID: ${{ secrets.TEST_LOG_ANALYTICS_WORKSPACE_ID }} + AZURE_DEVOPS_ORGANIZATION_URL: ${{ secrets.AZURE_DEVOPS_ORGANIZATION_URL }} + AZURE_DEVOPS_PAT: ${{ secrets.AZURE_DEVOPS_PAT }} + AZURE_DEVOPS_PROJECT: ${{ secrets.AZURE_DEVOPS_PROJECT }} + AZURE_DEVOPS_BUILD_DEFINITON_ID: ${{ secrets.AZURE_DEVOPS_BUILD_DEFINITON_ID }} + AZURE_DEVOPS_POOL_NAME: ${{ secrets.AZURE_DEVOPS_POOL_NAME }} run: make e2e-test diff --git a/CHANGELOG.md b/CHANGELOG.md index c41def229b1..d0904880fe3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ ## Unreleased +- Add Azure Pipelines Scaler ([#1706](https://github.com/kedacore/keda/pull/1706)) - Add OpenStack Metrics Scaler ([#1382](https://github.com/kedacore/keda/issues/1382)) - Fixed goroutine leaks in usage of timers ([#1704](https://github.com/kedacore/keda/pull/1704)) diff --git a/pkg/scalers/azure_pipelines_scaler.go b/pkg/scalers/azure_pipelines_scaler.go new file mode 100644 index 00000000000..74f4395a567 --- /dev/null +++ b/pkg/scalers/azure_pipelines_scaler.go @@ -0,0 +1,193 @@ +package scalers + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + + kedautil "github.com/kedacore/keda/v2/pkg/util" + v2beta2 "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + defaultTargetPipelinesQueueLength = 1 +) + +type azurePipelinesScaler struct { + metadata *azurePipelinesMetadata + httpClient *http.Client +} + +type azurePipelinesMetadata struct { + organizationURL string + organizationName string + personalAccessToken string + poolID string + targetPipelinesQueueLength int +} + +var azurePipelinesLog = logf.Log.WithName("azure_pipelines_scaler") + +// NewAzurePipelinesScaler creates a new AzurePipelinesScaler +func NewAzurePipelinesScaler(config *ScalerConfig) (Scaler, error) { + meta, err := parseAzurePipelinesMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing azure Pipelines metadata: %s", err) + } + + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout) + + return &azurePipelinesScaler{ + metadata: meta, + httpClient: httpClient, + }, nil +} + +func parseAzurePipelinesMetadata(config *ScalerConfig) (*azurePipelinesMetadata, error) { + meta := azurePipelinesMetadata{} + meta.targetPipelinesQueueLength = defaultTargetPipelinesQueueLength + + if val, ok := config.TriggerMetadata["targetPipelinesQueueLength"]; ok { + queueLength, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("error parsing azure pipelines metadata targetPipelinesQueueLength: %s", err.Error()) + } + + meta.targetPipelinesQueueLength = queueLength + } + + if val, ok := config.AuthParams["organizationURL"]; ok && val != "" { + // Found the organizationURL in a parameter from TriggerAuthentication + meta.organizationURL = val + } else if val, ok := config.TriggerMetadata["organizationURLFromEnv"]; ok && val != "" { + meta.organizationURL = config.ResolvedEnv[val] + } else { + return nil, fmt.Errorf("no organizationURL given") + } + + if val := meta.organizationURL[strings.LastIndex(meta.organizationURL, "/")+1:]; val != "" { + meta.organizationName = meta.organizationURL[strings.LastIndex(meta.organizationURL, "/")+1:] + } else { + return nil, fmt.Errorf("failed to extract organization name from organizationURL") + } + + if val, ok := config.AuthParams["personalAccessToken"]; ok && val != "" { + // Found the personalAccessToken in a parameter from TriggerAuthentication + meta.personalAccessToken = config.AuthParams["personalAccessToken"] + } else if val, ok := config.TriggerMetadata["personalAccessTokenFromEnv"]; ok && val != "" { + meta.personalAccessToken = config.ResolvedEnv[config.TriggerMetadata["personalAccessTokenFromEnv"]] + } else { + return nil, fmt.Errorf("no personalAccessToken given") + } + + if val, ok := config.TriggerMetadata["poolID"]; ok && val != "" { + meta.poolID = val + } else { + return nil, fmt.Errorf("no poolID given") + } + + return &meta, nil +} + +func (s *azurePipelinesScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + queuelen, err := s.GetAzurePipelinesQueueLength(ctx) + + if err != nil { + azurePipelinesLog.Error(err, "error getting pipelines queue length") + return []external_metrics.ExternalMetricValue{}, err + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(queuelen), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context) (int, error) { + url := fmt.Sprintf("%s/_apis/distributedtask/pools/%s/jobrequests", s.metadata.organizationURL, s.metadata.poolID) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return -1, err + } + + req.SetBasicAuth("PAT", s.metadata.personalAccessToken) + + r, err := s.httpClient.Do(req) + if err != nil { + return -1, err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return -1, err + } + r.Body.Close() + + if !(r.StatusCode >= 200 && r.StatusCode <= 299) { + return -1, fmt.Errorf("azure Devops REST api returned error. status: %d response: %s", r.StatusCode, string(b)) + } + + var result map[string]interface{} + err = json.Unmarshal(b, &result) + if err != nil { + return -1, err + } + + var count int = 0 + jobs, ok := result["value"].([]interface{}) + + if !ok { + return -1, fmt.Errorf("api result returned no value data") + } + + for _, value := range jobs { + v := value.(map[string]interface{}) + if v["result"] == nil { + count++ + } + } + + return count, err +} + +func (s *azurePipelinesScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetPipelinesQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetPipelinesQueueLength), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "azure-pipelines-queue", s.metadata.organizationName, s.metadata.poolID)), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetPipelinesQueueLengthQty, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} +} + +func (s *azurePipelinesScaler) IsActive(ctx context.Context) (bool, error) { + queuelen, err := s.GetAzurePipelinesQueueLength(ctx) + + if err != nil { + azurePipelinesLog.Error(err, "error)") + return false, err + } + + return queuelen > 0, nil +} + +func (s *azurePipelinesScaler) Close() error { + return nil +} diff --git a/pkg/scalers/azure_pipelines_scaler_test.go b/pkg/scalers/azure_pipelines_scaler_test.go new file mode 100644 index 00000000000..8a5d1cbb4a0 --- /dev/null +++ b/pkg/scalers/azure_pipelines_scaler_test.go @@ -0,0 +1,73 @@ +package scalers + +import ( + "net/http" + "testing" +) + +type parseAzurePipelinesMetadataTestData struct { + metadata map[string]string + isError bool + resolvedEnv map[string]string + authParams map[string]string +} + +type azurePipelinesMetricIdentifier struct { + metadataTestData *parseAzurePipelinesMetadataTestData + name string +} + +var testAzurePipelinesResolvedEnv = map[string]string{ + "AZP_URL": "https://dev.azure.com/sample", + "AZP_TOKEN": "sample", +} + +var testAzurePipelinesMetadata = []parseAzurePipelinesMetadataTestData{ + // empty + {map[string]string{}, true, testAzurePipelinesResolvedEnv, map[string]string{}}, + // all properly formed + {map[string]string{"organizationURLFromEnv": "AZP_URL", "personalAccessTokenFromEnv": "AZP_TOKEN", "poolID": "1", "targetPipelinesQueueLength": "1"}, false, testAzurePipelinesResolvedEnv, map[string]string{}}, + // using triggerAuthentication + {map[string]string{"poolID": "1", "targetPipelinesQueueLength": "1"}, false, testAzurePipelinesResolvedEnv, map[string]string{"organizationURL": "https://dev.azure.com/sample", "personalAccessToken": "sample"}}, + // missing organizationURL + {map[string]string{"organizationURLFromEnv": "", "personalAccessTokenFromEnv": "sample", "poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}}, + // missing personalAccessToken + {map[string]string{"organizationURLFromEnv": "AZP_URL", "poolID": "1", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}}, + // missing poolID + {map[string]string{"organizationURLFromEnv": "AZP_URL", "personalAccessTokenFromEnv": "AZP_TOKEN", "poolID": "", "targetPipelinesQueueLength": "1"}, true, testAzurePipelinesResolvedEnv, map[string]string{}}, +} + +var azurePipelinesMetricIdentifiers = []azurePipelinesMetricIdentifier{ + {&testAzurePipelinesMetadata[1], "azure-pipelines-queue-sample-1"}, +} + +func TestParseAzurePipelinesMetadata(t *testing.T) { + for _, testData := range testAzurePipelinesMetadata { + _, err := parseAzurePipelinesMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestAzurePipelinesGetMetricSpecForScaling(t *testing.T) { + for _, testData := range azurePipelinesMetricIdentifiers { + meta, err := parseAzurePipelinesMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testData.metadataTestData.resolvedEnv, AuthParams: testData.metadataTestData.authParams}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockAzurePipelinesScaler := azurePipelinesScaler{ + metadata: meta, + httpClient: http.DefaultClient, + } + + metricSpec := mockAzurePipelinesScaler.GetMetricSpecForScaling() + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index df542a44c89..991110db0a7 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -464,6 +464,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal return scalers.NewAzureLogAnalyticsScaler(config) case "azure-monitor": return scalers.NewAzureMonitorScaler(config) + case "azure-pipelines": + return scalers.NewAzurePipelinesScaler(config) case "azure-queue": return scalers.NewAzureQueueScaler(config) case "azure-servicebus": diff --git a/tests/.env b/tests/.env index b9f38097be6..b90026a761d 100644 --- a/tests/.env +++ b/tests/.env @@ -11,3 +11,8 @@ OPENSTACK_PASSWORD= OPENSTACK_PROJECT_ID= # OPENSTACK_SWIFT_URL= # OPENSTACK_REGION_NAME= +AZURE_DEVOPS_ORGANIZATION_URL= +AZURE_DEVOPS_PAT= +AZURE_DEVOPS_PROJECT= +AZURE_DEVOPS_BUILD_DEFINITON_ID= +AZURE_DEVOPS_POOL_NAME= diff --git a/tests/scalers/azure-pipelines.test.ts b/tests/scalers/azure-pipelines.test.ts new file mode 100644 index 00000000000..44e6e4a5f7e --- /dev/null +++ b/tests/scalers/azure-pipelines.test.ts @@ -0,0 +1,159 @@ +import * as azdev from "azure-devops-node-api"; +import * as ba from "azure-devops-node-api/BuildApi"; +import * as ta from "azure-devops-node-api/TaskAgentApiBase"; +import * as ti from "azure-devops-node-api/interfaces/TaskAgentInterfaces"; +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' + +const defaultNamespace = 'azure-pipelines-test' +const organizationURL = process.env['AZURE_DEVOPS_ORGANIZATION_URL'] +const personalAccessToken = process.env['AZURE_DEVOPS_PAT'] +const projectName = process.env['AZURE_DEVOPS_PROJECT'] +const buildDefinitionID = process.env['AZURE_DEVOPS_BUILD_DEFINITON_ID'] +const poolName = process.env['AZURE_DEVOPS_POOL_NAME'] + +test.before(async t => { + if (!organizationURL && !personalAccessToken && !projectName && !buildDefinitionID && !poolName) { + t.fail('AZURE_DEVOPS_ORGANIZATION_URL, AZURE_DEVOPS_PAT, AZURE_DEVOPS_PROJECT, AZURE_DEVOPS_BUILD_DEFINITON_ID and AZURE_DEVOPS_POOL_NAME environment variables are required for azure pipelines tests') + } + + let authHandler = azdev.getPersonalAccessTokenHandler(personalAccessToken); + let connection = new azdev.WebApi(organizationURL, authHandler); + + let taskAgent: ta.ITaskAgentApiBase = await connection.getTaskAgentApi(); + let agentPool: ti.TaskAgentPool[] = await taskAgent.getAgentPools(poolName) + let poolID: number = agentPool[0].id + + if(!poolID) { + t.fail("failed to convert poolName to poolID") + } + + sh.config.silent = true + const base64Token = Buffer.from(personalAccessToken).toString('base64') + const tmpFile = tmp.fileSync() + fs.writeFileSync(tmpFile.name, deployYaml.replace('{{AZP_TOKEN_BASE64}}', base64Token).replace('{{AZP_URL}}', organizationURL).replace('{{AZP_POOL}}', poolName).replace('{{AZP_POOL_ID}}', poolID.toString())) + sh.exec(`kubectl create namespace ${defaultNamespace}`) + t.is(0, sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${defaultNamespace}`).code, 'creating a deployment should work.') +}) + +test.serial('Deployment should have 1 replicas on start', t => { + sh.exec('sleep 5s') + let replicaCount = sh.exec(`kubectl get deployment.apps/test-deployment --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"`).stdout + t.is(replicaCount, '1', 'replica count should start out as 1') +}) + +test.serial('Deployment should scale to 3 replicas after queueing 3 jobs', async t => { + let authHandler = azdev.getPersonalAccessTokenHandler(personalAccessToken); + let connection = new azdev.WebApi(organizationURL, authHandler); + let build: ba.IBuildApi = await connection.getBuildApi(); + var definitionID = parseInt(buildDefinitionID) + + // wait for the first agent to be registered in the agent pool + await new Promise(resolve => setTimeout(resolve, 20 * 1000)); + + for(let i = 0; i < 3; i++) { + await build.queueBuild(null, projectName, null, null, null, definitionID) + } + + var replicaCount = sh.exec(`kubectl get deployment.apps/test-deployment --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"`).stdout + + for (let i = 0; i < 10 && replicaCount !== '3'; i++) { + replicaCount = sh.exec(`kubectl get deployment.apps/test-deployment --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"`).stdout + if (replicaCount !== '3') { + await new Promise(resolve => setTimeout(resolve, 5000)); + } + } + + t.is(replicaCount, '3', 'replica count should be 3 after starting 3 jobs') +}) + +test.serial('Deployment should scale to 1 replica after finishing 3 jobs', async t => { + // wait 10 minutes for the jobs to finish and scale down + await new Promise(resolve => setTimeout(resolve, 10 * 60 * 1000)); + + var replicaCount = sh.exec(`kubectl get deployment.apps/test-deployment --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"`).stdout + + for (let i = 0; i < 20 && replicaCount !== '1'; i++) { + replicaCount = sh.exec(`kubectl get deployment.apps/test-deployment --namespace ${defaultNamespace} -o jsonpath="{.spec.replicas}"`).stdout + if (replicaCount !== '1') { + await new Promise(resolve => setTimeout(resolve, 5000)); + } + } + + t.is(replicaCount, '1', 'replica count should be 1 after 10 minutes') +}) + +test.after.always('clean up azure-pipelines deployment', t => { + const resources = [ + 'scaledobject.keda.sh/azure-pipelines-scaledobject', + 'secret/test-secrets', + 'deployment.apps/test-deployment', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${defaultNamespace}`) + } + sh.exec(`kubectl delete namespace ${defaultNamespace}`) +}) + +const deployYaml = `apiVersion: v1 +kind: Secret +metadata: + name: test-secrets +data: + personalAccessToken: {{AZP_TOKEN_BASE64}} +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: test-deployment + labels: + app: azdevops-agent +spec: + replicas: 1 + selector: + matchLabels: + app: azdevops-agent + template: + metadata: + labels: + app: azdevops-agent + spec: + containers: + - name: azdevops-agent + image: ghcr.io/kedacore/tests-azure-pipelines-agent:b3a02cc + env: + - name: AZP_URL + value: {{AZP_URL}} + - name: AZP_TOKEN + valueFrom: + secretKeyRef: + name: test-secrets + key: personalAccessToken + - name: AZP_POOL + value: {{AZP_POOL}} + volumeMounts: + - mountPath: /var/run/docker.sock + name: docker-volume + volumes: + - name: docker-volume + hostPath: + path: /var/run/docker.sock +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: azure-pipelines-scaledobject +spec: + scaleTargetRef: + name: test-deployment + minReplicaCount: 1 + maxReplicaCount: 3 + triggers: + - type: azure-pipelines + metadata: + organizationURLFromEnv: "AZP_URL" + personalAccessTokenFromEnv: "AZP_TOKEN" + poolID: "{{AZP_POOL_ID}}"`