Skip to content

Commit

Permalink
Merge pull request #202 from splitio/SDKS-9120-impressoion-per-toggle
Browse files Browse the repository at this point in the history
[SDKS-9120] Impression per toggle implementation
  • Loading branch information
sanzmauro authored Jan 14, 2025
2 parents 678a6e7 + 1d29216 commit 07f0338
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 16 deletions.
1 change: 1 addition & 0 deletions dtos/impression.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Impression struct {
ChangeNumber int64 `json:"c"`
Time int64 `json:"m"`
Pt int64 `json:"pt,omitempty"`
Disabled bool `json:"-"`
}

// ImpressionQueueObject struct mapping impressions
Expand Down
1 change: 1 addition & 0 deletions dtos/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type SplitDTO struct {
Conditions []ConditionDTO `json:"conditions"`
Configurations map[string]string `json:"configurations"`
Sets []string `json:"sets"`
ImpressionsDisabled bool `json:"impressionsDisabled"`
}

// MarshalBinary exports SplitDTO to JSON string
Expand Down
29 changes: 16 additions & 13 deletions engine/evaluator/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ const (
// Result represents the result of an evaluation, including the resulting treatment, the label for the impression,
// the latency and error if any
type Result struct {
Treatment string
Label string
EvaluationTime time.Duration
SplitChangeNumber int64
Config *string
Treatment string
Label string
EvaluationTime time.Duration
SplitChangeNumber int64
Config *string
ImpressionsDisabled bool
}

// Results represents the result of multiple evaluations at once
Expand Down Expand Up @@ -84,10 +85,11 @@ func (e *Evaluator) evaluateTreatment(key string, bucketingKey string, featureFl
}

return &Result{
Treatment: split.DefaultTreatment(),
Label: impressionlabels.Killed,
SplitChangeNumber: split.ChangeNumber(),
Config: config,
Treatment: split.DefaultTreatment(),
Label: impressionlabels.Killed,
SplitChangeNumber: split.ChangeNumber(),
Config: config,
ImpressionsDisabled: split.ImpressionsDisabled(),
}
}

Expand All @@ -108,10 +110,11 @@ func (e *Evaluator) evaluateTreatment(key string, bucketingKey string, featureFl
}

return &Result{
Treatment: *treatment,
Label: label,
SplitChangeNumber: split.ChangeNumber(),
Config: config,
Treatment: *treatment,
Label: label,
SplitChangeNumber: split.ChangeNumber(),
Config: config,
ImpressionsDisabled: split.ImpressionsDisabled(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions engine/grammar/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,7 @@ func (s *Split) ChangeNumber() int64 {
func (s *Split) Configurations() map[string]string {
return s.splitData.Configurations
}

func (s *Split) ImpressionsDisabled() bool {
return s.splitData.ImpressionsDisabled
}
29 changes: 29 additions & 0 deletions provisional/impmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,30 @@ import (
type ImpressionManager interface {
ProcessImpressions(impressions []dtos.Impression) ([]dtos.Impression, []dtos.Impression)
ProcessSingle(impression *dtos.Impression) bool
Process(impressions []dtos.Impression, listenerEnabled bool) ([]dtos.Impression, []dtos.Impression)
}

// ImpressionManagerImpl implements
type ImpressionManagerImpl struct {
processStrategy strategy.ProcessStrategyInterface
keyTracker strategy.ProcessStrategyInterface
}

// DEPRECATED
// NewImpressionManager creates new ImpManager
func NewImpressionManager(processStrategy strategy.ProcessStrategyInterface) ImpressionManager {
return &ImpressionManagerImpl{
processStrategy: processStrategy,
}
}

func NewImpressionManagerImp(none *strategy.NoneImpl, processStrategy strategy.ProcessStrategyInterface) ImpressionManager {
return &ImpressionManagerImpl{
processStrategy: processStrategy,
keyTracker: none,
}
}

// ProcessImpressions bulk processes
func (i *ImpressionManagerImpl) ProcessImpressions(impressions []dtos.Impression) ([]dtos.Impression, []dtos.Impression) {
return i.processStrategy.Apply(impressions)
Expand All @@ -33,3 +43,22 @@ func (i *ImpressionManagerImpl) ProcessImpressions(impressions []dtos.Impression
func (i *ImpressionManagerImpl) ProcessSingle(impression *dtos.Impression) bool {
return i.processStrategy.ApplySingle(impression)
}

func (i *ImpressionManagerImpl) Process(impressions []dtos.Impression, listenerEnabled bool) ([]dtos.Impression, []dtos.Impression) {
forLog := make([]dtos.Impression, 0, len(impressions))
forListener := make([]dtos.Impression, 0, len(impressions))

for index := range impressions {
if impressions[index].Disabled {
i.keyTracker.ApplySingle(&impressions[index])
} else if i.processStrategy.ApplySingle(&impressions[index]) {
forLog = append(forLog, impressions[index])
}
}

if listenerEnabled {
forListener = impressions
}

return forLog, forListener
}
62 changes: 62 additions & 0 deletions provisional/impmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,65 @@ func TestImpManagerRedis(t *testing.T) {
t.Error("It should have pt")
}
}

func TestProcess(t *testing.T) {
observer, _ := strategy.NewImpressionObserver(5000)
debug := strategy.NewDebugImpl(observer, true)
filter := filter.NewBloomFilter(3000, 0.01)
uniqueTracker := strategy.NewUniqueKeysTracker(filter)
counter := strategy.NewImpressionsCounter()
none := strategy.NewNoneImpl(counter, uniqueTracker, false)

now := time.Now().UTC().UnixNano()
impressions := []dtos.Impression{
{
BucketingKey: "someBucketingKey",
ChangeNumber: 123456789,
FeatureName: "someFeature",
KeyName: "someKey",
Label: "someLabel",
Time: now,
Treatment: "someTreatment",
Disabled: true,
},
{
BucketingKey: "someBucketingKey",
ChangeNumber: 123456789,
FeatureName: "harnessFlag",
KeyName: "someKey",
Label: "someLabel",
Time: now,
Treatment: "someTreatment",
Disabled: true,
},
{
BucketingKey: "someBucketingKey",
ChangeNumber: 123456789,
FeatureName: "featureTest",
KeyName: "someKey",
Label: "someLabel",
Time: now,
Treatment: "someTreatment",
Disabled: false,
},
}

impManager := NewImpressionManagerImp(none, debug)
impressionsForLog, impressionsForListener := impManager.Process(impressions, true)
if len(impressionsForListener) != 3 {
t.Error("Impressions for Listener should be 3. Actual: ", len(impressionsForListener))
}
if len(impressionsForLog) != 1 {
t.Error("Impressions for Log should be 3. Actual: ", len(impressionsForLog))
}

impManager = NewImpressionManagerImp(none, none)

impressionsForLog, impressionsForListener = impManager.Process(impressions, false)
if len(impressionsForListener) != 0 {
t.Error("Impressions for Listener should be 0. Actual: ", len(impressionsForListener))
}
if len(impressionsForLog) != 0 {
t.Error("Impressions for Log should be 1. Actual: ", len(impressionsForLog))
}
}
4 changes: 3 additions & 1 deletion provisional/strategy/none.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type NoneImpl struct {
}

// NewNoneImpl creates new NoneImpl.
func NewNoneImpl(impressionCounter *ImpressionsCounter, uniqueKeysTracker UniqueKeysTracker, listenerEnabled bool) ProcessStrategyInterface {
func NewNoneImpl(impressionCounter *ImpressionsCounter, uniqueKeysTracker UniqueKeysTracker, listenerEnabled bool) *NoneImpl {
return &NoneImpl{
impressionsCounter: impressionCounter,
uniqueKeysTracker: uniqueKeysTracker,
Expand Down Expand Up @@ -51,3 +51,5 @@ func (s *NoneImpl) ApplySingle(impression *dtos.Impression) bool {

return s.apply(impression, now)
}

var _ ProcessStrategyInterface = (*NoneImpl)(nil)
3 changes: 1 addition & 2 deletions provisional/strategy/optimized.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (s *OptimizedImpl) apply(impression *dtos.Impression, now int64) bool {
}

if impression.Pt == 0 || impression.Pt < util.TruncateTimeFrame(now) {
s.runtimeTelemetry.RecordImpressionsStats(telemetry.ImpressionsDeduped, 1)
return true
}

Expand All @@ -56,8 +57,6 @@ func (s *OptimizedImpl) Apply(impressions []dtos.Impression) ([]dtos.Impression,
forListener = impressions
}

s.runtimeTelemetry.RecordImpressionsStats(telemetry.ImpressionsDeduped, int64(len(impressions)-len(forLog)))

return forLog, forListener
}

Expand Down

0 comments on commit 07f0338

Please sign in to comment.