diff --git a/CHANGELOG.md b/CHANGELOG.md index fc84b27b825..32b12fe7bb8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **NATS Scalers:** Support HTTPS protocol in NATS Scalers ([#3805](https://github.com/kedacore/keda/issues/3805)) - **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310)) - **Pulsar Scaler:** Add support for partitioned topics ([#3833](https://github.com/kedacore/keda/issues/3833)) +- **NATS Jetstream Scaler:** Query the stream consumer leader when clustered ([#3860](https://github.com/kedacore/keda/issues/3860)) ### Fixes diff --git a/pkg/scalers/nats_jetstream_scaler.go b/pkg/scalers/nats_jetstream_scaler.go index b038551bd95..02f86842f8d 100644 --- a/pkg/scalers/nats_jetstream_scaler.go +++ b/pkg/scalers/nats_jetstream_scaler.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "net/url" "strconv" "github.com/go-logr/logr" @@ -19,10 +20,11 @@ import ( ) const ( - jetStreamMetricType = "External" - defaultJetStreamLagThreshold = 10 - natsHTTPProtocol = "http" - natsHTTPSProtocol = "https" + jetStreamMetricType = "External" + defaultJetStreamLagThreshold = 10 + natsHTTPProtocol = "http" + natsHTTPSProtocol = "https" + jetStreamLagThresholdMetricName = "lagThreshold" ) type natsJetStreamScaler struct { @@ -34,17 +36,20 @@ type natsJetStreamScaler struct { } type natsJetStreamMetadata struct { - monitoringEndpoint string account string stream string consumer string + leaderName string + monitoringURL string lagThreshold int64 activationLagThreshold int64 + clusterSize int scalerIndex int } type jetStreamEndpointResponse struct { - Accounts []accountDetail `json:"account_details"` + Accounts []accountDetail `json:"account_details"` + MetaCluster metaCluster `json:"meta_cluster"` } type accountDetail struct { @@ -52,6 +57,10 @@ type accountDetail struct { Streams []*streamDetail `json:"stream_detail"` } +type metaCluster struct { + ClusterSize int `json:"cluster_size"` +} + type streamDetail struct { Name string `json:"name"` Config streamConfig `json:"config"` @@ -77,6 +86,11 @@ type consumerDetail struct { NumPending int `json:"num_pending"` Config consumerConfig `json:"config"` DeliveryStatus consumerDeliveryStatus `json:"delivery"` + Cluster consumerCluster `json:"cluster"` +} + +type consumerCluster struct { + Leader string `json:"leader"` } type consumerConfig struct { @@ -128,11 +142,12 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er meta.lagThreshold = defaultJetStreamLagThreshold - if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok { + if val, ok := config.TriggerMetadata[jetStreamLagThresholdMetricName]; ok { t, err := strconv.ParseInt(val, 10, 64) if err != nil { - return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err) + return meta, fmt.Errorf("error parsing %s: %s", jetStreamLagThresholdMetricName, err) } + meta.lagThreshold = t } @@ -158,49 +173,94 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er return meta, fmt.Errorf("useHTTPS parsing error %s", err.Error()) } } - meta.monitoringEndpoint = getNATSJetStreamEndpoint(useHTTPS, natsServerEndpoint, meta.account) + meta.monitoringURL = getNATSJetStreamURL(useHTTPS, natsServerEndpoint, meta.account) return meta, nil } -func getNATSJetStreamEndpoint(useHTTPS bool, natsServerEndpoint string, account string) string { - protocol := natsHTTPProtocol +func getNATSJetStreamURL(useHTTPS bool, natsServerEndpoint string, account string) string { + scheme := natsHTTPProtocol if useHTTPS { - protocol = natsHTTPSProtocol + scheme = natsHTTPSProtocol } - return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", protocol, natsServerEndpoint, account) + return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account) } -func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil) +func (s *natsJetStreamScaler) getNATSJetStreamLeaderURL() (string, error) { + jsURL, err := url.Parse(s.metadata.monitoringURL) if err != nil { - return false, err + s.logger.Error(err, "unable to parse monitoring URL to create leader URL", "natsServerMonitoringURL", s.metadata.monitoringURL) + return "", err + } + + return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, s.metadata.leaderName, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil +} + +func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context, natsJetStreamMonitoringURL string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringURL, nil) + if err != nil { + return err } resp, err := s.httpClient.Do(req) if err != nil { - s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint) - return false, err + s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringURL) + return err } defer resp.Body.Close() var jsAccountResp jetStreamEndpointResponse if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil { - s.logger.Error(err, "unable to decode JetStream account response") - return false, err + s.logger.Error(err, "unable to decode NATS JetStream account details") + return err } + s.metadata.clusterSize = jsAccountResp.MetaCluster.ClusterSize + // Find and assign the stream that we are looking for. - for _, account := range jsAccountResp.Accounts { - if account.Name == s.metadata.account { - for _, stream := range account.Streams { + for _, jsAccount := range jsAccountResp.Accounts { + if jsAccount.Name == s.metadata.account { + for _, stream := range jsAccount.Streams { if stream.Name == s.metadata.stream { s.stream = stream + + for _, consumer := range stream.Consumers { + if consumer.Name == s.metadata.consumer { + s.metadata.leaderName = consumer.Cluster.Leader + } + } } } } } + + return nil +} + +func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) { + err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL) + if err != nil { + return false, err + } + + // Query the consumer leader pod, it has the accurate count. + if s.metadata.clusterSize > 1 { + monitoringLeaderURL, err := s.getNATSJetStreamLeaderURL() + if err != nil { + return false, err + } + + err = s.getNATSJetstreamMonitoringData(ctx, monitoringLeaderURL) + if err != nil { + return false, err + } + } + + if s.stream == nil { + return false, errors.New("stream not found") + } + return s.getMaxMsgLag() > s.metadata.activationLagThreshold, nil } @@ -230,35 +290,28 @@ func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.Metr } func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil) + err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL) if err != nil { - return nil, err - } - - resp, err := s.httpClient.Do(req) - if err != nil { - s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint) return []external_metrics.ExternalMetricValue{}, err } - defer resp.Body.Close() - var jsAccountResp jetStreamEndpointResponse - if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil { - s.logger.Error(err, "unable to decode JetStream account details") - return []external_metrics.ExternalMetricValue{}, err - } + // Query the consumer leader pod, it has the accurate count. + if s.metadata.clusterSize > 1 { + monitoringLeaderEndpoint, err := s.getNATSJetStreamLeaderURL() + if err != nil { + return []external_metrics.ExternalMetricValue{}, err + } - // Find and assign the stream that we are looking for. - for _, account := range jsAccountResp.Accounts { - if account.Name == s.metadata.account { - for _, stream := range account.Streams { - if stream.Name == s.metadata.stream { - s.stream = stream - } - } + err = s.getNATSJetstreamMonitoringData(ctx, monitoringLeaderEndpoint) + if err != nil { + return []external_metrics.ExternalMetricValue{}, err } } + if s.stream == nil { + return []external_metrics.ExternalMetricValue{}, errors.New("stream not found") + } + totalLag := s.getMaxMsgLag() s.logger.V(1).Info("NATS JetStream Scaler: Providing metrics based on totalLag, threshold", "totalLag", totalLag, "lagThreshold", s.metadata.lagThreshold) diff --git a/pkg/scalers/nats_jetstream_scaler_test.go b/pkg/scalers/nats_jetstream_scaler_test.go index 3d75ed548cc..6dff26672b4 100644 --- a/pkg/scalers/nats_jetstream_scaler_test.go +++ b/pkg/scalers/nats_jetstream_scaler_test.go @@ -2,11 +2,17 @@ package scalers import ( "context" + "encoding/json" + "net" "net/http" + "net/http/httptest" + "strconv" "strings" "testing" + "time" "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/labels" ) type parseNATSJetStreamMetadataTestData struct { @@ -15,6 +21,14 @@ type parseNATSJetStreamMetadataTestData struct { isError bool } +type parseNATSJetStreamMockResponsesTestData struct { + name string + metadata *natsJetStreamMetricIdentifier + data *jetStreamEndpointResponse + isActive bool + isError bool +} + type natsJetStreamMetricIdentifier struct { metadataTestData *parseNATSJetStreamMetadataTestData scalerIndex int @@ -22,6 +36,10 @@ type natsJetStreamMetricIdentifier struct { } var testNATSJetStreamMetadata = []parseNATSJetStreamMetadataTestData{ + // All good localhost. + {map[string]string{"natsServerMonitoringEndpoint": "localhost:8222", "account": "$G", "stream": "mystream", "consumer": "pull_consumer", "useHttps": "false"}, map[string]string{}, false}, + // All good url. + {map[string]string{"natsServerMonitoringEndpoint": "nats.nats:8222", "account": "$G", "stream": "mystream", "consumer": "pull_consumer", "useHttps": "true"}, map[string]string{}, false}, // nothing passed {map[string]string{}, map[string]string{}, true}, // Missing account name, should fail @@ -32,21 +50,25 @@ var testNATSJetStreamMetadata = []parseNATSJetStreamMetadataTestData{ {map[string]string{"natsServerMonitoringEndpoint": "nats.nats:8222", "account": "$G", "stream": "mystream"}, map[string]string{}, true}, // Missing nats server monitoring endpoint, should fail {map[string]string{"account": "$G", "stream": "mystream"}, map[string]string{}, true}, - // All good. - {map[string]string{"natsServerMonitoringEndpoint": "nats.nats:8222", "account": "$G", "stream": "mystream", "consumer": "pull_consumer", "useHttps": "true"}, map[string]string{}, false}, // All good + activationLagThreshold {map[string]string{"natsServerMonitoringEndpoint": "nats.nats:8222", "account": "$G", "stream": "mystream", "consumer": "pull_consumer", "activationLagThreshold": "10"}, map[string]string{}, false}, + // Misconfigured activationLagThreshold + {map[string]string{"natsServerMonitoringEndpoint": "nats.nats:8222", "account": "$G", "stream": "mystream", "consumer": "pull_consumer", "activationLagThreshold": "Y"}, map[string]string{}, true}, // natsServerMonitoringEndpoint is defined in authParams {map[string]string{"account": "$G", "stream": "mystream", "consumer": "pull_consumer"}, map[string]string{"natsServerMonitoringEndpoint": "nats.nats:8222"}, false}, // Missing nats server monitoring endpoint , should fail {map[string]string{"account": "$G", "stream": "mystream", "consumer": "pull_consumer"}, map[string]string{"natsServerMonitoringEndpoint": ""}, true}, // Misconfigured https, should fail {map[string]string{"natsServerMonitoringEndpoint": "nats.nats:8222", "account": "$G", "stream": "mystream", "consumer": "pull_consumer", "useHttps": "error"}, map[string]string{}, true}, + // All good + lagThreshold + {map[string]string{"account": "$G", "stream": "mystream", "consumer": "pull_consumer", jetStreamLagThresholdMetricName: "6"}, map[string]string{"natsServerMonitoringEndpoint": "nats.nats:8222"}, false}, + // Misconfigured lag threshold + {map[string]string{"account": "$G", "stream": "mystream", "consumer": "pull_consumer", jetStreamLagThresholdMetricName: "Y"}, map[string]string{"natsServerMonitoringEndpoint": "nats.nats:8222"}, true}, } var natsJetStreamMetricIdentifiers = []natsJetStreamMetricIdentifier{ - {&testNATSJetStreamMetadata[5], 0, "s0-nats-jetstream-mystream"}, - {&testNATSJetStreamMetadata[5], 1, "s1-nats-jetstream-mystream"}, + {&testNATSJetStreamMetadata[0], 0, "s0-nats-jetstream-mystream"}, + {&testNATSJetStreamMetadata[0], 1, "s1-nats-jetstream-mystream"}, } func TestNATSJetStreamParseMetadata(t *testing.T) { @@ -67,13 +89,13 @@ func TestNATSJetStreamGetMetricSpecForScaling(t *testing.T) { if err != nil { t.Fatal("Could not parse metadata:", err) } - mockStanScaler := natsJetStreamScaler{ + mockJetStreamScaler := natsJetStreamScaler{ stream: nil, metadata: meta, httpClient: http.DefaultClient, } - metricSpec := mockStanScaler.GetMetricSpecForScaling(ctx) + metricSpec := mockJetStreamScaler.GetMetricSpecForScaling(ctx) metricName := metricSpec[0].External.Metric.Name if metricName != testData.name { t.Error("Wrong External metric source name:", metricName) @@ -82,13 +104,305 @@ func TestNATSJetStreamGetMetricSpecForScaling(t *testing.T) { } func TestGetNATSJetStreamEndpointHTTPS(t *testing.T) { - endpoint := getNATSJetStreamEndpoint(true, "nats.nats:8222", "$G") + endpoint := getNATSJetStreamURL(true, "nats.nats:8222", "$G") assert.True(t, strings.HasPrefix(endpoint, "https:")) } func TestGetNATSJetStreamEndpointHTTP(t *testing.T) { - endpoint := getNATSJetStreamEndpoint(false, "nats.nats:8222", "$G") + endpoint := getNATSJetStreamURL(false, "nats.nats:8222", "$G") assert.True(t, strings.HasPrefix(endpoint, "http:")) } + +var testNATSJetStreamGoodMetadata = map[string]string{"natsServerMonitoringEndpoint": "localhost:8222", "account": "$G", "stream": "mystream", "consumer": "pull_consumer", "useHttps": "false", "activationLagThreshold": "10"} + +var testNATSJetStreamMockResponses = []parseNATSJetStreamMockResponsesTestData{ + { + "All Good - no messages waiting (not active)", + &natsJetStreamMetricIdentifier{ + &parseNATSJetStreamMetadataTestData{ + testNATSJetStreamGoodMetadata, map[string]string{}, false}, + 0, "s0-nats-jetstream-mystream", + }, + &jetStreamEndpointResponse{ + Accounts: []accountDetail{{Name: "$G", + Streams: []*streamDetail{{Name: "mystream", + Consumers: []consumerDetail{{Name: "pull_consumer"}}, + }}, + }}, + }, false, false}, + { + "All Good - messages waiting (active)", + &natsJetStreamMetricIdentifier{ + &parseNATSJetStreamMetadataTestData{ + testNATSJetStreamGoodMetadata, map[string]string{}, false}, + 0, "s0-nats-jetstream-mystream", + }, + &jetStreamEndpointResponse{ + Accounts: []accountDetail{{Name: "$G", + Streams: []*streamDetail{{Name: "mystream", + Consumers: []consumerDetail{{Name: "pull_consumer", NumPending: 100}}, + }}, + }}, + }, true, false}, + { + "Not Active - Bad consumer name uses stream last sequence", + &natsJetStreamMetricIdentifier{ + &parseNATSJetStreamMetadataTestData{ + testNATSJetStreamGoodMetadata, map[string]string{}, false}, + 0, "s0-nats-jetstream-mystream", + }, + &jetStreamEndpointResponse{ + Accounts: []accountDetail{{Name: "$G", + Streams: []*streamDetail{{Name: "mystream", State: streamState{LastSequence: 1}, + Consumers: []consumerDetail{{Name: "pull_consumer_bad", NumPending: 100}}, + }}, + }}, + }, false, false}, + { + "Fail - Non-matching stream name", + &natsJetStreamMetricIdentifier{ + &parseNATSJetStreamMetadataTestData{ + testNATSJetStreamGoodMetadata, map[string]string{}, false}, + 0, "s0-nats-jetstream-mystream", + }, + &jetStreamEndpointResponse{ + Accounts: []accountDetail{{Name: "$G", + Streams: []*streamDetail{{Name: "mystreamBad", State: streamState{LastSequence: 1}, + Consumers: []consumerDetail{{Name: "pull_consumer", NumPending: 100}}, + }}, + }}, + }, false, true}, + { + "Fail - Unresolvable nats endpoint from config", + &natsJetStreamMetricIdentifier{ + &parseNATSJetStreamMetadataTestData{ + map[string]string{"natsServerMonitoringEndpoint": "asdf32423fdsafdasdf:8222", "account": "$G", "stream": "mystream", "consumer": "pull_consumer", "activationLagThreshold": "10"}, map[string]string{}, false}, + 0, "s0-nats-jetstream-mystream", + }, + &jetStreamEndpointResponse{ + Accounts: []accountDetail{{Name: "$G", + Streams: []*streamDetail{{Name: "mystream", + Consumers: []consumerDetail{{Name: "pull_consumer", NumPending: 100}}, + }}, + }}, + }, false, true}, + { + "All Good - messages waiting (clustered)", + &natsJetStreamMetricIdentifier{ + &parseNATSJetStreamMetadataTestData{ + testNATSJetStreamGoodMetadata, map[string]string{}, false}, + 0, "s0-nats-jetstream-mystream", + }, + &jetStreamEndpointResponse{ + MetaCluster: metaCluster{ClusterSize: 3}, + Accounts: []accountDetail{{Name: "$G", + Streams: []*streamDetail{{Name: "mystream", + Consumers: []consumerDetail{{Name: "pull_consumer", NumPending: 100, Cluster: consumerCluster{Leader: "leader"}}}, + }}, + }}, + }, true, false}, + { + "Fail - Bad leader name (clustered)", + &natsJetStreamMetricIdentifier{ + &parseNATSJetStreamMetadataTestData{ + testNATSJetStreamGoodMetadata, map[string]string{}, false}, + 0, "s0-nats-jetstream-mystream", + }, + &jetStreamEndpointResponse{ + MetaCluster: metaCluster{ClusterSize: 3}, + Accounts: []accountDetail{{Name: "$G", + Streams: []*streamDetail{{Name: "mystream", + Consumers: []consumerDetail{{Name: "pull_consumer", NumPending: 100, Cluster: consumerCluster{Leader: "leaderBad!!!!"}}}, + }}, + }}, + }, false, true}, +} + +func TestNATSJetStreamIsActive(t *testing.T) { + for _, mockResponse := range testNATSJetStreamMockResponses { + mockResponseJSON, err := json.Marshal(mockResponse.data) + if err != nil { + t.Fatal("Could not parse mock response struct:", err) + } + + srv := natsMockHTTPServer(t, mockResponseJSON) + defer srv.Close() + + ctx := context.Background() + meta, err := parseNATSJetStreamMetadata(&ScalerConfig{TriggerMetadata: mockResponse.metadata.metadataTestData.metadata, ScalerIndex: mockResponse.metadata.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + + mockJetStreamScaler := natsJetStreamScaler{ + stream: nil, + metadata: meta, + httpClient: http.DefaultClient, + logger: InitializeLogger(&ScalerConfig{TriggerMetadata: mockResponse.metadata.metadataTestData.metadata, ScalerIndex: mockResponse.metadata.scalerIndex}, "nats_jetstream_scaler"), + } + + isActive, err := mockJetStreamScaler.IsActive(ctx) + if err != nil && !mockResponse.isError { + t.Errorf("Expected success for '%s' but got error %s", mockResponse.name, err) + } else if mockResponse.isError && err == nil { + t.Errorf("Expected error for '%s' but got success %s", mockResponse.name, mockResponse.metadata.metadataTestData.authParams["natsServerMonitoringEndpoint"]) + } + + if isActive != mockResponse.isActive { + t.Errorf("Expected '%s' 'isActive=%s', got '%s'", mockResponse.name, strconv.FormatBool(mockResponse.isActive), strconv.FormatBool(isActive)) + } + srv.Close() + } +} + +func TestNewNATSJetStreamScaler(t *testing.T) { + // All Good + _, err := NewNATSJetStreamScaler(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, ScalerIndex: 0}) + if err != nil { + t.Error("Expected success for New NATS JetStream Scaler but got error", err) + } + + // Fail - Empty account + _, err = NewNATSJetStreamScaler(&ScalerConfig{TriggerMetadata: map[string]string{"natsServerMonitoringEndpoint": "localhost:8222", "account": ""}}) + if err == nil { + t.Error("Expected error for parsing monitoring leader URL but got success") + } +} + +func TestNATSJetStreamGetMetrics(t *testing.T) { + for _, mockResponse := range testNATSJetStreamMockResponses { + mockResponseJSON, err := json.Marshal(mockResponse.data) + if err != nil { + t.Fatal("Could not parse mock response struct:", err) + } + + tr := http.DefaultTransport.(*http.Transport).Clone() + srv := natsMockHTTPServer(t, mockResponseJSON) + defer func() { + srv.Close() + http.DefaultTransport = tr + }() + + ctx := context.Background() + meta, err := parseNATSJetStreamMetadata(&ScalerConfig{TriggerMetadata: mockResponse.metadata.metadataTestData.metadata, ScalerIndex: mockResponse.metadata.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + + mockJetStreamScaler := natsJetStreamScaler{ + stream: nil, + metadata: meta, + httpClient: http.DefaultClient, + logger: InitializeLogger(&ScalerConfig{TriggerMetadata: mockResponse.metadata.metadataTestData.metadata, ScalerIndex: mockResponse.metadata.scalerIndex}, "nats_jetstream_scaler"), + } + + _, err = mockJetStreamScaler.GetMetrics(ctx, "metric_name", labels.Everything()) + + if err != nil && !mockResponse.isError { + t.Errorf("Expected success for '%s' but got error %s", mockResponse.name, err) + } else if mockResponse.isError && err == nil { + t.Errorf("Expected error for '%s' but got success %s", mockResponse.name, mockResponse.metadata.metadataTestData.authParams["natsServerMonitoringEndpoint"]) + } + srv.Close() + } +} + +func natsMockHTTPServer(t *testing.T, mockResponseJSON []byte) *httptest.Server { + dialer := &net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + } + + // redirect leader.localhost for the clustered test + http.DefaultTransport.(*http.Transport).DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + if addr == "leader.localhost:8222" { + addr = "127.0.0.1:8222" + } + return dialer.DialContext(ctx, network, addr) + } + + srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/jsz" { + t.Errorf("Expected to request '/jsz', got: %s", r.URL.Path) + } + w.WriteHeader(http.StatusOK) + _, err := w.Write(mockResponseJSON) + if err != nil { + t.Fatal("Could not write to the http server connection:", err) + } + })) + + l, _ := net.Listen("tcp", "127.0.0.1:8222") + srv.Listener = l + srv.Start() + + return srv +} + +func TestNATSJetStreamgetNATSJetstreamMonitoringData(t *testing.T) { + tr := http.DefaultTransport.(*http.Transport).Clone() + + invalidJSONServer := natsMockHTTPServer(t, []byte(`{invalidJSON}`)) + defer func() { + invalidJSONServer.Close() + http.DefaultTransport = tr + }() + + ctx := context.Background() + meta, err := parseNATSJetStreamMetadata(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, ScalerIndex: 0}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + + mockJetStreamScaler := natsJetStreamScaler{ + stream: nil, + metadata: meta, + httpClient: http.DefaultClient, + logger: InitializeLogger(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, ScalerIndex: 0}, "nats_jetstream_scaler"), + } + + err = mockJetStreamScaler.getNATSJetstreamMonitoringData(ctx, mockJetStreamScaler.metadata.monitoringURL) + if err == nil { + t.Error("Expected error for bad JSON monitoring data but got success") + } +} + +func TestNATSJetStreamgetNATSJetstreamLeaderURL(t *testing.T) { + invalidJSONServer := natsMockHTTPServer(t, []byte(`{invalidJSON}`)) + defer invalidJSONServer.Close() + + meta, err := parseNATSJetStreamMetadata(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, ScalerIndex: 0}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + + mockJetStreamScaler := natsJetStreamScaler{ + stream: nil, + metadata: meta, + httpClient: http.DefaultClient, + logger: InitializeLogger(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, ScalerIndex: 0}, "nats_jetstream_scaler"), + } + + mockJetStreamScaler.metadata.monitoringURL = "234234:::::34234234;;;;really_bad_URL;;/" + + _, err = mockJetStreamScaler.getNATSJetStreamLeaderURL() + if err == nil { + t.Error("Expected error for parsing monitoring leader URL but got success") + } +} + +func TestNewNATSJetStreamClose(t *testing.T) { + mockJetStreamScaler, err := NewNATSJetStreamScaler(&ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, ScalerIndex: 0}) + if err != nil { + t.Error("Expected success for New NATS JetStream Scaler but got error", err) + } + + ctx := context.Background() + jsClose := mockJetStreamScaler.Close(ctx) + + if jsClose != nil { + t.Error("Expected success for NATS JetStream Scaler Close but got error", err) + } +}