Skip to content

Commit

Permalink
Handling StatusNotFound in IBMMQ scaler
Browse files Browse the repository at this point in the history
Signed-off-by: rickbrouwer <[email protected]>
  • Loading branch information
rickbrouwer committed Jan 9, 2025
1 parent 3e87999 commit 574ee1c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 32 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Here is an overview of all new **experimental** features:

### Improvements

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **IBMMQ Scaler**: Handling StatusNotFound in IBMMQ scaler ([#6472](https://github.com/kedacore/keda/pull/6472))

### Fixes

Expand Down
81 changes: 51 additions & 30 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"net/http"
"net/url"
"strings"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand Down Expand Up @@ -54,6 +53,13 @@ type Response struct {
Message []string `json:"message"`
}

// ErrorResponse Structure for error messages from IBM MQ
type ErrorResponse struct {
Error []struct {
Message string `json:"message"`
} `json:"error"`
}

// Parameters Contains the current depth of the IBM MQ Queue
type Parameters struct {
Curdepth int `json:"curdepth"`
Expand Down Expand Up @@ -132,18 +138,18 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro

func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
depths := make([]int64, 0, len(s.metadata.QueueName))
url := s.metadata.Host

req, err := http.NewRequestWithContext(ctx, "POST", url, nil)
req, err := http.NewRequestWithContext(ctx, "POST", s.metadata.Host, nil)
if err != nil {
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}

req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)

for _, queueName := range s.metadata.QueueName {
requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`)
requestJSON := []byte(fmt.Sprintf(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "%s", "responseParameters": ["CURDEPTH"]}`, queueName))
req.Body = io.NopCloser(bytes.NewBuffer(requestJSON))

resp, err := s.httpClient.Do(req)
Expand All @@ -152,47 +158,62 @@ func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusUnauthorized {
return 0, fmt.Errorf("authentication failed: incorrect username or password")
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err)
}

var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err)
}
switch resp.StatusCode {
case http.StatusUnauthorized:
return 0, fmt.Errorf("authentication failed: incorrect username or password")
case http.StatusNotFound:
var errorResponse ErrorResponse
if err := json.Unmarshal(body, &errorResponse); err != nil {
return 0, fmt.Errorf("failed to parse error response JSON for queue %s: %w", queueName, err)
}
if len(errorResponse.Error) > 0 && errorResponse.Error[0].Message != "" {
return 0, fmt.Errorf("%s", errorResponse.Error[0].Message)
}
return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s", queueName)
case http.StatusOK:
var response CommandResponse
if err := json.Unmarshal(body, &response); err != nil {
return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err)
}

if len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName)
}
// Check for valid response with message
if len(response.CommandResponse) > 0 && len(response.CommandResponse[0].Message) > 0 {
return 0, fmt.Errorf("%s", response.CommandResponse[0].Message[0])
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
// Check for valid response with parameters
if len(response.CommandResponse) == 0 || response.CommandResponse[0].Parameters == nil {
return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s", queueName)
}
return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason)

depths = append(depths, int64(response.CommandResponse[0].Parameters.Curdepth))
default:
return 0, fmt.Errorf("unexpected status code %d for queue %s", resp.StatusCode, queueName)
}
}

depth := int64(response.CommandResponse[0].Parameters.Curdepth)
depths = append(depths, depth)
return calculateDepth(depths, s.metadata.Operation), nil
}

func calculateDepth(depths []int64, operation string) int64 {
if len(depths) == 0 {
return 0
}

switch s.metadata.Operation {
switch operation {
case sumOperation:
return sumDepths(depths), nil
return sumDepths(depths)
case avgOperation:
return avgDepths(depths), nil
return avgDepths(depths)
case maxOperation:
return maxDepth(depths), nil
return maxDepths(depths)
default:
return 0, nil
return 0
}
}

Expand All @@ -211,7 +232,7 @@ func avgDepths(depths []int64) int64 {
return sumDepths(depths) / int64(len(depths))
}

func maxDepth(depths []int64) int64 {
func maxDepths(depths []int64) int64 {
if len(depths) == 0 {
return 0
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/ibmmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ var testQueueDepthResults = []queueDepthResultTestData{
"message": "MQWB0009E: Could not query the queue manager 'testqmgR'.",
"explanation": "The REST API was invoked specifying a queue manager name which cannot be located."}]
}`,
responseStatus: http.StatusOK,
responseStatus: http.StatusNotFound,
expectedValue: 0,
isError: true,
},
Expand Down

0 comments on commit 574ee1c

Please sign in to comment.