Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream scaler should query the stream consumer leader when clustered #3564

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -66,6 +66,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Azure Pipelines Scaler:** Improved speed of profiling large set of Job Requests from Azure Pipelines ([#3702](https://github.com/kedacore/keda/issues/3702))
- **GCP Storage Scaler:** Add prefix and delimiter support ([#3756](https://github.com/kedacore/keda/issues/3756))
- **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728))
- **NATS Jetstream Scaler:** Query the stream consumer leader when clustered ([#3860](https://github.com/kedacore/keda/issues/3860))
- **NATS Scalers:** Support HTTPS protocol in NATS Scalers ([#3805](https://github.com/kedacore/keda/issues/3805))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Pulsar Scaler:** Add support for bearer token and basic auth ([#3844](https://github.com/kedacore/keda/issues/3844))
269 changes: 214 additions & 55 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
@@ -6,7 +6,9 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
@@ -18,10 +20,11 @@ import (
)

const (
jetStreamMetricType = "External"
defaultJetStreamLagThreshold = 10
natsHTTPProtocol = "http"
natsHTTPSProtocol = "https"
jetStreamMetricType = "External"
defaultJetStreamLagThreshold = 10
natsHTTPProtocol = "http"
natsHTTPSProtocol = "https"
jetStreamLagThresholdMetricName = "lagThreshold"
)

type natsJetStreamScaler struct {
@@ -33,24 +36,41 @@ type natsJetStreamScaler struct {
}

type natsJetStreamMetadata struct {
monitoringEndpoint string
account string
stream string
consumer string
consumerLeader string
monitoringURL string
monitoringLeaderURL string
lagThreshold int64
activationLagThreshold int64
clusterSize int
scalerIndex int
}

type jetStreamEndpointResponse struct {
Accounts []accountDetail `json:"account_details"`
Accounts []accountDetail `json:"account_details"`
MetaCluster metaCluster `json:"meta_cluster"`
}

type jetStreamServerEndpointResponse struct {
Cluster jetStreamCluster `json:"cluster"`
ServerName string `json:"server_name"`
}

type jetStreamCluster struct {
HostUrls []string `json:"urls"`
}

type accountDetail struct {
Name string `json:"name"`
Streams []*streamDetail `json:"stream_detail"`
}

type metaCluster struct {
ClusterSize int `json:"cluster_size"`
}

type streamDetail struct {
Name string `json:"name"`
Config streamConfig `json:"config"`
@@ -76,6 +96,11 @@ type consumerDetail struct {
NumPending int `json:"num_pending"`
Config consumerConfig `json:"config"`
DeliveryStatus consumerDeliveryStatus `json:"delivery"`
Cluster consumerCluster `json:"cluster"`
}

type consumerCluster struct {
Leader string `json:"leader"`
}

type consumerConfig struct {
@@ -127,11 +152,12 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er

meta.lagThreshold = defaultJetStreamLagThreshold

if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok {
if val, ok := config.TriggerMetadata[jetStreamLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err)
return meta, fmt.Errorf("error parsing %s: %s", jetStreamLagThresholdMetricName, err)
}

meta.lagThreshold = t
}

@@ -157,49 +183,202 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er
return meta, fmt.Errorf("useHTTPS parsing error %s", err.Error())
}
}
meta.monitoringEndpoint = getNATSJetStreamEndpoint(useHTTPS, natsServerEndpoint, meta.account)
meta.monitoringURL = getNATSJetStreamMonitoringURL(useHTTPS, natsServerEndpoint, meta.account)

return meta, nil
}

func getNATSJetStreamEndpoint(useHTTPS bool, natsServerEndpoint string, account string) string {
protocol := natsHTTPProtocol
if useHTTPS {
protocol = natsHTTPSProtocol
func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context, natsJetStreamMonitoringURL string) error {
// save the leader URL, then we can check if it has changed
cachedConsumerLeader := s.metadata.consumerLeader
// default URL (standalone)
monitoringURL := natsJetStreamMonitoringURL
// use the leader URL if we already have it
if s.metadata.monitoringLeaderURL != "" {
monitoringURL = s.metadata.monitoringLeaderURL
}

jetStreamAccountResp, err := s.getNATSJetstreamMonitoringRequest(ctx, monitoringURL)
if err != nil {
return err
}

return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", protocol, natsServerEndpoint, account)
consumerFound := s.setNATSJetStreamMonitoringData(jetStreamAccountResp, "")

// invalidate the cached data if we used it but nothing was found
if cachedConsumerLeader != "" && !consumerFound {
s.invalidateNATSJetStreamCachedMonitoringData()
}

// the leader name hasn't changed from the previous run, we can assume we just queried the correct leader node
if consumerFound && cachedConsumerLeader != "" && cachedConsumerLeader == s.metadata.consumerLeader {
return nil
}

if s.metadata.clusterSize > 1 {
// we know who the consumer leader is, query it directly
if s.metadata.consumerLeader != "" {
natsJetStreamMonitoringLeaderURL, err := s.getNATSJetStreamMonitoringNodeURL(s.metadata.consumerLeader)
if err != nil {
return err
}

jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringLeaderURL)
if err != nil {
return err
}

s.setNATSJetStreamMonitoringData(jetStreamAccountResp, natsJetStreamMonitoringLeaderURL)
return nil
}

// we haven't found the consumer yet, grab the list of hosts and try each one
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL()
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringServerURL, nil)
if err != nil {
return err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring server endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringServerURL)
return err
}

defer resp.Body.Close()
var jetStreamServerResp *jetStreamServerEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jetStreamServerResp); err != nil {
s.logger.Error(err, "unable to decode NATS JetStream server details")
return err
}

for _, clusterURL := range jetStreamServerResp.Cluster.HostUrls {
node := strings.Split(clusterURL, ".")[0]
natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(node)
if err != nil {
return err
}

jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringNodeURL)
if err != nil {
return err
}

for _, jetStreamAccount := range jetStreamAccountResp.Accounts {
if jetStreamAccount.Name == s.metadata.account {
for _, stream := range jetStreamAccount.Streams {
if stream.Name == s.metadata.stream {
for _, consumer := range stream.Consumers {
if consumer.Name == s.metadata.consumer {
// this node is the consumer leader
if node == consumer.Cluster.Leader {
s.setNATSJetStreamMonitoringData(jetStreamAccountResp, natsJetStreamMonitoringNodeURL)
return nil
}
}
}
}
}
}
}
}
}
return nil
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
func (s *natsJetStreamScaler) setNATSJetStreamMonitoringData(jetStreamAccountResp *jetStreamEndpointResponse, leaderURL string) bool {
s.metadata.clusterSize = jetStreamAccountResp.MetaCluster.ClusterSize

// find and assign the stream that we are looking for.
for _, jsAccount := range jetStreamAccountResp.Accounts {
if jsAccount.Name == s.metadata.account {
for _, stream := range jsAccount.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream

for _, consumer := range stream.Consumers {
if consumer.Name == s.metadata.consumer {
s.metadata.consumerLeader = consumer.Cluster.Leader
if leaderURL != "" {
s.metadata.monitoringLeaderURL = leaderURL
}
return true
}
}
}
}
}
}
return false
}

func (s *natsJetStreamScaler) invalidateNATSJetStreamCachedMonitoringData() {
s.metadata.consumerLeader = ""
s.metadata.monitoringLeaderURL = ""
s.stream = nil
}

func (s *natsJetStreamScaler) getNATSJetstreamMonitoringRequest(ctx context.Context, natsJetStreamMonitoringURL string) (*jetStreamEndpointResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringURL, nil)
if err != nil {
return false, err
return nil, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return false, err
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringURL)
return nil, err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
var jsAccountResp *jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
s.logger.Error(err, "unable to decode JetStream account response")
s.logger.Error(err, "unable to decode NATS JetStream account details")
return nil, err
}
return jsAccountResp, nil
}

func getNATSJetStreamMonitoringURL(useHTTPS bool, natsServerEndpoint string, account string) string {
scheme := natsHTTPProtocol
if useHTTPS {
scheme = natsHTTPSProtocol
}
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account)
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringServerURL() (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create server URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s/varz", jsURL.Scheme, jsURL.Host), nil
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(node string) (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create node URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, node, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
if err != nil {
return false, err
}

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream
}
}
}
if s.stream == nil {
return false, errors.New("stream not found")
}

return s.getMaxMsgLag() > s.metadata.activationLagThreshold, nil
}

@@ -223,39 +402,20 @@ func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.Metr
Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: jetStreamMetricType,
External: externalMetric,
Type: jetStreamMetricType,
}
return []v2.MetricSpec{metricSpec}
}

func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
if err != nil {
return nil, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
s.logger.Error(err, "unable to decode JetStream account details")
return []external_metrics.ExternalMetricValue{}, err
}

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream
}
}
}
if s.stream == nil {
return []external_metrics.ExternalMetricValue{}, errors.New("stream not found")
}

totalLag := s.getMaxMsgLag()
@@ -266,7 +426,6 @@ func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string)
Value: *resource.NewQuantity(totalLag, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

390 changes: 382 additions & 8 deletions pkg/scalers/nats_jetstream_scaler_test.go

Large diffs are not rendered by default.

182 changes: 182 additions & 0 deletions tests/scalers/nats_jetstream/helper/nats_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//go:build e2e
// +build e2e

package helper

import (
h "github.com/kedacore/keda/v2/tests/helper"
)

type JetStreamTemplateData struct {
NatsNamespace string
TestNamespace string
NatsAddress string
NatsConsumer string
NatsStream string
StreamReplicas int
NatsVersion string
}

const (
NatsJetStreamName = "nats"
NatsJetStreamStreamName = "mystream"
NatsJetStreamConsumerName = "PULL_CONSUMER"
NatsJetStreamChartVersion = "0.18.2"
NatsJetStreamServerVersion = "2.9.3"
)

type JetStreamDeploymentTemplateData struct {
TestNamespace string
NatsAddress string
NatsConsumer string
NatsStream string
NatsServerMonitoringEndpoint string
NumberOfMessages int
}

func GetJetStreamDeploymentTemplateData(
testNamespace string,
natsAddress string,
natsServerMonitoringEndpoint string,
messagePublishCount int,
) (JetStreamDeploymentTemplateData, []h.Template) {
return JetStreamDeploymentTemplateData{
TestNamespace: testNamespace,
NatsAddress: natsAddress,
NatsServerMonitoringEndpoint: natsServerMonitoringEndpoint,
NumberOfMessages: messagePublishCount,
NatsConsumer: NatsJetStreamConsumerName,
NatsStream: NatsJetStreamStreamName,
}, []h.Template{
{Name: "deploymentTemplate", Config: DeploymentTemplate},
{Name: "scaledObjectTemplate", Config: ScaledObjectTemplate},
}
}

const (
StreamAndConsumerTemplate = `
apiVersion: batch/v1
kind: Job
metadata:
name: stream
namespace: {{.TestNamespace}}
spec:
template:
spec:
containers:
- name: stream
image: "natsio/nats-box:0.13.2"
imagePullPolicy: Always
command: [
'sh', '-c', 'nats context save local --server {{.NatsAddress}} --select &&
nats stream rm {{.NatsStream}} -f ;
nats stream add {{.NatsStream}} --replicas={{.StreamReplicas}} --storage=memory --subjects="ORDERS.*"
--retention=limits --discard=old --max-msgs="-1" --max-msgs-per-subject="-1"
--max-bytes="-1" --max-age="-1" --max-msg-size="-1" --dupe-window=2m
--allow-rollup --no-deny-delete --no-deny-purge &&
nats consumer add {{.NatsStream}} {{.NatsConsumer}} --pull --deliver=all --ack=explicit --replay=instant
--filter="" --max-deliver="-1" --max-pending=1000
--no-headers-only --wait=5s --backoff=none'
]
restartPolicy: Never
backoffLimit: 4
`

DeploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: sub
namespace: {{.TestNamespace}}
spec:
replicas: 0
selector:
matchLabels:
app: sub
template:
metadata:
labels:
app: sub
spec:
containers:
- name: sub
image: "goku321/nats-consumer:v0.9"
imagePullPolicy: Always
command: ["./main"]
env:
- name: NATS_ADDRESS
value: {{.NatsAddress}}
`

PublishJobTemplate = `
apiVersion: batch/v1
kind: Job
metadata:
name: pub
namespace: {{.TestNamespace}}
spec:
ttlSecondsAfterFinished: 0
template:
spec:
containers:
- name: pub
image: "goku321/nats-publisher:v0.2"
imagePullPolicy: Always
command: ["./main"]
env:
- name: NATS_ADDRESS
value: {{.NatsAddress}}
- name: NUM_MESSAGES
value: "{{.NumberOfMessages}}"
restartPolicy: Never
backoffLimit: 4
`

ActivationPublishJobTemplate = `
apiVersion: batch/v1
kind: Job
metadata:
name: pub0
namespace: {{.TestNamespace}}
spec:
ttlSecondsAfterFinished: 0
template:
spec:
containers:
- name: pub
image: "goku321/nats-publisher:v0.2"
imagePullPolicy: Always
command: ["./main"]
env:
- name: NATS_ADDRESS
value: {{.NatsAddress}}
- name: NUM_MESSAGES
value: "{{.NumberOfMessages}}"
restartPolicy: Never
backoffLimit: 4
`

ScaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: nats-jetstream-scaledobject
namespace: {{.TestNamespace}}
spec:
pollingInterval: 3
cooldownPeriod: 10
minReplicaCount: 0
maxReplicaCount: 2
scaleTargetRef:
name: sub
triggers:
- type: nats-jetstream
metadata:
natsServerMonitoringEndpoint: {{.NatsServerMonitoringEndpoint}}
account: "$G"
stream: {{.NatsStream}}
consumer: {{.NatsConsumer}}
lagThreshold: "10"
activationLagThreshold: "15"
`
)
218 changes: 0 additions & 218 deletions tests/scalers/nats_jetstream/nats_helper.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//go:build e2e
// +build e2e

package natsjetstream_cluster_test

import (
"fmt"
"testing"

"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
k8s "k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper"
nats "github.com/kedacore/keda/v2/tests/scalers/nats_jetstream/helper"
)

// Load env variables from .env files
var _ = godotenv.Load("../../.env")

const (
testName = "nats-jetstream-cluster"
)

var (
testNamespace = fmt.Sprintf("%s-test-ns", testName)
natsNamespace = fmt.Sprintf("%s-nats-ns", testName)
natsAddress = fmt.Sprintf("nats://%s.%s.svc.cluster.local:4222", nats.NatsJetStreamName, natsNamespace)
natsServerMonitoringEndpoint = fmt.Sprintf("%s.%s.svc.cluster.local:8222", nats.NatsJetStreamName, natsNamespace)
natsHelmRepo = "https://nats-io.github.io/k8s/helm/charts/"
natsServerReplicas = 3
messagePublishCount = 300
deploymentName = "sub"
minReplicaCount = 0
maxReplicaCount = 2
)

func TestNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T) {
// Create k8s resources.
kc := GetKubernetesClient(t)

// Deploy NATS server.
installClusterWithJetStream(t, kc)
assert.True(t, WaitForStatefulsetReplicaReadyCount(t, kc, nats.NatsJetStreamName, natsNamespace, natsServerReplicas, 60, 3),
"replica count should be %d after 3 minutes", minReplicaCount)

// Create k8s resources for testing.
testData, testTemplates := nats.GetJetStreamDeploymentTemplateData(testNamespace, natsAddress, natsServerMonitoringEndpoint, messagePublishCount)
CreateKubernetesResources(t, kc, testNamespace, testData, testTemplates)

// Create 3 replica stream with consumer
installStreamAndConsumer(t, 3, testNamespace, natsAddress)
assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3),
"stream and consumer creation job with 3 stream replicas should be success")

testActivation(t, kc, testData)
testScaleOut(t, kc, testData)
testScaleIn(t, kc)

// Remove 3 replica stream with consumer
removeStreamAndConsumer(t, 3, testNamespace, natsAddress)
assert.True(t, WaitForJobCount(t, kc, testNamespace, 0, 60, 3),
"job count in namespace should be 0")

// Create stream and consumer with 2 stream replicas
installStreamAndConsumer(t, 2, testNamespace, natsAddress)
assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3),
"stream and consumer creation job with 2 stream replicas should be success")

testActivation(t, kc, testData)
testScaleOut(t, kc, testData)
testScaleIn(t, kc)

// Remove 2 replica stream with consumer
removeStreamAndConsumer(t, 2, testNamespace, natsAddress)
assert.True(t, WaitForJobCount(t, kc, testNamespace, 0, 60, 3),
"job count in namespace should be 0")

// Create single replica stream with consumer
installStreamAndConsumer(t, 1, testNamespace, natsAddress)
assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3),
"stream and consumer creation job with 1 stream replica should be success")

testActivation(t, kc, testData)
testScaleOut(t, kc, testData)
testScaleIn(t, kc)

// Cleanup nats namespace
removeClusterWithJetStream(t)
DeleteNamespace(t, kc, natsNamespace)
deleted := WaitForNamespaceDeletion(t, kc, natsNamespace)
assert.Truef(t, deleted, "%s namespace not deleted", natsNamespace)

// Cleanup test namespace
removeStreamAndConsumer(t, 1, testNamespace, natsAddress)
DeleteKubernetesResources(t, kc, testNamespace, testData, testTemplates)
}

// installStreamAndConsumer creates stream and consumer job.
func installStreamAndConsumer(t *testing.T, streamReplicas int, namespace, natsAddress string) {
data := nats.JetStreamTemplateData{
TestNamespace: namespace,
NatsAddress: natsAddress,
NatsConsumer: nats.NatsJetStreamConsumerName,
NatsStream: nats.NatsJetStreamStreamName,
StreamReplicas: streamReplicas,
}

KubectlApplyWithTemplate(t, data, "streamAndConsumerTemplate", nats.StreamAndConsumerTemplate)
}

// removeStreamAndConsumer deletes stream and consumer job.
func removeStreamAndConsumer(t *testing.T, streamReplicas int, namespace, natsAddress string) {
data := nats.JetStreamTemplateData{
TestNamespace: namespace,
NatsAddress: natsAddress,
NatsConsumer: nats.NatsJetStreamConsumerName,
NatsStream: nats.NatsJetStreamStreamName,
StreamReplicas: streamReplicas,
}

KubectlDeleteWithTemplate(t, data, "streamAndConsumerTemplate", nats.StreamAndConsumerTemplate)
}

// installClusterWithJetStream install the nats helm chart with clustered jetstream enabled
func installClusterWithJetStream(t *testing.T, kc *k8s.Clientset) {
CreateNamespace(t, kc, natsNamespace)
_, err := ExecuteCommand(fmt.Sprintf("helm repo add %s %s", nats.NatsJetStreamName, natsHelmRepo))
assert.NoErrorf(t, err, "cannot execute command - %s", err)
_, err = ExecuteCommand("helm repo update")
assert.NoErrorf(t, err, "cannot execute command - %s", err)
_, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --version %s --set %s --set %s --set %s --set %s --wait --namespace %s %s nats/nats`,
nats.NatsJetStreamChartVersion,
"nats.jetstream.enabled=true",
"nats.jetstream.fileStorage.enabled=false",
"cluster.enabled=true",
fmt.Sprintf("replicas=%d", natsServerReplicas),
natsNamespace,
nats.NatsJetStreamName))
assert.NoErrorf(t, err, "cannot execute command - %s", err)
}

// removeClusterWithJetStream uninstall the nats helm chart
func removeClusterWithJetStream(t *testing.T) {
_, err := ExecuteCommand(fmt.Sprintf(`helm uninstall --wait --namespace %s %s`, natsNamespace, nats.NatsJetStreamName))
assert.NoErrorf(t, err, "cannot execute command - %s", err)
}

func testActivation(t *testing.T, kc *k8s.Clientset, data nats.JetStreamDeploymentTemplateData) {
t.Log("--- testing activation ---")
data.NumberOfMessages = 10
KubectlApplyWithTemplate(t, data, "activationPublishJobTemplate", nats.ActivationPublishJobTemplate)

AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60)
}

func testScaleOut(t *testing.T, kc *k8s.Clientset, data nats.JetStreamDeploymentTemplateData) {
t.Log("--- testing scale out ---")
KubectlApplyWithTemplate(t, data, "publishJobTemplate", nats.PublishJobTemplate)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3),
"replica count should be %d after 3 minutes", maxReplicaCount)
}

func testScaleIn(t *testing.T, kc *k8s.Clientset) {
t.Log("--- testing scale in ---")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3),
"replica count should be %d after 3 minutes", minReplicaCount)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
//go:build e2e
// +build e2e

package natsjetstream_standalone_test

import (
"fmt"
"testing"

"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
k8s "k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper"
nats "github.com/kedacore/keda/v2/tests/scalers/nats_jetstream/helper"
)

// Load env variables from .env files
var _ = godotenv.Load("../../.env")

const (
testName = "nats-jetstream-standalone"
)

var (
testNamespace = fmt.Sprintf("%s-test-ns", testName)
natsNamespace = fmt.Sprintf("%s-nats-ns", testName)
natsAddress = fmt.Sprintf("nats://%s.%s.svc.cluster.local:4222", nats.NatsJetStreamName, natsNamespace)
natsServerMonitoringEndpoint = fmt.Sprintf("%s.%s.svc.cluster.local:8222", nats.NatsJetStreamName, natsNamespace)
messagePublishCount = 300
deploymentName = "sub"
minReplicaCount = 0
maxReplicaCount = 2
)

const natsServerTemplate = `
# Source: https://github.com/nats-io/k8s/blob/main/nats-server/single-server-nats.yml
---
apiVersion: v1
kind: ConfigMap
metadata:
name: nats-config
namespace: {{.NatsNamespace}}
data:
nats.conf: |
pid_file: "/var/run/nats/nats.pid"
http: 8222
jetstream {
store_dir: /data/jetstream
max_mem: 1G
max_file: 10G
}
---
apiVersion: v1
kind: Service
metadata:
name: nats
namespace: {{.NatsNamespace}}
labels:
app: nats
spec:
selector:
app: nats
clusterIP: None
ports:
- name: client
port: 4222
- name: cluster
port: 6222
- name: monitor
port: 8222
- name: metrics
port: 7777
- name: leafnodes
port: 7422
- name: gateways
port: 7522
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: nats
namespace: {{.NatsNamespace}}
labels:
app: nats
spec:
selector:
matchLabels:
app: nats
replicas: 1
serviceName: "nats"
template:
metadata:
labels:
app: nats
spec:
# Common volumes for the containers
volumes:
- name: config-volume
configMap:
name: nats-config
- name: pid
emptyDir: {}
# Required to be able to HUP signal and apply config reload
# to the server without restarting the pod.
shareProcessNamespace: true
#################
# #
# NATS Server #
# #
#################
terminationGracePeriodSeconds: 60
containers:
- name: nats
image: nats:{{.NatsVersion}}-alpine
ports:
- containerPort: 4222
name: client
hostPort: 4222
- containerPort: 7422
name: leafnodes
hostPort: 7422
- containerPort: 6222
name: cluster
- containerPort: 8222
name: monitor
- containerPort: 7777
name: metrics
command:
- "nats-server"
- "--config"
- "/etc/nats-config/nats.conf"
# Required to be able to define an environment variable
# that refers to other environment variables. This env var
# is later used as part of the configuration file.
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: CLUSTER_ADVERTISE
value: $(POD_NAME).nats.$(POD_NAMESPACE).svc
volumeMounts:
- name: config-volume
mountPath: /etc/nats-config
- name: pid
mountPath: /var/run/nats
# Liveness/Readiness probes against the monitoring
#
livenessProbe:
httpGet:
path: /
port: 8222
initialDelaySeconds: 10
timeoutSeconds: 5
readinessProbe:
httpGet:
path: /
port: 8222
initialDelaySeconds: 10
timeoutSeconds: 5
# Gracefully stop NATS Server on pod deletion or image upgrade.
#
lifecycle:
preStop:
exec:
# Using the alpine based NATS image, we add an extra sleep that is
# the same amount as the terminationGracePeriodSeconds to allow
# the NATS Server to gracefully terminate the client connections.
#
command: ["/bin/sh", "-c", "/nats-server -sl=ldm=/var/run/nats/nats.pid && /bin/sleep 60"]
`

func TestNATSJetStreamScaler(t *testing.T) {
// Create k8s resources.
kc := GetKubernetesClient(t)

// Deploy NATS server.
installServerWithJetStream(t, kc, natsNamespace)
assert.True(t, WaitForStatefulsetReplicaReadyCount(t, kc, nats.NatsJetStreamName, natsNamespace, 1, 60, 3),
"replica count should be %d after 3 minutes", minReplicaCount)

// Create k8s resources for testing.
data, templates := nats.GetJetStreamDeploymentTemplateData(testNamespace, natsAddress, natsServerMonitoringEndpoint, messagePublishCount)
CreateKubernetesResources(t, kc, testNamespace, data, templates)

// Create stream and consumer.
installStreamAndConsumer(t, testNamespace, natsAddress)
assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3),
"stream and consumer creation job should be success")

testActivation(t, kc, data)
testScaleOut(t, kc, data)
testScaleIn(t, kc)

// Cleanup nats namespace
removeServerWithJetStream(t, kc, natsNamespace)
DeleteNamespace(t, kc, natsNamespace)
deleted := WaitForNamespaceDeletion(t, kc, natsNamespace)
assert.Truef(t, deleted, "%s namespace not deleted", natsNamespace)
// Cleanup test namespace
DeleteKubernetesResources(t, kc, testNamespace, data, templates)
}

// installStreamAndConsumer creates stream and consumer.
func installStreamAndConsumer(t *testing.T, namespace, natsAddress string) {
data := nats.JetStreamTemplateData{
TestNamespace: namespace,
NatsAddress: natsAddress,
NatsConsumer: nats.NatsJetStreamConsumerName,
NatsStream: nats.NatsJetStreamStreamName,
StreamReplicas: 1,
}

KubectlApplyWithTemplate(t, data, "streamAndConsumerTemplate", nats.StreamAndConsumerTemplate)
}

// installServerWithJetStream will deploy NATS server with JetStream.
func installServerWithJetStream(t *testing.T, kc *k8s.Clientset, namespace string) {
CreateNamespace(t, kc, namespace)
data := nats.JetStreamTemplateData{
NatsNamespace: namespace,
NatsVersion: nats.NatsJetStreamServerVersion,
}

KubectlApplyWithTemplate(t, data, "natsServerTemplate", natsServerTemplate)
}

// removeServerWithJetStream will remove the NATS server and delete the namespace.
func removeServerWithJetStream(t *testing.T, kc *k8s.Clientset, namespace string) {
data := nats.JetStreamTemplateData{
NatsNamespace: namespace,
NatsVersion: nats.NatsJetStreamServerVersion,
}

KubectlDeleteWithTemplate(t, data, "natsServerTemplate", natsServerTemplate)
DeleteNamespace(t, kc, namespace)
}

func testActivation(t *testing.T, kc *k8s.Clientset, data nats.JetStreamDeploymentTemplateData) {
t.Log("--- testing activation ---")
data.NumberOfMessages = 10
KubectlApplyWithTemplate(t, data, "activationPublishJobTemplate", nats.ActivationPublishJobTemplate)

AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60)
}

func testScaleOut(t *testing.T, kc *k8s.Clientset, data nats.JetStreamDeploymentTemplateData) {
t.Log("--- testing scale out ---")
KubectlApplyWithTemplate(t, data, "publishJobTemplate", nats.PublishJobTemplate)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3),
"replica count should be %d after 3 minutes", maxReplicaCount)
}

func testScaleIn(t *testing.T, kc *k8s.Clientset) {
t.Log("--- testing scale in ---")
assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3),
"replica count should be %d after 3 minutes", minReplicaCount)
}
202 changes: 0 additions & 202 deletions tests/scalers/nats_jetstream/nats_jetstream_test.go

This file was deleted.