Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved scaling speed of AzDO pipelines #3729

Merged
merged 4 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General:** Add explicit seccompProfile type to securityContext config ([#3561](https://github.com/kedacore/keda/issues/3561))
- **General:** Add `Min` column to ScaledJob visualization ([#3689](https://github.com/kedacore/keda/issues/3689))
- **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610))
- **Azure Pipelines Scaler:** Improved speed of profiling large set of Job Requests from Azure Pipelines ([#3702](https://github.com/kedacore/keda/issues/3702))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))

### Fixes
Expand Down
157 changes: 123 additions & 34 deletions pkg/scalers/azure_pipelines_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -21,6 +22,92 @@ const (
defaultTargetPipelinesQueueLength = 1
)

type JobRequests struct {
Count int `json:"count"`
Value []JobRequest `json:"value"`
}

type JobRequest struct {
RequestID int `json:"requestId"`
QueueTime time.Time `json:"queueTime"`
AssignTime time.Time `json:"assignTime,omitempty"`
ReceiveTime time.Time `json:"receiveTime,omitempty"`
LockedUntil time.Time `json:"lockedUntil,omitempty"`
ServiceOwner string `json:"serviceOwner"`
HostID string `json:"hostId"`
Result *string `json:"result"`
ScopeID string `json:"scopeId"`
PlanType string `json:"planType"`
PlanID string `json:"planId"`
JobID string `json:"jobId"`
Demands []string `json:"demands"`
ReservedAgent *struct {
Links struct {
Self struct {
Href string `json:"href"`
} `json:"self"`
Web struct {
Href string `json:"href"`
} `json:"web"`
} `json:"_links"`
ID int `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
OsDescription string `json:"osDescription"`
Enabled bool `json:"enabled"`
Status string `json:"status"`
ProvisioningState string `json:"provisioningState"`
AccessPoint string `json:"accessPoint"`
} `json:"reservedAgent,omitempty"`
Definition struct {
Links struct {
Web struct {
Href string `json:"href"`
} `json:"web"`
Self struct {
Href string `json:"href"`
} `json:"self"`
} `json:"_links"`
ID int `json:"id"`
Name string `json:"name"`
} `json:"definition"`
Owner struct {
Links struct {
Web struct {
Href string `json:"href"`
} `json:"web"`
Self struct {
Href string `json:"href"`
} `json:"self"`
} `json:"_links"`
ID int `json:"id"`
Name string `json:"name"`
} `json:"owner"`
Data struct {
ParallelismTag string `json:"ParallelismTag"`
IsScheduledKey string `json:"IsScheduledKey"`
} `json:"data"`
PoolID int `json:"poolId"`
OrchestrationID string `json:"orchestrationId"`
Priority int `json:"priority"`
MatchedAgents *[]struct {
Links struct {
Self struct {
Href string `json:"href"`
} `json:"self"`
Web struct {
Href string `json:"href"`
} `json:"web"`
} `json:"_links"`
ID int `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
Enabled bool `json:"enabled"`
Status string `json:"status"`
ProvisioningState string `json:"provisioningState"`
} `json:"matchedAgents,omitempty"`
}

type azurePipelinesPoolNameResponse struct {
Value []struct {
ID int `json:"id"`
Expand Down Expand Up @@ -243,47 +330,50 @@ func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context)
return -1, err
}

var result map[string]interface{}
err = json.Unmarshal(body, &result)
var jrs JobRequests
err = json.Unmarshal(body, &jrs)
if err != nil {
s.logger.Error(err, "Cannot unmarshal ADO JobRequests API response")
return -1, err
}

var count int64
jobs, ok := result["value"].([]interface{})

if !ok {
return -1, fmt.Errorf("the Azure DevOps REST API result returned no value data despite successful code. url: %s", url)
}

// for each job check if it parent fulfilled, then demand fulfilled, then finally pool fulfilled
for _, value := range jobs {
v := value.(map[string]interface{})
if v["result"] == nil {
if s.metadata.parent == "" && s.metadata.demands == "" {
// no plan defined, just add a count
count++
var count int64
for _, job := range stripDeadJobs(jrs.Value) {
if s.metadata.parent == "" && s.metadata.demands == "" {
// no plan defined, just add a count
count++
} else {
if s.metadata.parent == "" {
// doesn't use parent, switch to demand
if getCanAgentDemandFulfilJob(job, s.metadata) {
count++
}
} else {
if s.metadata.parent == "" {
// doesn't use parent, switch to demand
if getCanAgentDemandFulfilJob(v, s.metadata) {
count++
}
} else {
// does use parent
if getCanAgentParentFulfilJob(v, s.metadata) {
count++
}
// does use parent
if getCanAgentParentFulfilJob(job, s.metadata) {
count++
}
}
}
}

return count, err
}

func stripDeadJobs(jobs []JobRequest) []JobRequest {
var filtered []JobRequest
for _, job := range jobs {
if job.Result == nil {
filtered = append(filtered, job)
}
}
return filtered
}
Eldarrin marked this conversation as resolved.
Show resolved Hide resolved

// Determine if the scaledjob has the right demands to spin up
func getCanAgentDemandFulfilJob(v map[string]interface{}, metadata *azurePipelinesMetadata) bool {
var demandsReq = v["demands"].([]interface{})
func getCanAgentDemandFulfilJob(jr JobRequest, metadata *azurePipelinesMetadata) bool {
var demandsReq = jr.Demands
var demandsAvail = strings.Split(metadata.demands, ",")
var countDemands = 0
for _, dr := range demandsReq {
Expand All @@ -301,16 +391,15 @@ func getCanAgentDemandFulfilJob(v map[string]interface{}, metadata *azurePipelin
}

// Determine if the Job and Parent Agent Template have matching capabilities
func getCanAgentParentFulfilJob(v map[string]interface{}, metadata *azurePipelinesMetadata) bool {
matchedAgents, ok := v["matchedAgents"].([]interface{})
if !ok {
// ADO is already processing
func getCanAgentParentFulfilJob(jr JobRequest, metadata *azurePipelinesMetadata) bool {
matchedAgents := jr.MatchedAgents

if matchedAgents == nil {
return false
}

for _, m := range matchedAgents {
n := m.(map[string]interface{})
if metadata.parent == n["name"].(string) {
for _, m := range *matchedAgents {
if metadata.parent == m.Name {
return true
}
}
Expand Down
32 changes: 21 additions & 11 deletions pkg/scalers/azure_pipelines_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
)

const loadCount = 1000 // the size of the pretend pool completed of job requests

type parseAzurePipelinesMetadataTestData struct {
testName string
metadata map[string]string
Expand Down Expand Up @@ -40,6 +42,9 @@ var testAzurePipelinesMetadata = []parseAzurePipelinesMetadataTestData{
{"all properly formed", map[string]string{"organizationURLFromEnv": "AZP_URL", "personalAccessTokenFromEnv": "AZP_TOKEN", "poolID": "1", "targetPipelinesQueueLength": "1", "activationTargetPipelinesQueueLength": "A"}, true, testAzurePipelinesResolvedEnv, map[string]string{}},
}

var testJobRequestResponse = `{"count":2,"value":[{"requestId":890659,"queueTime":"2022-09-28T11:19:49.89Z","assignTime":"2022-09-28T11:20:29.5033333Z","receiveTime":"2022-09-28T11:20:32.0530499Z","lockedUntil":"2022-09-28T11:30:32.07Z","serviceOwner":"xxx","hostId":"xxx","scopeId":"xxx","planType":"Build","planId":"xxx","jobId":"xxx","demands":["kubectl","Agent.Version -gtVersion 2.182.1"],"reservedAgent":{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11735"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11735"}},"id":11735,"name":"kube-scaledjob-5nlph-kzpgf","version":"2.210.1","osDescription":"Linux 5.4.0-1089-azure #94~18.04.1-Ubuntu SMP Fri Aug 5 12:34:50 UTC 2022","enabled":true,"status":"online","provisioningState":"Provisioned","accessPoint":"CodexAccessMapping"},"definition":{"_links":{"web":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_build/definition?definitionId=4869"},"self":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_apis/build/Definitions/4869"}},"id":4869,"name":"base - main"},"owner":{"_links":{"web":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_build/results?buildId=673584"},"self":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_apis/build/Builds/673584"}},"id":673584,"name":"20220928.2"},"data":{"ParallelismTag":"Private","IsScheduledKey":"False"},"poolId":44,"orchestrationId":"5c5c8ec9-786f-4e97-99d4-a29279befba3.build.__default","priority":0},{"requestId":890663,"queueTime":"2022-09-28T11:20:22.4633333Z","serviceOwner":"00025394-6065-48ca-87d9-7f5672854ef7","hostId":"41a18c7d-df5e-4032-a4df-d533b56bd2de","scopeId":"02696e26-a35b-424c-86b8-1f54e1b0b4b7","planType":"Build","planId":"b718cfed-493c-46be-a650-88fe762f75aa","jobId":"15b95994-59ec-5502-695d-0b93722883bd","demands":["dotnet60","java","Agent.Version -gtVersion 2.182.1"],"matchedAgents":[{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/1755"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=1755"}},"id":1755,"name":"dotnet60-keda-template","version":"2.210.1","enabled":true,"status":"offline","provisioningState":"Provisioned"},{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11732"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11732"}},"id":11732,"name":"dotnet60-scaledjob-5dsgc-pkqvm","version":"2.210.1","enabled":true,"status":"online","provisioningState":"Provisioned"},{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11733"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11733"}},"id":11733,"name":"dotnet60-scaledjob-zgqnp-8h4z4","version":"2.210.1","enabled":true,"status":"online","provisioningState":"Provisioned"},{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11734"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11734"}},"id":11734,"name":"dotnet60-scaledjob-wr65c-ff2cv","version":"2.210.1","enabled":true,"status":"online","provisioningState":"Provisioned"}],"definition":{"_links":{"web":{"href":"https://FOO.visualstudio.com/02696e26-a35b-424c-86b8-1f54e1b0b4b7/_build/definition?definitionId=3129"},"self":{"href":"https://FOO.visualstudio.com/02696e26-a35b-424c-86b8-1f54e1b0b4b7/_apis/build/Definitions/3129"}},"id":3129,"name":"Other Build CI"},"owner":{"_links":{"web":{"href":"https://FOO.visualstudio.com/02696e26-a35b-424c-86b8-1f54e1b0b4b7/_build/results?buildId=673585"},"self":{"href":"https://FOO.visualstudio.com/02696e26-a35b-424c-86b8-1f54e1b0b4b7/_apis/build/Builds/673585"}},"id":673585,"name":"20220928.11"},"data":{"ParallelismTag":"Private","IsScheduledKey":"False"},"poolId":44,"orchestrationId":"b718cfed-493c-46be-a650-88fe762f75aa.buildtest.build_and_test.__default","priority":0}]}`
var deadJob = `{"requestId":890659,"result":"succeeded","queueTime":"2022-09-28T11:19:49.89Z","assignTime":"2022-09-28T11:20:29.5033333Z","receiveTime":"2022-09-28T11:20:32.0530499Z","lockedUntil":"2022-09-28T11:30:32.07Z","serviceOwner":"xxx","hostId":"xxx","scopeId":"xxx","planType":"Build","planId":"xxx","jobId":"xxx","demands":["kubectl","Agent.Version -gtVersion 2.182.1"],"reservedAgent":{"_links":{"self":{"href":"https://dev.azure.com/FOO/_apis/distributedtask/pools/44/agents/11735"},"web":{"href":"https://dev.azure.com/FOO/_settings/agentpools?view=jobs&poolId=44&agentId=11735"}},"id":11735,"name":"kube-scaledjob-5nlph-kzpgf","version":"2.210.1","osDescription":"Linux 5.4.0-1089-azure #94~18.04.1-Ubuntu SMP Fri Aug 5 12:34:50 UTC 2022","enabled":true,"status":"online","provisioningState":"Provisioned","accessPoint":"CodexAccessMapping"},"definition":{"_links":{"web":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_build/definition?definitionId=4869"},"self":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_apis/build/Definitions/4869"}},"id":4869,"name":"base - main"},"owner":{"_links":{"web":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_build/results?buildId=673584"},"self":{"href":"https://dev.azure.com/FOO/1858395a-257e-4efd-bbc5-eb618128452b/_apis/build/Builds/673584"}},"id":673584,"name":"20220928.2"},"data":{"ParallelismTag":"Private","IsScheduledKey":"False"},"poolId":44,"orchestrationId":"5c5c8ec9-786f-4e97-99d4-a29279befba3.build.__default","priority":0}`

func TestParseAzurePipelinesMetadata(t *testing.T) {
for _, testData := range testAzurePipelinesMetadata {
t.Run(testData.testName, func(t *testing.T) {
Expand Down Expand Up @@ -173,7 +178,7 @@ func getMatchedAgentMetaData(url string) *azurePipelinesMetadata {
meta := azurePipelinesMetadata{}
meta.organizationName = "testOrg"
meta.organizationURL = url
meta.parent = "test-keda-template"
meta.parent = "dotnet60-keda-template"
meta.personalAccessToken = "testPAT"
meta.poolID = 1
meta.targetPipelinesQueueLength = 1
Expand All @@ -182,11 +187,9 @@ func getMatchedAgentMetaData(url string) *azurePipelinesMetadata {
}

func TestAzurePipelinesMatchedAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
_, _ = w.Write(buildLoadJSON())
}))

meta := getMatchedAgentMetaData(apiStub.URL)
Expand All @@ -210,7 +213,7 @@ func TestAzurePipelinesMatchedAgent(t *testing.T) {
func getDemandJobMetaData(url string) *azurePipelinesMetadata {
meta := getMatchedAgentMetaData(url)
meta.parent = ""
meta.demands = "testDemand,kubernetes"
meta.demands = "dotnet60,java"

return meta
}
Expand All @@ -224,11 +227,9 @@ func getMismatchDemandJobMetaData(url string) *azurePipelinesMetadata {
}

func TestAzurePipelinesMatchedDemandAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0", "testDemand", "kubernetes"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
_, _ = w.Write(buildLoadJSON())
}))

meta := getDemandJobMetaData(apiStub.URL)
Expand All @@ -250,11 +251,9 @@ func TestAzurePipelinesMatchedDemandAgent(t *testing.T) {
}

func TestAzurePipelinesNonMatchedDemandAgent(t *testing.T) {
var response = `{"count":1,"value":[{"demands":["Agent.Version -gtVersion 2.144.0", "testDemand", "kubernetes"],"matchedAgents":[{"id":1,"name":"test-keda-template"}]}]}`

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(response))
_, _ = w.Write(buildLoadJSON())
}))

meta := getMismatchDemandJobMetaData(apiStub.URL)
Expand All @@ -274,3 +273,14 @@ func TestAzurePipelinesNonMatchedDemandAgent(t *testing.T) {
t.Fail()
}
}

func buildLoadJSON() []byte {
output := testJobRequestResponse[0 : len(testJobRequestResponse)-2]
for i := 1; i < loadCount; i++ {
output = output + "," + deadJob
}

output += "]}"

return []byte(output)
}