Skip to content

Commit

Permalink
Support object store (cosmos#206)
Browse files Browse the repository at this point in the history
generic interface

generic btree

generic cachekv

generic transient store

support ObjStore

changelog

Update CHANGELOG.md

Signed-off-by: yihuang <[email protected]>

object store key

Apply review suggestions

fix merge conflict

fix snapshot

revert dependers

Problem: store key type assertion is incorrect (cosmos#244)

fix and add test
  • Loading branch information
yihuang authored and mmsqe committed Dec 16, 2024
1 parent 234cdc4 commit 4d38f8f
Show file tree
Hide file tree
Showing 24 changed files with 595 additions and 276 deletions.
18 changes: 15 additions & 3 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ import (
"context"
"errors"
"fmt"
"maps"
"math"
"slices"
"sort"
"strconv"
"sync"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
cmtproto "github.com/cometbft/cometbft/api/cometbft/types/v1"
"github.com/cometbft/cometbft/crypto/tmhash"
"github.com/cosmos/gogoproto/proto"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/reflect/protoreflect"

"cosmossdk.io/core/header"
Expand Down Expand Up @@ -336,13 +336,25 @@ func (app *BaseApp) MountTransientStores(keys map[string]*storetypes.TransientSt
// MountMemoryStores mounts all in-memory KVStores with the BaseApp's internal
// commit multi-store.
func (app *BaseApp) MountMemoryStores(keys map[string]*storetypes.MemoryStoreKey) {
skeys := slices.Sorted(maps.Keys(keys))
skeys := maps.Keys(keys)
sort.Strings(skeys)
for _, key := range skeys {
memKey := keys[key]
app.MountStore(memKey, storetypes.StoreTypeMemory)
}
}

// MountObjectStores mounts all transient object stores with the BaseApp's internal
// commit multi-store.
func (app *BaseApp) MountObjectStores(keys map[string]*storetypes.ObjectStoreKey) {
skeys := maps.Keys(keys)
sort.Strings(skeys)
for _, key := range skeys {
memKey := keys[key]
app.MountStore(memKey, storetypes.StoreTypeObject)
}
}

// MountStore mounts a store to the provided key in the BaseApp multistore,
// using the default DB.
func (app *BaseApp) MountStore(key storetypes.StoreKey, typ storetypes.StoreType) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ require (
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b
go.uber.org/mock v0.5.0
golang.org/x/crypto v0.31.0
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f
golang.org/x/sync v0.10.0
google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1
google.golang.org/grpc v1.68.1
Expand Down Expand Up @@ -163,7 +164,6 @@ require (
go.opencensus.io v0.24.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.12.0 // indirect
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions runtime/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,15 @@ func (s kvStoreAdapter) Set(key, value []byte) {
}
}

func (s kvStoreAdapter) Iterator(start, end []byte) store.Iterator {
func (s kvStoreAdapter) Iterator(start, end []byte) storetypes.Iterator {
it, err := s.store.Iterator(start, end)
if err != nil {
panic(err)
}
return it
}

func (s kvStoreAdapter) ReverseIterator(start, end []byte) store.Iterator {
func (s kvStoreAdapter) ReverseIterator(start, end []byte) storetypes.Iterator {
it, err := s.store.ReverseIterator(start, end)
if err != nil {
panic(err)
Expand Down
12 changes: 4 additions & 8 deletions server/mock/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ func (ms multiStore) CacheWrapWithTrace(_ io.Writer, _ storetypes.TraceContext)
panic("not implemented")
}

func (ms multiStore) CacheWrapWithListeners(_ storetypes.StoreKey, _ []storetypes.MemoryListener) storetypes.CacheWrap {
panic("not implemented")
}

func (ms multiStore) TracingEnabled() bool {
panic("not implemented")
}
Expand Down Expand Up @@ -114,6 +110,10 @@ func (ms multiStore) GetKVStore(key storetypes.StoreKey) storetypes.KVStore {
return ms.kv[key]
}

func (ms multiStore) GetObjKVStore(storetypes.StoreKey) storetypes.ObjKVStore {
panic("not implemented")
}

func (ms multiStore) GetStore(key storetypes.StoreKey) storetypes.Store {
panic("not implemented")
}
Expand Down Expand Up @@ -186,10 +186,6 @@ func (kv kvStore) CacheWrapWithTrace(w io.Writer, tc storetypes.TraceContext) st
panic("not implemented")
}

func (kv kvStore) CacheWrapWithListeners(_ storetypes.StoreKey, _ []storetypes.MemoryListener) storetypes.CacheWrap {
panic("not implemented")
}

func (kv kvStore) GetStoreType() storetypes.StoreType {
panic("not implemented")
}
Expand Down
43 changes: 23 additions & 20 deletions store/cachekv/internal/mergeiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@ import (
// cache shadows (overrides) the parent.
//
// TODO: Optimize by memoizing.
type cacheMergeIterator struct {
parent types.Iterator
cache types.Iterator
type cacheMergeIterator[V any] struct {
parent types.GIterator[V]
cache types.GIterator[V]
ascending bool

valid bool

isZero func(V) bool
}

var _ types.Iterator = (*cacheMergeIterator)(nil)
var _ types.Iterator = (*cacheMergeIterator[[]byte])(nil)

func NewCacheMergeIterator(parent, cache types.Iterator, ascending bool) types.Iterator {
iter := &cacheMergeIterator{
func NewCacheMergeIterator[V any](parent, cache types.GIterator[V], ascending bool, isZero func(V) bool) types.GIterator[V] {
iter := &cacheMergeIterator[V]{
parent: parent,
cache: cache,
ascending: ascending,
isZero: isZero,
}

iter.valid = iter.skipUntilExistsOrInvalid()
Expand All @@ -37,17 +40,17 @@ func NewCacheMergeIterator(parent, cache types.Iterator, ascending bool) types.I

// Domain implements Iterator.
// Returns parent domain because cache and parent domains are the same.
func (iter *cacheMergeIterator) Domain() (start, end []byte) {
func (iter *cacheMergeIterator[V]) Domain() (start, end []byte) {
return iter.parent.Domain()
}

// Valid implements Iterator.
func (iter *cacheMergeIterator) Valid() bool {
func (iter *cacheMergeIterator[V]) Valid() bool {
return iter.valid
}

// Next implements Iterator
func (iter *cacheMergeIterator) Next() {
func (iter *cacheMergeIterator[V]) Next() {
iter.assertValid()

switch {
Expand All @@ -74,7 +77,7 @@ func (iter *cacheMergeIterator) Next() {
}

// Key implements Iterator
func (iter *cacheMergeIterator) Key() []byte {
func (iter *cacheMergeIterator[V]) Key() []byte {
iter.assertValid()

// If parent is invalid, get the cache key.
Expand Down Expand Up @@ -104,7 +107,7 @@ func (iter *cacheMergeIterator) Key() []byte {
}

// Value implements Iterator
func (iter *cacheMergeIterator) Value() []byte {
func (iter *cacheMergeIterator[V]) Value() V {
iter.assertValid()

// If parent is invalid, get the cache value.
Expand Down Expand Up @@ -134,7 +137,7 @@ func (iter *cacheMergeIterator) Value() []byte {
}

// Close implements Iterator
func (iter *cacheMergeIterator) Close() error {
func (iter *cacheMergeIterator[V]) Close() error {
err1 := iter.cache.Close()
if err := iter.parent.Close(); err != nil {
return err
Expand All @@ -145,7 +148,7 @@ func (iter *cacheMergeIterator) Close() error {

// Error returns an error if the cacheMergeIterator is invalid defined by the
// Valid method.
func (iter *cacheMergeIterator) Error() error {
func (iter *cacheMergeIterator[V]) Error() error {
if !iter.Valid() {
return errors.New("invalid cacheMergeIterator")
}
Expand All @@ -155,14 +158,14 @@ func (iter *cacheMergeIterator) Error() error {

// If not valid, panics.
// NOTE: May have side-effect of iterating over cache.
func (iter *cacheMergeIterator) assertValid() {
func (iter *cacheMergeIterator[V]) assertValid() {
if err := iter.Error(); err != nil {
panic(err)
}
}

// Like bytes.Compare but opposite if not ascending.
func (iter *cacheMergeIterator) compare(a, b []byte) int {
func (iter *cacheMergeIterator[V]) compare(a, b []byte) int {
if iter.ascending {
return bytes.Compare(a, b)
}
Expand All @@ -175,9 +178,9 @@ func (iter *cacheMergeIterator) compare(a, b []byte) int {
// If the current cache item is not a delete item, does nothing.
// If `until` is nil, there is no limit, and cache may end up invalid.
// CONTRACT: cache is valid.
func (iter *cacheMergeIterator) skipCacheDeletes(until []byte) {
func (iter *cacheMergeIterator[V]) skipCacheDeletes(until []byte) {
for iter.cache.Valid() &&
iter.cache.Value() == nil &&
iter.isZero(iter.cache.Value()) &&
(until == nil || iter.compare(iter.cache.Key(), until) < 0) {
iter.cache.Next()
}
Expand All @@ -186,7 +189,7 @@ func (iter *cacheMergeIterator) skipCacheDeletes(until []byte) {
// Fast forwards cache (or parent+cache in case of deleted items) until current
// item exists, or until iterator becomes invalid.
// Returns whether the iterator is valid.
func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() bool {
func (iter *cacheMergeIterator[V]) skipUntilExistsOrInvalid() bool {
for {
// If parent is invalid, fast-forward cache.
if !iter.parent.Valid() {
Expand All @@ -211,7 +214,7 @@ func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() bool {
case 0: // parent == cache.
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
if iter.isZero(valueC) {
iter.parent.Next()
iter.cache.Next()

Expand All @@ -223,7 +226,7 @@ func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() bool {
case 1: // cache < parent
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
if iter.isZero(valueC) {
iter.skipCacheDeletes(keyP)
continue
}
Expand Down
12 changes: 6 additions & 6 deletions store/cachekv/search_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"strconv"
"testing"

"cosmossdk.io/store/cachekv/internal"
"cosmossdk.io/store/internal/btree"
)

func BenchmarkLargeUnsortedMisses(b *testing.B) {
Expand All @@ -22,23 +22,23 @@ func BenchmarkLargeUnsortedMisses(b *testing.B) {
}

func generateStore() *Store {
cache := map[string]*cValue{}
cache := map[string]*cValue[[]byte]{}
unsorted := map[string]struct{}{}
for i := 0; i < 5000; i++ {
key := "A" + strconv.Itoa(i)
unsorted[key] = struct{}{}
cache[key] = &cValue{}
cache[key] = &cValue[[]byte]{}
}

for i := 0; i < 5000; i++ {
key := "Z" + strconv.Itoa(i)
unsorted[key] = struct{}{}
cache[key] = &cValue{}
cache[key] = &cValue[[]byte]{}
}

return &Store{
return &GStore[[]byte]{
cache: cache,
unsortedCache: unsorted,
sortedCache: internal.NewBTree(),
sortedCache: btree.NewBTree[[]byte](),
}
}
Loading

0 comments on commit 4d38f8f

Please sign in to comment.