diff --git a/cmd/executor/main.go b/cmd/executor/main.go index 6787c59cec2..ed8444fbdb4 100644 --- a/cmd/executor/main.go +++ b/cmd/executor/main.go @@ -1,12 +1,14 @@ package main import ( + "context" "net/http" "os" "os/signal" "syscall" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" "github.com/spf13/pflag" "github.com/spf13/viper" @@ -47,11 +49,13 @@ func main() { shutdownChannel := make(chan os.Signal, 1) signal.Notify(shutdownChannel, syscall.SIGINT, syscall.SIGTERM) - shutdownMetricServer := common.ServeMetricsFor(config.Metric.Port, - prometheus.Gatherers{metrics.GetMetricsGatherer()}) + shutdownMetricServer := common.ServeMetricsFor( + config.Metric.Port, + prometheus.Gatherers{metrics.GetMetricsGatherer()}, + ) defer shutdownMetricServer() - shutdown, wg := executor.StartUp(config) + shutdown, wg := executor.StartUp(context.Background(), logrus.NewEntry(logrus.New()), config) go func() { <-shutdownChannel shutdown() diff --git a/config/executor/config.yaml b/config/executor/config.yaml index e968a6b50a6..1fc161d9a33 100644 --- a/config/executor/config.yaml +++ b/config/executor/config.yaml @@ -36,10 +36,6 @@ kubernetes: impersonateUsers: false trackedNodeLabels: - kubernetes.io/hostname - etcd: - fractionOfStorageInUseSoftLimit: 0.8 - fractionOfStorageInUseHardLimit: 0.9 - minimumAvailable: 2 QPS: 10000 Burst: 10000 nodeIdLabel: kubernetes.io/hostname diff --git a/internal/common/etcdhealth/etcdhealth.go b/internal/common/etcdhealth/etcdhealth.go new file mode 100644 index 00000000000..ace8fc1dfaa --- /dev/null +++ b/internal/common/etcdhealth/etcdhealth.go @@ -0,0 +1,327 @@ +package etcdhealth + +import ( + "context" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + + "github.com/armadaproject/armada/internal/common/healthmonitor" + "github.com/armadaproject/armada/internal/common/logging" + "github.com/armadaproject/armada/internal/common/metrics" +) + +const ( + etcdSizeInUseBytesMetricName string = "etcd_mvcc_db_total_size_in_use_in_bytes" + etcdSizeBytesMetricName string = "etcd_mvcc_db_total_size_in_bytes" + etcdCapacityBytesMetricName string = "etcd_server_quota_backend_bytes" + etcdMemberUrl string = "url" + + EtcdReplicaSizeInUseExceededReason string = "etcdReplicaSizeInUseExceeded" + EtcdReplicaSizeExceededReason string = "etcdReplicaSizeExceeded" +) + +// EtcdReplicaHealthMonitor is a health monitor for monitoring the health of an individual etcd replica. +type EtcdReplicaHealthMonitor struct { + // Name of the replica being scraped, e.g., its url. + // Included in exported Prometheus metrics. + name string + // Exported Prometheus metrics are prefixed with this. + metricsPrefix string + + // The cluster is considered unhealthy when for any replica in the cluster: + // etcd_mvcc_db_total_size_in_use_in_bytes / etcd_server_quota_backend_bytes > FractionOfStorageInUseLimit. + fractionOfStorageInUseLimit float64 + // The cluster is considered unhealthy when for any replica in the cluster: + // etcd_mvcc_db_total_size_in_bytes / etcd_server_quota_backend_bytes > FractionOfStorageLimit. + fractionOfStorageLimit float64 + // A replica is considered unavailable if the executor has failed to collect metrics from it for this amount of time. + replicaTimeout time.Duration + // Interval with which to scrape metrics. + scrapeInterval time.Duration + + // Time at which metrics collection was most recently attempted. + timeOfMostRecentCollectionAttempt time.Time + // Time at which metrics were most recently collected successfully. + timeOfMostRecentSuccessfulCollectionAttempt time.Time + + // Relevant metrics scraped from etcd. + etcdSizeInUseBytes float64 + etcdSizeBytes float64 + etcdCapacityBytes float64 + + // Prometheus metrics. + healthPrometheusDesc *prometheus.Desc + timeOfMostRecentCollectionAttemptPrometheusDesc *prometheus.Desc + timeOfMostRecentSuccessfulCollectionAttemptPrometheusDesc *prometheus.Desc + sizeInUseFractionPrometheusDesc *prometheus.Desc + sizeFractionPrometheusDesc *prometheus.Desc + + metricsCollectionDelayBucketsStart float64 + metricsCollectionDelayBucketsFactor float64 + metricsCollectionDelayBucketsCount int + metricsCollectionDelayHistogram prometheus.Histogram + + // Providing etcd metrics used for the health check. + metricsProvider metrics.MetricsProvider + + // Used to block until the next metrics collection. + watchers []chan struct{} + + // Mutex protecting the above fields. + mu sync.Mutex +} + +func NewEtcdReplicaHealthMonitor( + name string, + fractionOfStorageInUseLimit float64, + fractionOfStorageLimit float64, + replicaTimeout time.Duration, + scrapeInterval time.Duration, + metricsCollectionDelayBucketsStart float64, + metricsCollectionDelayBucketsFactor float64, + metricsCollectionDelayBucketsCount int, + metricsProvider metrics.MetricsProvider, +) *EtcdReplicaHealthMonitor { + return &EtcdReplicaHealthMonitor{ + name: name, + fractionOfStorageInUseLimit: fractionOfStorageInUseLimit, + fractionOfStorageLimit: fractionOfStorageLimit, + replicaTimeout: replicaTimeout, + scrapeInterval: scrapeInterval, + metricsCollectionDelayBucketsStart: metricsCollectionDelayBucketsStart, + metricsCollectionDelayBucketsFactor: metricsCollectionDelayBucketsFactor, + metricsCollectionDelayBucketsCount: metricsCollectionDelayBucketsCount, + metricsProvider: metricsProvider, + } +} + +func (srv *EtcdReplicaHealthMonitor) WithMetricsPrefix(v string) *EtcdReplicaHealthMonitor { + srv.metricsPrefix = v + return srv +} + +func (srv *EtcdReplicaHealthMonitor) IsHealthy() (bool, string, error) { + srv.mu.Lock() + defer srv.mu.Unlock() + if srv.hasTimedOut() { + return false, healthmonitor.UnavailableReason, nil + } + ok, reason := srv.isHealthy() + return ok, reason, nil +} + +func (srv *EtcdReplicaHealthMonitor) hasTimedOut() bool { + return time.Since(srv.timeOfMostRecentSuccessfulCollectionAttempt) > srv.replicaTimeout +} + +func (srv *EtcdReplicaHealthMonitor) isHealthy() (bool, string) { + if srv.sizeInUseFraction() > srv.fractionOfStorageInUseLimit { + return false, EtcdReplicaSizeInUseExceededReason + } + if srv.sizeFraction() > srv.fractionOfStorageLimit { + return false, EtcdReplicaSizeExceededReason + } + return true, "" +} + +func (srv *EtcdReplicaHealthMonitor) sizeInUseFraction() float64 { + return srv.etcdSizeInUseBytes / srv.etcdCapacityBytes +} + +func (srv *EtcdReplicaHealthMonitor) sizeFraction() float64 { + return srv.etcdSizeBytes / srv.etcdCapacityBytes +} + +func (srv *EtcdReplicaHealthMonitor) Run(ctx context.Context, log *logrus.Entry) error { + srv.initialise() + log = log.WithField("service", "EtcdHealthMonitor") + log.Info("starting etcd health monitor") + defer log.Info("stopping etcd health monitor") + ticker := time.NewTicker(srv.scrapeInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + t := time.Now() + metrics, err := srv.metricsProvider.Collect(ctx, log) + srv.mu.Lock() + srv.timeOfMostRecentCollectionAttempt = time.Now() + if err != nil { + logging.WithStacktrace(log, err).Errorf("failed to scrape etcd metrics from %s", srv.name) + } else { + success := true + if err := srv.setSizeInUseBytesFromMetrics(metrics); err != nil { + success = false + logging.WithStacktrace(log, err).Errorf("failed to scrape etcd metrics from %s", srv.name) + } + if err := srv.setSizeBytesFromMetrics(metrics); err != nil { + success = false + logging.WithStacktrace(log, err).Errorf("failed to scrape etcd metrics from %s", srv.name) + } + if err := srv.setCapacityBytesFromMetrics(metrics); err != nil { + success = false + logging.WithStacktrace(log, err).Errorf("failed to scrape etcd metrics from %s", srv.name) + } + if success { + srv.timeOfMostRecentSuccessfulCollectionAttempt = srv.timeOfMostRecentCollectionAttempt + srv.metricsCollectionDelayHistogram.Observe(floatingPointSecondsFromDuration(time.Since(t))) + } + } + + // Unblock any threads waiting for collection to finish. + for _, c := range srv.watchers { + close(c) + } + srv.watchers = nil + srv.mu.Unlock() + } + } +} + +func floatingPointSecondsFromDuration(d time.Duration) float64 { + return float64(d) / 1e9 +} + +func (srv *EtcdReplicaHealthMonitor) initialise() { + srv.healthPrometheusDesc = prometheus.NewDesc( + srv.metricsPrefix+"etcd_replica_health", + "Shows the health of an etcd replica", + []string{etcdMemberUrl}, + nil, + ) + srv.timeOfMostRecentCollectionAttemptPrometheusDesc = prometheus.NewDesc( + srv.metricsPrefix+"etcd_replica_time_of_most_recent_metrics_collection_attempt", + "Time of most recent metrics collection attempt.", + []string{etcdMemberUrl}, + nil, + ) + srv.timeOfMostRecentSuccessfulCollectionAttemptPrometheusDesc = prometheus.NewDesc( + srv.metricsPrefix+"etcd_replica_time_of_most_recent_successful_metrics_collection", + "Time of most recent successful metrics collection.", + []string{etcdMemberUrl}, + nil, + ) + srv.sizeInUseFractionPrometheusDesc = prometheus.NewDesc( + srv.metricsPrefix+"etcd_replica_size_in_use_fraction", + "etcd_mvcc_db_total_size_in_use_in_bytes / etcd_server_quota_backend_bytes.", + []string{etcdMemberUrl}, + nil, + ) + srv.sizeFractionPrometheusDesc = prometheus.NewDesc( + srv.metricsPrefix+"etcd_replica_size_fraction", + "etcd_mvcc_db_total_size_in_bytes / etcd_server_quota_backend_bytes.", + []string{etcdMemberUrl}, + nil, + ) + srv.metricsCollectionDelayHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: srv.metricsPrefix + "etcd_replica_metrics_collection_delay_seconds", + Help: "Delay in seconds of collecting metrics from this etcd replica.", + Buckets: prometheus.ExponentialBuckets( + srv.metricsCollectionDelayBucketsStart, + srv.metricsCollectionDelayBucketsFactor, + srv.metricsCollectionDelayBucketsCount, + ), + }) +} + +func (srv *EtcdReplicaHealthMonitor) setSizeInUseBytesFromMetrics(metrics map[string]float64) error { + f, ok := metrics[etcdSizeInUseBytesMetricName] + if !ok { + return errors.Errorf("metric unavailable: %s", etcdSizeInUseBytesMetricName) + } + srv.etcdSizeInUseBytes = f + return nil +} + +func (srv *EtcdReplicaHealthMonitor) setSizeBytesFromMetrics(metrics map[string]float64) error { + f, ok := metrics[etcdSizeBytesMetricName] + if !ok { + return errors.Errorf("metric unavailable: %s", etcdSizeBytesMetricName) + } + srv.etcdSizeBytes = f + return nil +} + +func (srv *EtcdReplicaHealthMonitor) setCapacityBytesFromMetrics(metrics map[string]float64) error { + f, ok := metrics[etcdCapacityBytesMetricName] + if !ok { + return errors.Errorf("metric unavailable: %s", etcdCapacityBytesMetricName) + } + srv.etcdCapacityBytes = f + return nil +} + +// BlockUntilNextMetricsCollection blocks until the next metrics collection has completed, +// or until ctx is cancelled, whichever occurs first. +func (srv *EtcdReplicaHealthMonitor) BlockUntilNextMetricsCollection(ctx context.Context) { + c := make(chan struct{}) + srv.mu.Lock() + srv.watchers = append(srv.watchers, c) + srv.mu.Unlock() + select { + case <-ctx.Done(): + return + case <-c: + return + } +} + +func (srv *EtcdReplicaHealthMonitor) Describe(c chan<- *prometheus.Desc) { + c <- srv.healthPrometheusDesc + c <- srv.timeOfMostRecentCollectionAttemptPrometheusDesc + c <- srv.timeOfMostRecentSuccessfulCollectionAttemptPrometheusDesc + c <- srv.sizeInUseFractionPrometheusDesc + c <- srv.sizeFractionPrometheusDesc + srv.metricsCollectionDelayHistogram.Describe(c) +} + +func (srv *EtcdReplicaHealthMonitor) Collect(c chan<- prometheus.Metric) { + srv.mu.Lock() + resultOfMostRecentHealthCheck := 0.0 + if ok, _ := srv.isHealthy(); ok { + resultOfMostRecentHealthCheck = 1.0 + } + timeOfMostRecentCollectionAttempt := srv.timeOfMostRecentCollectionAttempt + timeOfMostRecentSuccessfulCollectionAttempt := srv.timeOfMostRecentSuccessfulCollectionAttempt + sizeInUseFraction := srv.sizeInUseFraction() + sizeFraction := srv.sizeFraction() + srv.mu.Unlock() + + c <- prometheus.MustNewConstMetric( + srv.healthPrometheusDesc, + prometheus.GaugeValue, + resultOfMostRecentHealthCheck, + srv.name, + ) + c <- prometheus.MustNewConstMetric( + srv.timeOfMostRecentCollectionAttemptPrometheusDesc, + prometheus.CounterValue, + float64(timeOfMostRecentCollectionAttempt.Unix()), + srv.name, + ) + c <- prometheus.MustNewConstMetric( + srv.timeOfMostRecentSuccessfulCollectionAttemptPrometheusDesc, + prometheus.CounterValue, + float64(timeOfMostRecentSuccessfulCollectionAttempt.Unix()), + srv.name, + ) + c <- prometheus.MustNewConstMetric( + srv.sizeInUseFractionPrometheusDesc, + prometheus.GaugeValue, + sizeInUseFraction, + srv.name, + ) + c <- prometheus.MustNewConstMetric( + srv.sizeFractionPrometheusDesc, + prometheus.GaugeValue, + sizeFraction, + srv.name, + ) + srv.metricsCollectionDelayHistogram.Collect(c) +} diff --git a/internal/common/etcdhealth/etcdhealth_test.go b/internal/common/etcdhealth/etcdhealth_test.go new file mode 100644 index 00000000000..22435861a61 --- /dev/null +++ b/internal/common/etcdhealth/etcdhealth_test.go @@ -0,0 +1,74 @@ +package etcdhealth + +import ( + "context" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" + + "github.com/armadaproject/armada/internal/common/healthmonitor" + "github.com/armadaproject/armada/internal/common/metrics" +) + +func TestEtcdReplicaHealthMonitor(t *testing.T) { + mp := &metrics.ManualMetricsProvider{} + hm := NewEtcdReplicaHealthMonitor("foo", 0.2, 0.3, time.Second, time.Microsecond, 1e-3, 1.1, 10, mp) + + // Initial call results in unavailable. + ok, reason, err := hm.IsHealthy() + assert.False(t, ok) + assert.Equal(t, healthmonitor.UnavailableReason, reason) + assert.NoError(t, err) + + // Start the metrics collection service. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { return hm.Run(ctx, logrus.NewEntry(logrus.New())) }) + + // Should still be unavailable due to missing metrics. + hm.BlockUntilNextMetricsCollection(ctx) + ok, reason, err = hm.IsHealthy() + assert.False(t, ok) + assert.Equal(t, healthmonitor.UnavailableReason, reason) + assert.NoError(t, err) + + // Metrics indicate healthy. + mp.WithMetrics(map[string]float64{ + etcdSizeInUseBytesMetricName: 2, + etcdSizeBytesMetricName: 3, + etcdCapacityBytesMetricName: 10, + }) + hm.BlockUntilNextMetricsCollection(ctx) + ok, reason, err = hm.IsHealthy() + assert.True(t, ok) + assert.Empty(t, reason) + assert.NoError(t, err) + + // Size in use metric indicates unhealthy. + mp.WithMetrics(map[string]float64{ + etcdSizeInUseBytesMetricName: 2.1, + etcdSizeBytesMetricName: 3, + etcdCapacityBytesMetricName: 10, + }) + hm.BlockUntilNextMetricsCollection(ctx) + ok, reason, err = hm.IsHealthy() + assert.False(t, ok) + assert.Equal(t, EtcdReplicaSizeInUseExceededReason, reason) + assert.NoError(t, err) + + // Size metric indicates unhealthy. + mp.WithMetrics(map[string]float64{ + etcdSizeInUseBytesMetricName: 2, + etcdSizeBytesMetricName: 3.1, + etcdCapacityBytesMetricName: 10, + }) + hm.BlockUntilNextMetricsCollection(ctx) + ok, reason, err = hm.IsHealthy() + assert.False(t, ok) + assert.Equal(t, EtcdReplicaSizeExceededReason, reason) + assert.NoError(t, err) +} diff --git a/internal/common/healthmonitor/healthmonitor.go b/internal/common/healthmonitor/healthmonitor.go new file mode 100644 index 00000000000..aa196aaffda --- /dev/null +++ b/internal/common/healthmonitor/healthmonitor.go @@ -0,0 +1,29 @@ +package healthmonitor + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +const ( + // Health check failure reason indicating the monitored component is unavailable (e.g., due to time-out). + UnavailableReason string = "unavailable" + // Health check failure reason indicating too many replicas have timed out. + InsufficientReplicasAvailableReason string = "insufficientReplicasAvailable" + // Health check failure reason indicating a component is manually disabled. + ManuallyDisabledReason string = "manuallyDisabled" +) + +// HealthMonitor represents a health checker. +type HealthMonitor interface { + prometheus.Collector + // IsHealthy performs a health check, + // returning the result, a reason (should be empty if successful), and possibly an error. + IsHealthy() (ok bool, reason string, err error) + // Run initialises and starts the health checker. + // Run may be blocking and should be run within a separate goroutine. + // Must be called before IsHealthy() or any prometheus.Collector interface methods. + Run(context.Context, *logrus.Entry) error +} diff --git a/internal/common/healthmonitor/manualhealthmonitor.go b/internal/common/healthmonitor/manualhealthmonitor.go new file mode 100644 index 00000000000..1bc8a6d5b62 --- /dev/null +++ b/internal/common/healthmonitor/manualhealthmonitor.go @@ -0,0 +1,57 @@ +package healthmonitor + +import ( + "context" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +// ManualHealthMonitor is a manually controlled health monitor. +type ManualHealthMonitor struct { + isHealthy bool + reason string + mu sync.Mutex +} + +func NewManualHealthMonitor() *ManualHealthMonitor { + return &ManualHealthMonitor{ + reason: ManuallyDisabledReason, + } +} + +func (srv *ManualHealthMonitor) SetHealthStatus(isHealthy bool) bool { + srv.mu.Lock() + defer srv.mu.Unlock() + previous := srv.isHealthy + srv.isHealthy = isHealthy + return previous +} + +func (srv *ManualHealthMonitor) WithReason(reason string) *ManualHealthMonitor { + srv.mu.Lock() + defer srv.mu.Unlock() + srv.reason = reason + return srv +} + +func (srv *ManualHealthMonitor) IsHealthy() (bool, string, error) { + srv.mu.Lock() + defer srv.mu.Unlock() + if srv.isHealthy { + return true, "", nil + } else { + return false, srv.reason, nil + } +} + +func (srv *ManualHealthMonitor) Run(ctx context.Context, log *logrus.Entry) error { + return nil +} + +func (srv *ManualHealthMonitor) Describe(c chan<- *prometheus.Desc) { +} + +func (srv *ManualHealthMonitor) Collect(c chan<- prometheus.Metric) { +} diff --git a/internal/common/healthmonitor/manualhealthmonitor_test.go b/internal/common/healthmonitor/manualhealthmonitor_test.go new file mode 100644 index 00000000000..ff6960dd6ec --- /dev/null +++ b/internal/common/healthmonitor/manualhealthmonitor_test.go @@ -0,0 +1,35 @@ +package healthmonitor + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestManualHealthMonitor(t *testing.T) { + hm := NewManualHealthMonitor() + + ok, reason, err := hm.IsHealthy() + require.False(t, ok) + require.NotEmpty(t, reason) + require.NoError(t, err) + + hm.SetHealthStatus(true) + ok, reason, err = hm.IsHealthy() + require.True(t, ok) + require.Empty(t, reason) + require.NoError(t, err) + + hm.SetHealthStatus(false) + ok, reason, err = hm.IsHealthy() + require.False(t, ok) + require.NotEmpty(t, reason) + require.NoError(t, err) + + hm.SetHealthStatus(false) + hm = hm.WithReason("foo") + ok, reason, err = hm.IsHealthy() + require.False(t, ok) + require.Equal(t, "foo", reason) + require.NoError(t, err) +} diff --git a/internal/common/healthmonitor/multihealthmonitor.go b/internal/common/healthmonitor/multihealthmonitor.go new file mode 100644 index 00000000000..6a9d594e9eb --- /dev/null +++ b/internal/common/healthmonitor/multihealthmonitor.go @@ -0,0 +1,130 @@ +package healthmonitor + +import ( + "context" + "fmt" + "sync" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" +) + +// MultiHealthMonitor wraps multiple HealthMonitors and itself implements the HealthMonitor interface. +type MultiHealthMonitor struct { + name string + // Map from name to the health monitor for each cluster. + healthMonitorsByName map[string]HealthMonitor + + // Minimum replicas that must be available. + minimumReplicasAvailable int + // Prometheus metrics are prefixed with this. + metricsPrefix string + + // Result of the most recent health check. + resultOfMostRecentHealthCheck bool + // Mutex protecting the above values. + mu sync.Mutex + + healthPrometheusDesc *prometheus.Desc +} + +func NewMultiHealthMonitor(name string, healthMonitorsByName map[string]HealthMonitor) *MultiHealthMonitor { + return &MultiHealthMonitor{ + name: name, + minimumReplicasAvailable: len(healthMonitorsByName), + healthMonitorsByName: maps.Clone(healthMonitorsByName), + } +} + +func (srv *MultiHealthMonitor) WithMinimumReplicasAvailable(v int) *MultiHealthMonitor { + srv.minimumReplicasAvailable = v + return srv +} + +func (srv *MultiHealthMonitor) WithMetricsPrefix(v string) *MultiHealthMonitor { + srv.metricsPrefix = v + return srv +} + +// IsHealthy returns false if either +// 1. any child is unhealthy for a reason other than timeout or +// 2. at least len(healthMonitorsByName) - minReplicasAvailable + 1 children have timed out. +func (srv *MultiHealthMonitor) IsHealthy() (ok bool, reason string, err error) { + defer func() { + srv.mu.Lock() + defer srv.mu.Unlock() + srv.resultOfMostRecentHealthCheck = ok + }() + replicasAvailable := 0 + for name, healthMonitor := range srv.healthMonitorsByName { + ok, reason, err = healthMonitor.IsHealthy() + if err != nil { + return ok, reason, errors.WithMessagef(err, "failed to check health of %s in %s", name, srv.name) + } + + // Cluster is unhealthy if any child is unhealthy for a reason other than timeout + // or if too many have timed out. + if !ok { + if reason != UnavailableReason { + return ok, reason, nil + } + } else { + replicasAvailable++ + } + } + if replicasAvailable < srv.minimumReplicasAvailable { + return false, InsufficientReplicasAvailableReason, nil + } + return true, "", nil +} + +// Run initialises prometheus metrics and starts any child health checkers. +func (srv *MultiHealthMonitor) Run(ctx context.Context, log *logrus.Entry) error { + metricsPrefix := srv.name + if srv.metricsPrefix != "" { + metricsPrefix = srv.metricsPrefix + srv.name + } + srv.healthPrometheusDesc = prometheus.NewDesc( + metricsPrefix+"_health", + fmt.Sprintf("Shows whether %s is healthy.", srv.name), + []string{srv.name}, + nil, + ) + + g, ctx := errgroup.WithContext(ctx) + for _, healthMonitor := range srv.healthMonitorsByName { + healthMonitor := healthMonitor + g.Go(func() error { return healthMonitor.Run(ctx, log) }) + } + return g.Wait() +} + +func (srv *MultiHealthMonitor) Describe(c chan<- *prometheus.Desc) { + c <- srv.healthPrometheusDesc + + for _, healthMonitor := range srv.healthMonitorsByName { + healthMonitor.Describe(c) + } +} + +func (srv *MultiHealthMonitor) Collect(c chan<- prometheus.Metric) { + srv.mu.Lock() + resultOfMostRecentHealthCheck := 0.0 + if srv.resultOfMostRecentHealthCheck { + resultOfMostRecentHealthCheck = 1.0 + } + srv.mu.Unlock() + c <- prometheus.MustNewConstMetric( + srv.healthPrometheusDesc, + prometheus.GaugeValue, + resultOfMostRecentHealthCheck, + srv.name, + ) + + for _, healthMonitor := range srv.healthMonitorsByName { + healthMonitor.Collect(c) + } +} diff --git a/internal/common/healthmonitor/multihealthmonitor_test.go b/internal/common/healthmonitor/multihealthmonitor_test.go new file mode 100644 index 00000000000..ff101292f30 --- /dev/null +++ b/internal/common/healthmonitor/multihealthmonitor_test.go @@ -0,0 +1,81 @@ +package healthmonitor + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMultiHealthMonitor_OneChild(t *testing.T) { + hm := NewManualHealthMonitor() + mhm := NewMultiHealthMonitor("mhm", map[string]HealthMonitor{"hm": hm}) + ok, reason, err := mhm.IsHealthy() + require.False(t, ok) + require.NotEmpty(t, reason) + require.NoError(t, err) + + hm.SetHealthStatus(true) + ok, reason, err = mhm.IsHealthy() + require.True(t, ok) + require.Empty(t, reason) + require.NoError(t, err) +} + +func TestMultiHealthMonitor_TwoChildren(t *testing.T) { + hm1 := NewManualHealthMonitor() + hm2 := NewManualHealthMonitor() + mhm := NewMultiHealthMonitor("mhm", map[string]HealthMonitor{"hm1": hm1, "hm2": hm2}) + + ok, reason, err := mhm.IsHealthy() + require.False(t, ok) + require.NotEmpty(t, reason) + require.NoError(t, err) + + hm1.SetHealthStatus(true) + ok, reason, err = mhm.IsHealthy() + require.False(t, ok) + require.NotEmpty(t, reason) + require.NoError(t, err) + + hm1.SetHealthStatus(false) + hm2.SetHealthStatus(true) + ok, reason, err = mhm.IsHealthy() + require.False(t, ok) + require.NotEmpty(t, reason) + require.NoError(t, err) + + hm1.SetHealthStatus(true) + ok, reason, err = mhm.IsHealthy() + require.True(t, ok) + require.Empty(t, reason) + require.NoError(t, err) +} + +func TestMultiHealthMonitor_TwoChildrenTimeouts(t *testing.T) { + hm1 := NewManualHealthMonitor() + hm1 = hm1.WithReason(UnavailableReason) + hm2 := NewManualHealthMonitor() + mhm := NewMultiHealthMonitor("mhm", map[string]HealthMonitor{"hm1": hm1, "hm2": hm2}) + mhm = mhm.WithMinimumReplicasAvailable(1) + + hm1.SetHealthStatus(false) + hm2.SetHealthStatus(true) + ok, reason, err := mhm.IsHealthy() + require.True(t, ok) + require.Empty(t, reason) + require.NoError(t, err) + + hm1.SetHealthStatus(true) + hm2.SetHealthStatus(false) + ok, reason, err = mhm.IsHealthy() + require.False(t, ok) + require.NotEmpty(t, reason) + require.NoError(t, err) + + hm1.SetHealthStatus(false) + hm2.SetHealthStatus(false) + ok, reason, err = mhm.IsHealthy() + require.False(t, ok) + require.NotEmpty(t, reason) + require.NoError(t, err) +} diff --git a/internal/common/metrics/provider.go b/internal/common/metrics/provider.go new file mode 100644 index 00000000000..7280f461fa0 --- /dev/null +++ b/internal/common/metrics/provider.go @@ -0,0 +1,96 @@ +package metrics + +import ( + "bufio" + "context" + "net/http" + "strconv" + "strings" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +type MetricsProvider interface { + Collect(context.Context, *logrus.Entry) (map[string]float64, error) +} + +type ManualMetricsProvider struct { + metrics map[string]float64 + collectionDelay time.Duration + mu sync.Mutex +} + +func (srv *ManualMetricsProvider) WithMetrics(metrics map[string]float64) *ManualMetricsProvider { + srv.mu.Lock() + defer srv.mu.Unlock() + srv.metrics = metrics + return srv +} + +func (srv *ManualMetricsProvider) WithCollectionDelay(d time.Duration) *ManualMetricsProvider { + srv.mu.Lock() + defer srv.mu.Unlock() + srv.collectionDelay = d + return srv +} + +func (srv *ManualMetricsProvider) Collect(_ context.Context, _ *logrus.Entry) (map[string]float64, error) { + srv.mu.Lock() + defer srv.mu.Unlock() + if srv.collectionDelay != 0 { + time.Sleep(srv.collectionDelay) + } + return srv.metrics, nil +} + +// HttpMetricsProvider is a metrics provider scraping metrics from a url. +type HttpMetricsProvider struct { + url string + client *http.Client +} + +func NewHttpMetricsProvider(url string, client *http.Client) *HttpMetricsProvider { + if client == nil { + client = http.DefaultClient + } + return &HttpMetricsProvider{ + url: url, + client: client, + } +} + +func (srv *HttpMetricsProvider) Collect(ctx context.Context, _ *logrus.Entry) (map[string]float64, error) { + req, err := http.NewRequestWithContext(ctx, "GET", srv.url, nil) + if err != nil { + return nil, err + } + resp, err := srv.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + metrics := make(map[string]float64) + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if i := strings.Index(line, "#"); i >= 0 && i < len(line) { + line = line[:i] + } + if len(line) == 0 { + continue + } + keyVal := strings.Split(line, " ") + if len(keyVal) != 2 { + continue + } + key := keyVal[0] + val, err := strconv.ParseFloat(keyVal[1], 64) + if err != nil { + continue + } + metrics[key] = val + } + return metrics, nil +} diff --git a/internal/common/startup.go b/internal/common/startup.go index 276109e8a0f..e14fa5a21a7 100644 --- a/internal/common/startup.go +++ b/internal/common/startup.go @@ -143,6 +143,7 @@ func ServeMetricsFor(port uint16, gatherer prometheus.Gatherer) (shutdown func() } // ServeHttp starts an HTTP server listening on the given port. +// TODO: Make block until a context passed in is cancelled. func ServeHttp(port uint16, mux http.Handler) (shutdown func()) { srv := &http.Server{ Addr: fmt.Sprintf(":%d", port), diff --git a/internal/executor/application.go b/internal/executor/application.go index 26a5c8115e9..25d8f9deacb 100644 --- a/internal/executor/application.go +++ b/internal/executor/application.go @@ -1,6 +1,7 @@ package executor import ( + "context" "fmt" "net/http" "os" @@ -8,18 +9,23 @@ import ( "sync" "time" + "github.com/caarlos0/log" + "github.com/go-playground/validator/v10" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "github.com/armadaproject/armada/internal/common/cluster" + "github.com/armadaproject/armada/internal/common/etcdhealth" + "github.com/armadaproject/armada/internal/common/healthmonitor" + common_metrics "github.com/armadaproject/armada/internal/common/metrics" "github.com/armadaproject/armada/internal/common/task" "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/executor/configuration" executor_context "github.com/armadaproject/armada/internal/executor/context" - "github.com/armadaproject/armada/internal/executor/healthmonitor" "github.com/armadaproject/armada/internal/executor/job" "github.com/armadaproject/armada/internal/executor/job/processors" "github.com/armadaproject/armada/internal/executor/metrics" @@ -35,7 +41,7 @@ import ( "github.com/armadaproject/armada/pkg/executorapi" ) -func StartUp(config configuration.ExecutorConfiguration) (func(), *sync.WaitGroup) { +func StartUp(ctx context.Context, log *logrus.Entry, config configuration.ExecutorConfiguration) (func(), *sync.WaitGroup) { err := validateConfig(config) if err != nil { log.Errorf("Invalid config: %s", err) @@ -52,14 +58,39 @@ func StartUp(config configuration.ExecutorConfiguration) (func(), *sync.WaitGrou os.Exit(-1) } - var etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor - if len(config.Kubernetes.Etcd.MetricUrls) > 0 { - log.Info("etcd URLs provided; monitoring etcd health enabled") - - etcdHealthMonitor, err = healthmonitor.NewEtcdHealthMonitor(config.Kubernetes.Etcd, nil) - if err != nil { - panic(err) + // Create an errgroup to run services in. + g, ctx := errgroup.WithContext(ctx) + + // Setup etcd health monitoring. + etcdClusterHealthMonitoringByName := make(map[string]healthmonitor.HealthMonitor, len(config.Kubernetes.Etcd.EtcdClustersHealthMonitoring)) + for _, etcdClusterHealthMonitoring := range config.Kubernetes.Etcd.EtcdClustersHealthMonitoring { + etcdReplicaHealthMonitorsByUrl := make(map[string]healthmonitor.HealthMonitor, len(etcdClusterHealthMonitoring.MetricUrls)) + for _, metricsUrl := range etcdClusterHealthMonitoring.MetricUrls { + etcdReplicaHealthMonitorsByUrl[metricsUrl] = etcdhealth.NewEtcdReplicaHealthMonitor( + metricsUrl, + etcdClusterHealthMonitoring.FractionOfStorageInUseLimit, + etcdClusterHealthMonitoring.FractionOfStorageLimit, + etcdClusterHealthMonitoring.ReplicaTimeout, + etcdClusterHealthMonitoring.ScrapeInterval, + etcdClusterHealthMonitoring.ScrapeDelayBucketsStart, + etcdClusterHealthMonitoring.ScrapeDelayBucketsFactor, + etcdClusterHealthMonitoring.ScrapeDelayBucketsCount, + common_metrics.NewHttpMetricsProvider(metricsUrl, http.DefaultClient), + ) } + etcdClusterHealthMonitoringByName[etcdClusterHealthMonitoring.Name] = healthmonitor.NewMultiHealthMonitor( + etcdClusterHealthMonitoring.Name, + etcdReplicaHealthMonitorsByUrl, + ).WithMinimumReplicasAvailable(etcdClusterHealthMonitoring.MinimumReplicasAvailable) + } + var etcdClustersHealthMonitoring healthmonitor.HealthMonitor + if len(etcdClusterHealthMonitoringByName) > 0 { + log.Info("etcd URLs provided; monitoring etcd health enabled") + etcdClustersHealthMonitoring = healthmonitor.NewMultiHealthMonitor( + "etcd", + etcdClusterHealthMonitoringByName, + ) + g.Go(func() error { return etcdClustersHealthMonitoring.Run(ctx, log) }) } else { log.Info("no etcd URLs provided; etcd health isn't monitored") } @@ -68,7 +99,6 @@ func StartUp(config configuration.ExecutorConfiguration) (func(), *sync.WaitGrou config.Application, 2*time.Minute, kubernetesClientProvider, - etcdHealthMonitor, config.Kubernetes.PodKillTimeout, ) @@ -78,13 +108,14 @@ func StartUp(config configuration.ExecutorConfiguration) (func(), *sync.WaitGrou taskManager := task.NewBackgroundTaskManager(metrics.ArmadaExecutorMetricsPrefix) taskManager.Register(clusterContext.ProcessPodsToDelete, config.Task.PodDeletionInterval, "pod_deletion") - return StartUpWithContext(config, clusterContext, etcdHealthMonitor, taskManager, wg) + return StartUpWithContext(log, config, clusterContext, etcdClustersHealthMonitoring, taskManager, wg) } func StartUpWithContext( + log *logrus.Entry, config configuration.ExecutorConfiguration, clusterContext executor_context.ClusterContext, - etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor, + clusterHealthMonitor healthmonitor.HealthMonitor, taskManager *task.BackgroundTaskManager, wg *sync.WaitGroup, ) (func(), *sync.WaitGroup) { @@ -106,8 +137,8 @@ func StartUpWithContext( os.Exit(-1) } - stopServerApiComponents := setupServerApiComponents(config, clusterContext, etcdHealthMonitor, taskManager, pendingPodChecker, nodeInfoService, podUtilisationService) - stopExecutorApiComponents := setupExecutorApiComponents(config, clusterContext, etcdHealthMonitor, taskManager, pendingPodChecker, nodeInfoService, podUtilisationService) + stopServerApiComponents := setupServerApiComponents(config, clusterContext, clusterHealthMonitor, taskManager, pendingPodChecker, nodeInfoService, podUtilisationService) + stopExecutorApiComponents := setupExecutorApiComponents(config, clusterContext, clusterHealthMonitor, taskManager, pendingPodChecker, nodeInfoService, podUtilisationService) resourceCleanupService := service.NewResourceCleanupService(clusterContext, config.Kubernetes) taskManager.Register(resourceCleanupService.CleanupResources, config.Task.ResourceCleanupInterval, "resource_cleanup") @@ -131,7 +162,7 @@ func StartUpWithContext( func setupExecutorApiComponents( config configuration.ExecutorConfiguration, clusterContext executor_context.ClusterContext, - etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor, + clusterHealthMonitor healthmonitor.HealthMonitor, taskManager *task.BackgroundTaskManager, pendingPodChecker *podchecks.PodChecks, nodeInfoService node.NodeInfoService, @@ -184,20 +215,23 @@ func setupExecutorApiComponents( leaseRequester, jobRunState, clusterUtilisationService, - config.Kubernetes.PodDefaults) + config.Kubernetes.PodDefaults, + ) clusterAllocationService := service.NewClusterAllocationService( clusterContext, eventReporter, jobRunState, submitter, - etcdHealthMonitor) + clusterHealthMonitor, + ) podIssueService := service.NewIssueHandler( jobRunState, clusterContext, eventReporter, config.Kubernetes.StateChecks, pendingPodChecker, - config.Kubernetes.StuckTerminatingPodExpiry) + config.Kubernetes.StuckTerminatingPodExpiry, + ) taskManager.Register(podIssueService.HandlePodIssues, config.Task.PodIssueHandlingInterval, "pod_issue_handling") taskManager.Register(preemptRunProcessor.Run, config.Task.StateProcessorInterval, "preempt_runs") @@ -207,7 +241,7 @@ func setupExecutorApiComponents( taskManager.Register(eventReporter.ReportMissingJobEvents, config.Task.MissingJobEventReconciliationInterval, "event_reconciliation") pod_metrics.ExposeClusterContextMetrics(clusterContext, clusterUtilisationService, podUtilisationService, nodeInfoService) runStateMetricsCollector := runstate.NewJobRunStateStoreMetricsCollector(jobRunState) - prometheus.MustRegister(runStateMetricsCollector) + prometheus.MustRegister(runStateMetricsCollector, clusterHealthMonitor) if config.Metric.ExposeQueueUsageMetrics && config.Task.UtilisationEventReportingInterval > 0 { podUtilisationReporter := utilisation.NewUtilisationEventReporter( @@ -232,7 +266,7 @@ func setupExecutorApiComponents( func setupServerApiComponents( config configuration.ExecutorConfiguration, clusterContext executor_context.ClusterContext, - etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor, + clusterHealthMonitor healthmonitor.HealthMonitor, taskManager *task.BackgroundTaskManager, pendingPodChecker *podchecks.PodChecks, nodeInfoService node.NodeInfoService, @@ -295,7 +329,7 @@ func setupServerApiComponents( jobLeaseService, clusterUtilisationService, submitter, - etcdHealthMonitor, + clusterHealthMonitor, ) jobManager := service.NewJobManager( @@ -345,6 +379,10 @@ func createConnectionToApi(connectionDetails client.ApiConnectionDetails, maxMes } func validateConfig(config configuration.ExecutorConfiguration) error { + validator := validator.New() + if err := validator.Struct(config); err != nil { + return err + } missing := util.SubtractStringList(config.Kubernetes.AvoidNodeLabelsOnRetry, config.Kubernetes.TrackedNodeLabels) if len(missing) > 0 { return fmt.Errorf("These labels were in avoidNodeLabelsOnRetry but not trackedNodeLabels: %s", strings.Join(missing, ", ")) @@ -358,11 +396,5 @@ func validateConfig(config configuration.ExecutorConfiguration) error { if config.Application.DeleteConcurrencyLimit <= 0 { return fmt.Errorf("DeleteConcurrencyLimit was %d, must be greater or equal to 1", config.Application.DeleteConcurrencyLimit) } - if config.Kubernetes.Etcd.FractionOfStorageInUseSoftLimit <= 0 || config.Kubernetes.Etcd.FractionOfStorageInUseSoftLimit > 1 { - return fmt.Errorf("EtcdFractionOfStorageInUseSoftLimit must be in (0, 1]") - } - if config.Kubernetes.Etcd.FractionOfStorageInUseHardLimit <= 0 || config.Kubernetes.Etcd.FractionOfStorageInUseHardLimit > 1 { - return fmt.Errorf("EtcdFractionOfStorageInUseHardLimit must be in (0, 1]") - } return nil } diff --git a/internal/executor/application_test.go b/internal/executor/application_test.go index 3f126e3df9a..35dae128986 100644 --- a/internal/executor/application_test.go +++ b/internal/executor/application_test.go @@ -74,42 +74,6 @@ func Test_ValidateConfig_When_DeleteThreadCount_LessThanOrEqualToZero(t *testing assert.Error(t, validateConfig(config)) } -func TestValidateConfig_When_FractionStorageInUseSoftLimit_LessThan0(t *testing.T) { - config := createBasicValidExecutorConfiguration() - - config.Kubernetes.Etcd.FractionOfStorageInUseSoftLimit = 0 - assert.Error(t, validateConfig(config)) - config.Kubernetes.Etcd.FractionOfStorageInUseSoftLimit = -1 - assert.Error(t, validateConfig(config)) -} - -func TestValidateConfig_When_FractionStorageInUseSoftLimit_GreaterThan1(t *testing.T) { - config := createBasicValidExecutorConfiguration() - - config.Kubernetes.Etcd.FractionOfStorageInUseSoftLimit = 1 - assert.NoError(t, validateConfig(config)) - config.Kubernetes.Etcd.FractionOfStorageInUseSoftLimit = 1.1 - assert.Error(t, validateConfig(config)) -} - -func TestValidateConfig_When_FractionStorageInUseHardLimit_LessThan0(t *testing.T) { - config := createBasicValidExecutorConfiguration() - - config.Kubernetes.Etcd.FractionOfStorageInUseHardLimit = 0 - assert.Error(t, validateConfig(config)) - config.Kubernetes.Etcd.FractionOfStorageInUseHardLimit = -1 - assert.Error(t, validateConfig(config)) -} - -func TestValidateConfig_When_FractionStorageInUseHardLimit_GreaterThan1(t *testing.T) { - config := createBasicValidExecutorConfiguration() - - config.Kubernetes.Etcd.FractionOfStorageInUseHardLimit = 1 - assert.NoError(t, validateConfig(config)) - config.Kubernetes.Etcd.FractionOfStorageInUseHardLimit = 1.1 - assert.Error(t, validateConfig(config)) -} - func createBasicValidExecutorConfiguration() configuration.ExecutorConfiguration { return configuration.ExecutorConfiguration{ Application: configuration.ApplicationConfiguration{ @@ -117,11 +81,5 @@ func createBasicValidExecutorConfiguration() configuration.ExecutorConfiguration UpdateConcurrencyLimit: 1, DeleteConcurrencyLimit: 1, }, - Kubernetes: configuration.KubernetesConfiguration{ - Etcd: configuration.EtcdConfiguration{ - FractionOfStorageInUseSoftLimit: 0.8, - FractionOfStorageInUseHardLimit: 0.9, - }, - }, } } diff --git a/internal/executor/configuration/types.go b/internal/executor/configuration/types.go index 4798f29710a..45070cd31b9 100644 --- a/internal/executor/configuration/types.go +++ b/internal/executor/configuration/types.go @@ -80,16 +80,37 @@ type KubernetesConfiguration struct { } type EtcdConfiguration struct { - // URLs of the etcd instances storing the cluster state. - // If provided, Armada monitors the health of etcd and - // stops requesting jobs when etcd is EtcdFractionOfStorageInUseSoftLimit percent full and - // stops pod creation when etcd is EtcdFractionOfStorageInUseHardLimit or more percent full. - MetricUrls []string - FractionOfStorageInUseSoftLimit float64 - FractionOfStorageInUseHardLimit float64 - // This is the number of etcd endpoints that have to be healthy for Armada to perform the health check - // If less than MinimumAvailable are healthy, Armada will consider etcd unhealthy and stop submitting pods - MinimumAvailable int + // Etcd health monitoring configuration. + // If provided, the executor monitors etcd health and stops requesting jobs while any etcd cluster is unhealthy. + EtcdClustersHealthMonitoring []EtcdClusterHealthMonitoringConfiguration +} + +// EtcdClusterHealthMonitoringConfiguration +// contains settings associated with monitoring the health of an etcd cluster. +type EtcdClusterHealthMonitoringConfiguration struct { + // Etcd cluster name. Used in metrics exported by Armada. + Name string `validate:"gt=0"` + // Metric URLs of the etcd replicas making up this cluster. + MetricUrls []string `validate:"gt=0"` + // The cluster is considered unhealthy when for any replica in the cluster: + // etcd_mvcc_db_total_size_in_use_in_bytes / etcd_server_quota_backend_bytes + // > FractionOfStorageInUseLimit. + FractionOfStorageInUseLimit float64 `validate:"gt=0,lte=1"` + // The cluster is considered unhealthy when for any replica in the cluster: + // etcd_mvcc_db_total_size_in_bytes / etcd_server_quota_backend_bytes + // > FractionOfStorageLimit. + FractionOfStorageLimit float64 `validate:"gt=0,lte=1"` + // A replica is considered unavailable if the executor has failed to collect metrics from it for this amount of time. + // The cluster is considered unhealthy if there are less than MinimumReplicasAvailable replicas available. + ReplicaTimeout time.Duration `validate:"gt=0"` + MinimumReplicasAvailable int `validate:"gt=0"` + // Interval with which to scrape metrics from each etcd replica. + ScrapeInterval time.Duration `validate:"gt=0"` + // The time it takes to scrape metrics is exported as a prometheus histogram with exponential buckets. + // These settings control the size and number of such buckets. + ScrapeDelayBucketsStart float64 `validate:"gt=0"` + ScrapeDelayBucketsFactor float64 `validate:"gt=1"` + ScrapeDelayBucketsCount int `validate:"gt=0"` } type TaskConfiguration struct { diff --git a/internal/executor/context/cluster_context.go b/internal/executor/context/cluster_context.go index f8d4d5e627b..79619ea06fd 100644 --- a/internal/executor/context/cluster_context.go +++ b/internal/executor/context/cluster_context.go @@ -26,12 +26,10 @@ import ( "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "k8s.io/utils/pointer" - "github.com/armadaproject/armada/internal/common/armadaerrors" "github.com/armadaproject/armada/internal/common/cluster" util2 "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/executor/configuration" "github.com/armadaproject/armada/internal/executor/domain" - "github.com/armadaproject/armada/internal/executor/healthmonitor" "github.com/armadaproject/armada/internal/executor/util" ) @@ -87,10 +85,8 @@ type KubernetesClusterContext struct { kubernetesClient kubernetes.Interface kubernetesClientProvider cluster.KubernetesClientProvider eventInformer informer.EventInformer - // If provided, stops object creation while EtcdMaxFractionOfStorageInUse or more of etcd storage is full. - etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor - podKillTimeout time.Duration - clock clock.Clock + podKillTimeout time.Duration + clock clock.Clock } func (c *KubernetesClusterContext) GetClusterId() string { @@ -105,7 +101,6 @@ func NewClusterContext( configuration configuration.ApplicationConfiguration, minTimeBetweenRepeatDeletionCalls time.Duration, kubernetesClientProvider cluster.KubernetesClientProvider, - etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor, killTimeout time.Duration, ) *KubernetesClusterContext { kubernetesClient := kubernetesClientProvider.Client() @@ -127,7 +122,6 @@ func NewClusterContext( endpointSliceInformer: factory.Discovery().V1().EndpointSlices(), kubernetesClient: kubernetesClient, kubernetesClientProvider: kubernetesClientProvider, - etcdHealthMonitor: etcdHealthMonitor, podKillTimeout: killTimeout, clock: clock.RealClock{}, } @@ -253,16 +247,6 @@ func (c *KubernetesClusterContext) GetNodeStatsSummary(ctx context.Context, node } func (c *KubernetesClusterContext) SubmitPod(pod *v1.Pod, owner string, ownerGroups []string) (*v1.Pod, error) { - // If a health monitor is provided, reject pods when etcd is at its hard limit. - if c.etcdHealthMonitor != nil && !c.etcdHealthMonitor.IsWithinHardHealthLimit() { - err := errors.WithStack(&armadaerrors.ErrCreateResource{ - Type: "pod", - Name: pod.Name, - Message: fmt.Sprintf("etcd is at its hard heatlh limit and therefore not healthy to submit to"), - }) - return nil, err - } - c.submittedPods.Add(pod) ownerClient, err := c.kubernetesClientProvider.ClientForUser(owner, ownerGroups) if err != nil { diff --git a/internal/executor/context/cluster_context_test.go b/internal/executor/context/cluster_context_test.go index 119a0c02227..d1836e82168 100644 --- a/internal/executor/context/cluster_context_test.go +++ b/internal/executor/context/cluster_context_test.go @@ -26,7 +26,6 @@ import ( util2 "github.com/armadaproject/armada/internal/common/util" "github.com/armadaproject/armada/internal/executor/configuration" "github.com/armadaproject/armada/internal/executor/domain" - "github.com/armadaproject/armada/internal/executor/healthmonitor" "github.com/armadaproject/armada/internal/executor/util" ) @@ -41,23 +40,14 @@ func setupTestWithProvider() (*KubernetesClusterContext, *FakeClientProvider) { func setupTestWithMinRepeatedDeletePeriod(minRepeatedDeletePeriod time.Duration) (*KubernetesClusterContext, *FakeClientProvider) { prometheus.DefaultRegisterer = prometheus.NewRegistry() - client := fake.NewSimpleClientset() clientProvider := &FakeClientProvider{FakeClient: client} - - fakeEtcdHealthMonitor := &healthmonitor.FakeEtcdLimitHealthMonitor{ - IsWithinHardLimit: true, - IsWithinSoftLimit: true, - } - clusterContext := NewClusterContext( configuration.ApplicationConfiguration{ClusterId: "test-cluster-1", Pool: "pool", DeleteConcurrencyLimit: 1}, minRepeatedDeletePeriod, clientProvider, - fakeEtcdHealthMonitor, 5*time.Minute, ) - return clusterContext, clientProvider } @@ -720,26 +710,6 @@ func TestKubernetesClusterContext_GetNodes(t *testing.T) { assert.True(t, nodeFound) } -func TestKubernetesClusterContext_Submit_BlocksOnEtcdReachingHardLimit(t *testing.T) { - clusterContext, _ := setupTestWithProvider() - unhealthyEtcdHeathMonitor := &healthmonitor.FakeEtcdLimitHealthMonitor{ - IsWithinSoftLimit: false, - IsWithinHardLimit: false, - } - clusterContext.etcdHealthMonitor = unhealthyEtcdHeathMonitor - - _, err := clusterContext.SubmitPod(createBatchPod(), "user", []string{}) - assert.Error(t, err) -} - -func TestKubernetesClusterContext_Submit_HandlesNoEtcdHealthMonitor(t *testing.T) { - clusterContext, _ := setupTestWithProvider() - clusterContext.etcdHealthMonitor = nil - - _, err := clusterContext.SubmitPod(createBatchPod(), "user", []string{}) - assert.NoError(t, err) -} - func TestKubernetesClusterContext_Submit_UseUserSpecificClient(t *testing.T) { clusterContext, provider := setupTestWithProvider() diff --git a/internal/executor/fake/application.go b/internal/executor/fake/application.go index fe1e8694a10..c2d28362c1f 100644 --- a/internal/executor/fake/application.go +++ b/internal/executor/fake/application.go @@ -3,6 +3,8 @@ package fake import ( "sync" + "github.com/sirupsen/logrus" + "github.com/armadaproject/armada/internal/common/task" "github.com/armadaproject/armada/internal/executor" "github.com/armadaproject/armada/internal/executor/configuration" @@ -14,6 +16,7 @@ func StartUp(config configuration.ExecutorConfiguration, nodes []*context.NodeSp wg := &sync.WaitGroup{} wg.Add(1) return executor.StartUpWithContext( + logrus.NewEntry(logrus.New()), config, context.NewFakeClusterContext(config.Application, config.Kubernetes.NodeIdLabel, nodes), nil, diff --git a/internal/executor/healthmonitor/domain.go b/internal/executor/healthmonitor/domain.go deleted file mode 100644 index c454d1530a8..00000000000 --- a/internal/executor/healthmonitor/domain.go +++ /dev/null @@ -1,39 +0,0 @@ -package healthmonitor - -import "time" - -// Store monitor information for a particular etcd instance. -type etcdInstanceMonitor struct { - // When metrics was last collected for this instance. - lastCheck time.Time - // Error returned by the most recent attempt to scrape metrics for this instance. - err error - // Metrics collected from etcd (a map from metric name to value); see - // https://etcd.io/docs/v3.5/op-guide/monitoring/ - metrics map[string]float64 -} - -func (e *etcdInstanceMonitor) deepCopy() *etcdInstanceMonitor { - metricsCopy := make(map[string]float64, len(e.metrics)) - for key, value := range e.metrics { - metricsCopy[key] = value - } - return &etcdInstanceMonitor{ - lastCheck: time.Unix(e.lastCheck.Unix(), int64(e.lastCheck.Nanosecond())), - err: e.err, - metrics: metricsCopy, - } -} - -type FakeEtcdLimitHealthMonitor struct { - IsWithinSoftLimit bool - IsWithinHardLimit bool -} - -func (f *FakeEtcdLimitHealthMonitor) IsWithinSoftHealthLimit() bool { - return f.IsWithinSoftLimit -} - -func (f *FakeEtcdLimitHealthMonitor) IsWithinHardHealthLimit() bool { - return f.IsWithinHardLimit -} diff --git a/internal/executor/healthmonitor/etcd.go b/internal/executor/healthmonitor/etcd.go deleted file mode 100644 index 855e9baa0c2..00000000000 --- a/internal/executor/healthmonitor/etcd.go +++ /dev/null @@ -1,308 +0,0 @@ -package healthmonitor - -import ( - "bufio" - "context" - "fmt" - "net/http" - "strconv" - "strings" - "sync" - "time" - - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - log "github.com/sirupsen/logrus" - - "github.com/armadaproject/armada/internal/common/armadaerrors" - "github.com/armadaproject/armada/internal/common/logging" - "github.com/armadaproject/armada/internal/executor/configuration" - "github.com/armadaproject/armada/internal/executor/metrics" -) - -const ( - etcdInUseSizeBytesMetricName string = "etcd_mvcc_db_total_size_in_use_in_bytes" - etcdTotalSizeBytesMetricName string = "etcd_server_quota_backend_bytes" - etcdMemberUrl string = "url" -) - -type EtcdLimitHealthMonitor interface { - IsWithinSoftHealthLimit() bool - IsWithinHardHealthLimit() bool -} - -// EtcdHealthMonitor is a service for monitoring the health of etcd. -// It continually scrapes metrics from one of more etcd instances -// and provides a method for checking the fraction of storage in use. -type EtcdHealthMonitor struct { - // Maps instance URL to information about that particular instance. - instances map[string]*etcdInstanceMonitor - // HTTP client used to make requests. - client *http.Client - // Time after which we consider instances to be down if no metrics have been collected successfully. - // Defaults to 2 minutes. - MetricsMaxAge time.Duration - // Interval at which to scrape metrics from etcd. - // Defaults to 5 second. - ScrapeInterval time.Duration - // Configuration for etcd - etcdConfiguration configuration.EtcdConfiguration - mu sync.Mutex -} - -var etcdInstanceUpDesc = prometheus.NewDesc( - metrics.ArmadaExecutorMetricsPrefix+"etcd_instance_up", - "Shows if an etcd instance is sufficiently live to get metrics from", - []string{etcdMemberUrl}, nil, -) - -var etcdInstanceHealthCheckTimeDesc = prometheus.NewDesc( - metrics.ArmadaExecutorMetricsPrefix+"etcd_instance_health_check_time", - "Time of the last successful health check scrape", - []string{etcdMemberUrl}, nil, -) - -var etcdInstanceInUseFractionDesc = prometheus.NewDesc( - metrics.ArmadaExecutorMetricsPrefix+"etcd_instance_current_in_use_fraction", - "Current fraction of the etcd instance that is in use", - []string{etcdMemberUrl}, nil, -) - -// Return a new EtcdHealthMonitor that monitors the etcd instances at the given urls. -// Provide a http client, e.g., to use auth, or set client to nil to use the default client. -func NewEtcdHealthMonitor(etcConfiguration configuration.EtcdConfiguration, client *http.Client) (*EtcdHealthMonitor, error) { - if len(etcConfiguration.MetricUrls) == 0 { - return nil, errors.WithStack(&armadaerrors.ErrInvalidArgument{ - Name: "urls", - Value: etcConfiguration.MetricUrls, - Message: "no URLs provided", - }) - } - if client == nil { - client = http.DefaultClient - } - rv := &EtcdHealthMonitor{ - MetricsMaxAge: 2 * time.Minute, - ScrapeInterval: time.Second * 5, - instances: make(map[string]*etcdInstanceMonitor), - client: client, - etcdConfiguration: etcConfiguration, - } - for _, url := range etcConfiguration.MetricUrls { - rv.instances[url] = &etcdInstanceMonitor{ - metrics: make(map[string]float64), - } - } - go rv.Run(context.Background()) - prometheus.MustRegister(rv) - return rv, nil -} - -func (srv *EtcdHealthMonitor) Describe(desc chan<- *prometheus.Desc) { - desc <- etcdInstanceUpDesc - desc <- etcdInstanceHealthCheckTimeDesc - desc <- etcdInstanceInUseFractionDesc -} - -func (srv *EtcdHealthMonitor) Collect(metrics chan<- prometheus.Metric) { - for url, instance := range srv.getInstances() { - currentFraction, err := srv.getInstanceCurrentFractionOfResourceInUse(instance) - metrics <- prometheus.MustNewConstMetric(etcdInstanceHealthCheckTimeDesc, prometheus.CounterValue, float64(instance.lastCheck.Unix()), url) - if err != nil { - metrics <- prometheus.MustNewConstMetric(etcdInstanceUpDesc, prometheus.GaugeValue, 0, url) - prometheus.NewInvalidMetric(etcdInstanceInUseFractionDesc, err) - } else { - metrics <- prometheus.MustNewConstMetric(etcdInstanceUpDesc, prometheus.GaugeValue, 1, url) - metrics <- prometheus.MustNewConstMetric(etcdInstanceInUseFractionDesc, prometheus.GaugeValue, currentFraction, url) - } - } -} - -// Run the service until ctx is cancelled. -func (srv *EtcdHealthMonitor) Run(ctx context.Context) { - log.WithField("service", "EtcdHealthMonitor").Info("started ETCD health monitor") - defer log.WithField("service", "EtcdHealthMonitor").Info("exited ETCD health monitor") - - taskDurationHistogram := promauto.NewHistogram( - prometheus.HistogramOpts{ - Name: metrics.ArmadaExecutorMetricsPrefix + "etcd_health_check_latency_seconds", - Help: "Background loop etcd health check latency in seconds", - Buckets: prometheus.ExponentialBuckets(0.01, 2, 15), - }) - - ticker := time.NewTicker(srv.ScrapeInterval) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - start := time.Now() - if err := srv.scrapeMetrics(ctx); err != nil { - logging.WithStacktrace(log.WithField("service", "EtcdHealthMonitor"), err).Error("failed to scrape metrics from etcd") - } - duration := time.Since(start) - taskDurationHistogram.Observe(duration.Seconds()) - } - } -} - -func (srv *EtcdHealthMonitor) getInstances() map[string]*etcdInstanceMonitor { - srv.mu.Lock() - defer srv.mu.Unlock() - - current := srv.instances - instancesCopy := make(map[string]*etcdInstanceMonitor, len(current)) - - for url, instance := range current { - instancesCopy[url] = instance.deepCopy() - } - - return instancesCopy -} - -func (srv *EtcdHealthMonitor) updateInstances(updatedInstances map[string]*etcdInstanceMonitor) { - srv.mu.Lock() - defer srv.mu.Unlock() - - srv.instances = updatedInstances -} - -// ScrapeMetrics collects metrics for all etcd instances. -func (srv *EtcdHealthMonitor) scrapeMetrics(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) - defer cancel() - - wg := sync.WaitGroup{} - instances := srv.getInstances() - for url, instance := range instances { - wg.Add(1) - url := url - instance := instance - go func() { - defer wg.Done() - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - instance.err = err - return - } - - resp, err := srv.client.Do(req) - if err != nil { - instance.err = err - return - } - defer resp.Body.Close() - - receivedMetrics := make(map[string]float64) - scanner := bufio.NewScanner(resp.Body) - for scanner.Scan() { - line := scanner.Text() - if i := strings.Index(line, "#"); i >= 0 && i < len(line) { - line = line[:i] - } - if len(line) == 0 { - continue - } - - keyVal := strings.Split(line, " ") - if len(keyVal) != 2 { - continue - } - key := keyVal[0] - val, err := strconv.ParseFloat(keyVal[1], 64) - if err != nil { - continue - } - receivedMetrics[key] = val - } - instance.metrics = receivedMetrics - instance.lastCheck = time.Now() - instance.err = nil - }() - } - - wg.Wait() - srv.updateInstances(instances) - - for _, instance := range instances { - if instance.err != nil { - return instance.err - } - } - return nil -} - -func (srv *EtcdHealthMonitor) IsWithinSoftHealthLimit() bool { - currentFractionOfResourceInUse, err := srv.currentFractionOfResourceInUse() - if err != nil { - return false - } - return currentFractionOfResourceInUse < srv.etcdConfiguration.FractionOfStorageInUseSoftLimit -} - -func (srv *EtcdHealthMonitor) IsWithinHardHealthLimit() bool { - currentFractionOfResourceInUse, err := srv.currentFractionOfResourceInUse() - if err != nil { - return false - } - return currentFractionOfResourceInUse < srv.etcdConfiguration.FractionOfStorageInUseHardLimit -} - -// MaxFractionOfStorageInUse returns the maximum fraction of storage in use over all etcd instances. -func (srv *EtcdHealthMonitor) currentFractionOfResourceInUse() (float64, error) { - srv.mu.Lock() - defer srv.mu.Unlock() - - maxCurrentFractionInUse := 0.0 - available := 0 - for url, instance := range srv.instances { - fractionInUse, err := srv.getInstanceCurrentFractionOfResourceInUse(instance) - if err != nil { - log.Warnf("skipping etcd instance %s as %s", url, err) - continue - } - if fractionInUse > maxCurrentFractionInUse { - maxCurrentFractionInUse = fractionInUse - } - available++ - } - - if available < srv.etcdConfiguration.MinimumAvailable { - return 0, errors.WithStack(&armadaerrors.ErrInvalidArgument{ - Name: "instances", - Value: srv.instances, - Message: "insufficient etcd metrics available", - }) - } - - return maxCurrentFractionInUse, nil -} - -func (srv *EtcdHealthMonitor) getInstanceCurrentFractionOfResourceInUse(instance *etcdInstanceMonitor) (float64, error) { - if instance == nil { - return 0, fmt.Errorf("instance is nil") - } - - if instance.lastCheck.IsZero() { - return 0, fmt.Errorf("no scrape has ever occurred for this instance, possibly etcd health check is still initialising") - } - - metricsAge := time.Since(instance.lastCheck) - if metricsAge > srv.MetricsMaxAge { - return 0, fmt.Errorf("metrics for etcd instance are too old current age %s max age %s; instance may be down", metricsAge, srv.MetricsMaxAge) - } - - totalSize, ok := instance.metrics[etcdTotalSizeBytesMetricName] - if !ok { - return 0, fmt.Errorf("metric etcd_server_quota_backend_bytes not available") - } - - inUse, ok := instance.metrics[etcdInUseSizeBytesMetricName] - if !ok { - return 0, fmt.Errorf("metric etcd_mvcc_db_total_size_in_use_in_bytes not available") - } - - return inUse / totalSize, nil -} diff --git a/internal/executor/healthmonitor/etcd_test.go b/internal/executor/healthmonitor/etcd_test.go deleted file mode 100644 index aa10f4542cd..00000000000 --- a/internal/executor/healthmonitor/etcd_test.go +++ /dev/null @@ -1,160 +0,0 @@ -package healthmonitor - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/armadaproject/armada/internal/executor/configuration" -) - -func TestGetInstanceCurrentFractionOfResourceInUse(t *testing.T) { - healthChecker := makeEtcHealthMonitor() - - instance := makeValidEtcdInstanceMonitor(10, 100) - - result, err := healthChecker.getInstanceCurrentFractionOfResourceInUse(instance) - assert.NoError(t, err) - assert.Equal(t, result, 0.1) -} - -func TestGetInstanceCurrentFractionOfResourceInUse_ErrorsWhenNil(t *testing.T) { - healthChecker := makeEtcHealthMonitor() - _, err := healthChecker.getInstanceCurrentFractionOfResourceInUse(nil) - assert.Error(t, err) -} - -func TestGetInstanceCurrentFractionOfResourceInUse_ErrorsWhenNeverScraped(t *testing.T) { - healthChecker := makeEtcHealthMonitor() - instance := &etcdInstanceMonitor{} - _, err := healthChecker.getInstanceCurrentFractionOfResourceInUse(instance) - assert.Error(t, err) -} - -func TestGetInstanceCurrentFractionOfResourceInUse_ErrorsWhenMissingMetrics(t *testing.T) { - healthChecker := makeEtcHealthMonitor() - - instance := &etcdInstanceMonitor{ - err: nil, - lastCheck: time.Now(), - } - - // In use size metric missing - instance.metrics = map[string]float64{ - etcdInUseSizeBytesMetricName: 10, - } - _, err := healthChecker.getInstanceCurrentFractionOfResourceInUse(instance) - assert.Error(t, err) - - // Total size metric missing - instance.metrics = map[string]float64{ - etcdInUseSizeBytesMetricName: 10, - } - _, err = healthChecker.getInstanceCurrentFractionOfResourceInUse(instance) - assert.Error(t, err) -} - -func TestGetInstanceCurrentFractionOfResourceInUse_ErrorsWhenLastScrapeIsTooOld(t *testing.T) { - healthChecker := makeEtcHealthMonitor() - instance := &etcdInstanceMonitor{ - lastCheck: time.Now().Add(time.Minute * -20), - } - _, err := healthChecker.getInstanceCurrentFractionOfResourceInUse(instance) - assert.Error(t, err) -} - -func TestIsWithinSoftHealthLimit(t *testing.T) { - // Current usage is 10%, under 30% limit - healthChecker := makeEtcHealthMonitorWithInstances(0.3, 0.5, makeValidEtcdInstanceMonitor(10, 100)) - assert.True(t, healthChecker.IsWithinSoftHealthLimit()) - - // Current usage is 30%, at 30% limit - healthChecker = makeEtcHealthMonitorWithInstances(0.3, 0.5, makeValidEtcdInstanceMonitor(30, 100)) - assert.False(t, healthChecker.IsWithinSoftHealthLimit()) - - // Current usage is 60%, over 30% limit - healthChecker = makeEtcHealthMonitorWithInstances(0.3, 0.5, makeValidEtcdInstanceMonitor(60, 100)) - assert.False(t, healthChecker.IsWithinSoftHealthLimit()) -} - -func TestIsWithinSoftHealthLimit_WithMinAvailable(t *testing.T) { - // Current usage is 10%, under 30% limit - healthChecker := makeEtcHealthMonitorWithInstances( - 0.3, - 0.5, - makeValidEtcdInstanceMonitor(10, 100), - makeInvalidEtcdInstanceMonitor(), - makeInvalidEtcdInstanceMonitor(), - ) - - healthChecker.etcdConfiguration.MinimumAvailable = 0 - assert.True(t, healthChecker.IsWithinSoftHealthLimit()) - - healthChecker.etcdConfiguration.MinimumAvailable = 1 - assert.True(t, healthChecker.IsWithinSoftHealthLimit()) - - healthChecker.etcdConfiguration.MinimumAvailable = 2 - assert.False(t, healthChecker.IsWithinSoftHealthLimit()) -} - -func TestIsWithinSoftHealthLimit_TakesMaxOfAllInstances(t *testing.T) { - // Max usage is 10%, under 30% limit - healthChecker := makeEtcHealthMonitorWithInstances(0.3, 0.5, makeValidEtcdInstanceMonitor(10, 100), makeValidEtcdInstanceMonitor(20, 100)) - assert.True(t, healthChecker.IsWithinSoftHealthLimit()) - - // Max usage is 50%, over 30% limit - healthChecker = makeEtcHealthMonitorWithInstances(0.3, 0.5, makeValidEtcdInstanceMonitor(30, 100), makeValidEtcdInstanceMonitor(50, 100)) - assert.False(t, healthChecker.IsWithinSoftHealthLimit()) -} - -func TestIsWithinHardHealthLimit(t *testing.T) { - // Current usage is 10%, under 50% limit - healthChecker := makeEtcHealthMonitorWithInstances(0.5, 0.5, makeValidEtcdInstanceMonitor(10, 100)) - assert.True(t, healthChecker.IsWithinHardHealthLimit()) - - // Current usage is 50%, at 50% limit - healthChecker = makeEtcHealthMonitorWithInstances(0.5, 0.5, makeValidEtcdInstanceMonitor(50, 100)) - assert.False(t, healthChecker.IsWithinHardHealthLimit()) - - // Current usage is 60%, over 50% limit - healthChecker = makeEtcHealthMonitorWithInstances(0.5, 0.5, makeValidEtcdInstanceMonitor(60, 100)) - assert.False(t, healthChecker.IsWithinHardHealthLimit()) -} - -func makeValidEtcdInstanceMonitor(inUse, total float64) *etcdInstanceMonitor { - return &etcdInstanceMonitor{ - err: nil, - lastCheck: time.Now(), - metrics: map[string]float64{ - etcdTotalSizeBytesMetricName: total, - etcdInUseSizeBytesMetricName: inUse, - }, - } -} - -func makeInvalidEtcdInstanceMonitor() *etcdInstanceMonitor { - return nil -} - -func makeEtcHealthMonitorWithInstances(softLimit, hardLimit float64, instanceMonitors ...*etcdInstanceMonitor) *EtcdHealthMonitor { - monitor := makeEtcHealthMonitor() - - monitor.instances = map[string]*etcdInstanceMonitor{} - for i, instance := range instanceMonitors { - monitor.instances[fmt.Sprintf("instance-%d", i)] = instance - } - etcdConfig := configuration.EtcdConfiguration{ - FractionOfStorageInUseSoftLimit: softLimit, - FractionOfStorageInUseHardLimit: hardLimit, - } - monitor.etcdConfiguration = etcdConfig - return monitor -} - -func makeEtcHealthMonitor() *EtcdHealthMonitor { - return &EtcdHealthMonitor{ - MetricsMaxAge: time.Minute * 5, - } -} diff --git a/internal/executor/service/cluster_allocation.go b/internal/executor/service/cluster_allocation.go index 1b00eeceab0..3d806c233a8 100644 --- a/internal/executor/service/cluster_allocation.go +++ b/internal/executor/service/cluster_allocation.go @@ -3,14 +3,16 @@ package service import ( "fmt" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "github.com/armadaproject/armada/internal/common/healthmonitor" + "github.com/armadaproject/armada/internal/common/logging" armadaresource "github.com/armadaproject/armada/internal/common/resource" util2 "github.com/armadaproject/armada/internal/common/util" executorContext "github.com/armadaproject/armada/internal/executor/context" - "github.com/armadaproject/armada/internal/executor/healthmonitor" "github.com/armadaproject/armada/internal/executor/job" "github.com/armadaproject/armada/internal/executor/reporter" "github.com/armadaproject/armada/internal/executor/util" @@ -23,11 +25,11 @@ type ClusterAllocator interface { } type ClusterAllocationService struct { - clusterId executorContext.ClusterIdentity - jobRunStateStore job.RunStateStore - submitter job.Submitter - eventReporter reporter.EventReporter - etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor + clusterId executorContext.ClusterIdentity + jobRunStateStore job.RunStateStore + submitter job.Submitter + eventReporter reporter.EventReporter + clusterHealthMonitor healthmonitor.HealthMonitor } func NewClusterAllocationService( @@ -35,22 +37,28 @@ func NewClusterAllocationService( eventReporter reporter.EventReporter, jobRunStateManager job.RunStateStore, submitter job.Submitter, - etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor, + clusterHealthMonitor healthmonitor.HealthMonitor, ) *ClusterAllocationService { return &ClusterAllocationService{ - eventReporter: eventReporter, - clusterId: clusterId, - submitter: submitter, - jobRunStateStore: jobRunStateManager, - etcdHealthMonitor: etcdHealthMonitor, + eventReporter: eventReporter, + clusterId: clusterId, + submitter: submitter, + jobRunStateStore: jobRunStateManager, + clusterHealthMonitor: clusterHealthMonitor, } } func (allocationService *ClusterAllocationService) AllocateSpareClusterCapacity() { - // If a health monitor is provided, avoid leasing jobs when etcd is almost full. - if allocationService.etcdHealthMonitor != nil && !allocationService.etcdHealthMonitor.IsWithinSoftHealthLimit() { - log.Warnf("Skipping allocating spare cluster capacity as etcd is at its soft health limit") - return + // If a health monitor is provided, avoid leasing jobs when the cluster is unhealthy. + if allocationService.clusterHealthMonitor != nil { + log := logrus.NewEntry(logrus.New()) + if ok, reason, err := allocationService.clusterHealthMonitor.IsHealthy(); err != nil { + logging.WithStacktrace(log, err).Error("failed to check cluster health") + return + } else if !ok { + log.Warnf("cluster is not healthy; will not request more jobs: %s", reason) + return + } } jobRuns := allocationService.jobRunStateStore.GetAllWithFilter(func(state *job.RunState) bool { @@ -117,12 +125,12 @@ func (allocationService *ClusterAllocationService) processFailedJobSubmissions(f } type LegacyClusterAllocationService struct { - leaseService LeaseService - eventReporter reporter.EventReporter - utilisationService utilisation.UtilisationService - clusterContext executorContext.ClusterContext - submitter job.Submitter - etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor + leaseService LeaseService + eventReporter reporter.EventReporter + utilisationService utilisation.UtilisationService + clusterContext executorContext.ClusterContext + submitter job.Submitter + clusterHealthMonitor healthmonitor.HealthMonitor } func NewLegacyClusterAllocationService( @@ -131,23 +139,29 @@ func NewLegacyClusterAllocationService( leaseService LeaseService, utilisationService utilisation.UtilisationService, submitter job.Submitter, - etcdHealthMonitor healthmonitor.EtcdLimitHealthMonitor, + clusterHealthMonitor healthmonitor.HealthMonitor, ) *LegacyClusterAllocationService { return &LegacyClusterAllocationService{ - leaseService: leaseService, - eventReporter: eventReporter, - utilisationService: utilisationService, - clusterContext: clusterContext, - submitter: submitter, - etcdHealthMonitor: etcdHealthMonitor, + leaseService: leaseService, + eventReporter: eventReporter, + utilisationService: utilisationService, + clusterContext: clusterContext, + submitter: submitter, + clusterHealthMonitor: clusterHealthMonitor, } } func (allocationService *LegacyClusterAllocationService) AllocateSpareClusterCapacity() { - // If a health monitor is provided, avoid leasing jobs when etcd is almost full. - if allocationService.etcdHealthMonitor != nil && !allocationService.etcdHealthMonitor.IsWithinSoftHealthLimit() { - log.Warnf("Skipping allocating spare cluster capacity as etcd is at its soft health limit") - return + // If a health monitor is provided, avoid leasing jobs when the cluster is unhealthy. + if allocationService.clusterHealthMonitor != nil { + log := logrus.NewEntry(logrus.New()) + if ok, reason, err := allocationService.clusterHealthMonitor.IsHealthy(); err != nil { + logging.WithStacktrace(log, err).Error("failed to check cluster health") + return + } else if !ok { + log.Warnf("cluster is not healthy; will not request more jobs: %s", reason) + return + } } capacityReport, err := allocationService.utilisationService.GetAvailableClusterCapacity(true) diff --git a/internal/executor/service/cluster_allocation_test.go b/internal/executor/service/cluster_allocation_test.go index 61ff9efabfd..7beaf10977e 100644 --- a/internal/executor/service/cluster_allocation_test.go +++ b/internal/executor/service/cluster_allocation_test.go @@ -6,8 +6,8 @@ import ( "github.com/stretchr/testify/assert" + "github.com/armadaproject/armada/internal/common/healthmonitor" fakecontext "github.com/armadaproject/armada/internal/executor/context/fake" - "github.com/armadaproject/armada/internal/executor/healthmonitor" "github.com/armadaproject/armada/internal/executor/job" "github.com/armadaproject/armada/internal/executor/job/mocks" mocks2 "github.com/armadaproject/armada/internal/executor/reporter/mocks" @@ -58,8 +58,8 @@ func TestAllocateSpareClusterCapacity_OnlySubmitsJobForLeasedRuns(t *testing.T) func TestAllocateSpareClusterCapacity_DoesNotSubmitJobs_WhenEtcdIsNotWithinSoftLimit(t *testing.T) { leaseRun := createRun("leased", job.Leased) - clusterAllocationService, etcdHealthMonitor, eventReporter, submitter, _ := setupClusterAllocationServiceTest([]*job.RunState{leaseRun}) - etcdHealthMonitor.IsWithinSoftLimit = false + clusterAllocationService, healthMonitor, eventReporter, submitter, _ := setupClusterAllocationServiceTest([]*job.RunState{leaseRun}) + healthMonitor.SetHealthStatus(false) clusterAllocationService.AllocateSpareClusterCapacity() @@ -135,7 +135,7 @@ func TestAllocateSpareClusterCapacity_HandlesFailedPodCreations(t *testing.T) { func setupClusterAllocationServiceTest(initialJobRuns []*job.RunState) ( *ClusterAllocationService, - *healthmonitor.FakeEtcdLimitHealthMonitor, + *healthmonitor.ManualHealthMonitor, *mocks2.FakeEventReporter, *mocks.FakeSubmitter, *job.JobRunStateStore, @@ -143,13 +143,17 @@ func setupClusterAllocationServiceTest(initialJobRuns []*job.RunState) ( clusterId := fakecontext.NewFakeClusterIdentity("cluster-1", "pool-1") eventReporter := mocks2.NewFakeEventReporter() submitter := &mocks.FakeSubmitter{} - etcdHealthChecker := &healthmonitor.FakeEtcdLimitHealthMonitor{IsWithinSoftLimit: true, IsWithinHardLimit: true} jobRunStateManager := job.NewJobRunStateStoreWithInitialState(initialJobRuns) - - clusterAllocationService := NewClusterAllocationService( - clusterId, eventReporter, jobRunStateManager, submitter, etcdHealthChecker) - - return clusterAllocationService, etcdHealthChecker, eventReporter, submitter, jobRunStateManager + healthMonitor := &healthmonitor.ManualHealthMonitor{} + healthMonitor.SetHealthStatus(true) + + return NewClusterAllocationService( + clusterId, + eventReporter, + jobRunStateManager, + submitter, + healthMonitor, + ), healthMonitor, eventReporter, submitter, jobRunStateManager } func createRun(runId string, phase job.RunPhase) *job.RunState { diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration.go index 910abe31cfe..7b232edc141 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration.go @@ -2,6 +2,7 @@ package scheduler import ( "context" + "sync" "golang.org/x/exp/maps" "golang.org/x/exp/slices" @@ -51,6 +52,8 @@ type InMemoryJobRepository struct { // If true, jobs are sorted first by priority class priority. // If false, priority class is ignored when ordering jobs. sortByPriorityClass bool + // Protects the above fields. + mu sync.Mutex } func NewInMemoryJobRepository(priorityClasses map[string]types.PriorityClass) *InMemoryJobRepository { @@ -63,6 +66,8 @@ func NewInMemoryJobRepository(priorityClasses map[string]types.PriorityClass) *I } func (repo *InMemoryJobRepository) EnqueueMany(jobs []interfaces.LegacySchedulerJob) { + repo.mu.Lock() + defer repo.mu.Unlock() updatedQueues := make(map[string]bool) for _, job := range jobs { queue := job.GetQueue() @@ -76,6 +81,8 @@ func (repo *InMemoryJobRepository) EnqueueMany(jobs []interfaces.LegacyScheduler } func (repo *InMemoryJobRepository) Enqueue(job interfaces.LegacySchedulerJob) { + repo.mu.Lock() + defer repo.mu.Unlock() queue := job.GetQueue() repo.jobsByQueue[queue] = append(repo.jobsByQueue[queue], job) repo.jobsById[job.GetId()] = job @@ -118,6 +125,8 @@ func (repo *InMemoryJobRepository) GetQueueJobIds(queue string) ([]string, error } func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) ([]interfaces.LegacySchedulerJob, error) { + repo.mu.Lock() + defer repo.mu.Unlock() rv := make([]interfaces.LegacySchedulerJob, 0, len(jobIds)) for _, jobId := range jobIds { if job, ok := repo.jobsById[jobId]; ok { @@ -128,6 +137,8 @@ func (repo *InMemoryJobRepository) GetExistingJobsByIds(jobIds []string) ([]inte } func (repo *InMemoryJobRepository) GetJobIterator(ctx context.Context, queue string) (JobIterator, error) { + repo.mu.Lock() + defer repo.mu.Unlock() return NewInMemoryJobIterator(slices.Clone(repo.jobsByQueue[queue])), nil }