Skip to content

Commit

Permalink
Add support to JetStream scaler to query the stream consumer leader w…
Browse files Browse the repository at this point in the history
…hen clustered

Signed-off-by: Ray <[email protected]>
  • Loading branch information
rayjanoka committed Nov 15, 2022
1 parent 32348ae commit 5d57c2c
Show file tree
Hide file tree
Showing 3 changed files with 420 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **NATS Scalers:** Support HTTPS protocol in NATS Scalers ([#3805](https://github.com/kedacore/keda/issues/3805))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Pulsar Scaler:** Add support for partitioned topics ([#3833](https://github.com/kedacore/keda/issues/3833))
- **NATS Jetstream Scaler:** Query the stream consumer leader when clustered ([#3860](https://github.com/kedacore/keda/issues/3860))

### Fixes

Expand Down
141 changes: 97 additions & 44 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"

"github.com/go-logr/logr"
Expand All @@ -19,10 +20,11 @@ import (
)

const (
jetStreamMetricType = "External"
defaultJetStreamLagThreshold = 10
natsHTTPProtocol = "http"
natsHTTPSProtocol = "https"
jetStreamMetricType = "External"
defaultJetStreamLagThreshold = 10
natsHTTPProtocol = "http"
natsHTTPSProtocol = "https"
jetStreamLagThresholdMetricName = "lagThreshold"
)

type natsJetStreamScaler struct {
Expand All @@ -34,24 +36,31 @@ type natsJetStreamScaler struct {
}

type natsJetStreamMetadata struct {
monitoringEndpoint string
account string
stream string
consumer string
leaderName string
monitoringURL string
lagThreshold int64
activationLagThreshold int64
clusterSize int
scalerIndex int
}

type jetStreamEndpointResponse struct {
Accounts []accountDetail `json:"account_details"`
Accounts []accountDetail `json:"account_details"`
MetaCluster metaCluster `json:"meta_cluster"`
}

type accountDetail struct {
Name string `json:"name"`
Streams []*streamDetail `json:"stream_detail"`
}

type metaCluster struct {
ClusterSize int `json:"cluster_size"`
}

type streamDetail struct {
Name string `json:"name"`
Config streamConfig `json:"config"`
Expand All @@ -77,6 +86,11 @@ type consumerDetail struct {
NumPending int `json:"num_pending"`
Config consumerConfig `json:"config"`
DeliveryStatus consumerDeliveryStatus `json:"delivery"`
Cluster consumerCluster `json:"cluster"`
}

type consumerCluster struct {
Leader string `json:"leader"`
}

type consumerConfig struct {
Expand Down Expand Up @@ -128,11 +142,12 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er

meta.lagThreshold = defaultJetStreamLagThreshold

if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok {
if val, ok := config.TriggerMetadata[jetStreamLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err)
return meta, fmt.Errorf("error parsing %s: %s", jetStreamLagThresholdMetricName, err)
}

meta.lagThreshold = t
}

Expand All @@ -158,49 +173,94 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er
return meta, fmt.Errorf("useHTTPS parsing error %s", err.Error())
}
}
meta.monitoringEndpoint = getNATSJetStreamEndpoint(useHTTPS, natsServerEndpoint, meta.account)
meta.monitoringURL = getNATSJetStreamURL(useHTTPS, natsServerEndpoint, meta.account)

return meta, nil
}

func getNATSJetStreamEndpoint(useHTTPS bool, natsServerEndpoint string, account string) string {
protocol := natsHTTPProtocol
func getNATSJetStreamURL(useHTTPS bool, natsServerEndpoint string, account string) string {
scheme := natsHTTPProtocol
if useHTTPS {
protocol = natsHTTPSProtocol
scheme = natsHTTPSProtocol
}

return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", protocol, natsServerEndpoint, account)
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account)
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
func (s *natsJetStreamScaler) getNATSJetStreamLeaderURL() (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
return false, err
s.logger.Error(err, "unable to parse monitoring URL to create leader URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}

return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, s.metadata.leaderName, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context, natsJetStreamMonitoringURL string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringURL, nil)
if err != nil {
return err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return false, err
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringURL)
return err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
s.logger.Error(err, "unable to decode JetStream account response")
return false, err
s.logger.Error(err, "unable to decode NATS JetStream account details")
return err
}

s.metadata.clusterSize = jsAccountResp.MetaCluster.ClusterSize

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
for _, jsAccount := range jsAccountResp.Accounts {
if jsAccount.Name == s.metadata.account {
for _, stream := range jsAccount.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream

for _, consumer := range stream.Consumers {
if consumer.Name == s.metadata.consumer {
s.metadata.leaderName = consumer.Cluster.Leader
}
}
}
}
}
}

return nil
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
if err != nil {
return false, err
}

// Query the consumer leader pod, it has the accurate count.
if s.metadata.clusterSize > 1 {
monitoringLeaderURL, err := s.getNATSJetStreamLeaderURL()
if err != nil {
return false, err
}

err = s.getNATSJetstreamMonitoringData(ctx, monitoringLeaderURL)
if err != nil {
return false, err
}
}

if s.stream == nil {
return false, errors.New("stream not found")
}

return s.getMaxMsgLag() > s.metadata.activationLagThreshold, nil
}

Expand Down Expand Up @@ -230,35 +290,28 @@ func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.Metr
}

func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
if err != nil {
return nil, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
s.logger.Error(err, "unable to decode JetStream account details")
return []external_metrics.ExternalMetricValue{}, err
}
// Query the consumer leader pod, it has the accurate count.
if s.metadata.clusterSize > 1 {
monitoringLeaderEndpoint, err := s.getNATSJetStreamLeaderURL()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream
}
}
err = s.getNATSJetstreamMonitoringData(ctx, monitoringLeaderEndpoint)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}
}

if s.stream == nil {
return []external_metrics.ExternalMetricValue{}, errors.New("stream not found")
}

totalLag := s.getMaxMsgLag()
s.logger.V(1).Info("NATS JetStream Scaler: Providing metrics based on totalLag, threshold", "totalLag", totalLag, "lagThreshold", s.metadata.lagThreshold)

Expand Down
Loading

0 comments on commit 5d57c2c

Please sign in to comment.