-
Notifications
You must be signed in to change notification settings - Fork 180
/
Copy pathcommon.go
415 lines (350 loc) · 12 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
package operation
import (
"bytes"
"errors"
"fmt"
"github.com/cockroachdb/pebble"
"github.com/vmihailenco/msgpack"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
)
type ReaderBatchWriter struct {
db *pebble.DB
batch *pebble.Batch
callbacks []func(error)
}
var _ storage.PebbleReaderBatchWriter = (*ReaderBatchWriter)(nil)
func (b *ReaderBatchWriter) ReaderWriter() (pebble.Reader, pebble.Writer) {
return b.db, b.batch
}
func (b *ReaderBatchWriter) IndexedBatch() *pebble.Batch {
return b.batch
}
func (b *ReaderBatchWriter) Commit() error {
err := b.batch.Commit(nil)
for _, callback := range b.callbacks {
callback(err)
}
return err
}
func (b *ReaderBatchWriter) AddCallback(callback func(error)) {
b.callbacks = append(b.callbacks, callback)
}
func NewPebbleReaderBatchWriterWithBatch(db *pebble.DB, batch *pebble.Batch) *ReaderBatchWriter {
return &ReaderBatchWriter{
db: db,
batch: batch,
callbacks: make([]func(error), 0),
}
}
func NewPebbleReaderBatchWriter(db *pebble.DB) *ReaderBatchWriter {
return &ReaderBatchWriter{
db: db,
batch: db.NewIndexedBatch(),
}
}
func WithReaderBatchWriter(db *pebble.DB, fn func(storage.PebbleReaderBatchWriter) error) error {
batch := NewPebbleReaderBatchWriter(db)
err := fn(batch)
if err != nil {
return err
}
err = batch.Commit()
if err != nil {
return err
}
return nil
}
func insert(key []byte, val interface{}) func(pebble.Writer) error {
return func(w pebble.Writer) error {
value, err := msgpack.Marshal(val)
if err != nil {
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
}
err = w.Set(key, value, pebble.Sync)
if err != nil {
return irrecoverable.NewExceptionf("failed to store data: %w", err)
}
return nil
}
}
func retrieve(key []byte, sc interface{}) func(r pebble.Reader) error {
return func(r pebble.Reader) error {
val, closer, err := r.Get(key)
if err != nil {
return convertNotFoundError(err)
}
defer closer.Close()
err = msgpack.Unmarshal(val, sc)
if err != nil {
return irrecoverable.NewExceptionf("failed to decode value: %w", err)
}
return nil
}
}
func exists(key []byte, keyExists *bool) func(r pebble.Reader) error {
return func(r pebble.Reader) error {
_, closer, err := r.Get(key)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
*keyExists = false
return nil
}
// exception while checking for the key
return irrecoverable.NewExceptionf("could not load data: %w", err)
}
*keyExists = true
defer closer.Close()
return nil
}
}
// checkFunc is called during key iteration through the badger DB in order to
// check whether we should process the given key-value pair. It can be used to
// avoid loading the value if its not of interest, as well as storing the key
// for the current iteration step.
type checkFunc func(key []byte) bool
// createFunc returns a pointer to an initialized entity that we can potentially
// decode the next value into during a badger DB iteration.
type createFunc func() interface{}
// handleFunc is a function that starts the processing of the current key-value
// pair during a badger iteration. It should be called after the key was checked
// and the entity was decoded.
// No errors are expected during normal operation. Any errors will halt the iteration.
type handleFunc func() error
// iterationFunc is a function provided to our low-level iteration function that
// allows us to pass badger efficiencies across badger boundaries. By calling it
// for each iteration step, we can inject a function to check the key, a
// function to create the decode target and a function to process the current
// key-value pair. This a consumer of the API to decode when to skip the loading
// of values, the initialization of entities and the processing.
type iterationFunc func() (checkFunc, createFunc, handleFunc)
// remove removes the entity with the given key, if it exists. If it doesn't
// exist, this is a no-op.
// Error returns:
// * generic error in case of unexpected database error
func remove(key []byte) func(pebble.Writer) error {
return func(w pebble.Writer) error {
err := w.Delete(key, nil)
if err != nil {
return irrecoverable.NewExceptionf("could not delete item: %w", err)
}
return nil
}
}
// prefixUpperBound returns a key K such that all possible keys beginning with the input prefix
// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble.
// This is used to define an upper bound for iteration, when we want to iterate over
// all keys beginning with a given prefix.
// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration
func prefixUpperBound(prefix []byte) []byte {
end := make([]byte, len(prefix))
copy(end, prefix)
for i := len(end) - 1; i >= 0; i-- {
// increment the bytes by 1
end[i] = end[i] + 1
if end[i] != 0 {
return end[:i+1]
}
}
return nil // no upper-bound
}
// iterate iterates over a range of keys defined by a start and end key.
// The start key must be less than or equal to the end key by lexicographic ordering.
// Both start and end keys must have non-zero length.
//
// The iteration range uses prefix-wise semantics. Specifically, all keys that
// meet ANY of the following conditions are included in the iteration:
// - have a prefix equal to the start key OR
// - have a prefix equal to the end key OR
// - have a prefix that is lexicographically between start and end
//
// On each iteration, it will call the iteration function to initialize
// functions specific to processing the given key-value pair.
//
// TODO: this function is unbounded – pass context.Context to this or calling functions to allow timing functions out.
// No errors are expected during normal operation. Any errors returned by the
// provided handleFunc will be propagated back to the caller of iterate.
func iterate(start []byte, end []byte, iteration iterationFunc) func(pebble.Reader) error {
return func(tx pebble.Reader) error {
if len(start) == 0 {
return fmt.Errorf("start prefix is empty")
}
if len(end) == 0 {
return fmt.Errorf("end prefix is empty")
}
// Reverse iteration is not supported by pebble
if bytes.Compare(start, end) > 0 {
return fmt.Errorf("start key must be less than or equal to end key")
}
// initialize the default options and comparison modifier for iteration
options := pebble.IterOptions{
LowerBound: start,
// LowerBound specifies the smallest key to iterate and it's inclusive.
// UpperBound specifies the largest key to iterate and it's exclusive (not inclusive)
// in order to match all keys prefixed with the `end` bytes, we increment the bytes of end by 1,
// for instance, to iterate keys between "hello" and "world",
// we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff"
// will all be included.
UpperBound: prefixUpperBound(end),
}
it, err := tx.NewIter(&options)
if err != nil {
return fmt.Errorf("can not create iterator: %w", err)
}
defer it.Close()
for it.SeekGE(start); it.Valid(); it.Next() {
key := it.Key()
// initialize processing functions for iteration
check, create, handle := iteration()
// check if we should process the item at all
ok := check(key)
if !ok {
continue
}
binaryValue, err := it.ValueAndErr()
if err != nil {
return fmt.Errorf("failed to get value: %w", err)
}
// preventing caller from modifying the iterator's value slices
valueCopy := make([]byte, len(binaryValue))
copy(valueCopy, binaryValue)
entity := create()
err = msgpack.Unmarshal(valueCopy, entity)
if err != nil {
return irrecoverable.NewExceptionf("could not decode entity: %w", err)
}
// process the entity
err = handle()
if err != nil {
return fmt.Errorf("could not handle entity: %w", err)
}
}
return nil
}
}
// traverse iterates over a range of keys defined by a prefix.
//
// The prefix must be shared by all keys in the iteration.
//
// On each iteration, it will call the iteration function to initialize
// functions specific to processing the given key-value pair.
func traverse(prefix []byte, iteration iterationFunc) func(pebble.Reader) error {
return func(r pebble.Reader) error {
if len(prefix) == 0 {
return fmt.Errorf("prefix must not be empty")
}
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
// LowerBound specifies the smallest key to iterate and it's inclusive.
// UpperBound specifies the largest key to iterate and it's exclusive (not inclusive)
// in order to match all keys prefixed with the `end` bytes, we increment the bytes of end by 1,
// for instance, to iterate keys between "hello" and "world",
// we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff"
// will all be included.
UpperBound: prefixUpperBound(prefix),
})
if err != nil {
return fmt.Errorf("can not create iterator: %w", err)
}
defer it.Close()
// this is where we actually enforce that all results have the prefix
for it.SeekGE(prefix); it.Valid(); it.Next() {
// initialize processing functions for iteration
check, create, handle := iteration()
// check if we should process the item at all
key := it.Key()
ok := check(key)
if !ok {
continue
}
binaryValue, err := it.ValueAndErr()
if err != nil {
return fmt.Errorf("failed to get value: %w", err)
}
// preventing caller from modifying the iterator's value slices
valueCopy := make([]byte, len(binaryValue))
copy(valueCopy, binaryValue)
entity := create()
err = msgpack.Unmarshal(valueCopy, entity)
if err != nil {
return irrecoverable.NewExceptionf("could not decode entity: %w", err)
}
// process the entity
err = handle()
if err != nil {
return fmt.Errorf("could not handle entity: %w", err)
}
}
return nil
}
}
// removeByPrefix removes all the entities if the prefix of the key matches the given prefix.
// if no key matches, this is a no-op
// No errors are expected during normal operation.
func removeByPrefix(prefix []byte) func(pebble.Writer) error {
return func(tx pebble.Writer) error {
return tx.DeleteRange(prefix, prefixUpperBound(prefix), nil)
}
}
func convertNotFoundError(err error) error {
if errors.Is(err, pebble.ErrNotFound) {
return storage.ErrNotFound
}
return err
}
// O(N) performance
func findHighestAtOrBelow(
prefix []byte,
height uint64,
entity interface{},
) func(pebble.Reader) error {
return func(r pebble.Reader) error {
if len(prefix) == 0 {
return fmt.Errorf("prefix must not be empty")
}
// why height+1? because:
// UpperBound specifies the largest key to iterate and it's exclusive (not inclusive)
// in order to match all keys indexed by height that is equal to or below the given height,
// we could increment the height by 1,
// for instance, to find highest key equal to or below 10, we first use 11 as the UpperBound, so that
// if there are 4 keys: [prefix-7, prefix-9, prefix-10, prefix-11], then all keys except
// prefix-11 will be included. And iterating them in the increasing order will find prefix-10
// as the highest key.
key := append(prefix, b(height+1)...)
it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
UpperBound: key,
})
if err != nil {
return fmt.Errorf("can not create iterator: %w", err)
}
defer it.Close()
var highestKey []byte
// find highest value below the given height
for it.SeekGE(prefix); it.Valid(); it.Next() {
highestKey = it.Key()
}
if len(highestKey) == 0 {
return storage.ErrNotFound
}
// read the value of the highest key
val, closer, err := r.Get(highestKey)
if err != nil {
return convertNotFoundError(err)
}
defer closer.Close()
err = msgpack.Unmarshal(val, entity)
if err != nil {
return irrecoverable.NewExceptionf("failed to decode value: %w", err)
}
return nil
}
}
func BatchUpdate(db *pebble.DB, fn func(tx pebble.Writer) error) error {
batch := db.NewIndexedBatch()
err := fn(batch)
if err != nil {
return err
}
return batch.Commit(nil)
}