Skip to content

Commit

Permalink
Impl kvstore map gc
Browse files Browse the repository at this point in the history
  • Loading branch information
xorkevin committed Sep 3, 2023
1 parent c53532e commit aeedade
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 12 deletions.
134 changes: 125 additions & 9 deletions service/kvstore/map.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
package kvstore

import (
"container/heap"
"context"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"xorkevin.dev/governor/util/ksync"
"xorkevin.dev/kerrors"
)

type (
Map struct {
store map[string]mapEntry
mu sync.Mutex
store map[string]mapValue
gcCandidates mapHeap
minTime int64
mu sync.Mutex
minTimeA atomic.Int64
}

mapEntry struct {
mapValue struct {
val string
expire time.Time
}

mapKey struct {
key string
expire time.Time
}
)

func NewMap() *Map {
return &Map{
store: map[string]mapEntry{},
store: map[string]mapValue{},
}
}

Expand Down Expand Up @@ -56,6 +67,18 @@ func (s *Map) GetInt(ctx context.Context, key string) (int64, error) {
return num, nil
}

func (s *Map) lockedAddGCCandidate(key string, exp time.Time) {
s.gcCandidates.Add(mapKey{
key: key,
expire: exp,
})
t := exp.Unix() + 1
if s.minTime == 0 || t < s.minTime {
s.minTime = t
s.minTimeA.Store(t)
}
}

func (s *Map) Set(ctx context.Context, key, val string, duration time.Duration) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -64,10 +87,12 @@ func (s *Map) Set(ctx context.Context, key, val string, duration time.Duration)
delete(s.store, key)
return nil
}
s.store[key] = mapEntry{
exp := time.Now().Add(duration)
s.store[key] = mapValue{
val: val,
expire: time.Now().Add(duration),
expire: exp,
}
s.lockedAddGCCandidate(key, exp)
return nil
}

Expand All @@ -82,10 +107,12 @@ func (s *Map) SetNX(ctx context.Context, key, val string, duration time.Duration
delete(s.store, key)
return true, nil
}
s.store[key] = mapEntry{
exp := time.Now().Add(duration)
s.store[key] = mapValue{
val: val,
expire: time.Now().Add(duration),
expire: exp,
}
s.lockedAddGCCandidate(key, exp)
return true, nil
}

Expand All @@ -108,7 +135,7 @@ func (s *Map) Incr(ctx context.Context, key string, delta int64) (int64, error)

v, ok := s.store[key]
if !ok || v.expire.Before(time.Now()) {
v = mapEntry{
v = mapValue{
val: "0",
// incr does not add an expiration, so set a value far in the future
expire: time.Now().Add(24 * time.Hour),
Expand All @@ -121,6 +148,7 @@ func (s *Map) Incr(ctx context.Context, key string, delta int64) (int64, error)
num++
v.val = strconv.FormatInt(num, 10)
s.store[key] = v
s.lockedAddGCCandidate(key, v.expire)
return num, nil
}

Expand All @@ -139,6 +167,7 @@ func (s *Map) Expire(ctx context.Context, key string, duration time.Duration) er
}
v.expire = time.Now().Add(duration)
s.store[key] = v
s.lockedAddGCCandidate(key, v.expire)
return nil
}

Expand All @@ -162,6 +191,42 @@ func (s *Map) Subtree(prefix string) KVStore {
}
}

func (s *Map) GC(ctx context.Context) {
now := time.Now()
if t := s.minTimeA.Load(); t == 0 || t > now.Unix() {
return
}

s.mu.Lock()
defer s.mu.Unlock()

for {
k, ok := s.gcCandidates.Peek()
if !ok || k.expire.After(now) {
break
}
s.gcCandidates.Remove()

if v, ok := s.store[k.key]; ok && v.expire.Before(now) {
delete(s.store, k.key)
}
}
}

func (s *Map) Ticker(ctx context.Context, wg *ksync.WaitGroup, d time.Duration) {
defer wg.Done()
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.GC(ctx)
}
}
}

type (
mapMulti struct {
m *Map
Expand Down Expand Up @@ -331,3 +396,54 @@ func (t *mapMulti) Exec(ctx context.Context) error {
}
return nil
}

type (
mapHeap struct {
tree []mapKey
}
)

func (h mapHeap) Len() int {
return len(h.tree)
}

func (h mapHeap) Less(i, j int) bool {
return h.tree[i].expire.Before(h.tree[j].expire)
}

func (h mapHeap) Swap(i, j int) {
h.tree[i], h.tree[j] = h.tree[j], h.tree[i]
}

func (h *mapHeap) Push(x any) {
k := x.(mapKey)
h.tree = append(h.tree, k)
}

func (h *mapHeap) Pop() any {
n := len(h.tree)
k := h.tree[n-1]
h.tree = h.tree[:n-1]
return k
}

func (h *mapHeap) Add(v mapKey) {
heap.Push(h, v)
}

func (h *mapHeap) Remove() (mapKey, bool) {
if len(h.tree) == 0 {
var k mapKey
return k, false
}
k := heap.Pop(h).(mapKey)
return k, true
}

func (h *mapHeap) Peek() (mapKey, bool) {
if len(h.tree) == 0 {
var k mapKey
return k, false
}
return h.tree[0], true
}
4 changes: 1 addition & 3 deletions util/ksync/ksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ type (
// NewWaitGroup creates a new [WaitGroup]
func NewWaitGroup() *WaitGroup {
return &WaitGroup{
done: make(chan struct{}),
once: sync.Once{},
count: atomic.Int32{},
done: make(chan struct{}),
}
}

Expand Down

0 comments on commit aeedade

Please sign in to comment.