Skip to content

Commit

Permalink
Merge pull request #6316 from onflow/leo/v0.33-pebble-storage-iterate
Browse files Browse the repository at this point in the history
[Pebble storage] Refactor key iteration
  • Loading branch information
zhangchiqing authored Sep 13, 2024
2 parents 7fff0ee + 2ea5698 commit 38dd0c8
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 82 deletions.
2 changes: 1 addition & 1 deletion storage/pebble/operation/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ func LookupClusterBlocksByReferenceHeightRange(start, end uint64, clusterBlockID
return false
}
return check, nil, nil
}, false)
})
}
120 changes: 57 additions & 63 deletions storage/pebble/operation/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,27 @@ func remove(key []byte) func(pebble.Writer) error {
}
}

// iterate iterates over a range of keys defined by a start and end key. The
// start key may be higher than the end key, in which case we iterate in
// reverse order.
// prefixUpperBound returns a key K such that all possible keys beginning with the input prefix
// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble.
// This is used to define an upper bound for iteration, when we want to iterate over
// all keys beginning with a given prefix.
// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration
func prefixUpperBound(prefix []byte) []byte {
end := make([]byte, len(prefix))
copy(end, prefix)
for i := len(end) - 1; i >= 0; i-- {
// increment the bytes by 1
end[i] = end[i] + 1
if end[i] != 0 {
return end[:i+1]
}
}
return nil // no upper-bound
}

// iterate iterates over a range of keys defined by a start and end key.
// The start key must be less than or equal to the end key by lexicographic ordering.
// Both start and end keys must have non-zero length.
//
// The iteration range uses prefix-wise semantics. Specifically, all keys that
// meet ANY of the following conditions are included in the iteration:
Expand All @@ -175,44 +193,34 @@ func remove(key []byte) func(pebble.Writer) error {
// TODO: this function is unbounded – pass context.Context to this or calling functions to allow timing functions out.
// No errors are expected during normal operation. Any errors returned by the
// provided handleFunc will be propagated back to the caller of iterate.
func iterate(start []byte, end []byte, iteration iterationFunc, prefetchValues bool) func(pebble.Reader) error {
func iterate(start []byte, end []byte, iteration iterationFunc) func(pebble.Reader) error {
return func(tx pebble.Reader) error {

if len(start) == 0 {
return fmt.Errorf("start prefix is empty")
}

if len(end) == 0 {
return fmt.Errorf("end prefix is empty")
}

// Reverse iteration is not supported by pebble
if bytes.Compare(start, end) > 0 {
return fmt.Errorf("start key must be less than or equal to end key")
}

// initialize the default options and comparison modifier for iteration
maxEndWithPrefix := append(end, ffBytes...)

options := pebble.IterOptions{
LowerBound: start,
UpperBound: maxEndWithPrefix,
// LowerBound specifies the smallest key to iterate and it's inclusive.
// UpperBound specifies the largest key to iterate and it's exclusive (not inclusive)
// in order to match all keys prefixed with the `end` bytes, we increment the bytes of end by 1,
// for instance, to iterate keys between "hello" and "world",
// we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff"
// will all be included.
UpperBound: prefixUpperBound(end),
}

// In order to satisfy this function's prefix-wise inclusion semantics,
// we append 0xff bytes to the largest of start and end.
// This ensures Badger will seek to the largest key with that prefix
// for reverse iteration, thus including all keys with a prefix matching
// the starting key. It also enables us to detect boundary conditions by
// simple lexicographic comparison (ie. bytes.Compare) rather than
// explicitly comparing prefixes.

// If start is bigger than end, we have a backwards iteration:
// 1) We set the reverse option on the iterator, so we step through all
// the keys backwards. This modifies the behaviour of Seek to go to
// the first key that is less than or equal to the start key (as
// opposed to greater than or equal in a regular iteration).
// 2) In order to satisfy this function's prefix-wise inclusion semantics,
// we append a 0xff-byte suffix to the start key so the seek will go
// to the right place.
// 3) For a regular iteration, we break the loop upon hitting the first
// item that has a key higher than the end prefix. In order to reverse
// this, we use a modifier for the comparison that reverses the check
// and makes it stop upon the first item lower than the end prefix.
// for forward iteration, add the 0xff-bytes suffix to the end
// prefix, to ensure we include all keys with that prefix before
// finishing.
it, err := tx.NewIter(&options)
if err != nil {
return fmt.Errorf("can not create iterator: %w", err)
Expand All @@ -231,11 +239,6 @@ func iterate(start []byte, end []byte, iteration iterationFunc, prefetchValues b
continue
}

// when prefetchValues is false, we skip loading the value
if !prefetchValues {
continue
}

binaryValue, err := it.ValueAndErr()
if err != nil {
return fmt.Errorf("failed to get value: %w", err)
Expand All @@ -262,8 +265,6 @@ func iterate(start []byte, end []byte, iteration iterationFunc, prefetchValues b
}
}

var ffBytes = bytes.Repeat([]byte{0xFF}, 32)

// traverse iterates over a range of keys defined by a prefix.
//
// The prefix must be shared by all keys in the iteration.
Expand All @@ -278,7 +279,13 @@ func traverse(prefix []byte, iteration iterationFunc) func(pebble.Reader) error

it, err := r.NewIter(&pebble.IterOptions{
LowerBound: prefix,
UpperBound: append(prefix, ffBytes...),
// LowerBound specifies the smallest key to iterate and it's inclusive.
// UpperBound specifies the largest key to iterate and it's exclusive (not inclusive)
// in order to match all keys prefixed with the `end` bytes, we increment the bytes of end by 1,
// for instance, to iterate keys between "hello" and "world",
// we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff"
// will all be included.
UpperBound: prefixUpperBound(prefix),
})

if err != nil {
Expand Down Expand Up @@ -331,30 +338,8 @@ func traverse(prefix []byte, iteration iterationFunc) func(pebble.Reader) error
// No errors are expected during normal operation.
func removeByPrefix(prefix []byte) func(pebble.Writer) error {
return func(tx pebble.Writer) error {
start, end := getStartEndKeys(prefix)
return tx.DeleteRange(start, end, nil)
}
}

// getStartEndKeys calculates the start and end keys for a given prefix.
// note: DeleteRange takes [start, end) (inclusion on start, exclusive on end),
func getStartEndKeys(prefix []byte) (start, end []byte) {
start = prefix

// End key is the prefix with the last byte incremented by 1
end = append([]byte{}, prefix...)
for i := len(end) - 1; i >= 0; i-- {
if end[i] < 0xff {
end[i]++
break
}
end[i] = 0
if i == 0 {
end = append(end, 0)
}
return tx.DeleteRange(prefix, prefixUpperBound(prefix), nil)
}

return start, end
}

func convertNotFoundError(err error) error {
Expand All @@ -375,9 +360,18 @@ func findHighestAtOrBelow(
return fmt.Errorf("prefix must not be empty")
}

key := append(prefix, b(height)...)
// why height+1? because:
// UpperBound specifies the largest key to iterate and it's exclusive (not inclusive)
// in order to match all keys indexed by height that is equal to or below the given height,
// we could increment the height by 1,
// for instance, to find highest key equal to or below 10, we first use 11 as the UpperBound, so that
// if there are 4 keys: [prefix-7, prefix-9, prefix-10, prefix-11], then all keys except
// prefix-11 will be included. And iterating them in the increasing order will find prefix-10
// as the highest key.
key := append(prefix, b(height+1)...)
it, err := r.NewIter(&pebble.IterOptions{
UpperBound: append(key, ffBytes...),
LowerBound: prefix,
UpperBound: key,
})
if err != nil {
return fmt.Errorf("can not create iterator: %w", err)
Expand Down
Loading

0 comments on commit 38dd0c8

Please sign in to comment.