diff --git a/CHANGELOG.md b/CHANGELOG.md index 484cf87bd68..ce12ecc9354 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General**: Add the generateEmbeddedObjectMeta flag to generate meta properties of JobTargetRef in ScaledJob ([#5908](https://github.com/kedacore/keda/issues/5908)) - **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973)) +- **General**: Introduce new Beanstalkd scaler ([#5901](https://github.com/kedacore/keda/issues/5901)) - **General**: Replace wildcards in RBAC objects with explicit resources and verbs ([#6129](https://github.com/kedacore/keda/pull/6129)) - **Azure Pipelines Scalar**: Print warning to log when Azure DevOps API Rate Limits are (nearly) reached ([#6284](https://github.com/kedacore/keda/issues/6284)) - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) diff --git a/go.mod b/go.mod index 4322a6cafee..8ae68b7c5c4 100644 --- a/go.mod +++ b/go.mod @@ -117,6 +117,7 @@ require ( sigs.k8s.io/controller-tools v0.15.0 sigs.k8s.io/custom-metrics-apiserver v1.29.0 sigs.k8s.io/kustomize/kustomize/v5 v5.4.3 + github.com/beanstalkd/go-beanstalk v0.2.0 ) // Remove this when they merge the PR and cut a release https://github.com/open-policy-agent/cert-controller/pull/202 diff --git a/go.sum b/go.sum index e7cefad3489..5db432af5c9 100644 --- a/go.sum +++ b/go.sum @@ -951,6 +951,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzx github.com/aws/smithy-go v1.13.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/beanstalkd/go-beanstalk v0.2.0 h1:6UOJugnu47uNB2jJO/lxyDgeD1Yds7owYi1USELqexA= +github.com/beanstalkd/go-beanstalk v0.2.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/pkg/scalers/beanstalkd_scaler.go b/pkg/scalers/beanstalkd_scaler.go new file mode 100644 index 00000000000..a658b4f104f --- /dev/null +++ b/pkg/scalers/beanstalkd_scaler.go @@ -0,0 +1,179 @@ +package scalers + +import ( + "context" + "errors" + "fmt" + "net/url" + "time" + + beanstalk "github.com/beanstalkd/go-beanstalk" + "github.com/go-logr/logr" + "github.com/mitchellh/mapstructure" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" + "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + beanstalkdJobsMetricName = "jobs" + beanstalkdValueConfigName = "value" + beanstalkdActivationValueTriggerConfigName = "activationValue" + beanstalkdMetricType = "External" + beanstalkdNetworkProtocol = "tcp" +) + +type BeanstalkdScaler struct { + metricType v2.MetricTargetType + metadata *BeanstalkdMetadata + connection *beanstalk.Conn + tube *beanstalk.Tube + logger logr.Logger +} + +type BeanstalkdMetadata struct { + Server string `keda:"name=server, order=triggerMetadata"` + Tube string `keda:"name=tube, order=triggerMetadata"` + Value float64 `keda:"name=value, order=triggerMetadata"` + ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional"` + IncludeDelayed bool `keda:"name=includeDelayed, order=triggerMetadata, optional"` + Timeout uint `keda:"name=timeout, order=triggerMetadata, optional, default=30"` + TriggerIndex int +} + +// TubeStats represents a set of tube statistics. +type tubeStats struct { + TotalJobs int64 `mapstructure:"total-jobs"` + JobsReady int64 `mapstructure:"current-jobs-ready"` + JobsReserved int64 `mapstructure:"current-jobs-reserved"` + JobsUrgent int64 `mapstructure:"current-jobs-urgent"` + JobsBuried int64 `mapstructure:"current-jobs-buried"` + JobsDelayed int64 `mapstructure:"current-jobs-delayed"` +} + +func NewBeanstalkdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { + s := &BeanstalkdScaler{} + + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("error getting scaler metric type: %w", err) + } + s.metricType = metricType + + s.logger = InitializeLogger(config, "beanstalkd_scaler") + + meta, err := parseBeanstalkdMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing beanstalkd metadata: %w", err) + } + s.metadata = meta + + timeout := time.Duration(s.metadata.Timeout) * time.Second + + conn, err := beanstalk.DialTimeout(beanstalkdNetworkProtocol, s.metadata.Server, timeout) + if err != nil { + return nil, fmt.Errorf("error connecting to beanstalkd: %w", err) + } + + s.connection = conn + + s.tube = beanstalk.NewTube(s.connection, meta.Tube) + + return s, nil +} + +func parseBeanstalkdMetadata(config *scalersconfig.ScalerConfig) (*BeanstalkdMetadata, error) { + meta := &BeanstalkdMetadata{} + + meta.TriggerIndex = config.TriggerIndex + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing beanstalkd metadata: %w", err) + } + + return meta, nil +} + +func (s *BeanstalkdScaler) getTubeStats(ctx context.Context) (*tubeStats, error) { + errCh := make(chan error) + statsCh := make(chan *tubeStats) + + go func() { + rawStats, err := s.tube.Stats() + if err != nil { + errCh <- fmt.Errorf("error retrieving stats from beanstalkd: %w", err) + } + + var stats tubeStats + err = mapstructure.WeakDecode(rawStats, &stats) + if err != nil { + errCh <- fmt.Errorf("error decoding stats from beanstalkd: %w", err) + } + + statsCh <- &stats + }() + + select { + case err := <-errCh: + if errors.Is(err, beanstalk.ErrNotFound) { + s.logger.Info("tube not found, setting stats to 0") + return &tubeStats{ + TotalJobs: 0, + JobsReady: 0, + JobsDelayed: 0, + JobsReserved: 0, + JobsUrgent: 0, + JobsBuried: 0, + }, nil + } + return nil, err + case tubeStats := <-statsCh: + return tubeStats, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (s *BeanstalkdScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + stats, err := s.getTubeStats(ctx) + if err != nil { + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error interacting with beanstalkd: %w", err) + } + + totalJobs := stats.JobsReady + stats.JobsReserved + + if s.metadata.IncludeDelayed { + totalJobs += stats.JobsDelayed + } + + metric := GenerateMetricInMili(metricName, float64(totalJobs)) + isActive := float64(totalJobs) > s.metadata.ActivationValue + + return []external_metrics.ExternalMetricValue{metric}, isActive, nil +} + +func (s *BeanstalkdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, util.NormalizeString(fmt.Sprintf("beanstalkd-%s", url.QueryEscape(s.metadata.Tube)))), + }, + Target: GetMetricTargetMili(s.metricType, s.metadata.Value), + } + metricSpec := v2.MetricSpec{ + External: externalMetric, Type: beanstalkdMetricType, + } + + return []v2.MetricSpec{metricSpec} +} + +func (s *BeanstalkdScaler) Close(context.Context) error { + if s.connection != nil { + err := s.connection.Close() + if err != nil { + s.logger.Error(err, "Error closing beanstalkd connection") + return err + } + } + return nil +} diff --git a/pkg/scalers/beanstalkd_scaler_test.go b/pkg/scalers/beanstalkd_scaler_test.go new file mode 100644 index 00000000000..11f77176b5c --- /dev/null +++ b/pkg/scalers/beanstalkd_scaler_test.go @@ -0,0 +1,257 @@ +package scalers + +import ( + "context" + "fmt" + "net" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v3" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" +) + +const ( + beanstalkdServer = "localhost:3000" +) + +type parseBeanstalkdMetadataTestData struct { + metadata map[string]string + isError bool +} + +type beanstalkdMetricIdentifier struct { + metadataTestData *parseBeanstalkdMetadataTestData + index int + name string +} + +type tubeStatsTestData struct { + response map[string]interface{} + metadata map[string]string + isActive bool +} + +var testBeanstalkdMetadata = []parseBeanstalkdMetadataTestData{ + // nothing passed + {map[string]string{}, true}, + // properly formed + {map[string]string{"server": beanstalkdServer, "tube": "delayed", "value": "1", "includeDelayed": "true"}, false}, + // no includeDelayed + {map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "1"}, false}, + // missing server + {map[string]string{"tube": "stats-tube", "value": "1", "includeDelayed": "true"}, true}, + // missing tube + {map[string]string{"server": beanstalkdServer, "value": "1", "includeDelayed": "true"}, true}, + // missing value + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "includeDelayed": "true"}, true}, + // invalid value + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "lots", "includeDelayed": "true"}, true}, + // valid timeout + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "1", "includeDelayed": "true", "timeout": "1000"}, false}, + // invalid timeout + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "1", "includeDelayed": "true", "timeout": "-1"}, true}, + // activationValue passed + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "1", "activationValue": "10"}, false}, + // invalid activationValue passed + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "1", "activationValue": "AA"}, true}, +} + +var beanstalkdMetricIdentifiers = []beanstalkdMetricIdentifier{ + {&testBeanstalkdMetadata[2], 0, "s0-beanstalkd-no-delayed"}, + {&testBeanstalkdMetadata[1], 1, "s1-beanstalkd-delayed"}, +} + +var testTubeStatsTestData = []tubeStatsTestData{ + { + response: map[string]interface{}{ + "cmd-delete": 18, + "cmd-pause-tube": 0, + "current-jobs-buried": 6, + "current-jobs-delayed": 0, + "current-jobs-ready": 10, + "current-jobs-reserved": 0, + "current-jobs-urgent": 0, + "current-using": 3, + "current-waiting": 3, + "current-watching": 3, + "name": "form-crawler-notifications", + "pause": 0, + "pause-time-left": 0, + "total-jobs": 24, + }, + metadata: map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "2"}, + isActive: true, + }, + { + response: map[string]interface{}{ + "cmd-delete": 18, + "cmd-pause-tube": 0, + "current-jobs-buried": 0, + "current-jobs-delayed": 0, + "current-jobs-ready": 1, + "current-jobs-reserved": 0, + "current-jobs-urgent": 0, + "current-using": 3, + "current-waiting": 3, + "current-watching": 3, + "name": "form-crawler-notifications", + "pause": 0, + "pause-time-left": 0, + "total-jobs": 24, + }, + metadata: map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "3", "activationValue": "2"}, + isActive: false, + }, + { + response: map[string]interface{}{ + "cmd-delete": 18, + "cmd-pause-tube": 0, + "current-jobs-buried": 0, + "current-jobs-delayed": 10, + "current-jobs-ready": 0, + "current-jobs-reserved": 0, + "current-jobs-urgent": 0, + "current-using": 3, + "current-waiting": 3, + "current-watching": 3, + "name": "form-crawler-notifications", + "pause": 0, + "pause-time-left": 0, + "total-jobs": 24, + }, + metadata: map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "2"}, + isActive: false, + }, + { + response: map[string]interface{}{ + "cmd-delete": 18, + "cmd-pause-tube": 0, + "current-jobs-buried": 0, + "current-jobs-delayed": 10, + "current-jobs-ready": 0, + "current-jobs-reserved": 0, + "current-jobs-urgent": 0, + "current-using": 3, + "current-waiting": 3, + "current-watching": 3, + "name": "form-crawler-notifications", + "pause": 0, + "pause-time-left": 0, + "total-jobs": 24, + }, + metadata: map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "2", "includeDelayed": "true"}, + isActive: true, + }, +} + +func TestBeanstalkdParseMetadata(t *testing.T) { + for idx, testData := range testBeanstalkdMetadata { + meta, err := parseBeanstalkdMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Errorf("Expected error but got success in test case %d", idx) + } + if err == nil { + if val, ok := testData.metadata["includeDelayed"]; !ok { + assert.Equal(t, false, meta.IncludeDelayed) + } else { + boolVal, err := strconv.ParseBool(val) + if err != nil { + assert.Equal(t, boolVal, meta.IncludeDelayed) + } + } + } + } +} + +func TestBeanstalkdGetMetricSpecForScaling(t *testing.T) { + for _, testData := range beanstalkdMetricIdentifiers { + meta, err := parseBeanstalkdMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil, TriggerIndex: testData.index}) + if err != nil { + t.Fatal("could not parse metadata", err) + } + mockBeanstalkdScaler := BeanstalkdScaler{ + metadata: meta, + connection: nil, + tube: nil, + } + + metricSpec := mockBeanstalkdScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + assert.Equal(t, testData.name, metricName, "correct external source name") + } +} + +func TestGetTubeStats(t *testing.T) { + for _, testData := range testTubeStatsTestData { + testData := testData + yamlData, err := yaml.Marshal(testData.response) + if err != nil { + t.Fatal(err) + } + + response := []byte(fmt.Sprintf("OK %d\r\n", len(yamlData))) + response = append(response, yamlData...) + response = append(response, []byte("\r\n")...) + createTestServer(t, response) + + s, err := NewBeanstalkdScaler( + &scalersconfig.ScalerConfig{ + TriggerMetadata: testData.metadata, + GlobalHTTPTimeout: 1000 * time.Millisecond, + }, + ) + + assert.NoError(t, err) + + ctx := context.Background() + _, active, err := s.GetMetricsAndActivity(ctx, "Metric") + + assert.NoError(t, err) + + assert.Equal(t, testData.isActive, active) + } +} + +func TestGetTubeStatsNotFound(t *testing.T) { + testData := testTubeStatsTestData[0] + createTestServer(t, []byte("NOT_FOUND\r\n")) + s, err := NewBeanstalkdScaler( + &scalersconfig.ScalerConfig{ + TriggerMetadata: testData.metadata, + GlobalHTTPTimeout: 1000 * time.Millisecond, + }, + ) + + assert.NoError(t, err) + + ctx := context.Background() + _, active, err := s.GetMetricsAndActivity(ctx, "Metric") + + assert.NoError(t, err) + assert.False(t, active) +} + +func createTestServer(t *testing.T, response []byte) { + list, err := net.Listen("tcp", "localhost:3000") + if err != nil { + t.Fatal(err) + } + go func() { + defer list.Close() + conn, err := list.Accept() + if err != nil { + return + } + + _, err = conn.Write(response) + assert.NoError(t, err) + conn.Close() + }() +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 1f4549c7ffa..80bfb40658f 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -152,6 +152,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewAzureQueueScaler(config) case "azure-servicebus": return scalers.NewAzureServiceBusScaler(ctx, config) + case "beanstalkd": + return scalers.NewBeanstalkdScaler(config) case "cassandra": return scalers.NewCassandraScaler(config) case "couchdb": diff --git a/tests/scalers/beanstalkd/beanstalkd_test.go b/tests/scalers/beanstalkd/beanstalkd_test.go new file mode 100644 index 00000000000..7565d3fa0c6 --- /dev/null +++ b/tests/scalers/beanstalkd/beanstalkd_test.go @@ -0,0 +1,267 @@ +//go:build e2e +// +build e2e + +package beanstalkd_test + +import ( + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "beanstalkd-test" + deploymentName = "beanstalkd-consumer-deployment" + beanstalkdPutJobName = "beanstalkd-put-job" + beanstalkdPopJobName = "beanstalkd-pop-job" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + beanstalkdDeploymentName = fmt.Sprintf("%s-beanstalkd-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + beanstalkdTubeName = "default" + activationJobCount = 5 +) + +type templateData struct { + TestNamespace string + BeanstalkdDeploymentName string + BeanstalkdPutJobName string + BeanstalkdPopJobName string + ScaledObjectName string + DeploymentName string + BeanstalkdTubeName string + JobCount int +} + +const ( + beanstalkdDeploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: beanstalkd + name: {{.BeanstalkdDeploymentName}} + namespace: {{.TestNamespace}} +spec: + selector: + matchLabels: + app: beanstalkd + template: + metadata: + labels: + app: beanstalkd + spec: + containers: + - image: docker.io/schickling/beanstalkd + name: beanstalkd + ports: + - containerPort: 11300 + name: beanstalkd + readinessProbe: + tcpSocket: + port: 11300 + initialDelaySeconds: 5 + periodSeconds: 10 +--- +apiVersion: v1 +kind: Service +metadata: + name: beanstalkd + namespace: {{.TestNamespace}} +spec: + ports: + - name: beanstalkd + port: 11300 + targetPort: 11300 + selector: + app: beanstalkd + type: ClusterIP +` + + scaledObjectActivationTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + maxReplicaCount: 3 + pollingInterval: 5 + cooldownPeriod: 10 + triggers: + - type: beanstalkd + metadata: + server: beanstalkd.{{.TestNamespace}}:11300 + value: "15" + activationValue: "10" + tube: {{.BeanstalkdTubeName}} +` + + beanstalkdPutJobsTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: {{.BeanstalkdPutJobName}} + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: beanstalkd-put-job + image: docker.io/sitecrafting/beanstalkd-cli + command: ["/bin/sh"] + args: ["-c", "for run in $(seq 1 {{.JobCount}}); do beanstalkd-cli --host=beanstalkd put \"Test Job\"; done;"] + restartPolicy: OnFailure +` + + beanstalkdPopJobsTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: {{.BeanstalkdPopJobName}} + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: beanstalkd-pop-job + image: docker.io/sitecrafting/beanstalkd-cli + command: ["/bin/sh"] + args: ["-c", "for run in $(seq 1 {{.JobCount}}); do beanstalkd-cli --host=beanstalkd pop; done;"] + restartPolicy: OnFailure +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: nginx-deployment +spec: + replicas: 0 + selector: + matchLabels: + app: nginx-deployment + template: + metadata: + labels: + app: nginx-deployment + spec: + containers: + - name: nginx-deployment + image: nginxinc/nginx-unprivileged + ports: + - containerPort: 80 +` +) + +func TestBeanstalkdScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + t.Cleanup(func() { + DeleteKubernetesResources(t, testNamespace, data, templates) + }) + + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, beanstalkdDeploymentName, testNamespace, 1, 60, 1), + "replica count should be 0 after a minute") + + // test activation + testActivation(t, kc, data) + + // test scaling in + testScaleOut(t, kc, data) + + // scaling out + testScaleIn(t, kc, data) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + ScaledObjectName: scaledObjectName, + DeploymentName: deploymentName, + BeanstalkdDeploymentName: beanstalkdDeploymentName, + BeanstalkdTubeName: beanstalkdTubeName, + BeanstalkdPutJobName: beanstalkdPutJobName, + BeanstalkdPopJobName: beanstalkdPopJobName, + JobCount: activationJobCount, + }, []Template{ + {Name: "beanstalkdDeploymentTemplate", Config: beanstalkdDeploymentTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + } +} + +// Adds five beanstalkd jobs to the default tube +func addBeanstalkdJobs(t *testing.T, kc *kubernetes.Clientset, data *templateData) { + // run putJob + KubectlReplaceWithTemplate(t, data, "beanstalkdPutJobsTemplate", beanstalkdPutJobsTemplate) + require.True(t, WaitForJobSuccess(t, kc, beanstalkdPutJobName, testNamespace, 30, 2), "Job should run successfully") +} + +// Removes five beanstalkd jobs from the default tube +func removeBeanstalkdJobs(t *testing.T, kc *kubernetes.Clientset, data *templateData) { + // run putJob + KubectlReplaceWithTemplate(t, data, "beanstalkdPopJobsTemplate", beanstalkdPopJobsTemplate) + require.True(t, WaitForJobSuccess(t, kc, beanstalkdPopJobName, testNamespace, 30, 2), "Job should run successfully") +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing activation---") + + KubectlApplyWithTemplate(t, data, "scaledObjectActivationTemplate", scaledObjectActivationTemplate) + + // Add 5 beanstalkd jobs + data.JobCount = 5 + addBeanstalkdJobs(t, kc, &data) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scaling out ---") + + // Add 100 beanstalkd jobs + data.JobCount = 100 + addBeanstalkdJobs(t, kc, &data) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 3, 60, 1), + "replica count should be 3 after a minute") +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scaling in ---") + + // Remove 80 beanstalkd jobs + data.JobCount = 80 + removeBeanstalkdJobs(t, kc, &data) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 5), + "replica count should be 2 after 5 minutes") + + // Remove remaining beanstalkd jobs + data.JobCount = 25 + removeBeanstalkdJobs(t, kc, &data) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after a minute") +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/License b/vendor/github.com/beanstalkd/go-beanstalk/License new file mode 100644 index 00000000000..183c3898c36 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/License @@ -0,0 +1,22 @@ +Copyright 2012 Keith Rarick + +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated documentation +files (the "Software"), to deal in the Software without +restriction, including without limitation the rights to use, +copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/beanstalkd/go-beanstalk/Readme.md b/vendor/github.com/beanstalkd/go-beanstalk/Readme.md new file mode 100644 index 00000000000..15fd3b96330 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/Readme.md @@ -0,0 +1,19 @@ +# Beanstalk + +Go client for [beanstalkd](https://beanstalkd.github.io). + +## Install + + $ go get github.com/beanstalkd/go-beanstalk + +## Use + +Produce jobs: + + c, err := beanstalk.Dial("tcp", "127.0.0.1:11300") + id, err := c.Put([]byte("hello"), 1, 0, 120*time.Second) + +Consume jobs: + + c, err := beanstalk.Dial("tcp", "127.0.0.1:11300") + id, body, err := c.Reserve(5 * time.Second) diff --git a/vendor/github.com/beanstalkd/go-beanstalk/conn.go b/vendor/github.com/beanstalkd/go-beanstalk/conn.go new file mode 100644 index 00000000000..7c3eb40d83c --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/conn.go @@ -0,0 +1,295 @@ +package beanstalk + +import ( + "fmt" + "io" + "net" + "net/textproto" + "strings" + "time" +) + +// DefaultDialTimeout is the time to wait for a connection to the beanstalk server. +const DefaultDialTimeout = 10 * time.Second + +// DefaultKeepAlivePeriod is the default period between TCP keepalive messages. +const DefaultKeepAlivePeriod = 10 * time.Second + +// A Conn represents a connection to a beanstalkd server. It consists +// of a default Tube and TubeSet as well as the underlying network +// connection. The embedded types carry methods with them; see the +// documentation of those types for details. +type Conn struct { + c *textproto.Conn + used string + watched map[string]bool + Tube + TubeSet +} + +var ( + space = []byte{' '} + crnl = []byte{'\r', '\n'} + yamlHead = []byte{'-', '-', '-', '\n'} + nl = []byte{'\n'} + colonSpace = []byte{':', ' '} + minusSpace = []byte{'-', ' '} +) + +// NewConn returns a new Conn using conn for I/O. +func NewConn(conn io.ReadWriteCloser) *Conn { + c := new(Conn) + c.c = textproto.NewConn(conn) + c.Tube = *NewTube(c, "default") + c.TubeSet = *NewTubeSet(c, "default") + c.used = "default" + c.watched = map[string]bool{"default": true} + return c +} + +// Dial connects addr on the given network using net.DialTimeout +// with a default timeout of 10s and then returns a new Conn for the connection. +func Dial(network, addr string) (*Conn, error) { + return DialTimeout(network, addr, DefaultDialTimeout) +} + +// DialTimeout connects addr on the given network using net.DialTimeout +// with a supplied timeout and then returns a new Conn for the connection. +func DialTimeout(network, addr string, timeout time.Duration) (*Conn, error) { + dialer := &net.Dialer{ + Timeout: timeout, + KeepAlive: DefaultKeepAlivePeriod, + } + c, err := dialer.Dial(network, addr) + if err != nil { + return nil, err + } + return NewConn(c), nil +} + +// Close closes the underlying network connection. +func (c *Conn) Close() error { + return c.c.Close() +} + +func (c *Conn) cmd(t *Tube, ts *TubeSet, body []byte, op string, args ...interface{}) (req, error) { + // negative dur checking + for _, arg := range args { + if d, _ := arg.(dur); d < 0 { + return req{}, fmt.Errorf("duration must be non-negative, got %v", time.Duration(d)) + } + } + + r := req{c.c.Next(), op} + c.c.StartRequest(r.id) + defer c.c.EndRequest(r.id) + err := c.adjustTubes(t, ts) + if err != nil { + return req{}, err + } + if body != nil { + args = append(args, len(body)) + } + c.printLine(op, args...) + if body != nil { + c.c.W.Write(body) + c.c.W.Write(crnl) + } + err = c.c.W.Flush() + if err != nil { + return req{}, ConnError{c, op, err} + } + return r, nil +} + +func (c *Conn) adjustTubes(t *Tube, ts *TubeSet) error { + if t != nil && t.Name != c.used { + if err := checkName(t.Name); err != nil { + return err + } + c.printLine("use", t.Name) + c.used = t.Name + } + if ts != nil { + for s := range ts.Name { + if !c.watched[s] { + if err := checkName(s); err != nil { + return err + } + c.printLine("watch", s) + } + } + for s := range c.watched { + if !ts.Name[s] { + c.printLine("ignore", s) + } + } + c.watched = make(map[string]bool) + for s := range ts.Name { + c.watched[s] = true + } + } + return nil +} + +// does not flush +func (c *Conn) printLine(cmd string, args ...interface{}) { + io.WriteString(c.c.W, cmd) + for _, a := range args { + c.c.W.Write(space) + fmt.Fprint(c.c.W, a) + } + c.c.W.Write(crnl) +} + +func (c *Conn) readResp(r req, readBody bool, f string, a ...interface{}) (body []byte, err error) { + c.c.StartResponse(r.id) + defer c.c.EndResponse(r.id) + line, err := c.c.ReadLine() + for strings.HasPrefix(line, "WATCHING ") || strings.HasPrefix(line, "USING ") { + line, err = c.c.ReadLine() + } + if err != nil { + return nil, ConnError{c, r.op, err} + } + toScan := line + if readBody { + var size int + toScan, size, err = parseSize(toScan) + if err != nil { + return nil, ConnError{c, r.op, err} + } + body = make([]byte, size+2) // include trailing CR NL + _, err = io.ReadFull(c.c.R, body) + if err != nil { + return nil, ConnError{c, r.op, err} + } + body = body[:size] // exclude trailing CR NL + } + + err = scan(toScan, f, a...) + if err != nil { + return nil, ConnError{c, r.op, err} + } + return body, nil +} + +// Delete deletes the given job. +func (c *Conn) Delete(id uint64) error { + r, err := c.cmd(nil, nil, nil, "delete", id) + if err != nil { + return err + } + _, err = c.readResp(r, false, "DELETED") + return err +} + +// Release tells the server to perform the following actions: +// set the priority of the given job to pri, remove it from the list of +// jobs reserved by c, wait delay seconds, then place the job in the +// ready queue, which makes it available for reservation by any client. +func (c *Conn) Release(id uint64, pri uint32, delay time.Duration) error { + r, err := c.cmd(nil, nil, nil, "release", id, pri, dur(delay)) + if err != nil { + return err + } + _, err = c.readResp(r, false, "RELEASED") + return err +} + +// Bury places the given job in a holding area in the job's tube and +// sets its priority to pri. The job will not be scheduled again until it +// has been kicked; see also the documentation of Kick. +func (c *Conn) Bury(id uint64, pri uint32) error { + r, err := c.cmd(nil, nil, nil, "bury", id, pri) + if err != nil { + return err + } + _, err = c.readResp(r, false, "BURIED") + return err +} + +// KickJob places the given job to the ready queue of the same tube where it currently belongs +// when the given job id exists and is in a buried or delayed state. +func (c *Conn) KickJob(id uint64) error { + r, err := c.cmd(nil, nil, nil, "kick-job", id) + if err != nil { + return err + } + _, err = c.readResp(r, false, "KICKED") + return err +} + +// Touch resets the reservation timer for the given job. +// It is an error if the job isn't currently reserved by c. +// See the documentation of Reserve for more details. +func (c *Conn) Touch(id uint64) error { + r, err := c.cmd(nil, nil, nil, "touch", id) + if err != nil { + return err + } + _, err = c.readResp(r, false, "TOUCHED") + return err +} + +// Peek gets a copy of the specified job from the server. +func (c *Conn) Peek(id uint64) (body []byte, err error) { + r, err := c.cmd(nil, nil, nil, "peek", id) + if err != nil { + return nil, err + } + return c.readResp(r, true, "FOUND %d", &id) +} + +// ReserveJob reserves the specified job by id from the server. +func (c *Conn) ReserveJob(id uint64) (body []byte, err error) { + r, err := c.cmd(nil, nil, nil, "reserve-job", id) + if err != nil { + return nil, err + } + return c.readResp(r, true, "RESERVED %d", &id) +} + +// Stats retrieves global statistics from the server. +func (c *Conn) Stats() (map[string]string, error) { + r, err := c.cmd(nil, nil, nil, "stats") + if err != nil { + return nil, err + } + body, err := c.readResp(r, true, "OK") + return parseDict(body), err +} + +// StatsJob retrieves statistics about the given job. +func (c *Conn) StatsJob(id uint64) (map[string]string, error) { + r, err := c.cmd(nil, nil, nil, "stats-job", id) + if err != nil { + return nil, err + } + body, err := c.readResp(r, true, "OK") + return parseDict(body), err +} + +// ListTubes returns the names of the tubes that currently +// exist on the server. +func (c *Conn) ListTubes() ([]string, error) { + r, err := c.cmd(nil, nil, nil, "list-tubes") + if err != nil { + return nil, err + } + body, err := c.readResp(r, true, "OK") + return parseList(body), err +} + +func scan(input, format string, a ...interface{}) error { + _, err := fmt.Sscanf(input, format, a...) + if err != nil { + return findRespError(input) + } + return nil +} + +type req struct { + id uint + op string +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/doc.go b/vendor/github.com/beanstalkd/go-beanstalk/doc.go new file mode 100644 index 00000000000..7bb685e008a --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/doc.go @@ -0,0 +1,6 @@ +// Package beanstalk provides a client for the beanstalk protocol. +// See http://kr.github.com/beanstalkd/ for the server. +// +// This package is synchronized internally and safe to use from +// multiple goroutines without other coordination. +package beanstalk diff --git a/vendor/github.com/beanstalkd/go-beanstalk/err.go b/vendor/github.com/beanstalkd/go-beanstalk/err.go new file mode 100644 index 00000000000..66a38512a1b --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/err.go @@ -0,0 +1,63 @@ +package beanstalk + +import "errors" + +// ConnError records an error message from the server and the operation +// and connection that caused it. +type ConnError struct { + Conn *Conn + Op string + Err error +} + +func (e ConnError) Error() string { + return e.Op + ": " + e.Err.Error() +} + +func (e ConnError) Unwrap() error { + return e.Err +} + +// Error messages returned by the server. +var ( + ErrBadFormat = errors.New("bad command format") + ErrBuried = errors.New("buried") + ErrDeadline = errors.New("deadline soon") + ErrDraining = errors.New("draining") + ErrInternal = errors.New("internal error") + ErrJobTooBig = errors.New("job too big") + ErrNoCRLF = errors.New("expected CR LF") + ErrNotFound = errors.New("not found") + ErrNotIgnored = errors.New("not ignored") + ErrOOM = errors.New("server is out of memory") + ErrTimeout = errors.New("timeout") + ErrUnknown = errors.New("unknown command") +) + +var respError = map[string]error{ + "BAD_FORMAT": ErrBadFormat, + "BURIED": ErrBuried, + "DEADLINE_SOON": ErrDeadline, + "DRAINING": ErrDraining, + "EXPECTED_CRLF": ErrNoCRLF, + "INTERNAL_ERROR": ErrInternal, + "JOB_TOO_BIG": ErrJobTooBig, + "NOT_FOUND": ErrNotFound, + "NOT_IGNORED": ErrNotIgnored, + "OUT_OF_MEMORY": ErrOOM, + "TIMED_OUT": ErrTimeout, + "UNKNOWN_COMMAND": ErrUnknown, +} + +type unknownRespError string + +func (e unknownRespError) Error() string { + return "unknown response: " + string(e) +} + +func findRespError(s string) error { + if err := respError[s]; err != nil { + return err + } + return unknownRespError(s) +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/name.go b/vendor/github.com/beanstalkd/go-beanstalk/name.go new file mode 100644 index 00000000000..5a85b2d41c6 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/name.go @@ -0,0 +1,55 @@ +package beanstalk + +import ( + "errors" +) + +// NameChars are the allowed name characters in the beanstalkd protocol. +const NameChars = `\-+/;.$_()0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz` + +// NameError indicates that a name was malformed and the specific error +// describing how. +type NameError struct { + Name string + Err error +} + +func (e NameError) Error() string { + return e.Err.Error() + ": " + e.Name +} + +func (e NameError) Unwrap() error { + return e.Err +} + +// Name format errors. The Err field of NameError contains one of these. +var ( + ErrEmpty = errors.New("name is empty") + ErrBadChar = errors.New("name has bad char") // contains a character not in NameChars + ErrTooLong = errors.New("name is too long") +) + +func checkName(s string) error { + switch { + case len(s) == 0: + return NameError{s, ErrEmpty} + case len(s) >= 200: + return NameError{s, ErrTooLong} + case !containsOnly(s, NameChars): + return NameError{s, ErrBadChar} + } + return nil +} + +func containsOnly(s, chars string) bool { +outer: + for _, c := range s { + for _, m := range chars { + if c == m { + continue outer + } + } + return false + } + return true +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/parse.go b/vendor/github.com/beanstalkd/go-beanstalk/parse.go new file mode 100644 index 00000000000..091ab86e8f3 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/parse.go @@ -0,0 +1,54 @@ +package beanstalk + +import ( + "bytes" + "strconv" + "strings" +) + +func parseDict(dat []byte) map[string]string { + if dat == nil { + return nil + } + d := make(map[string]string) + if bytes.HasPrefix(dat, yamlHead) { + dat = dat[4:] + } + for _, s := range bytes.Split(dat, nl) { + kv := bytes.SplitN(s, colonSpace, 2) + if len(kv) != 2 { + continue + } + d[string(kv[0])] = string(kv[1]) + } + return d +} + +func parseList(dat []byte) []string { + if dat == nil { + return nil + } + l := []string{} + if bytes.HasPrefix(dat, yamlHead) { + dat = dat[4:] + } + for _, s := range bytes.Split(dat, nl) { + if !bytes.HasPrefix(s, minusSpace) { + continue + } + l = append(l, string(s[2:])) + } + return l +} + +func parseSize(s string) (string, int, error) { + i := strings.LastIndex(s, " ") + if i == -1 { + return "", 0, findRespError(s) + } + n, err := strconv.Atoi(s[i+1:]) + if err != nil { + return "", 0, err + } + return s[:i], n, nil +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/time.go b/vendor/github.com/beanstalkd/go-beanstalk/time.go new file mode 100644 index 00000000000..fd128cbd849 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/time.go @@ -0,0 +1,12 @@ +package beanstalk + +import ( + "strconv" + "time" +) + +type dur time.Duration + +func (d dur) String() string { + return strconv.FormatInt(int64(time.Duration(d)/time.Second), 10) +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/tube.go b/vendor/github.com/beanstalkd/go-beanstalk/tube.go new file mode 100644 index 00000000000..fe7baf7e536 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/tube.go @@ -0,0 +1,112 @@ +package beanstalk + +import ( + "time" +) + +// Tube represents tube Name on the server connected to by Conn. +// It has methods for commands that operate on a single tube. +type Tube struct { + Conn *Conn + Name string +} + +// NewTube returns a new Tube representing the given name. +func NewTube(c *Conn, name string) *Tube { + return &Tube{c, name} +} + +// Put puts a job into tube t with priority pri and TTR ttr, and returns +// the id of the newly-created job. If delay is nonzero, the server will +// wait the given amount of time after returning to the client and before +// putting the job into the ready queue. +func (t *Tube) Put(body []byte, pri uint32, delay, ttr time.Duration) (id uint64, err error) { + r, err := t.Conn.cmd(t, nil, body, "put", pri, dur(delay), dur(ttr)) + if err != nil { + return 0, err + } + _, err = t.Conn.readResp(r, false, "INSERTED %d", &id) + if err != nil { + return 0, err + } + return id, nil +} + +// PeekReady gets a copy of the job at the front of t's ready queue. +func (t *Tube) PeekReady() (id uint64, body []byte, err error) { + r, err := t.Conn.cmd(t, nil, nil, "peek-ready") + if err != nil { + return 0, nil, err + } + body, err = t.Conn.readResp(r, true, "FOUND %d", &id) + if err != nil { + return 0, nil, err + } + return id, body, nil +} + +// PeekDelayed gets a copy of the delayed job that is next to be +// put in t's ready queue. +func (t *Tube) PeekDelayed() (id uint64, body []byte, err error) { + r, err := t.Conn.cmd(t, nil, nil, "peek-delayed") + if err != nil { + return 0, nil, err + } + body, err = t.Conn.readResp(r, true, "FOUND %d", &id) + if err != nil { + return 0, nil, err + } + return id, body, nil +} + +// PeekBuried gets a copy of the job in the holding area that would +// be kicked next by Kick. +func (t *Tube) PeekBuried() (id uint64, body []byte, err error) { + r, err := t.Conn.cmd(t, nil, nil, "peek-buried") + if err != nil { + return 0, nil, err + } + body, err = t.Conn.readResp(r, true, "FOUND %d", &id) + if err != nil { + return 0, nil, err + } + return id, body, nil +} + +// Kick takes up to bound jobs from the holding area and moves them into +// the ready queue, then returns the number of jobs moved. Jobs will be +// taken in the order in which they were last buried. +func (t *Tube) Kick(bound int) (n int, err error) { + r, err := t.Conn.cmd(t, nil, nil, "kick", bound) + if err != nil { + return 0, err + } + _, err = t.Conn.readResp(r, false, "KICKED %d", &n) + if err != nil { + return 0, err + } + return n, nil +} + +// Stats retrieves statistics about tube t. +func (t *Tube) Stats() (map[string]string, error) { + r, err := t.Conn.cmd(nil, nil, nil, "stats-tube", t.Name) + if err != nil { + return nil, err + } + body, err := t.Conn.readResp(r, true, "OK") + return parseDict(body), err +} + +// Pause pauses new reservations in t for time d. +func (t *Tube) Pause(d time.Duration) error { + r, err := t.Conn.cmd(nil, nil, nil, "pause-tube", t.Name, dur(d)) + if err != nil { + return err + } + _, err = t.Conn.readResp(r, false, "PAUSED") + if err != nil { + return err + } + return nil +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/tubeset.go b/vendor/github.com/beanstalkd/go-beanstalk/tubeset.go new file mode 100644 index 00000000000..0b431e011c4 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/tubeset.go @@ -0,0 +1,39 @@ +package beanstalk + +import ( + "time" +) + +// TubeSet represents a set of tubes on the server connected to by Conn. +// Name names the tubes represented. +type TubeSet struct { + Conn *Conn + Name map[string]bool +} + +// NewTubeSet returns a new TubeSet representing the given names. +func NewTubeSet(c *Conn, name ...string) *TubeSet { + ts := &TubeSet{c, make(map[string]bool)} + for _, s := range name { + ts.Name[s] = true + } + return ts +} + +// Reserve reserves and returns a job from one of the tubes in t. If no +// job is available before time timeout has passed, Reserve returns a +// ConnError recording ErrTimeout. +// +// Typically, a client will reserve a job, perform some work, then delete +// the job with Conn.Delete. +func (t *TubeSet) Reserve(timeout time.Duration) (id uint64, body []byte, err error) { + r, err := t.Conn.cmd(nil, t, nil, "reserve-with-timeout", dur(timeout)) + if err != nil { + return 0, nil, err + } + body, err = t.Conn.readResp(r, true, "RESERVED %d", &id) + if err != nil { + return 0, nil, err + } + return id, body, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index af45eca4758..07f8e3baa61 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -456,6 +456,9 @@ github.com/aws/smithy-go/time github.com/aws/smithy-go/transport/http github.com/aws/smithy-go/transport/http/internal/io github.com/aws/smithy-go/waiter +# github.com/beanstalkd/go-beanstalk v0.2.0 +## explicit; go 1.14 +github.com/beanstalkd/go-beanstalk # github.com/beorn7/perks v1.0.1 ## explicit; go 1.11 github.com/beorn7/perks/quantile