Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor couchdb scaler #6267

Merged
merged 2 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 107 additions & 156 deletions pkg/scalers/couchdb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"net"
"strconv"

couchdb "github.com/go-kivik/couchdb/v3"
"github.com/go-kivik/kivik/v3"
Expand All @@ -19,216 +18,168 @@ import (

type couchDBScaler struct {
metricType v2.MetricTargetType
metadata *couchDBMetadata
metadata couchDBMetadata
client *kivik.Client
logger logr.Logger
}

type couchDBMetadata struct {
ConnectionString string `keda:"name=connectionString,order=authParams;triggerMetadata;resolvedEnv,optional"`
Host string `keda:"name=host,order=authParams;triggerMetadata,optional"`
Port string `keda:"name=port,order=authParams;triggerMetadata,optional"`
Username string `keda:"name=username,order=authParams;triggerMetadata,optional"`
Password string `keda:"name=password,order=authParams;triggerMetadata;resolvedEnv,optional"`
DBName string `keda:"name=dbName,order=authParams;triggerMetadata,optional"`
Query string `keda:"name=query,order=triggerMetadata,optional"`
QueryValue int64 `keda:"name=queryValue,order=triggerMetadata,optional"`
ActivationQueryValue int64 `keda:"name=activationQueryValue,order=triggerMetadata,default=0,optional"`
TriggerIndex int
}

func (m *couchDBMetadata) Validate() error {
if m.ConnectionString == "" {
if m.Host == "" {
return fmt.Errorf("no host given")
}
if m.Port == "" {
return fmt.Errorf("no port given")
}
if m.Username == "" {
return fmt.Errorf("no username given")
}
if m.Password == "" {
return fmt.Errorf("no password given")
}
if m.DBName == "" {
return fmt.Errorf("no dbName given")
}
}
return nil
}

type couchDBQueryRequest struct {
Selector map[string]interface{} `json:"selector"`
Fields []string `json:"fields"`
}

type couchDBMetadata struct {
connectionString string
host string
port string
username string
password string
dbName string
query string
queryValue int64
activationQueryValue int64
triggerIndex int
}

type Res struct {
ID string `json:"_id"`
Feet int `json:"feet"`
Greeting string `json:"greeting"`
}

func (s *couchDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("coucdb-%s", s.metadata.dbName))),
},
Target: GetMetricTarget(s.metricType, s.metadata.queryValue),
func NewCouchDBScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}
metricSpec := v2.MetricSpec{
External: externalMetric, Type: externalMetricType,

meta, err := parseCouchDBMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing couchdb metadata: %w", err)
}
return []v2.MetricSpec{metricSpec}
}

func (s couchDBScaler) Close(ctx context.Context) error {
if s.client != nil {
err := s.client.Close(ctx)
if err != nil {
s.logger.Error(err, fmt.Sprintf("failed to close couchdb connection, because of %v", err))
return err
}
connStr := meta.ConnectionString
if connStr == "" {
addr := net.JoinHostPort(meta.Host, meta.Port)
connStr = "http://" + addr
}
return nil
}

func (s *couchDBScaler) getQueryResult(ctx context.Context) (int64, error) {
db := s.client.DB(ctx, s.metadata.dbName)
var request couchDBQueryRequest
err := json.Unmarshal([]byte(s.metadata.query), &request)
client, err := kivik.New("couch", connStr)
if err != nil {
s.logger.Error(err, fmt.Sprintf("Couldn't unmarshal query string because of %v", err))
return 0, err
return nil, fmt.Errorf("error creating couchdb client: %w", err)
}
rows, err := db.Find(ctx, request, nil)

err = client.Authenticate(ctx, couchdb.BasicAuth("admin", meta.Password))
if err != nil {
s.logger.Error(err, fmt.Sprintf("failed to fetch rows because of %v", err))
return 0, err
return nil, fmt.Errorf("error authenticating with couchdb: %w", err)
}
var count int64
for rows.Next() {
count++
res := Res{}
if err := rows.ScanDoc(&res); err != nil {
s.logger.Error(err, fmt.Sprintf("failed to scan the doc because of %v", err))
return 0, err
}

isConnected, err := client.Ping(ctx)
if !isConnected || err != nil {
return nil, fmt.Errorf("failed to ping couchdb: %w", err)
}
return count, nil

return &couchDBScaler{
metricType: metricType,
metadata: meta,
client: client,
logger: InitializeLogger(config, "couchdb_scaler"),
}, nil
}

func parseCouchDBMetadata(config *scalersconfig.ScalerConfig) (*couchDBMetadata, string, error) {
var connStr string
var err error
func parseCouchDBMetadata(config *scalersconfig.ScalerConfig) (couchDBMetadata, error) {
meta := couchDBMetadata{}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, "", fmt.Errorf("no query given")
err := config.TypedConfig(&meta)
if err != nil {
return meta, fmt.Errorf("error parsing couchdb metadata: %w", err)
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, "", fmt.Errorf("failed to convert %v to int, because of %w", val, err)
}
meta.queryValue = queryValue
} else {
if config.AsMetricSource {
meta.queryValue = 0
} else {
return nil, "", fmt.Errorf("no queryValue given")
}
if meta.QueryValue == 0 && !config.AsMetricSource {
return meta, fmt.Errorf("no queryValue given")
}

meta.activationQueryValue = 0
if val, ok := config.TriggerMetadata["activationQueryValue"]; ok {
activationQueryValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, "", fmt.Errorf("failed to convert %v to int, because of %w", val, err)
}
meta.activationQueryValue = activationQueryValue
if config.AsMetricSource {
meta.QueryValue = 0
}

dbName, err := GetFromAuthOrMeta(config, "dbName")
if err != nil {
return nil, "", err
}
meta.dbName = dbName

switch {
case config.AuthParams["connectionString"] != "":
meta.connectionString = config.AuthParams["connectionString"]
case config.TriggerMetadata["connectionStringFromEnv"] != "":
meta.connectionString = config.ResolvedEnv[config.TriggerMetadata["connectionStringFromEnv"]]
default:
meta.connectionString = ""
host, err := GetFromAuthOrMeta(config, "host")
if err != nil {
return nil, "", err
}
meta.host = host

port, err := GetFromAuthOrMeta(config, "port")
if err != nil {
return nil, "", err
}
meta.port = port

username, err := GetFromAuthOrMeta(config, "username")
if err != nil {
return nil, "", err
}
meta.username = username
meta.TriggerIndex = config.TriggerIndex
return meta, nil
}

if config.AuthParams["password"] != "" {
meta.password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}
if len(meta.password) == 0 {
return nil, "", fmt.Errorf("no password given")
func (s *couchDBScaler) Close(ctx context.Context) error {
if s.client != nil {
if err := s.client.Close(ctx); err != nil {
s.logger.Error(err, "failed to close couchdb connection")
return err
}
}

if meta.connectionString != "" {
connStr = meta.connectionString
} else {
// Build connection str
addr := net.JoinHostPort(meta.host, meta.port)
// nosemgrep: db-connection-string
connStr = "http://" + addr
}
meta.triggerIndex = config.TriggerIndex
return &meta, connStr, nil
return nil
}

func NewCouchDBScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
func (s *couchDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("coucdb-%s", s.metadata.DBName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.QueryValue),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

meta, connStr, err := parseCouchDBMetadata(config)
if err != nil {
return nil, fmt.Errorf("failed to parsing couchDB metadata, because of %w", err)
}
func (s *couchDBScaler) getQueryResult(ctx context.Context) (int64, error) {
db := s.client.DB(ctx, s.metadata.DBName)

client, err := kivik.New("couch", connStr)
if err != nil {
return nil, fmt.Errorf("%w", err)
var request couchDBQueryRequest
if err := json.Unmarshal([]byte(s.metadata.Query), &request); err != nil {
return 0, fmt.Errorf("error unmarshaling query: %w", err)
}

err = client.Authenticate(ctx, couchdb.BasicAuth("admin", meta.password))
rows, err := db.Find(ctx, request, nil)
if err != nil {
return nil, err
return 0, fmt.Errorf("error executing query: %w", err)
}

isconnected, err := client.Ping(ctx)
if !isconnected {
return nil, fmt.Errorf("%w", err)
}
if err != nil {
return nil, fmt.Errorf("failed to ping couchDB, because of %w", err)
var count int64
for rows.Next() {
count++
var res Res
if err := rows.ScanDoc(&res); err != nil {
return 0, fmt.Errorf("error scanning document: %w", err)
}
}

return &couchDBScaler{
metricType: metricType,
metadata: meta,
client: client,
logger: InitializeLogger(config, "couchdb_scaler"),
}, nil
return count, nil
}

// GetMetricsAndActivity query from couchDB,and return to external metrics and activity
func (s *couchDBScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
result, err := s.getQueryResult(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("failed to inspect couchdb, because of %w", err)
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("failed to inspect couchdb: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(result))

return append([]external_metrics.ExternalMetricValue{}, metric), result > s.metadata.activationQueryValue, nil
return []external_metrics.ExternalMetricValue{metric}, result > s.metadata.ActivationQueryValue, nil
}
Loading
Loading