Skip to content

Commit

Permalink
Add support for JetStream scaler to query the stream consumer leader …
Browse files Browse the repository at this point in the history
…when clustered

Signed-off-by: Ray <[email protected]>
  • Loading branch information
rayjanoka committed Dec 2, 2022
1 parent a489ca7 commit 61a844d
Show file tree
Hide file tree
Showing 8 changed files with 1,217 additions and 483 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **Azure Pipelines Scaler:** Improved speed of profiling large set of Job Requests from Azure Pipelines ([#3702](https://github.com/kedacore/keda/issues/3702))
- **GCP Storage Scaler:** Add prefix and delimiter support ([#3756](https://github.com/kedacore/keda/issues/3756))
- **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728))
- **NATS Jetstream Scaler:** Query the stream consumer leader when clustered ([#3860](https://github.com/kedacore/keda/issues/3860))
- **NATS Scalers:** Support HTTPS protocol in NATS Scalers ([#3805](https://github.com/kedacore/keda/issues/3805))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Pulsar Scaler:** Add support for bearer token and basic auth ([#3844](https://github.com/kedacore/keda/issues/3844))
Expand Down
269 changes: 214 additions & 55 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -18,10 +20,11 @@ import (
)

const (
jetStreamMetricType = "External"
defaultJetStreamLagThreshold = 10
natsHTTPProtocol = "http"
natsHTTPSProtocol = "https"
jetStreamMetricType = "External"
defaultJetStreamLagThreshold = 10
natsHTTPProtocol = "http"
natsHTTPSProtocol = "https"
jetStreamLagThresholdMetricName = "lagThreshold"
)

type natsJetStreamScaler struct {
Expand All @@ -33,24 +36,41 @@ type natsJetStreamScaler struct {
}

type natsJetStreamMetadata struct {
monitoringEndpoint string
account string
stream string
consumer string
consumerLeader string
monitoringURL string
monitoringLeaderURL string
lagThreshold int64
activationLagThreshold int64
clusterSize int
scalerIndex int
}

type jetStreamEndpointResponse struct {
Accounts []accountDetail `json:"account_details"`
Accounts []accountDetail `json:"account_details"`
MetaCluster metaCluster `json:"meta_cluster"`
}

type jetStreamServerEndpointResponse struct {
Cluster jetStreamCluster `json:"cluster"`
ServerName string `json:"server_name"`
}

type jetStreamCluster struct {
HostUrls []string `json:"urls"`
}

type accountDetail struct {
Name string `json:"name"`
Streams []*streamDetail `json:"stream_detail"`
}

type metaCluster struct {
ClusterSize int `json:"cluster_size"`
}

type streamDetail struct {
Name string `json:"name"`
Config streamConfig `json:"config"`
Expand All @@ -76,6 +96,11 @@ type consumerDetail struct {
NumPending int `json:"num_pending"`
Config consumerConfig `json:"config"`
DeliveryStatus consumerDeliveryStatus `json:"delivery"`
Cluster consumerCluster `json:"cluster"`
}

type consumerCluster struct {
Leader string `json:"leader"`
}

type consumerConfig struct {
Expand Down Expand Up @@ -127,11 +152,12 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er

meta.lagThreshold = defaultJetStreamLagThreshold

if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok {
if val, ok := config.TriggerMetadata[jetStreamLagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err)
return meta, fmt.Errorf("error parsing %s: %s", jetStreamLagThresholdMetricName, err)
}

meta.lagThreshold = t
}

Expand All @@ -157,49 +183,202 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er
return meta, fmt.Errorf("useHTTPS parsing error %s", err.Error())
}
}
meta.monitoringEndpoint = getNATSJetStreamEndpoint(useHTTPS, natsServerEndpoint, meta.account)
meta.monitoringURL = getNATSJetStreamMonitoringURL(useHTTPS, natsServerEndpoint, meta.account)

return meta, nil
}

func getNATSJetStreamEndpoint(useHTTPS bool, natsServerEndpoint string, account string) string {
protocol := natsHTTPProtocol
if useHTTPS {
protocol = natsHTTPSProtocol
func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context, natsJetStreamMonitoringURL string) error {
// save the leader URL, then we can check if it has changed
cachedConsumerLeader := s.metadata.consumerLeader
// default URL (standalone)
monitoringURL := natsJetStreamMonitoringURL
// use the leader URL if we already have it
if s.metadata.monitoringLeaderURL != "" {
monitoringURL = s.metadata.monitoringLeaderURL
}

jetStreamAccountResp, err := s.getNATSJetstreamMonitoringRequest(ctx, monitoringURL)
if err != nil {
return err
}

return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", protocol, natsServerEndpoint, account)
consumerFound := s.setNATSJetStreamMonitoringData(jetStreamAccountResp, "")

// invalidate the cached data if we used it but nothing was found
if cachedConsumerLeader != "" && !consumerFound {
s.invalidateNATSJetStreamCachedMonitoringData()
}

// the leader name hasn't changed from the previous run, we can assume we just queried the correct leader node
if consumerFound && cachedConsumerLeader != "" && cachedConsumerLeader == s.metadata.consumerLeader {
return nil
}

if s.metadata.clusterSize > 1 {
// we know who the consumer leader is, query it directly
if s.metadata.consumerLeader != "" {
natsJetStreamMonitoringLeaderURL, err := s.getNATSJetStreamMonitoringNodeURL(s.metadata.consumerLeader)
if err != nil {
return err
}

jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringLeaderURL)
if err != nil {
return err
}

s.setNATSJetStreamMonitoringData(jetStreamAccountResp, natsJetStreamMonitoringLeaderURL)
return nil
}

// we haven't found the consumer yet, grab the list of hosts and try each one
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL()
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringServerURL, nil)
if err != nil {
return err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring server endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringServerURL)
return err
}

defer resp.Body.Close()
var jetStreamServerResp *jetStreamServerEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jetStreamServerResp); err != nil {
s.logger.Error(err, "unable to decode NATS JetStream server details")
return err
}

for _, clusterURL := range jetStreamServerResp.Cluster.HostUrls {
node := strings.Split(clusterURL, ".")[0]
natsJetStreamMonitoringNodeURL, err := s.getNATSJetStreamMonitoringNodeURL(node)
if err != nil {
return err
}

jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringNodeURL)
if err != nil {
return err
}

for _, jetStreamAccount := range jetStreamAccountResp.Accounts {
if jetStreamAccount.Name == s.metadata.account {
for _, stream := range jetStreamAccount.Streams {
if stream.Name == s.metadata.stream {
for _, consumer := range stream.Consumers {
if consumer.Name == s.metadata.consumer {
// this node is the consumer leader
if node == consumer.Cluster.Leader {
s.setNATSJetStreamMonitoringData(jetStreamAccountResp, natsJetStreamMonitoringNodeURL)
return nil
}
}
}
}
}
}
}
}
}
return nil
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
func (s *natsJetStreamScaler) setNATSJetStreamMonitoringData(jetStreamAccountResp *jetStreamEndpointResponse, leaderURL string) bool {
s.metadata.clusterSize = jetStreamAccountResp.MetaCluster.ClusterSize

// find and assign the stream that we are looking for.
for _, jsAccount := range jetStreamAccountResp.Accounts {
if jsAccount.Name == s.metadata.account {
for _, stream := range jsAccount.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream

for _, consumer := range stream.Consumers {
if consumer.Name == s.metadata.consumer {
s.metadata.consumerLeader = consumer.Cluster.Leader
if leaderURL != "" {
s.metadata.monitoringLeaderURL = leaderURL
}
return true
}
}
}
}
}
}
return false
}

func (s *natsJetStreamScaler) invalidateNATSJetStreamCachedMonitoringData() {
s.metadata.consumerLeader = ""
s.metadata.monitoringLeaderURL = ""
s.stream = nil
}

func (s *natsJetStreamScaler) getNATSJetstreamMonitoringRequest(ctx context.Context, natsJetStreamMonitoringURL string) (*jetStreamEndpointResponse, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, natsJetStreamMonitoringURL, nil)
if err != nil {
return false, err
return nil, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return false, err
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringURL", natsJetStreamMonitoringURL)
return nil, err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
var jsAccountResp *jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
s.logger.Error(err, "unable to decode JetStream account response")
s.logger.Error(err, "unable to decode NATS JetStream account details")
return nil, err
}
return jsAccountResp, nil
}

func getNATSJetStreamMonitoringURL(useHTTPS bool, natsServerEndpoint string, account string) string {
scheme := natsHTTPProtocol
if useHTTPS {
scheme = natsHTTPSProtocol
}
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", scheme, natsServerEndpoint, account)
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringServerURL() (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create server URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s/varz", jsURL.Scheme, jsURL.Host), nil
}

func (s *natsJetStreamScaler) getNATSJetStreamMonitoringNodeURL(node string) (string, error) {
jsURL, err := url.Parse(s.metadata.monitoringURL)
if err != nil {
s.logger.Error(err, "unable to parse monitoring URL to create node URL", "natsServerMonitoringURL", s.metadata.monitoringURL)
return "", err
}
return fmt.Sprintf("%s://%s.%s%s?%s", jsURL.Scheme, node, jsURL.Host, jsURL.Path, jsURL.RawQuery), nil
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
if err != nil {
return false, err
}

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream
}
}
}
if s.stream == nil {
return false, errors.New("stream not found")
}

return s.getMaxMsgLag() > s.metadata.activationLagThreshold, nil
}

Expand All @@ -223,39 +402,20 @@ func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.Metr
Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold),
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: jetStreamMetricType,
External: externalMetric,
Type: jetStreamMetricType,
}
return []v2.MetricSpec{metricSpec}
}

func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
err := s.getNATSJetstreamMonitoringData(ctx, s.metadata.monitoringURL)
if err != nil {
return nil, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
s.logger.Error(err, "unable to decode JetStream account details")
return []external_metrics.ExternalMetricValue{}, err
}

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream
}
}
}
if s.stream == nil {
return []external_metrics.ExternalMetricValue{}, errors.New("stream not found")
}

totalLag := s.getMaxMsgLag()
Expand All @@ -266,7 +426,6 @@ func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string)
Value: *resource.NewQuantity(totalLag, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

Expand Down
Loading

0 comments on commit 61a844d

Please sign in to comment.