Skip to content

Commit

Permalink
JetStream scaler use stream consumer leader metric when clustered
Browse files Browse the repository at this point in the history
Signed-off-by: Ray <[email protected]>
  • Loading branch information
rayjanoka committed Nov 15, 2022
1 parent 682f7af commit 4d44456
Show file tree
Hide file tree
Showing 3 changed files with 423 additions and 52 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- [v1.1.0](#v110)
- [v1.0.0](#v100)

## Unreleased

- **JetStream:** JetStream scaler use stream consumer leader metric when clustered ([#2391](https://github.com/kedacore/keda/issues/2391))

### New

- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588))
Expand Down
141 changes: 97 additions & 44 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"

"github.com/go-logr/logr"
Expand All @@ -19,10 +20,11 @@ import (
)

const (
jetStreamMetricType = "External"
defaultJetStreamLagThreshold = 10
natsHTTPProtocol = "http"
natsHTTPSProtocol = "https"
jetStreamMetricType = "External"
defaultJetStreamLagThreshold = 10
natsHTTPProtocol = "http"
natsHTTPSProtocol = "https"
jetStreamLagThresholdMetricName = "lagThreshold"
)

type natsJetStreamScaler struct {
Expand All @@ -34,24 +36,31 @@ type natsJetStreamScaler struct {
}

type natsJetStreamMetadata struct {
monitoringEndpoint string
account string
stream string
consumer string
leaderName string
monitoringURL string
lagThreshold int64
activationLagThreshold int64
clusterSize int
scalerIndex int
}

type jetStreamEndpointResponse struct {
Accounts []accountDetail `json:"account_details"`
Accounts []accountDetail `json:"account_details"`
MetaCluster metaCluster `json:"meta_cluster"`
}

type accountDetail struct {
Name string `json:"name"`
Streams []*streamDetail `json:"stream_detail"`
}

type metaCluster struct {
ClusterSize int `json:"cluster_size"`
}

type streamDetail struct {
Name string `json:"name"`
Config streamConfig `json:"config"`
Expand All @@ -77,6 +86,11 @@ type consumerDetail struct {
NumPending int `json:"num_pending"`
Config consumerConfig `json:"config"`
DeliveryStatus consumerDeliveryStatus `json:"delivery"`
Cluster consumerCluster `json:"cluster"`
}

type consumerCluster struct {
Leader string `json:"leader"`
}

type consumerConfig struct {
Expand Down Expand Up @@ -128,11 +142,12 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er

meta.lagThreshold = defaultJetStreamLagThreshold

if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok {
if val, ok := config.TriggerMetadata[jetStreamLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err)
return meta, fmt.Errorf("error parsing %s: %s", jetStreamLagThresholdMetricName, err)
}

meta.lagThreshold = t
}

Expand All @@ -158,49 +173,94 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er
return meta, fmt.Errorf("useHTTPS parsing error %s", err.Error())
}
}
meta.monitoringEndpoint = getNATSJetStreamEndpoint(useHTTPS, natsServerEndpoint, meta.account)
meta.monitoringURL = getNATSJetStreamURL(useHTTPS, natsServerEndpoint, meta.account)

return meta, nil
}

func getNATSJetStreamEndpoint(useHTTPS bool, natsServerEndpoint string, account string) string {
protocol := natsHTTPProtocol
func getNATSJetStreamURL(useHTTPS bool, natsServerEndpoint string, account string) string {
scheme := natsHTTPProtocol
if useHTTPS {
protocol = natsHTTPSProtocol
scheme = natsHTTPSProtocol
}

return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", protocol, natsServerEndpoint, account)
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account)
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
func (s *natsJetStreamScaler) getNATSJetStreamLeaderURL() (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
return false, err
s.logger.Error(err, "unable to parse monitoring URL to create leader URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}

return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, s.metadata.leaderName, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context, natsJetStreamMonitoringURL string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringURL, nil)
if err != nil {
return err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return false, err
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringURL)
return err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
s.logger.Error(err, "unable to decode JetStream account response")
return false, err
s.logger.Error(err, "unable to decode NATS JetStream account details")
return err
}

s.metadata.clusterSize = jsAccountResp.MetaCluster.ClusterSize

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
for _, jsAccount := range jsAccountResp.Accounts {
if jsAccount.Name == s.metadata.account {
for _, stream := range jsAccount.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream

for _, consumer := range stream.Consumers {
if consumer.Name == s.metadata.consumer {
s.metadata.leaderName = consumer.Cluster.Leader
}
}
}
}
}
}

return nil
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
if err != nil {
return false, err
}

// Query the consumer leader pod, it has the accurate count.
if s.metadata.clusterSize > 1 {
monitoringLeaderURL, err := s.getNATSJetStreamLeaderURL()
if err != nil {
return false, err
}

err = s.getNATSJetstreamMonitoringData(ctx, monitoringLeaderURL)
if err != nil {
return false, err
}
}

if s.stream == nil {
return false, errors.New("stream not found")
}

return s.getMaxMsgLag() > s.metadata.activationLagThreshold, nil
}

Expand Down Expand Up @@ -230,35 +290,28 @@ func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.Metr
}

func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
if err != nil {
return nil, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
s.logger.Error(err, "unable to decode JetStream account details")
return []external_metrics.ExternalMetricValue{}, err
}
// Query the consumer leader pod, it has the accurate count.
if s.metadata.clusterSize > 1 {
monitoringLeaderEndpoint, err := s.getNATSJetStreamLeaderURL()
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream
}
}
err = s.getNATSJetstreamMonitoringData(ctx, monitoringLeaderEndpoint)
if err != nil {
return []external_metrics.ExternalMetricValue{}, err
}
}

if s.stream == nil {
return []external_metrics.ExternalMetricValue{}, errors.New("stream not found")
}

totalLag := s.getMaxMsgLag()
s.logger.V(1).Info("NATS JetStream Scaler: Providing metrics based on totalLag, threshold", "totalLag", totalLag, "lagThreshold", s.metadata.lagThreshold)

Expand Down
Loading

0 comments on commit 4d44456

Please sign in to comment.