Skip to content

Commit

Permalink
go-kosu: Fix Order store I/O overhead
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Chain committed Oct 30, 2019
1 parent b06c99b commit 8591500
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 70 deletions.
1 change: 1 addition & 0 deletions packages/go-kosu/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master

- Fix Order store I/O overhead
- Fix wrong initial validators power calculations
- Add --validator family flags to optionally start as a validator node
- Add GetBlocks RPC endpoint
Expand Down
4 changes: 2 additions & 2 deletions packages/go-kosu/abci/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *Client) QueryValidator(addr string) (*types.Validator, error) {
// QueryTotalOrders performs a ABCI Query to "/chain/totalorders"
func (c *Client) QueryTotalOrders() (uint64, error) {
var num uint64
if err := c.Query("/store/chain/key", []byte("totalorders"), &num); err != nil {
if err := c.Query("/store/orders/key", cosmos.LengthKey(), &num); err != nil {
return 0, err
}

Expand All @@ -186,7 +186,7 @@ func (c *Client) QueryTotalOrders() (uint64, error) {

// QueryLatestOrders queries a collection (subspace) of orders in the `orders` store.
func (c *Client) QueryLatestOrders() ([]types.TransactionOrder, error) {
KVs, err := c.querySubSpace("orders", []byte(cosmos.OrderKeyPrefix))
KVs, err := c.querySubSpace("orders", cosmos.ElemKey(0))
if err != nil {
return nil, err
}
Expand Down
115 changes: 115 additions & 0 deletions packages/go-kosu/store/cosmos/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package cosmos

import (
"fmt"
"strconv"

"github.com/cosmos/cosmos-sdk/types"

"github.com/ParadigmFoundation/kosu-monorepo/packages/go-kosu/store"
)

type panicCodec struct {
store.Codec
}

func (cdc *panicCodec) MustEncode(ptr interface{}) []byte {
bz, err := cdc.Encode(ptr)
if err != nil {
panic(err)
}
return bz
}

func (cdc *panicCodec) MustDecode(bz []byte, ptr interface{}) {
if err := cdc.Decode(bz, ptr); err != nil {
panic(err)
}
}

// LengthKey is the key for the length of the list
func LengthKey() []byte {
return []byte{0x00}
}

// ElemKey is the key for the elements of the list
func ElemKey(index uint64) []byte {
return append([]byte{0x01}, []byte(fmt.Sprintf("%020d", index))...)
}

// List defines an integer indexable mapper
// It panics when the element type cannot be (un/)marshalled by the codec
type List struct {
cdc *panicCodec
store types.KVStore
}

// NewList constructs new List
func NewList(cdc store.Codec, store types.KVStore) *List {
return &List{
cdc: &panicCodec{cdc},
store: store,
}
}

// Len returns the length of the list
// The user should check Len() before doing any actions
func (m List) Len() (res uint64) {
bz := m.store.Get(LengthKey())
if bz == nil {
return 0
}

m.cdc.MustDecode(bz, &res)
return
}

// Get returns the element by its index
func (m List) Get(index uint64, ptr interface{}) error {
bz := m.store.Get(ElemKey(index))
return m.cdc.Decode(bz, ptr)
}

// Set stores the element to the given position
// Setting element out of range will break length counting
// Use Push() instead of Set() to append a new element
func (m List) Set(index uint64, value interface{}) {
bz := m.cdc.MustEncode(value)
m.store.Set(ElemKey(index), bz)
}

// Delete deletes the element in the given position
// Other elements' indices are preserved after deletion
// Panics when the index is out of range
func (m List) Delete(index uint64) {
m.store.Delete(ElemKey(index))
}

// Push inserts the element to the end of the list
// It will increase the length when it is called
func (m List) Push(value interface{}) {
length := m.Len()
m.Set(length, value)
m.store.Set(LengthKey(), m.cdc.MustEncode(length+1))
}

// Iterate is used to iterate over all existing elements in the list
// Return true in the continuation to break
// Using it with Get() will return the same one with the provided element
func (m List) Iterate(fn func(uint64) bool) {
iter := types.KVStorePrefixIterator(m.store, []byte{0x01})
defer iter.Close()
for ; iter.Valid(); iter.Next() {
k := iter.Key()
s := string(k[len(k)-20:])

index, err := strconv.ParseUint(s, 10, 64)
if err != nil {
panic(err)
}

if fn(index) {
break
}
}
}
71 changes: 15 additions & 56 deletions packages/go-kosu/store/cosmos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package cosmos

import (
"encoding/hex"
"fmt"
"sort"
"strconv"
"strings"

abci "github.com/tendermint/tendermint/abci/types"
db "github.com/tendermint/tm-db"
Expand All @@ -30,6 +26,8 @@ type Store struct {
posterKey *sdk.KVStoreKey
validatorKey *sdk.KVStoreKey
orderKey *sdk.KVStoreKey

ordersList *List
}

// NewStore returns a new store
Expand All @@ -54,6 +52,9 @@ func NewStore(db db.DB, cdc store.Codec) *Store {
panic(err)
}

kv := s.cms.GetCommitKVStore(s.orderKey)
s.ordersList = NewList(cdc, kv)

return s
}

Expand Down Expand Up @@ -207,75 +208,33 @@ func (s *Store) LastEvent() uint64 {
return v
}

// SetTotalOrders sets the TotalOrders
func (s *Store) SetTotalOrders(v uint64) {
s.Set("totalorders", s.chainKey, v)
}

// TotalOrders gets the TotalOrders
// TotalOrders gets the TotalOrders, this is all the received orders.
// This number is only increased by SetOrders and never decreased
func (s *Store) TotalOrders() uint64 {
var v uint64
s.Get("totalorders", s.chainKey, &v)
return v
}

// OrderKeyPrefix is the prefix to be used in the orders
// We need a prefix to be able to use the `/subspace` query path.
const OrderKeyPrefix = "orders"

func orderKey(id int) string {
return fmt.Sprintf("%s%d", OrderKeyPrefix, id)
}

func parseOrderKey(key string) (int, error) {
key = strings.TrimPrefix(key, OrderKeyPrefix)
return strconv.Atoi(key)
return s.ordersList.Len()
}

// SetOrder creates an order in the store and remove the oldest
// if the number of stored orders exceeds the limit value.
// SetOrder will also update the TotalOrders counter
func (s *Store) SetOrder(tx *types.TransactionOrder, limit int) {
// we generate auto-incremental IDs so that we can track the oldest order
var keys []int
s.All(s.orderKey, func(key string, _ []byte) {
n, err := parseOrderKey(key)
if err != nil {
panic(err)
}

keys = append(keys, n)
})
sort.Ints(keys)
// at this point we have all the IDs in order

// Delete all the orders that exceeds the limit
for len(keys) >= limit && limit > 0 {
key := keys[0]
s.Delete(orderKey(key), s.orderKey)
keys = keys[1:]
}

var newKey = 0
if len(keys) > 0 {
newKey = keys[len(keys)-1] + 1
s.ordersList.Push(tx)
l := s.ordersList.Len()
if l > uint64(limit) {
s.ordersList.Delete(l - uint64(limit) - 1)
}
s.Set(orderKey(newKey), s.orderKey, tx)

total := s.TotalOrders()
s.SetTotalOrders(total + 1)
}

// GetOrders retrieves all the available orders in the store
func (s *Store) GetOrders() (orders []types.TransactionOrder) {
s.All(s.orderKey, func(_ string, bytes []byte) {
s.ordersList.Iterate(func(id uint64) bool {
var tx types.TransactionOrder
if err := s.Codec().Decode(bytes, &tx); err != nil {
if err := s.ordersList.Get(id, &tx); err != nil {
panic(err)
}
orders = append(orders, tx)
return false
})

return
}

Expand Down
1 change: 0 additions & 1 deletion packages/go-kosu/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type Store interface {
SetLastEvent(uint64)

TotalOrders() uint64
SetTotalOrders(uint64)
SetOrder(tx *types.TransactionOrder, limit int)
GetOrders() []types.TransactionOrder

Expand Down
11 changes: 0 additions & 11 deletions packages/go-kosu/store/storetest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func TestSuite(t *testing.T, f Factory) {
{"RoundInfo", TestRoundInfo},
{"ConsensusParams", TestConsensusParams},
{"LastEvent", TestLastEvent},
{"TotalOrders", TestTotalOrders},
{"Witness", TestWitness},
{"Poster", TestPoster},
{"Validator", TestValidator},
Expand Down Expand Up @@ -68,16 +67,6 @@ func TestLastEvent(t *testing.T, s store.Store) {
assert.Equal(t, lastEvent, s.LastEvent())
}

// TestTotalOrders verifies the LastTotalOrders storage behavior
func TestTotalOrders(t *testing.T, s store.Store) {
s.SetTotalOrders(1)
s.SetTotalOrders(2)
s.SetTotalOrders(3)
s.SetTotalOrders(4)

assert.Equal(t, uint64(4), s.TotalOrders())
}

// TestWitness verifies the Witness storage behavior
func TestWitness(t *testing.T, s store.Store) {
witnessTx := &types.TransactionWitness{
Expand Down

0 comments on commit 8591500

Please sign in to comment.