Skip to content

Commit

Permalink
Add classifiers to flowcontrolv1.CheckResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
DariaKunoichi committed Sep 2, 2022
1 parent 0e19524 commit 062b630
Show file tree
Hide file tree
Showing 15 changed files with 679 additions and 284 deletions.
10 changes: 10 additions & 0 deletions api/aperture/common/config/v1/config_properties_wrapper.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package aperture.common.config.v1;

import "aperture/policy/decisions/v1/decisions.proto";
import "aperture/policy/language/v1/classifier.proto";
import "aperture/policy/language/v1/fluxmeter.proto";
import "aperture/policy/language/v1/policy.proto";

Expand All @@ -22,6 +23,15 @@ message FluxMeterWrapper {
string fluxmeter_name = 4;
}

message ClassifierWrapper {
// Classifier
policy.language.v1.Classifier classifier = 1;
// Name of the Policy.
string policy_name = 2;
// Hash of the entire Policy spec.
string policy_hash = 3;
}

message ConcurrencyLimiterWrapper {
// Concurrency Limiter
policy.language.v1.ConcurrencyLimiter concurrency_limiter = 1;
Expand Down
8 changes: 8 additions & 0 deletions api/aperture/flowcontrol/v1/flowcontrol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ message CheckResponse {
repeated FluxMeter flux_meters = 4;
// flow label keys that were matched for this request.
repeated string flow_label_keys = 5;
// classifiers that were matched for this request.
repeated Classifier classifiers = 6;
}

// Reason contains fields that give further information about error or rejection.
Expand Down Expand Up @@ -83,3 +85,9 @@ message LimiterDecision {
message FluxMeter {
string flux_meter_name = 1;
}

// Classifier describes details for each Classifier.
message Classifier {
string policy_name = 1;
string policy_hash = 2;
}
41 changes: 39 additions & 2 deletions api/gen/openapiv2/aperture.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,15 @@ definitions:
[flow labels](/concepts/flow-control/label/label.md).
workload:
$ref: '#/definitions/SchedulerWorkload'
description: Workload associated with flows matching the label matcher.
description: Workload associated with requests matching the label matcher.
apertureflowcontrolv1Classifier:
type: object
properties:
policy_hash:
type: string
policy_name:
type: string
description: Classifier describes details for each Classifier.
apertureflowcontrolv1FluxMeter:
type: object
properties:
Expand Down Expand Up @@ -351,6 +359,30 @@ definitions:
Ratelimiting is done separately on per-label-value basis. Use _label\_key_
to select which label should be used as key.
title: Limits the traffic on a control point to specified rate
policylanguagev1Classifier:
type: object
properties:
rules:
type: object
additionalProperties:
$ref: '#/definitions/v1Rule'
description: A map of {key, value} pairs mapping from flow label names to rules that define how to extract and propagate them.
selector:
$ref: '#/definitions/v1Selector'
description: Defines where to apply the flow classification rule.
description: |-
Example:
```yaml
selector:
service: service1.default.svc.cluster.local
control_point:
traffic: ingress
rules:
user:
extractor:
from: request.http.headers.user
```
title: Set of classification rules sharing a common selector
policylanguagev1FluxMeter:
type: object
properties:
Expand Down Expand Up @@ -593,6 +625,11 @@ definitions:
v1CheckResponse:
type: object
properties:
classifiers:
type: array
items:
$ref: '#/definitions/apertureflowcontrolv1Classifier'
description: classifiers that were matched for this request.
decision_reason:
$ref: '#/definitions/v1DecisionReason'
description: reason contains information in the case of an error or rejection.
Expand Down Expand Up @@ -1577,7 +1614,7 @@ definitions:
classifiers:
type: array
items:
$ref: '#/definitions/v1Classifier'
$ref: '#/definitions/policylanguagev1Classifier'
description: |-
Classifiers are installed in the data-plane and are used to label the requests based on payload content.
Expand Down

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

332 changes: 210 additions & 122 deletions api/gen/proto/go/aperture/flowcontrol/v1/flowcontrol.pb.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions api/gen/proto/go/aperture/flowcontrol/v1/flowcontrol.pb.json.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/flowcontrol/envoy/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (h *Handler) Check(ctx context.Context, req *ext_authz.CheckRequest) (*ext_
},
LimiterDecisions: nil,
FluxMeters: nil,
Classifiers: nil,
}
}
marshalledCheckResponse, err := protoMessageAsPbValue(fcResponse)
Expand Down
8 changes: 7 additions & 1 deletion pkg/policies/controlplane/resources/classifier/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"path"

configv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/common/config/v1"
policylangv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/policy/language/v1"
etcdclient "github.com/fluxninja/aperture/pkg/etcd/client"
"github.com/fluxninja/aperture/pkg/log"
Expand Down Expand Up @@ -55,7 +56,12 @@ func NewClassifierOptions(
func (configSync *classifierConfigSync) doSync(etcdClient *etcdclient.Client, lifecycle fx.Lifecycle) error {
lifecycle.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
dat, err := proto.Marshal(configSync.classifierProto)
wrapper := &configv1.ClassifierWrapper{
PolicyName: configSync.policyBaseAPI.GetPolicyName(),
PolicyHash: configSync.policyBaseAPI.GetPolicyHash(),
Classifier: configSync.classifierProto,
}
dat, err := proto.Marshal(wrapper)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal classifier")
return err
Expand Down
37 changes: 29 additions & 8 deletions pkg/policies/dataplane/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type multiMatchResult struct {
concurrencyLimiters []iface.Limiter
fluxMeters []iface.FluxMeter
rateLimiters []iface.RateLimiter
classifiers []iface.Classifier
}

// multiMatcher is MultiMatcher instantiation used in this package.
Expand All @@ -33,6 +34,7 @@ func (result *multiMatchResult) populateFromMultiMatcher(mm *multimatcher.MultiM
result.concurrencyLimiters = append(result.concurrencyLimiters, resultCollection.concurrencyLimiters...)
result.fluxMeters = append(result.fluxMeters, resultCollection.fluxMeters...)
result.rateLimiters = append(result.rateLimiters, resultCollection.rateLimiters...)
result.classifiers = append(result.classifiers, resultCollection.classifiers...)
}

// ProvideEngineAPI Main fx app.
Expand Down Expand Up @@ -75,6 +77,16 @@ func (e *Engine) ProcessRequest(controlPoint selectors.ControlPoint, serviceIDs
}
response.FluxMeters = fluxMeterProtos

classifiers := mmr.classifiers
classifierProtos := make([]*flowcontrolv1.Classifier, len(classifiers))
for i, classifier := range classifiers {
classifierProtos[i] = &flowcontrolv1.Classifier{
PolicyName: classifier.GetClassifierID().PolicyName,
PolicyHash: classifier.GetClassifierID().PolicyHash,
}
}
response.Classifiers = classifierProtos

// execute rate limiters first
rateLimiters := make([]iface.Limiter, len(mmr.rateLimiters))
for i, rl := range mmr.rateLimiters {
Expand Down Expand Up @@ -151,10 +163,7 @@ func returnExtraTokens(
// RegisterConcurrencyLimiter adds concurrency limiter to multimatcher.
func (e *Engine) RegisterConcurrencyLimiter(cl iface.Limiter) error {
concurrencyLimiterMatchedCB := func(mmr multiMatchResult) multiMatchResult {
mmr.concurrencyLimiters = append(
mmr.concurrencyLimiters,
cl,
)
mmr.concurrencyLimiters = append(mmr.concurrencyLimiters, cl)
return mmr
}
return e.register("ConcurrencyLimiter:"+cl.GetLimiterID().String(), cl.GetSelector(), concurrencyLimiterMatchedCB)
Expand All @@ -179,10 +188,7 @@ func (e *Engine) RegisterFluxMeter(fm iface.FluxMeter) error {

// Save the fluxMeterAPI in multiMatchers
fluxMeterMatchedCB := func(mmr multiMatchResult) multiMatchResult {
mmr.fluxMeters = append(
mmr.fluxMeters,
fm,
)
mmr.fluxMeters = append(mmr.fluxMeters, fm)
return mmr
}

Expand Down Expand Up @@ -235,6 +241,21 @@ func (e *Engine) UnregisterRateLimiter(rl iface.RateLimiter) error {
return e.unregister("RateLimiter:"+rl.GetLimiterID().String(), selectorProto)
}

// RegisterClassifier adds classifier to multimatcher.
func (e *Engine) RegisterClassifier(c iface.Classifier) error {
classifierMatchedCB := func(mmr multiMatchResult) multiMatchResult {
mmr.classifiers = append(mmr.classifiers, c)
return mmr
}
return e.register("Classifier:"+c.GetClassifierID().String(), c.GetSelector(), classifierMatchedCB)
}

// UnregisterClassifier removes classifier from multimatcher.
func (e *Engine) UnregisterClassifier(c iface.Classifier) error {
selectorProto := c.GetSelector()
return e.unregister("Classifier:"+c.GetClassifierID().String(), selectorProto)
}

// getMatches returns schedulers and fluxmeters for given labels.
func (e *Engine) getMatches(controlPoint selectors.ControlPoint, serviceIDs []services.ServiceID, labels selectors.Labels) *multiMatchResult {
e.multiMatchersMutex.RLock()
Expand Down
22 changes: 22 additions & 0 deletions pkg/policies/dataplane/iface/classifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package iface

import selectorv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/common/selector/v1"

// ClassifierID is the ID of the Classifier.
type ClassifierID struct {
PolicyName string
PolicyHash string
}

// String function returns the ClassifierID as a string.
func (cID ClassifierID) String() string {
return "policy_name-" + cID.PolicyName + "-policy_hash-" + cID.PolicyHash
}

// Classifier interface.
type Classifier interface {
// GetSelector returns the selector.
GetSelector() *selectorv1.Selector
// GetClassifierID returns ClassifierID object that should uniquely identify classifier.
GetClassifierID() ClassifierID
}
5 changes: 5 additions & 0 deletions pkg/policies/dataplane/iface/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ type Engine interface {

RegisterRateLimiter(l RateLimiter) error
UnregisterRateLimiter(l RateLimiter) error

RegisterClassifier(c Classifier) error
UnregisterClassifier(c Classifier) error
}

// MultiMatchResult is used as return value of PolicyConfigAPI.GetMatches.
type MultiMatchResult struct {
ConcurrencyLimiters []Limiter
FluxMeters []FluxMeter
RateLimiters []RateLimiter
Classifiers []Classifier
}

// PopulateFromMultiMatcher populates result object with results from MultiMatcher.
Expand All @@ -39,4 +43,5 @@ func (result *MultiMatchResult) PopulateFromMultiMatcher(mm *multimatcher.MultiM
result.ConcurrencyLimiters = append(result.ConcurrencyLimiters, resultCollection.ConcurrencyLimiters...)
result.FluxMeters = append(result.FluxMeters, resultCollection.FluxMeters...)
result.RateLimiters = append(result.RateLimiters, resultCollection.RateLimiters...)
result.Classifiers = append(result.Classifiers, resultCollection.Classifiers...)
}
28 changes: 24 additions & 4 deletions pkg/policies/dataplane/resources/classifier/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
classificationv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/policy/language/v1"
"github.com/fluxninja/aperture/pkg/log"
"github.com/fluxninja/aperture/pkg/multimatcher"
"github.com/fluxninja/aperture/pkg/policies/dataplane/iface"
"github.com/fluxninja/aperture/pkg/policies/dataplane/resources/classifier/extractors"
"github.com/fluxninja/aperture/pkg/selectors"
"github.com/fluxninja/aperture/pkg/services"
Expand Down Expand Up @@ -64,10 +65,13 @@ type labeler struct {
// Classifier receives classification policies and provides Classify method.
type Classifier struct {
// storing activeRules underneath
mu sync.Mutex
activeRules atomic.Value
activeRulesets map[rulesetID]CompiledRuleset // protected by mu
nextRulesetID rulesetID // protected by mu
mu sync.Mutex
activeRules atomic.Value
activeRulesets map[rulesetID]CompiledRuleset // protected by mu
nextRulesetID rulesetID // protected by mu
classifierProto *classificationv1.Classifier
policyName string
policyHash string
}

type rulesetID = uint64
Expand Down Expand Up @@ -266,6 +270,22 @@ func (c *Classifier) AddRules(
return ActiveRuleset{id: id, classifier: c}, nil
}

// GetSelector returns the selector.
func (c *Classifier) GetSelector() *selectorv1.Selector {
if c.classifierProto != nil {
return c.classifierProto.GetSelector()
}
return nil
}

// GetClassifierID returns ClassifierID object that should uniquely identify classifier.
func (c *Classifier) GetClassifierID() iface.ClassifierID {
return iface.ClassifierID{
PolicyName: c.policyName,
PolicyHash: c.policyHash,
}
}

// ActiveRuleset represents one of currently active set of rules.
type ActiveRuleset struct {
classifier *Classifier
Expand Down
Loading

0 comments on commit 062b630

Please sign in to comment.