Skip to content

Commit

Permalink
Quota increase factor and other minor adjustments (#737)
Browse files Browse the repository at this point in the history
* Add token increase factor

* Get tokens correctly for batch requests

* Don't impose quota on Admin requests

* Add quota_increase_factor flag

* Add quota_dry_run mode
  • Loading branch information
codingllama authored Jul 14, 2017
1 parent 9ad16f8 commit e1d124f
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 58 deletions.
49 changes: 36 additions & 13 deletions log/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,26 @@ var (
seqStoreRootLatency monitoring.Histogram
seqCommitLatency monitoring.Histogram
seqCounter monitoring.Counter

// QuotaIncreaseFactor is the multiplier used for the number of tokens added back to
// sequencing-based quotas. The resulting PutTokens call is equivalent to
// "PutTokens(_, numLeaves * QuotaIncreaseFactor, _)".
// A factor >1 adds resilience to token leakage, on the risk of a system that's overly
// optimistic in face of true token shortages. The higher the factor, the higher the quota
// "optimism" is. A factor that's too high (say, >1.5) is likely a sign that the quota
// configuration should be changed instead.
// A factor <1 WILL lead to token shortages, therefore it'll be normalized to 1.
QuotaIncreaseFactor = 1.1
)

func quotaIncreaseFactor() float64 {
if QuotaIncreaseFactor < 1 {
QuotaIncreaseFactor = 1
return 1
}
return QuotaIncreaseFactor
}

func createMetrics(mf monitoring.MetricFactory) {
if mf == nil {
mf = monitoring.InertMetricFactory{}
Expand Down Expand Up @@ -256,7 +274,8 @@ func (s Sequencer) SequenceBatch(ctx context.Context, logID int64, limit int, gu

// There might be no work to be done. But we possibly still need to create an signed root if the
// current one is too old. If there's work to be done then we'll be creating a root anyway.
if len(leaves) == 0 {
numLeaves := len(leaves)
if numLeaves == 0 {
nowNanos := s.timeSource.Now().UnixNano()
interval := time.Duration(nowNanos - currentRoot.TimestampNanos)
if maxRootDurationInterval == 0 || interval < maxRootDurationInterval {
Expand Down Expand Up @@ -292,8 +311,8 @@ func (s Sequencer) SequenceBatch(ctx context.Context, logID int64, limit int, gu
stageStart = s.timeSource.Now()

// We should still have the same number of leaves
if want, got := len(leaves), len(sequencedLeaves); want != got {
return 0, fmt.Errorf("%v: wanted: %v leaves after sequencing but we got: %v", logID, want, got)
if want := len(sequencedLeaves); numLeaves != want {
return 0, fmt.Errorf("%v: wanted: %v leaves after sequencing but we got: %v", logID, want, numLeaves)
}

// Write the new sequence numbers to the leaves in the DB
Expand Down Expand Up @@ -360,18 +379,22 @@ func (s Sequencer) SequenceBatch(ctx context.Context, logID int64, limit int, gu
// TODO(codingllama): Consider adding a source-aware replenish method
// (eg, qm.Replenish(ctx, tokens, specs, quota.SequencerSource)), so there's no ambiguity as to
// where the tokens come from.
if err := s.qm.PutTokens(ctx, len(leaves), []quota.Spec{
{Group: quota.Tree, Kind: quota.Read, TreeID: logID},
{Group: quota.Tree, Kind: quota.Write, TreeID: logID},
{Group: quota.Global, Kind: quota.Read},
{Group: quota.Global, Kind: quota.Write},
}); err != nil {
glog.Warningf("Failed to replenish tokens for tree %v: %v", logID, err)
if numLeaves > 0 {
tokens := int(float64(numLeaves) * quotaIncreaseFactor())
glog.V(2).Infof("Replenishing %v tokens for tree %v (numLeaves = %v)", tokens, logID, leaves)
if err := s.qm.PutTokens(ctx, tokens, []quota.Spec{
{Group: quota.Tree, Kind: quota.Read, TreeID: logID},
{Group: quota.Tree, Kind: quota.Write, TreeID: logID},
{Group: quota.Global, Kind: quota.Read},
{Group: quota.Global, Kind: quota.Write},
}); err != nil {
glog.Warningf("Failed to replenish %v tokens for tree %v: %v", tokens, logID, err)
}
}

seqCounter.Add(float64(len(leaves)), label)
glog.Infof("%v: sequenced %v leaves, size %v, tree-revision %v", logID, len(leaves), newLogRoot.TreeSize, newLogRoot.TreeRevision)
return len(leaves), nil
seqCounter.Add(float64(numLeaves), label)
glog.Infof("%v: sequenced %v leaves, size %v, tree-revision %v", logID, numLeaves, newLogRoot.TreeSize, newLogRoot.TreeRevision)
return numLeaves, nil
}

// SignRoot wraps up all the operations for creating a new log signed root.
Expand Down
125 changes: 118 additions & 7 deletions log/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ func TestSequenceBatch(t *testing.T) {
dequeuedLeaves: noLeaves,
skipStoreSignedRoot: true,
},
wantCount: 0,
},
{
desc: "nothing-queued-within-max",
Expand All @@ -339,7 +338,6 @@ func TestSequenceBatch(t *testing.T) {
skipStoreSignedRoot: true,
},
maxRootDuration: 15 * time.Millisecond,
wantCount: 0,
},
{
desc: "nothing-queued-after-max",
Expand All @@ -356,7 +354,6 @@ func TestSequenceBatch(t *testing.T) {
storeSignedRoot: &newRoot16,
},
maxRootDuration: 9 * time.Millisecond,
wantCount: 0,
},
{
desc: "nothing-queued-on-max",
Expand All @@ -373,7 +370,6 @@ func TestSequenceBatch(t *testing.T) {
storeSignedRoot: &newRoot16,
},
maxRootDuration: 10 * time.Millisecond,
wantCount: 0,
},
{
// Tests that the guard interval is being passed to storage correctly.
Expand Down Expand Up @@ -514,12 +510,13 @@ func TestSequenceBatch(t *testing.T) {
for _, test := range tests {
func() {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

qm := quota.NewMockManager(ctrl)
test.params.qm = qm
if test.wantCount > 0 {
qm := quota.NewMockManager(ctrl)
qm.EXPECT().PutTokens(gomock.Any(), test.wantCount, specs).Return(nil)
test.params.qm = qm
}
defer ctrl.Finish()
c, ctx := createTestContext(ctrl, test.params)

got, err := c.sequencer.SequenceBatch(ctx, test.params.logID, 1, test.guardWindow, test.maxRootDuration)
Expand All @@ -538,6 +535,120 @@ func TestSequenceBatch(t *testing.T) {
}
}

func TestSequenceBatch_PutTokens(t *testing.T) {
cryptoSigner, err := newSignerWithFixedSig(expectedSignedRoot.Signature)
if err != nil {
t.Fatalf("Failed to create test signer (%v)", err)
}

ctrl := gomock.NewController(t)
defer ctrl.Finish()

// Needed to create a signer
hasher := rfc6962.DefaultHasher
ts := util.NewFakeTimeSource(fakeTimeForTest)
signer := crypto.NewSHA256Signer(cryptoSigner)

// Needed for SequenceBatch calls
const treeID int64 = 1234
const limit = 1000
const guardWindow = 10 * time.Second
const maxRootDuration = 1 * time.Hour

// Expected PutTokens specs
specs := []quota.Spec{
{Group: quota.Tree, Kind: quota.Read, TreeID: treeID},
{Group: quota.Tree, Kind: quota.Write, TreeID: treeID},
{Group: quota.Global, Kind: quota.Read},
{Group: quota.Global, Kind: quota.Write},
}

oneHundredLeaves := make([]*trillian.LogLeaf, 100)
for i := range oneHundredLeaves {
oneHundredLeaves[i] = &trillian.LogLeaf{
LeafValue: []byte(fmt.Sprintf("leaf-%v", i)),
}
}

tests := []struct {
desc string
leaves []*trillian.LogLeaf
quotaFactor float64
wantLeaves, wantTokens int
}{
{desc: "noLeaves"},
{
desc: "singleLeaf",
leaves: []*trillian.LogLeaf{getLeaf42()},
wantLeaves: 1,
wantTokens: 1,
},
{
desc: "badFactor",
leaves: oneHundredLeaves,
quotaFactor: 0.7, // factor <1 is normalized to 1
wantLeaves: 100,
wantTokens: 100,
},
{
desc: "factorOne",
leaves: oneHundredLeaves,
quotaFactor: 1,
wantLeaves: 100,
wantTokens: 100,
},
{
desc: "10%-factor",
leaves: oneHundredLeaves,
quotaFactor: 1.1,
wantLeaves: 100,
wantTokens: 110,
},
}

any := gomock.Any()
ctx := context.Background()
for _, test := range tests {
func() {
if test.quotaFactor != 0 {
defer func(qf float64) {
QuotaIncreaseFactor = qf
}(QuotaIncreaseFactor)
QuotaIncreaseFactor = test.quotaFactor
}

// Correctness of operation is tested elsewhere. The focus here is the interaction
// between Sequencer and quota.Manager.
logTX := storage.NewMockLogTreeTX(ctrl)
logTX.EXPECT().DequeueLeaves(any, any, any).Return(test.leaves, nil)
logTX.EXPECT().LatestSignedLogRoot(any).Return(testRoot16, nil)
logTX.EXPECT().WriteRevision().AnyTimes().Return(testRoot16.TreeRevision + 1)
logTX.EXPECT().UpdateSequencedLeaves(any, any).AnyTimes().Return(nil)
logTX.EXPECT().SetMerkleNodes(any, any).AnyTimes().Return(nil)
logTX.EXPECT().StoreSignedLogRoot(any, any).AnyTimes().Return(nil)
logTX.EXPECT().Commit().Return(nil)
logTX.EXPECT().Close().Return(nil)
logStorage := storage.NewMockLogStorage(ctrl)
logStorage.EXPECT().BeginForTree(any, any).Return(logTX, nil)

qm := quota.NewMockManager(ctrl)
if test.wantTokens > 0 {
qm.EXPECT().PutTokens(any, test.wantTokens, specs)
}

sequencer := NewSequencer(hasher, ts, logStorage, signer, nil /* mf */, qm)
leaves, err := sequencer.SequenceBatch(ctx, treeID, limit, guardWindow, maxRootDuration)
if err != nil {
t.Errorf("%v: SequenceBatch() returned err = %v", test.desc, err)
return
}
if leaves != test.wantLeaves {
t.Errorf("%v: SequenceBatch() returned %v leaves, want = %v", test.desc, leaves, test.wantLeaves)
}
}()
}
}

func TestSignRoot(t *testing.T) {
signer0, err := newSignerWithFixedSig(expectedSignedRoot0.Signature)
if err != nil {
Expand Down
Loading

0 comments on commit e1d124f

Please sign in to comment.