Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TickInfo in LoadDecision #836

Merged
merged 4 commits into from
Oct 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/aperture/policy/sync/v1/concurrency_limiter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package aperture.policy.sync.v1;

import "aperture/policy/language/v1/policy.proto";
import "aperture/policy/sync/v1/common_attributes.proto";
import "aperture/policy/sync/v1/tick.proto";

message ConcurrencyLimiterWrapper {
// CommonAttributes
Expand All @@ -29,6 +30,7 @@ message TokensDecisionWrapper {
message LoadDecision {
double load_multiplier = 1;
bool pass_through = 2;
TickInfo tick_info = 3;
}

message TokensDecision {
Expand Down
13 changes: 13 additions & 0 deletions api/aperture/policy/sync/v1/tick.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package aperture.policy.sync.v1;

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

message TickInfo {
google.protobuf.Timestamp timestamp = 1;
google.protobuf.Timestamp next_timestamp = 2;
int64 tick = 3;
google.protobuf.Duration interval = 4;
}
180 changes: 99 additions & 81 deletions api/gen/proto/go/aperture/policy/sync/v1/concurrency_limiter.pb.go

Large diffs are not rendered by default.

205 changes: 205 additions & 0 deletions api/gen/proto/go/aperture/policy/sync/v1/tick.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions api/gen/proto/go/aperture/policy/sync/v1/tick.pb.json.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions api/gen/proto/go/aperture/policy/sync/v1/tick_deepcopy.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package concurrency
import (
"context"
"path"
"sync"

clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/fx"
Expand All @@ -24,7 +23,6 @@ import (

// LoadActuator struct.
type LoadActuator struct {
dryRunLock sync.RWMutex
policyReadAPI iface.Policy
decisionWriter *etcdwriter.Writer
loadActuatorProto *policylangv1.LoadActuator
Expand Down Expand Up @@ -98,7 +96,7 @@ func (la *LoadActuator) Execute(inPortReadings runtime.PortToValue, tickInfo run
} else {
lmValue = lmReading.Value()
}
return nil, la.publishDecision(lmValue, false)
return nil, la.publishDecision(tickInfo, lmValue, false)
} else {
logger.Sample(zerolog.Often).Info().Msg("Invalid load multiplier data")
}
Expand All @@ -108,13 +106,11 @@ func (la *LoadActuator) Execute(inPortReadings runtime.PortToValue, tickInfo run
} else {
logger.Sample(zerolog.Often).Info().Msg("load_multiplier port not found")
}
return nil, la.publishDefaultDecision()
return nil, la.publishDefaultDecision(tickInfo)
}

// DynamicConfigUpdate finds the dynamic config and syncs the decision to agent.
func (la *LoadActuator) DynamicConfigUpdate(event notifiers.Event, unmarshaller config.Unmarshaller) {
la.dryRunLock.Lock()
defer la.dryRunLock.Unlock()
logger := la.policyReadAPI.GetStatusRegistry().GetLogger()
key := la.loadActuatorProto.GetDynamicConfigKey()
// read dynamic config
Expand All @@ -138,13 +134,11 @@ func (la *LoadActuator) setConfig(config *policylangv1.LoadActuator_DynamicConfi
}
}

func (la *LoadActuator) publishDefaultDecision() error {
return la.publishDecision(1.0, true)
func (la *LoadActuator) publishDefaultDecision(tickInfo runtime.TickInfo) error {
return la.publishDecision(tickInfo, 1.0, true)
}

func (la *LoadActuator) publishDecision(loadMultiplier float64, passThrough bool) error {
la.dryRunLock.RLock()
defer la.dryRunLock.RUnlock()
func (la *LoadActuator) publishDecision(tickInfo runtime.TickInfo, loadMultiplier float64, passThrough bool) error {
if la.dryRun {
passThrough = true
}
Expand All @@ -153,6 +147,7 @@ func (la *LoadActuator) publishDecision(loadMultiplier float64, passThrough bool
decision := &policysyncv1.LoadDecision{
LoadMultiplier: loadMultiplier,
PassThrough: passThrough,
TickInfo: tickInfo.Serialize(),
}
// Publish decision
logger.Sample(zerolog.Often).Debug().Float64("loadMultiplier", loadMultiplier).Bool("passThrough", passThrough).Msg("Publish load decision")
Expand Down
19 changes: 18 additions & 1 deletion pkg/policies/controlplane/runtime/tick-info.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package runtime

import "time"
import (
"time"

policysyncv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/policy/sync/v1"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

var _ TickInfo = (*tickInfo)(nil)

Expand All @@ -10,6 +16,7 @@ type TickInfo interface {
NextTimestamp() time.Time
Tick() int
Interval() time.Duration
Serialize() *policysyncv1.TickInfo
}

type tickInfo struct {
Expand Down Expand Up @@ -48,3 +55,13 @@ func (tickInfo *tickInfo) Tick() int {
func (tickInfo *tickInfo) Interval() time.Duration {
return tickInfo.interval
}

// Serialize returns the proto serialized version of the tickInfo.
func (tickInfo *tickInfo) Serialize() *policysyncv1.TickInfo {
return &policysyncv1.TickInfo{
Timestamp: timestamppb.New(tickInfo.timestamp),
NextTimestamp: timestamppb.New(tickInfo.nextTimestamp),
Tick: (int64)(tickInfo.tick),
Interval: durationpb.New(tickInfo.interval),
}
}