Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for policy alerts #3168

Merged
merged 11 commits into from
Jan 29, 2024
2 changes: 1 addition & 1 deletion pkg/jobs/job-tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (gt *groupTracker) updateStatus(job Job, s *statusv1.Status) error {
return errExistingJob
}

tracker.statusRegistry.SetStatus(s)
tracker.statusRegistry.SetStatus(s, nil)
return nil
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/jobs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/reugn/go-quartz/quartz"

jobsconfig "github.com/fluxninja/aperture/v2/pkg/jobs/config"
panichandler "github.com/fluxninja/aperture/v2/pkg/panic-handler"
"github.com/fluxninja/aperture/v2/pkg/status"
"github.com/reugn/go-quartz/quartz"
)

// JobCallback is the callback function that is called after a job is executed.
Expand Down Expand Up @@ -133,11 +134,11 @@ func (executor *jobExecutor) doJob(ctx context.Context) (proto.Message, error) {
select {
case <-timerCh:
s := status.NewStatus(wrapperspb.String("Timeout"), errors.New("job execution timeout"))
executor.livenessRegistry.SetStatus(s)
executor.livenessRegistry.SetStatus(s, nil)
timer.Reset(time.Second * 1)
case <-jobCh:
s := status.NewStatus(wrapperspb.String("OK"), nil)
executor.livenessRegistry.SetStatus(s)
executor.livenessRegistry.SetStatus(s, nil)
timer.Stop()
return msg, err
}
Expand Down
114 changes: 114 additions & 0 deletions pkg/labelstatus/labelstatus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package labelstatus

import (
"context"
"errors"
"fmt"
"sync"
"time"

"go.uber.org/fx"
"google.golang.org/protobuf/proto"

"github.com/fluxninja/aperture/v2/pkg/alerts"
"github.com/fluxninja/aperture/v2/pkg/config"
"github.com/fluxninja/aperture/v2/pkg/jobs"
"github.com/fluxninja/aperture/v2/pkg/status"
)

// Module is an fx module for providing LabelStatusFactory.
func Module() fx.Option {
return fx.Options(
fx.Provide(NewLabelStatusFactory),
)
}

// LabelStatusFactory is a factory for creating LabelStatus.
type LabelStatusFactory struct {
registry status.Registry
}

// NewLabelStatusFactory creates a new LabelStatusFactory.
func NewLabelStatusFactory(statusRegistry status.Registry) *LabelStatusFactory {
return &LabelStatusFactory{
registry: statusRegistry.Child("label", "status"),
}
}

// New creates a new LabelStatus.
func (lsf *LabelStatusFactory) New(labelKey string, policyName string, componentID string) *LabelStatus {
reg := lsf.registry.Child("label", labelKey)
return &LabelStatus{
registry: reg,
labelKey: labelKey,
policyName: policyName,
componentID: componentID,
timestamp: time.Time{},
}
}

// LabelStatus holds the status of the labels.
type LabelStatus struct {
lock sync.RWMutex
registry status.Registry
timestamp time.Time
labelKey string
policyName string
componentID string
}

// Setup sets up the LabelsStatus's lifecycle hooks.
func (ls *LabelStatus) Setup(jobGroup *jobs.JobGroup, lifecycle fx.Lifecycle) {
jobName := fmt.Sprintf("label-status-%s-%s-%s", ls.policyName, ls.componentID, ls.labelKey)
lifecycle.Append(fx.Hook{
OnStart: func(context.Context) error {
job := jobs.NewBasicJob(jobName, ls.setLookupStatus)
err := jobGroup.RegisterJob(job, jobs.JobConfig{
ExecutionPeriod: config.MakeDuration(10 * time.Second),
})
if err != nil {
return err
}
return nil
},
OnStop: func(context.Context) error {
err := jobGroup.DeregisterJob(jobName)
if err != nil {
return err
}
return nil
},
})
}

// SetMissing sets the status to missing with current timestamp.
func (ls *LabelStatus) SetMissing() {
ls.lock.Lock()
defer ls.lock.Unlock()
ls.timestamp = time.Now()
}

func (ls *LabelStatus) setLookupStatus(ctx context.Context) (proto.Message, error) {
ls.lock.RLock()
defer ls.lock.RUnlock()

if ls.timestamp.IsZero() {
return nil, nil
}

labels := map[string]string{
"policy_name": ls.policyName,
"component_id": ls.componentID,
}

if time.Since(ls.timestamp) >= 5*time.Minute {
ls.registry.SetStatus(nil, labels)
return nil, nil
} else {
labels["severity"] = alerts.SeverityCrit.String()
s := status.NewStatus(nil, errors.New("label "+ls.labelKey+" missing"))
ls.registry.SetStatus(s, labels)
}

return nil, nil
}
16 changes: 8 additions & 8 deletions pkg/notifiers/fx-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (fr *fxRunner) processEvent(event Event, unmarshaller config.Unmarshaller)
}

func (fr *fxRunner) initApp(key Key, unmarshaller config.Unmarshaller) error {
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner initializing"), nil))
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner initializing"), nil), nil)
logger := fr.fxRunnerStatusRegistry.GetLogger()

if fr.app == nil && unmarshaller != nil {
Expand Down Expand Up @@ -98,34 +98,34 @@ func (fr *fxRunner) initApp(key Key, unmarshaller config.Unmarshaller) error {
if err = fr.app.Err(); err != nil {
visualize, _ := fx.VisualizeError(err)
logger.Error().Err(err).Str("visualize", visualize).Msg("fx.New failed")
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, err))
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, err), nil)
if deinitErr := fr.deinitApp(); deinitErr != nil {
logger.Error().Err(deinitErr).Msg("Failed to deinitialize application after start failure")
}
return err
}

fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner starting"), nil))
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner starting"), nil), nil)

ctx, cancel := context.WithTimeout(context.Background(), fr.app.StartTimeout())
defer cancel()
if err = fr.app.Start(ctx); err != nil {
logger.Error().Err(err).Msg("Could not start application")
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, err))
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, err), nil)
if deinitErr := fr.deinitApp(); deinitErr != nil {
logger.Error().Err(deinitErr).Msg("Failed to deinitialize application after start failure")
}
return err
}
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner started"), nil))
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner started"), nil), nil)
} else {
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, errors.New("fxRunner is not initialized")))
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(nil, errors.New("fxRunner is not initialized")), nil)
}
return nil
}

func (fr *fxRunner) deinitApp() error {
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner stopping"), nil))
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner stopping"), nil), nil)
logger := fr.fxRunnerStatusRegistry.GetLogger()
if fr.app != nil {
ctx, cancel := context.WithTimeout(context.Background(), fr.app.StopTimeout())
Expand All @@ -138,7 +138,7 @@ func (fr *fxRunner) deinitApp() error {
return err
}
}
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner stopped"), nil))
fr.fxRunnerStatusRegistry.SetStatus(status.NewStatus(wrapperspb.String("policy runner stopped"), nil), nil)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/otelcollector/otelcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,5 @@ func otelState(otelService *otelcol.Collector) (proto.Message, error) {
func setReadinessStatus(statusRegistry status.Registry, msg proto.Message, err error) {
statusRegistry.Child("system", "readiness").
Child("component", "otel-collector").
SetStatus(status.NewStatus(msg, err))
SetStatus(status.NewStatus(msg, err), nil)
}
4 changes: 2 additions & 2 deletions pkg/platform/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ func Run(app *fx.App) {

defer stop(app)

platform.statusRegistry.Child("system", readinessStatusPath).Child("component", platformStatusPath).SetStatus(status.NewStatus(wrapperspb.String("platform running"), nil))
platform.statusRegistry.Child("system", readinessStatusPath).Child("component", platformStatusPath).SetStatus(status.NewStatus(wrapperspb.String("platform running"), nil), nil)

// Wait for os.Signal
signal := <-app.Done()
log.Info().Str("signal", signal.String()).Msg("Received signal. Stopping application")
platform.statusRegistry.Child("system", readinessStatusPath).Child("component", platformStatusPath).SetStatus(status.NewStatus(nil, errors.New("platform stopping")))
platform.statusRegistry.Child("system", readinessStatusPath).Child("component", platformStatusPath).SetStatus(status.NewStatus(nil, errors.New("platform stopping")), nil)
}

func stop(app *fx.App) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/platform/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ func platformReadinessStatus(in platformReadinessStatusIn) error {
OnStart: func(context.Context) error {
platform.statusRegistry.Child("system", readinessStatusPath).
Child("component", platformStatusPath).
SetStatus(status.NewStatus(nil, errors.New("platform starting")))
SetStatus(status.NewStatus(nil, errors.New("platform starting")), nil)
return nil
},
OnStop: func(context.Context) error {
platform.statusRegistry.Child("system", readinessStatusPath).
Child("component", platformStatusPath).
SetStatus(status.NewStatus(nil, errors.New("platform stopped")))
SetStatus(status.NewStatus(nil, errors.New("platform stopped")), nil)
return nil
},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (psFactory *podScalerFactory) newPodScalerOptions(
wrapperMessage := &policysyncv1.PodScalerWrapper{}
err := unmarshaller.Unmarshal(wrapperMessage)
if err != nil || wrapperMessage.PodScaler == nil {
reg.SetStatus(status.NewStatus(nil, err))
reg.SetStatus(status.NewStatus(nil, err), nil)
logger.Warn().Err(err).Msg("Failed to unmarshal pod scaler")
return fx.Options(), err
}
Expand Down Expand Up @@ -296,14 +296,14 @@ func (ps *podScaler) setup(
logger.Error().Err(err).Msg("Failed to remove control point notifier")
merr = multierror.Append(merr, err)
}
ps.registry.SetStatus(status.NewStatus(nil, merr))
ps.registry.SetStatus(status.NewStatus(nil, merr), nil)
ps.cancel()
ps.etcdClient.Delete(ps.statusEtcdPath)
if err != nil {
logger.Error().Err(err).Msg("Failed to delete scale status")
merr = multierr.Append(merr, err)
}
ps.registry.SetStatus(status.NewStatus(nil, merr))
ps.registry.SetStatus(status.NewStatus(nil, merr), nil)
return merr
},
})
Expand Down
3 changes: 1 addition & 2 deletions pkg/policies/controlplane/components/alerter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ func (a *Alerter) Execute(inPortReadings runtime.PortToReading, circuitAPI runti
}

// DynamicConfigUpdate is a no-op for Alerter.
func (a *Alerter) DynamicConfigUpdate(event notifiers.Event, unmarshaller config.Unmarshaller) {
}
func (a *Alerter) DynamicConfigUpdate(event notifiers.Event, unmarshaller config.Unmarshaller) {}

func (a *Alerter) createAlert() *alerts.Alert {
newAlert := alerts.NewAlert(
Expand Down
4 changes: 2 additions & 2 deletions pkg/policies/controlplane/policy-factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (factory *PolicyFactory) provideControllerPolicyFxOptions(
var wrapperMessage policysyncv1.PolicyWrapper
err := unmarshaller.Unmarshal(&wrapperMessage)
if err != nil || wrapperMessage.Policy == nil {
registry.SetStatus(status.NewStatus(nil, err))
registry.SetStatus(status.NewStatus(nil, err), nil)
registry.GetLogger().Error().Err(err).Msg("Failed to unmarshal policy config wrapper")
return fx.Options(), err
}
Expand All @@ -156,7 +156,7 @@ func (factory *PolicyFactory) provideControllerPolicyFxOptions(
registry,
)
if err != nil {
registry.SetStatus(status.NewStatus(nil, err))
registry.SetStatus(status.NewStatus(nil, err), nil)
registry.GetLogger().Warn().Err(err).Msg("Failed to create policy options")
return fx.Options(), err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/policies/controlplane/runtime/circuit.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (circuit *Circuit) Execute(tickInfo TickInfo) error {

}
signalStatus := status.NewStatus(signalInfo, nil)
reg.SetStatus(signalStatus)
reg.SetStatus(signalStatus, nil)
}()

// Populate with last run's looped signal
Expand Down
Loading
Loading