Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rabbitmq trigger with invalid queueName creates connections that persist #6283

Closed
mauvilsa opened this issue Oct 30, 2024 · 7 comments
Closed
Labels
bug Something isn't working

Comments

@mauvilsa
Copy link

Report

When a scaledObject is created with rabbitmq trigger and queueName pointing to a queue that doesn't exist in that rabbitmq, the keda-operator pod increasingly creates new connections that persist. Eventually a limit can be reached and new connections to rabbitmq are rejected.

Expected Behavior

When a rabbitmq queue doesn't exist, the scaledObject fail and close any opened connections to rabbitmq.

Actual Behavior

The scalerObject fails periodically, and the number of connections to rabbitmq keep increasing until creating new connections fail.

Steps to Reproduce the Problem

  1. Create a scaledObject with proper details to connect to rabbitmq
  2. Set a queueName to a queue that doesn't exist in that rabbitmq
  3. Wait for failures and see in the rabbitmq web ui how number of connections keeps increasing

Logs from KEDA operator

ERROR scale_handler error getting metric for trigger {"scaledObject.Namespace": "...", "scaledObject.Name": "...", "trigger": "rabbitMQScaler", "error": "error inspecting rabbitMQ: Exception (404) Reason: \"NOT_FOUND - no queue '...' in vhost '/'\""}
ERROR scale_handler error getting scale decision {"scaledObject.Namespace": "...", "scaledObject.Name": "...", "scaler": "rabbitMQScaler", "error": "error establishing rabbitmq connection: dial tcp ...: i/o timeout"}

KEDA Version

2.14.0

Kubernetes Version

1.30

Platform

Microsoft Azure

Scaler Details

rabbitmq

Anything else?

No response

@mauvilsa mauvilsa added the bug Something isn't working label Oct 30, 2024
@JorTurFer
Copy link
Member

Hello
Are you using AMQP or HTTP?

@mauvilsa
Copy link
Author

Hello Are you using AMQP or HTTP?

amqps://

@JorTurFer
Copy link
Member

Interesting... In theory, on each scaler failure KEDA calls to close the scaler before refreshing it, and it closes all the connections:

func (s *rabbitMQScaler) Close(context.Context) error {
if s.connection != nil {
err := s.connection.Close()
if err != nil {
s.logger.Error(err, "Error closing rabbitmq connection")
return err
}
}
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
}
return nil
}

Are you willing to take a look?

@rickbrouwer
Copy link
Contributor

rickbrouwer commented Nov 3, 2024

I think the issue is in this piece of code:

// QueueDeclarePassive assumes that the queue exists and fails if it doesn't
	items, err := s.channel.QueueDeclarePassive(s.metadata.QueueName, false, false, false, false, amqp.Table{})
	if err != nil {
		return -1, -1, err
	} 

This gives an error indicating that the queue does not exist. If the queue does not exist, the error is returned immediately and the Close() method is not called. It looks like the connection will indeed remain open. A new connection is created on the next attempt.

This explains, i think, why the number of connections continues to grow when a non-existent queue is configured.

@JorTurFer
Copy link
Member

JorTurFer commented Nov 3, 2024

The call to Close() is done directly by KEDA from outside the scaler code:

func (c *ScalersCache) GetMetricsAndActivityForScaler(ctx context.Context, index int, metricName string) ([]external_metrics.ExternalMetricValue, bool, time.Duration, error) {
if index < 0 || index >= len(c.Scalers) {
return nil, false, -1, fmt.Errorf("scaler with id %d not found. Len = %d", index, len(c.Scalers))
}
startTime := time.Now()
metric, activity, err := c.Scalers[index].Scaler.GetMetricsAndActivity(ctx, metricName)
if err == nil {
return metric, activity, time.Since(startTime), nil
}
ns, err := c.refreshScaler(ctx, index)
if err != nil {
return nil, false, -1, err
}
startTime = time.Now()
metric, activity, err = ns.GetMetricsAndActivity(ctx, metricName)
return metric, activity, time.Since(startTime), err
}

Specifically, it's ns, err := c.refreshScaler(ctx, index) the function that calls to close

@mauvilsa
Copy link
Author

mauvilsa commented Nov 7, 2024

I still observe this behavior in more than one cluster. I have been trying to create better steps to reproduce the problem, but I haven't been able up to now. I will post them when I manage.

@mauvilsa
Copy link
Author

mauvilsa commented Nov 7, 2024

I managed to locally get connections that stay open. The way I did it is not the same as what we have in our kubernetes cluster, though the issue could be related. Anyway, if I use keda v2.16.0 released some hours ago, the same issue doesn't seem to happen. So could be that this is already fixed. I will close this issue.

Just to have as reference, how I reproduced locally was:

  1. Create a cluster with rabbitmq, a dummy deployment and keda with a scaled object.
# Cluster
kind create cluster --name keda-cluster --image kindest/node:v1.30.0

# RabbitMQ
echo 'apiVersion: apps/v1
kind: Deployment
metadata:
  name: rabbitmq
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
        - name: rabbitmq
          image: rabbitmq:3.12.10-management
          ports:
            - containerPort: 5672
            - containerPort: 15672
---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq
  namespace: default
spec:
  ports:
    - name: amqp
      port: 5672
      targetPort: 5672
    - name: management
      port: 15672
      targetPort: 15672
  selector:
    app: rabbitmq
' > rabbitmq-deployment.yaml
kubectl apply -f rabbitmq-deployment.yaml
kubectl port-forward -n default $(kubectl get pods -n default | grep rabbitmq | awk '{print $1}') 15672:15672 &


# Deployment
echo 'apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-deployment
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: my-app
  template:
    metadata:
      labels:
        app: my-app
    spec:
      containers:
        - name: my-container
          image: busybox
          command: ["sh", "-c", "sleep 3600"]
' > my-deployment.yaml
kubectl apply -f my-deployment.yaml

# KEDA
kubectl create namespace keda
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda --namespace keda --version 2.14.0

echo "apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: rabbitmq-scaledobject
  namespace: default
spec:
  scaleTargetRef:
    name: my-deployment
  pollingInterval:  30
  cooldownPeriod:   180
  minReplicaCount:  2
  maxReplicaCount:  5
  triggers:
    - type: rabbitmq
      metadata:
        protocol: amqp
        queueName: queue-that-does-not-exist
        mode: QueueLength
        value: \"20\"
        host: amqp://guest:[email protected]:5672/
" > scaledobject.yaml
kubectl apply -f scaledobject.yaml
  1. The previous step doesn't produce the problem. But if I change the scaler, e.g. change minReplicaCount, and then reapply kubectl apply -f scaledobject.yaml, for some reason one or more connections stay open.

Let me know if what I did is overkill, and there are easier ways to reproduce bugs.

@mauvilsa mauvilsa closed this as completed Nov 7, 2024
@github-project-automation github-project-automation bot moved this from To Triage to Ready To Ship in Roadmap - KEDA Core Nov 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: Ready To Ship
Development

No branches or pull requests

3 participants