Skip to content

Commit

Permalink
lfu cache policy
Browse files Browse the repository at this point in the history
  • Loading branch information
yifeng zhang committed Nov 23, 2021
1 parent 72d3e38 commit 58de4de
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 14 deletions.
2 changes: 1 addition & 1 deletion etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (sws *serverWatchStream) sendLoop() {
opt := mvcc.RangeOptions{Rev: evs[i].Kv.ModRevision - 1}
r, err := sws.watchable.Range(evs[i].Kv.Key, nil, opt)
if err == nil && len(r.KVs) != 0 {
events[i].PrevKv = &(r.KVs[0])
events[i].PrevKv = r.KVs[0]
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu
}
if p.PrevKv {
if rr != nil && len(rr.KVs) != 0 {
resp.PrevKv = &rr.KVs[0]
resp.PrevKv = rr.KVs[0]
}
}

Expand All @@ -245,7 +245,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
if rr != nil {
resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
resp.PrevKvs[i] = &rr.KVs[i]
resp.PrevKvs[i] = rr.KVs[i]
}
}
}
Expand Down Expand Up @@ -346,7 +346,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
if r.KeysOnly {
rr.KVs[i].Value = nil
}
resp.Kvs[i] = &rr.KVs[i]
resp.Kvs[i] = rr.KVs[i]
}
trace.Step("assemble the response")
return resp, nil
Expand Down Expand Up @@ -463,7 +463,7 @@ func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
// nil == empty string in grpc; no way to represent missing value
return false
}
return compareKV(c, mvccpb.KeyValue{})
return compareKV(c, &mvccpb.KeyValue{})
}
for _, kv := range rr.KVs {
if !compareKV(c, kv) {
Expand All @@ -473,7 +473,7 @@ func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
return true
}

func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
func compareKV(c *pb.Compare, ckv *mvccpb.KeyValue) bool {
var result int
rev := int64(0)
switch c.Target {
Expand Down Expand Up @@ -868,7 +868,7 @@ func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantRes
return resp, err
}

type kvSort struct{ kvs []mvccpb.KeyValue }
type kvSort struct{ kvs []*mvccpb.KeyValue }

func (s *kvSort) Swap(i, j int) {
t := s.kvs[i]
Expand Down Expand Up @@ -1019,7 +1019,7 @@ func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
j := 0
for i := range rr.KVs {
rr.KVs[j] = rr.KVs[i]
if !isPrunable(&rr.KVs[i]) {
if !isPrunable(rr.KVs[i]) {
j++
}
}
Expand Down
2 changes: 1 addition & 1 deletion mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type RangeOptions struct {
}

type RangeResult struct {
KVs []mvccpb.KeyValue
KVs []*mvccpb.KeyValue
Rev int64
Count int
}
Expand Down
4 changes: 2 additions & 2 deletions mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ func TestKVRestore(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
tt(s)
var kvss [][]mvccpb.KeyValue
var kvss [][]*mvccpb.KeyValue
for k := int64(0); k < 10; k++ {
r, _ := s.Range([]byte("a"), []byte("z"), RangeOptions{Rev: k})
kvss = append(kvss, r.KVs)
Expand All @@ -652,7 +652,7 @@ func TestKVRestore(t *testing.T) {

// wait for possible compaction to finish
testutil.WaitSchedule()
var nkvss [][]mvccpb.KeyValue
var nkvss [][]*mvccpb.KeyValue
for k := int64(0); k < 10; k++ {
r, _ := ns.Range([]byte("a"), []byte("z"), RangeOptions{Rev: k})
nkvss = append(nkvss, r.KVs)
Expand Down
3 changes: 3 additions & 0 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type store struct {

ig ConsistentIndexGetter

cache *storeCache

b backend.Backend
kvindex index

Expand Down Expand Up @@ -125,6 +127,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI
b: b,
ig: ig,
kvindex: newTreeIndex(lg),
cache: newStoreCache(),

le: le,

Expand Down
162 changes: 162 additions & 0 deletions mvcc/kvstore_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package mvcc

import (
"fmt"
"go.etcd.io/etcd/mvcc/mvccpb"
"math"
"strconv"
"sync"
"time"
)

const (
// lfuSample is the number of items to sample when looking at eviction
// candidates. 5 seems to be the most optimal number.
lfuSample = 5
// default estimator counter size
defaultEstimatorCapacity = 100
// default cache cost upper limit
defaultCostCapacity = 100 * 1024 * 1024
//costThreshold = 1024 * 1024 * 1024
)

type storeCache struct {
lock *sync.RWMutex
costEstimator costEstimator
cache map[revision]*mvccpb.KeyValue
currentCost int
costCapacity int
lastUpdate time.Time
cachedReq string
}

type costEstimator struct {
lock *sync.Mutex
bound int
counter map[string]int
maxCostReq string
}

func newStoreCache() *storeCache {
c := new(storeCache)
c.costEstimator = costEstimator{
lock: new(sync.Mutex),
bound: defaultEstimatorCapacity,
counter: make(map[string]int),
}
c.cache = make(map[revision]*mvccpb.KeyValue)
c.lock = new(sync.RWMutex)
c.costCapacity = defaultCostCapacity
c.lastUpdate = time.Time{}
return c
}

func (c *storeCache) get(key revision) (*mvccpb.KeyValue, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
e, ok := c.cache[key]
return e, ok
}

func (c *storeCache) set(key revision, value *mvccpb.KeyValue, cost int) bool {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.cache[key]; !ok {
return c.unsafeSet(key, value, cost)
}
return true
}

func (c *storeCache) unsafeSet(key revision, value *mvccpb.KeyValue, cost int) bool {
if c.currentCost < c.costCapacity-cost {
c.cache[key] = value
c.currentCost += cost
//fmt.Println("capacity success")
//fmt.Println(c.currentCost)
//fmt.Println(cost)
return true
} else {
fmt.Println("capacity full, can't cache")
fmt.Println(c.currentCost)
fmt.Println(cost)
return false
}
}

func (c *storeCache) hasCachedRequest(request string) bool {
c.lock.RLock()
defer c.lock.RUnlock()
return c.cachedReq == request
}

func (c *storeCache) updateCacheIfNecessary(request string, cost int, rvs []revision, kvs []*mvccpb.KeyValue) {
if c.costEstimator.updateRequestCosts(request, cost) {
c.lock.Lock()
defer c.lock.Unlock()
if time.Since(c.lastUpdate) >= 1*time.Minute && request != c.cachedReq {
c.clear()
for i, r := range rvs {
fmt.Println("caching item " + strconv.Itoa(i))
if !c.unsafeSet(r, kvs[i], kvs[i].Size()) {
break
}
}
c.cachedReq = request
fmt.Println("caching updated " + strconv.Itoa(c.currentCost))
}
}
}

func (c *storeCache) clear() {
c.cache = make(map[revision]*mvccpb.KeyValue)
c.currentCost = 0
c.cachedReq = ""
c.lastUpdate = time.Now()
c.costEstimator.clear()
}

func (c *costEstimator) updateRequestCosts(request string, cost int) bool {
c.lock.Lock()
defer c.lock.Unlock()
if co, ok := c.counter[request]; ok {
c.counter[request] = co + cost
} else {
if len(c.counter) > c.bound {
c.evict()
}
c.counter[request] = cost
}
if c.counter[request] > c.counter[c.maxCostReq] {
c.maxCostReq = request
return true
}
return false
}

func (c *costEstimator) evict() {
minCost := math.MaxInt16
minCostReq := ""
i := 0
for req, cost := range c.counter {
if cost < minCost {
minCost = cost
minCostReq = req
}
i++
if i == lfuSample {
break
}
}
delete(c.counter, minCostReq)
return
}

func (c *costEstimator) clear() {
c.lock.Lock()
defer c.lock.Unlock()
c.counter = make(map[string]int)
}

// can be used to optimize to reduce counter periodically like tiny lfu
func (c *costEstimator) reset() {
}
25 changes: 23 additions & 2 deletions mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap"
"strconv"
)

type storeTxnRead struct {
Expand Down Expand Up @@ -140,9 +141,25 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
limit = len(revpairs)
}

kvs := make([]mvccpb.KeyValue, limit)
kvs := make([]*mvccpb.KeyValue, limit)
revBytes := newRevBytes()
costs := 0
cached := tr.s.cache.hasCachedRequest(string(key))
if !cached {
plog.Warning("caching not found:" + string(key))
}
for i, revpair := range revpairs[:len(kvs)] {
if cached {
v, ok := tr.s.cache.get(revpair)
if ok {
kvs[i] = v
costs += kvs[i].Size()
continue
} else {
plog.Warning("cache miss:" + strconv.Itoa(i))
}
}

revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
if len(vs) != 1 {
Expand All @@ -156,7 +173,8 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
plog.Fatalf("range cannot find rev (%d,%d)", revpair.main, revpair.sub)
}
}
if err := kvs[i].Unmarshal(vs[0]); err != nil {
keyValue := mvccpb.KeyValue{}
if err := keyValue.Unmarshal(vs[0]); err != nil {
if tr.s.lg != nil {
tr.s.lg.Fatal(
"failed to unmarshal mvccpb.KeyValue",
Expand All @@ -166,7 +184,10 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions
plog.Fatalf("cannot unmarshal event: %v", err)
}
}
kvs[i] = &keyValue
costs += kvs[i].Size()
}
tr.s.cache.updateCacheIfNecessary(string(key), costs, revpairs[:len(kvs)], kvs)
tr.trace.Step("range keys from bolt db")
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion tools/benchmark/cmd/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func putFunc(cmd *cobra.Command, args []string) {
} else {
binary.PutVarint(k, int64(rand.Intn(keySpaceSize)))
}
requests <- v3.OpPut(string(k), v)
requests <- v3.OpPut("a"+string(k), v)
}
close(requests)
}()
Expand Down

0 comments on commit 58de4de

Please sign in to comment.