-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathcache.go
180 lines (163 loc) · 5.33 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package valkey
import (
"context"
"sync"
"time"
)
// NewCacheStoreFn can be provided in ClientOption for using a custom CacheStore implementation
type NewCacheStoreFn func(CacheStoreOption) CacheStore
// CacheStoreOption will be passed to NewCacheStoreFn
type CacheStoreOption struct {
// CacheSizeEachConn is valkey client side cache size that bind to each TCP connection to a single valkey instance.
// The default is DefaultCacheBytes.
CacheSizeEachConn int
}
// CacheStore is the store interface for the client side caching
// More detailed interface requirement can be found in cache_test.go
type CacheStore interface {
// Flight is called when DoCache and DoMultiCache, with the requested client side ttl and the current time.
// It should look up the store in single-flight manner and return one of the following three combinations:
// Case 1: (empty ValkeyMessage, nil CacheEntry) <- when cache missed, and valkey will send the request to valkey.
// Case 2: (empty ValkeyMessage, non-nil CacheEntry) <- when cache missed, and valkey will use CacheEntry.Wait to wait for response.
// Case 3: (non-empty ValkeyMessage, nil CacheEntry) <- when cache hit
Flight(key, cmd string, ttl time.Duration, now time.Time) (v ValkeyMessage, e CacheEntry)
// Update is called when receiving the response of the request sent by the above Flight Case 1 from valkey.
// It should not only update the store but also deliver the response to all CacheEntry.Wait and return a desired client side PXAT of the response.
// Note that the server side expire time can be retrieved from ValkeyMessage.CachePXAT.
Update(key, cmd string, val ValkeyMessage) (pxat int64)
// Cancel is called when the request sent by the above Flight Case 1 failed.
// It should not only deliver the error to all CacheEntry.Wait but also remove the CacheEntry from the store.
Cancel(key, cmd string, err error)
// Delete is called when receiving invalidation notifications from valkey.
// If the keys is nil then it should delete all non-pending cached entries under all keys.
// If the keys is not nil then it should delete all non-pending cached entries under those keys.
Delete(keys []ValkeyMessage)
// Close is called when connection between valkey is broken.
// It should flush all cached entries and deliver the error to all pending CacheEntry.Wait.
Close(err error)
}
// CacheEntry should be used to wait for single-flight response when cache missed.
type CacheEntry interface {
Wait(ctx context.Context) (ValkeyMessage, error)
}
// SimpleCache is an alternative interface should be paired with NewSimpleCacheAdapter to construct a CacheStore
type SimpleCache interface {
Get(key string) ValkeyMessage
Set(key string, val ValkeyMessage)
Del(key string)
Flush()
}
// NewSimpleCacheAdapter converts a SimpleCache into CacheStore
func NewSimpleCacheAdapter(store SimpleCache) CacheStore {
return &adapter{store: store, flights: make(map[string]map[string]CacheEntry)}
}
type adapter struct {
store SimpleCache
flights map[string]map[string]CacheEntry
mu sync.RWMutex
}
func (a *adapter) Flight(key, cmd string, ttl time.Duration, now time.Time) (ValkeyMessage, CacheEntry) {
a.mu.RLock()
if v := a.store.Get(key + cmd); v.typ != 0 && v.relativePTTL(now) > 0 {
a.mu.RUnlock()
return v, nil
}
flight := a.flights[key][cmd]
a.mu.RUnlock()
if flight != nil {
return ValkeyMessage{}, flight
}
a.mu.Lock()
entries := a.flights[key]
if entries == nil && a.flights != nil {
entries = make(map[string]CacheEntry, 1)
a.flights[key] = entries
}
if flight = entries[cmd]; flight == nil && entries != nil {
entries[cmd] = &adapterEntry{ch: make(chan struct{}), xat: now.Add(ttl).UnixMilli()}
}
a.mu.Unlock()
return ValkeyMessage{}, flight
}
func (a *adapter) Update(key, cmd string, val ValkeyMessage) (sxat int64) {
a.mu.Lock()
entries := a.flights[key]
if flight, ok := entries[cmd].(*adapterEntry); ok {
sxat = val.getExpireAt()
if flight.xat < sxat || sxat == 0 {
sxat = flight.xat
val.setExpireAt(sxat)
}
a.store.Set(key+cmd, val)
flight.set(val, nil)
entries[cmd] = nil
}
a.mu.Unlock()
return
}
func (a *adapter) Cancel(key, cmd string, err error) {
a.mu.Lock()
entries := a.flights[key]
if flight, ok := entries[cmd].(*adapterEntry); ok {
flight.set(ValkeyMessage{}, err)
entries[cmd] = nil
}
a.mu.Unlock()
}
func (a *adapter) del(key string) {
entries := a.flights[key]
for cmd, e := range entries {
if e == nil {
a.store.Del(key + cmd)
delete(entries, cmd)
}
}
if len(entries) == 0 {
delete(a.flights, key)
}
}
func (a *adapter) Delete(keys []ValkeyMessage) {
a.mu.Lock()
if keys == nil {
for key := range a.flights {
a.del(key)
}
} else {
for _, k := range keys {
a.del(k.string)
}
}
a.mu.Unlock()
}
func (a *adapter) Close(err error) {
a.mu.Lock()
flights := a.flights
a.flights = nil
a.store.Flush()
a.mu.Unlock()
for _, entries := range flights {
for _, e := range entries {
if e != nil {
e.(*adapterEntry).set(ValkeyMessage{}, err)
}
}
}
}
type adapterEntry struct {
err error
ch chan struct{}
val ValkeyMessage
xat int64
}
func (a *adapterEntry) set(val ValkeyMessage, err error) {
a.err, a.val = err, val
close(a.ch)
}
func (a *adapterEntry) Wait(ctx context.Context) (ValkeyMessage, error) {
select {
case <-ctx.Done():
return ValkeyMessage{}, ctx.Err()
case <-a.ch:
return a.val, a.err
}
}