Skip to content

Commit

Permalink
Support poolName or poolID validation in Azure Pipelines Scaler (#…
Browse files Browse the repository at this point in the history
…2370)

Signed-off-by: Jorge Turrado <[email protected]>

Co-authored-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
Jorge Turrado Ferrero and zroubalik authored Jan 3, 2022
1 parent 9c6d7f9 commit 71d6c90
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 71 deletions.
7 changes: 1 addition & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,19 @@

### New


- Add New Relic Scaler ([#2387](https://github.com/kedacore/keda/pull/2387))

- Add ActiveMQ Scaler ([#2305](https://github.com/kedacore/keda/pull/2305))

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))

### Improvements

- Azure Pipelines Scaler: support `poolName` or `poolID` validation ([#2370](https://github.com/kedacore/keda/pull/2370))
- Graphite Scaler: use the latest datapoint returned, not the earliest ([#2365](https://github.com/kedacore/keda/pull/2365))
- Kubernetes Workload Scaler: ignore terminated pods ([#2384](https://github.com/kedacore/keda/pull/2384))
- Azure EventHub Scaler: don't expose connection string in metricName ([#2404](https://github.com/kedacore/keda/pull/2404))
- `keda-operator` Cluster Role: add `list` and `watch` access to service accounts ([#2406](https://github.com/kedacore/keda/pull/2406))|([#2410](https://github.com/kedacore/keda/pull/2410))
- Kafka Scaler: concurrently query brokers for consumer and producer offsets ([#2405](https://github.com/kedacore/keda/pull/2405))
- Delete the cache entry when a ScaledObject is deleted ([#2408](https://github.com/kedacore/keda/pull/2408))

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))

### Breaking Changes

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
Expand Down
130 changes: 101 additions & 29 deletions pkg/scalers/azure_pipelines_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ const (
defaultTargetPipelinesQueueLength = 1
)

type azurePipelinesPoolNameResponse struct {
Value []struct {
ID int `json:"id"`
} `json:"value"`
}

type azurePipelinesPoolIDResponse struct {
ID int `json:"id"`
}

type azurePipelinesScaler struct {
metadata *azurePipelinesMetadata
httpClient *http.Client
Expand All @@ -31,29 +41,29 @@ type azurePipelinesMetadata struct {
organizationURL string
organizationName string
personalAccessToken string
poolID string
poolID int
targetPipelinesQueueLength int
scalerIndex int
}

var azurePipelinesLog = logf.Log.WithName("azure_pipelines_scaler")

// NewAzurePipelinesScaler creates a new AzurePipelinesScaler
func NewAzurePipelinesScaler(config *ScalerConfig) (Scaler, error) {
meta, err := parseAzurePipelinesMetadata(config)
func NewAzurePipelinesScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false)

meta, err := parseAzurePipelinesMetadata(ctx, config, httpClient)
if err != nil {
return nil, fmt.Errorf("error parsing azure Pipelines metadata: %s", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false)

return &azurePipelinesScaler{
metadata: meta,
httpClient: httpClient,
}, nil
}

func parseAzurePipelinesMetadata(config *ScalerConfig) (*azurePipelinesMetadata, error) {
func parseAzurePipelinesMetadata(ctx context.Context, config *ScalerConfig, httpClient *http.Client) (*azurePipelinesMetadata, error) {
meta := azurePipelinesMetadata{}
meta.targetPipelinesQueueLength = defaultTargetPipelinesQueueLength

Expand Down Expand Up @@ -90,60 +100,122 @@ func parseAzurePipelinesMetadata(config *ScalerConfig) (*azurePipelinesMetadata,
return nil, fmt.Errorf("no personalAccessToken given")
}

if val, ok := config.TriggerMetadata["poolID"]; ok && val != "" {
meta.poolID = val
if val, ok := config.TriggerMetadata["poolName"]; ok && val != "" {
var err error
meta.poolID, err = getPoolIDFromName(ctx, val, &meta, httpClient)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("no poolID given")
if val, ok := config.TriggerMetadata["poolID"]; ok && val != "" {
var err error
meta.poolID, err = validatePoolID(ctx, val, &meta, httpClient)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("no poolName or poolID given")
}
}

meta.scalerIndex = config.ScalerIndex

return &meta, nil
}

func (s *azurePipelinesScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
queuelen, err := s.GetAzurePipelinesQueueLength(ctx)
func getPoolIDFromName(ctx context.Context, poolName string, metadata *azurePipelinesMetadata, httpClient *http.Client) (int, error) {
url := fmt.Sprintf("%s/_apis/distributedtask/pools?poolName=%s", metadata.organizationURL, poolName)
body, err := getAzurePipelineRequest(ctx, url, metadata, httpClient)
if err != nil {
return -1, err
}

var result azurePipelinesPoolNameResponse
err = json.Unmarshal(body, &result)
if err != nil {
azurePipelinesLog.Error(err, "error getting pipelines queue length")
return []external_metrics.ExternalMetricValue{}, err
return -1, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(queuelen), resource.DecimalSI),
Timestamp: metav1.Now(),
count := len(result.Value)
if count == 0 {
return -1, fmt.Errorf("agent pool with name `%s` not found", poolName)
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
if count != 1 {
return -1, fmt.Errorf("found %d agent pool with name `%s`", count, poolName)
}

return result.Value[0].ID, nil
}

func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context) (int, error) {
url := fmt.Sprintf("%s/_apis/distributedtask/pools/%s/jobrequests", s.metadata.organizationURL, s.metadata.poolID)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
func validatePoolID(ctx context.Context, poolID string, metadata *azurePipelinesMetadata, httpClient *http.Client) (int, error) {
url := fmt.Sprintf("%s/_apis/distributedtask/pools?poolID=%s", metadata.organizationURL, poolID)
body, err := getAzurePipelineRequest(ctx, url, metadata, httpClient)
if err != nil {
return -1, fmt.Errorf("agent pool with id `%s` not found", poolID)
}

var result azurePipelinesPoolIDResponse
err = json.Unmarshal(body, &result)
if err != nil {
return -1, err
}

req.SetBasicAuth("PAT", s.metadata.personalAccessToken)
return result.ID, nil
}

r, err := s.httpClient.Do(req)
func getAzurePipelineRequest(ctx context.Context, url string, metadata *azurePipelinesMetadata, httpClient *http.Client) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return -1, err
return []byte{}, err
}

req.SetBasicAuth("PAT", metadata.personalAccessToken)

r, err := httpClient.Do(req)
if err != nil {
return []byte{}, err
}

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return -1, err
return []byte{}, err
}
r.Body.Close()

if !(r.StatusCode >= 200 && r.StatusCode <= 299) {
return -1, fmt.Errorf("the Azure DevOps REST API returned error. url: %s status: %d response: %s", url, r.StatusCode, string(b))
return []byte{}, fmt.Errorf("the Azure DevOps REST API returned error. url: %s status: %d response: %s", url, r.StatusCode, string(b))
}

return b, nil
}

func (s *azurePipelinesScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
queuelen, err := s.GetAzurePipelinesQueueLength(ctx)

if err != nil {
azurePipelinesLog.Error(err, "error getting pipelines queue length")
return []external_metrics.ExternalMetricValue{}, err
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(queuelen), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context) (int, error) {
url := fmt.Sprintf("%s/_apis/distributedtask/pools/%d/jobrequests", s.metadata.organizationURL, s.metadata.poolID)
body, err := getAzurePipelineRequest(ctx, url, s.metadata, s.httpClient)
if err != nil {
return -1, err
}

var result map[string]interface{}
err = json.Unmarshal(b, &result)
err = json.Unmarshal(body, &result)
if err != nil {
return -1, err
}
Expand All @@ -152,7 +224,7 @@ func (s *azurePipelinesScaler) GetAzurePipelinesQueueLength(ctx context.Context)
jobs, ok := result["value"].([]interface{})

if !ok {
return -1, fmt.Errorf("the Azure DevOps REST API result returned no value data. url: %s status: %d", url, r.StatusCode)
return -1, fmt.Errorf("the Azure DevOps REST API result returned no value data despite successful code. url: %s", url)
}

for _, value := range jobs {
Expand All @@ -169,7 +241,7 @@ func (s *azurePipelinesScaler) GetMetricSpecForScaling(context.Context) []v2beta
targetPipelinesQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetPipelinesQueueLength), resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-pipelines-%s", s.metadata.poolID))),
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("azure-pipelines-%d", s.metadata.poolID))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
Expand Down
Loading

0 comments on commit 71d6c90

Please sign in to comment.