diff --git a/deploy/def/client.go b/deploy/def/client.go index 7c2c166ee..a1ac25d7c 100644 --- a/deploy/def/client.go +++ b/deploy/def/client.go @@ -49,7 +49,7 @@ func (c *Client) Dial(chainAddress, keysClientAddress string) error { logrus.Info("Using mempool signing since no keyClient set, pass --keys to sign locally or elsewhere") c.MempoolSigning = true } else { - logrus.Info("Using keys server at: %s", keysClientAddress) + logrus.Infof("Using keys server at: %s", keysClientAddress) c.keyClient, err = keys.NewRemoteKeyClient(keysClientAddress, logging.NewNoopLogger()) } @@ -227,7 +227,7 @@ func (c *Client) UpdateAccount(arg *GovArg) (*payload.GovTx, error) { if c.keyClient != nil { publicKey, err := c.keyClient.PublicKey(*update.Address) if err != nil { - logrus.Info("Could not retrieve public key for %v from keys server", *update.Address) + logrus.Infof("Could not retrieve public key for %v from keys server", *update.Address) } else { update.PublicKey = &publicKey } diff --git a/execution/evm/vm.go b/execution/evm/vm.go index 680a42311..d8309f8ad 100644 --- a/execution/evm/vm.go +++ b/execution/evm/vm.go @@ -608,7 +608,7 @@ func (vm *VM) call(callState *state.Cache, caller acm.Account, callee *acm.Mutab case GASPRICE_DEPRECATED: // 0x3A stack.Push(Zero256) - vm.Debugf(" => %X (GASPRICE IS DEPRECATED)\n") + vm.Debugf(" => %X (GASPRICE IS DEPRECATED)\n", Zero256) case EXTCODESIZE: // 0x3B addr := stack.Pop() diff --git a/execution/execution_test.go b/execution/execution_test.go index d39fa1675..18b961405 100644 --- a/execution/execution_test.go +++ b/execution/execution_test.go @@ -893,7 +893,7 @@ func TestTxSequence(t *testing.T) { func TestNameTxs(t *testing.T) { st, err := MakeGenesisState(dbm.NewMemDB(), testGenesisDoc) require.NoError(t, err) - st.writeState.save() + st.writeState.commit() names.MinNameRegistrationPeriod = 5 exe := makeExecutor(st) @@ -1196,7 +1196,7 @@ func TestMerklePanic(t *testing.T) { acc0 := getAccount(st, privAccounts[0].Address()) acc1 := getAccount(st, privAccounts[1].Address()) - st.writeState.save() + st.writeState.commit() // SendTx. { tx := &payload.SendTx{ @@ -1238,7 +1238,7 @@ func TestMerklePanic(t *testing.T) { err := makeExecutor(stateCallTx).signExecuteCommit(tx, privAccounts[0]) require.NoError(t, err) } - st.writeState.save() + st.writeState.commit() trygetacc0 := getAccount(st, privAccounts[0].Address()) fmt.Println(trygetacc0.Address()) } @@ -1499,7 +1499,7 @@ func makeGenesisState(numAccounts int, randBalance bool, minBalance uint64, numV if err != nil { panic(fmt.Errorf("could not make genesis state: %v", err)) } - s0.writeState.save() + s0.writeState.commit() return s0, privAccounts } diff --git a/execution/state.go b/execution/state.go index e3687b49a..d89d3f525 100644 --- a/execution/state.go +++ b/execution/state.go @@ -17,7 +17,9 @@ package execution import ( "fmt" "sync" - "time" + + "github.com/tendermint/go-amino" + "github.com/tendermint/tendermint/crypto/tmhash" "github.com/hyperledger/burrow/acm" "github.com/hyperledger/burrow/acm/state" @@ -26,35 +28,36 @@ import ( "github.com/hyperledger/burrow/execution/exec" "github.com/hyperledger/burrow/execution/names" "github.com/hyperledger/burrow/genesis" - "github.com/hyperledger/burrow/logging" "github.com/hyperledger/burrow/permission" - "github.com/tendermint/iavl" + "github.com/hyperledger/burrow/storage" dbm "github.com/tendermint/tendermint/libs/db" ) const ( defaultCacheCapacity = 1024 - // Age of state versions in blocks before we remove them. This has us keeping a little over an hour's worth of blocks - // in principle we could manage with 2. Ideally we would lift this limit altogether but IAVL leaks memory on access - // to previous tree versions since it lazy loads values (nice) but gives no ability to unload them (see SaveBranch) - defaultVersionExpiry = 2048 - - // Version by state hash - versionPrefix = "v/" - - // Prefix of keys in state tree - accountsPrefix = "a/" - storagePrefix = "s/" - nameRegPrefix = "n/" - blockPrefix = "b/" - txPrefix = "t/" + uint64Length = 8 + + // Prefix under which the versioned merkle state tree resides - tracking previous versions of history + treePrefix = "m" + // Prefix under which all non-versioned values reside - either immutable values of references to immutable values + // that track the current state rather than being part of the history. + refsPrefix = "r" ) var ( - accountsStart, accountsEnd []byte = prefixKeyRange(accountsPrefix) - storageStart, storageEnd []byte = prefixKeyRange(storagePrefix) - nameRegStart, nameRegEnd []byte = prefixKeyRange(nameRegPrefix) - lastBlockHeightKey = []byte("h") + // Directly referenced values + accountKeyFormat = storage.NewMustKeyFormat("a", crypto.AddressLength) + storageKeyFormat = storage.NewMustKeyFormat("s", crypto.AddressLength, binary.Word256Length) + nameKeyFormat = storage.NewMustKeyFormat("n", storage.VariadicSegmentLength) + // Keys that reference references + blockRefKeyFormat = storage.NewMustKeyFormat("b", uint64Length) + txRefKeyFormat = storage.NewMustKeyFormat("t", uint64Length, uint64Length) + // Reference keys + // TODO: implement content-addressing of code and optionally blocks (to allow reference to block to be stored in state tree) + //codeKeyFormat = storage.NewMustKeyFormat("c", sha256.Size) + //blockKeyFormat = storage.NewMustKeyFormat("b", sha256.Size) + txKeyFormat = storage.NewMustKeyFormat("b", tmhash.Size) + commitKeyFormat = storage.NewMustKeyFormat("x", tmhash.Size) ) // Implements account and blockchain state @@ -73,27 +76,38 @@ type writeState struct { state *State } +type CommitID struct { + Hash binary.HexBytes + // Height and Version will normally be the same - but it's not clear we should assume this + Height uint64 + Version int64 +} + // Writers to state are responsible for calling State.Lock() before calling type State struct { // Values not reassigned sync.RWMutex writeState *writeState + height uint64 db dbm.DB - tree *iavl.MutableTree - logger *logging.Logger - - // Values may be reassigned (mutex protected) - // Previous version of IAVL tree for concurrent read-only access - readTree *iavl.ImmutableTree - // Last state hash - hash []byte + cacheDB *storage.CacheDB + tree *storage.RWTree + refs storage.KVStore + codec *amino.Codec } // Create a new State object func NewState(db dbm.DB) *State { + // We collapse all db operations into a single batch committed by save() + cacheDB := storage.NewCacheDB(db) + tree := storage.NewRWTree(storage.NewPrefixDB(cacheDB, treePrefix), defaultCacheCapacity) + refs := storage.NewPrefixDB(cacheDB, refsPrefix) s := &State{ - db: db, - tree: iavl.NewMutableTree(db, defaultCacheCapacity), + db: db, + cacheDB: cacheDB, + tree: tree, + refs: refs, + codec: amino.NewCodec(), } s.writeState = &writeState{state: s} return s @@ -107,15 +121,6 @@ func MakeGenesisState(db dbm.DB, genesisDoc *genesis.GenesisDoc) (*State, error) s := NewState(db) - if genesisDoc.GenesisTime.IsZero() { - // NOTE: [ben] change GenesisTime to requirement on v0.17 - // GenesisTime needs to be deterministic across the chain - // and should be required in the genesis file; - // the requirement is not yet enforced when lacking set - // time to 11/18/2016 @ 4:09am (UTC) - genesisDoc.GenesisTime = time.Unix(1479442162, 0) - } - // Make accounts state tree for _, genAcc := range genesisDoc.Accounts { perm := genAcc.Permissions @@ -149,7 +154,7 @@ func MakeGenesisState(db dbm.DB, genesisDoc *genesis.GenesisDoc) (*State, error) } // We need to save at least once so that readTree points at a non-working-state tree - _, err = s.writeState.save() + _, err = s.writeState.commit() if err != nil { return nil, err } @@ -161,24 +166,18 @@ func MakeGenesisState(db dbm.DB, genesisDoc *genesis.GenesisDoc) (*State, error) func LoadState(db dbm.DB, hash []byte) (*State, error) { s := NewState(db) // Get the version associated with this state hash - version, err := s.writeState.GetVersion(hash) + commitID := new(CommitID) + err := s.codec.UnmarshalBinary(s.refs.Get(commitKeyFormat.Key(hash)), commitID) if err != nil { - return nil, err + return nil, fmt.Errorf("could not decode CommitID: %v", err) } - if version <= 0 { - return nil, fmt.Errorf("trying to load state from non-positive version: version %v, hash: %X", version, hash) + if commitID.Version <= 0 { + return nil, fmt.Errorf("trying to load state from non-positive version: CommitID: %v", commitID) } - treeVersion, err := s.tree.LoadVersion(version) + err = s.tree.Load(commitID.Version) if err != nil { - return nil, fmt.Errorf("could not load current version of state tree: version %v, hash: %X", version, hash) + return nil, fmt.Errorf("could not load current version of state tree: CommitID: %v", commitID) } - if treeVersion != version { - return nil, fmt.Errorf("tried to load state version %v for state hash %X but loaded version %v", - version, hash, treeVersion) - } - // Load previous version for readTree - // Set readTree - s.readTree, err = s.tree.GetImmutable(version - 1) return s, nil } @@ -191,46 +190,40 @@ func (s *State) Update(updater func(up Updatable) error) ([]byte, error) { if err != nil { return nil, err } - return s.writeState.save() + return s.writeState.commit() } -func (ws *writeState) save() ([]byte, error) { +func (ws *writeState) commit() ([]byte, error) { // save state at a new version may still be orphaned before we save the version against the hash - hash, treeVersion, err := ws.state.tree.SaveVersion() + hash, treeVersion, err := ws.state.tree.Save() if err != nil { return nil, err } - // Take an immutable reference to the tree we just saved for querying - ws.state.readTree, err = ws.state.tree.GetImmutable(treeVersion) - if err != nil { - return nil, err + if len(hash) == 0 { + // Normalise the hash of an empty to tree to the correct hash size + hash = make([]byte, tmhash.Size) } - // Provide a reference to load this version in the future from the state hash - ws.SetVersion(hash, treeVersion) - ws.state.hash = hash - return hash, err -} - -// Get a previously saved tree version stored by state hash -func (ws *writeState) GetVersion(hash []byte) (int64, error) { - versionBytes := ws.state.db.Get(prefixedKey(versionPrefix, hash)) - if versionBytes == nil { - return -1, fmt.Errorf("could not retrieve version corresponding to state hash '%X' in database", hash) + commitID := CommitID{ + Hash: hash, + Height: ws.state.height, + Version: treeVersion, } - return binary.GetInt64BE(versionBytes), nil -} - -// Set the tree version associated with a particular hash -func (ws *writeState) SetVersion(hash []byte, version int64) { - versionBytes := make([]byte, 8) - binary.PutInt64BE(versionBytes, version) - ws.state.db.SetSync(prefixedKey(versionPrefix, hash), versionBytes) + bs, err := ws.state.codec.MarshalBinary(commitID) + if err != nil { + return nil, fmt.Errorf("could not encode CommitID %v: %v", commitID, err) + } + ws.state.refs.Set(commitKeyFormat.Key(hash), bs) + // Commit the state in cacheDB atomically for this block (synchronous) + batch := ws.state.db.NewBatch() + ws.state.cacheDB.Commit(batch) + batch.WriteSync() + return hash, err } // Returns nil if account does not exist with given address. func (s *State) GetAccount(address crypto.Address) (acm.Account, error) { - _, accBytes := s.readTree.Get(prefixedKey(accountsPrefix, address.Bytes())) + accBytes := s.tree.Get(accountKeyFormat.Key(address)) if accBytes == nil { return nil, nil } @@ -241,66 +234,67 @@ func (ws *writeState) UpdateAccount(account acm.Account) error { if account == nil { return fmt.Errorf("UpdateAccount passed nil account in State") } - // TODO: find a way to implement something equivalent to this so we can set the account StorageRoot - //storageRoot := s.tree.SubTreeHash(prefixedKey(storagePrefix, account.Address().Bytes())) - // Alternatively just abandon and - accountWithStorageRoot := acm.AsMutableAccount(account) - encodedAccount, err := accountWithStorageRoot.Encode() + encodedAccount, err := account.Encode() if err != nil { - return err + return fmt.Errorf("UpdateAccount could not encode account: %v", err) } - ws.state.tree.Set(prefixedKey(accountsPrefix, account.Address().Bytes()), encodedAccount) + ws.state.tree.Set(accountKeyFormat.Key(account.Address()), encodedAccount) return nil } func (ws *writeState) RemoveAccount(address crypto.Address) error { - ws.state.tree.Remove(prefixedKey(accountsPrefix, address.Bytes())) + ws.state.tree.Delete(accountKeyFormat.Key(address)) return nil } func (s *State) IterateAccounts(consumer func(acm.Account) (stop bool)) (stopped bool, err error) { - stopped = s.readTree.IterateRange(accountsStart, accountsEnd, true, func(key, value []byte) bool { - var account acm.Account - account, err = acm.Decode(value) + it := accountKeyFormat.Iterator(s.tree, nil, nil) + for it.Valid() { + account, err := acm.Decode(it.Value()) if err != nil { - return true + return true, fmt.Errorf("IterateAccounts could not decode account: %v", err) } - return consumer(account) - }) - return + if consumer(account) { + return true, nil + } + it.Next() + } + return false, nil } func (s *State) GetStorage(address crypto.Address, key binary.Word256) (binary.Word256, error) { - _, value := s.readTree.Get(prefixedKey(storagePrefix, address.Bytes(), key.Bytes())) - return binary.LeftPadWord256(value), nil + return binary.LeftPadWord256(s.tree.Get(storageKeyFormat.Key(address, key))), nil } func (ws *writeState) SetStorage(address crypto.Address, key, value binary.Word256) error { if value == binary.Zero256 { - ws.state.tree.Remove(prefixedKey(storagePrefix, address.Bytes(), key.Bytes())) + ws.state.tree.Delete(storageKeyFormat.Key(address, key)) } else { - ws.state.tree.Set(prefixedKey(storagePrefix, address.Bytes(), key.Bytes()), value.Bytes()) + ws.state.tree.Set(storageKeyFormat.Key(address, key), value.Bytes()) } return nil } -func (s *State) IterateStorage(address crypto.Address, - consumer func(key, value binary.Word256) (stop bool)) (stopped bool, err error) { - stopped = s.readTree.IterateRange(storageStart, storageEnd, true, func(key []byte, value []byte) (stop bool) { - // Note: no left padding should occur unless there is a bug and non-words have been writte to this storage tree +func (s *State) IterateStorage(address crypto.Address, consumer func(key, value binary.Word256) (stop bool)) (stopped bool, err error) { + it := storageKeyFormat.Fix(address).Iterator(s.tree, nil, nil) + for it.Valid() { + key := it.Key() + // Note: no left padding should occur unless there is a bug and non-words have been written to this storage tree if len(key) != binary.Word256Length { - err = fmt.Errorf("key '%X' stored for account %s is not a %v-byte word", + return true, fmt.Errorf("key '%X' stored for account %s is not a %v-byte word", key, address, binary.Word256Length) - return true } + value := it.Value() if len(value) != binary.Word256Length { - err = fmt.Errorf("value '%X' stored for account %s is not a %v-byte word", + return true, fmt.Errorf("value '%X' stored for account %s is not a %v-byte word", key, address, binary.Word256Length) - return true } - return consumer(binary.LeftPadWord256(key), binary.LeftPadWord256(value)) - }) - return + if consumer(binary.LeftPadWord256(key), binary.LeftPadWord256(value)) { + return true, nil + } + it.Next() + } + return false, nil } // State.storage @@ -309,12 +303,11 @@ func (s *State) IterateStorage(address crypto.Address, // Execution events func (ws *writeState) AddBlock(be *exec.BlockExecution) error { - lastBlockHeight, ok := ws.lastBlockHeight() - if ok && be.Height != lastBlockHeight+1 { + if ws.state.height > 0 && be.Height != ws.state.height+1 { return fmt.Errorf("AddBlock received block for height %v but last block height was %v", - be.Height, lastBlockHeight) + be.Height, ws.state.height) } - ws.setLastBlockHeight(be.Height) + ws.state.height = be.Height // Index transactions so they can be retrieved by their TxHash for i, txe := range be.TxExecutions { ws.addTx(txe.TxHash, be.Height, uint64(i)) @@ -323,37 +316,34 @@ func (ws *writeState) AddBlock(be *exec.BlockExecution) error { if err != nil { return err } - key := blockKey(be.Height) - ws.state.tree.Set(key, bs) + ws.state.refs.Set(blockRefKeyFormat.Key(be.Height), bs) return nil } func (ws *writeState) addTx(txHash []byte, height, index uint64) { - ws.state.tree.Set(txKey(txHash), encodeTxRef(height, index)) + ws.state.refs.Set(txKeyFormat.Key(txHash), txRefKeyFormat.Key(height, index)) } func (s *State) GetTx(txHash []byte) (*exec.TxExecution, error) { - _, bs := s.readTree.Get(txKey(txHash)) + bs := s.tree.Get(txKeyFormat.Key(txHash)) if len(bs) == 0 { return nil, nil } - height, index, err := decodeTxRef(bs) - if err != nil { - return nil, fmt.Errorf("error decoding database reference to tx %X: %v", txHash, err) - } - be, err := s.GetBlock(height) + height, index := new(uint64), new(uint64) + txRefKeyFormat.Scan(bs, height, index) + be, err := s.GetBlock(*height) if err != nil { return nil, fmt.Errorf("error getting block %v containing tx %X", height, txHash) } - if index < uint64(len(be.TxExecutions)) { - return be.TxExecutions[index], nil + if *index < uint64(len(be.TxExecutions)) { + return be.TxExecutions[*index], nil } return nil, fmt.Errorf("retrieved index %v in block %v for tx %X but block only contains %v TxExecutions", index, height, txHash, len(be.TxExecutions)) } func (s *State) GetBlock(height uint64) (*exec.BlockExecution, error) { - _, bs := s.readTree.Get(blockKey(height)) + bs := s.tree.Get(blockRefKeyFormat.Key(height)) if len(bs) == 0 { return nil, nil } @@ -361,36 +351,25 @@ func (s *State) GetBlock(height uint64) (*exec.BlockExecution, error) { } func (s *State) GetBlocks(startHeight, endHeight uint64, consumer func(*exec.BlockExecution) (stop bool)) (stopped bool, err error) { - return s.readTree.IterateRange(blockKey(startHeight), blockKey(endHeight), true, - func(_, value []byte) bool { - block, err := exec.DecodeBlockExecution(value) - if err != nil { - err = fmt.Errorf("error unmarshalling ExecutionEvent in GetEvents: %v", err) - // stop iteration on error - return true - } - return consumer(block) - }), err + kf := blockRefKeyFormat + it := kf.Iterator(s.refs, kf.Suffix(startHeight), kf.Suffix(endHeight)) + for it.Valid() { + block, err := exec.DecodeBlockExecution(it.Value()) + if err != nil { + return true, fmt.Errorf("error unmarshalling ExecutionEvent in GetEvents: %v", err) + } + if consumer(block) { + return true, nil + } + it.Next() + } + return false, nil } func (s *State) Hash() []byte { s.RLock() defer s.RUnlock() - return s.hash -} - -func (s *writeState) lastBlockHeight() (uint64, bool) { - _, bs := s.state.tree.Get(lastBlockHeightKey) - if len(bs) == 0 { - return 0, false - } - return binary.GetUint64BE(bs), true -} - -func (s *writeState) setLastBlockHeight(height uint64) { - bs := make([]byte, 8) - binary.PutUint64BE(bs, height) - s.state.tree.Set(lastBlockHeightKey, bs) + return s.tree.Hash() } // Events @@ -400,7 +379,7 @@ func (s *writeState) setLastBlockHeight(height uint64) { var _ names.IterableReader = &State{} func (s *State) GetName(name string) (*names.Entry, error) { - _, entryBytes := s.readTree.Get(prefixedKey(nameRegPrefix, []byte(name))) + entryBytes := s.tree.Get(nameKeyFormat.Key(name)) if entryBytes == nil { return nil, nil } @@ -408,93 +387,45 @@ func (s *State) GetName(name string) (*names.Entry, error) { return names.DecodeEntry(entryBytes) } -func (s *State) IterateNames(consumer func(*names.Entry) (stop bool)) (stopped bool, err error) { - return s.readTree.IterateRange(nameRegStart, nameRegEnd, true, func(key []byte, value []byte) (stop bool) { - var entry *names.Entry - entry, err = names.DecodeEntry(value) - if err != nil { - return true - } - return consumer(entry) - }), err -} - func (ws *writeState) UpdateName(entry *names.Entry) error { bs, err := entry.Encode() if err != nil { return err } - ws.state.tree.Set(prefixedKey(nameRegPrefix, []byte(entry.Name)), bs) + ws.state.tree.Set(nameKeyFormat.Key(entry.Name), bs) return nil } func (ws *writeState) RemoveName(name string) error { - ws.state.tree.Remove(prefixedKey(nameRegPrefix, []byte(name))) + ws.state.tree.Delete(nameKeyFormat.Key(name)) return nil } +func (s *State) IterateNames(consumer func(*names.Entry) (stop bool)) (stopped bool, err error) { + it := nameKeyFormat.Iterator(s.tree, nil, nil) + for it.Valid() { + entry, err := names.DecodeEntry(it.Value()) + if err != nil { + return true, fmt.Errorf("State.IterateNames() could not iterate over names: %v", err) + } + if consumer(entry) { + return true, nil + } + it.Next() + } + return false, nil +} + // Creates a copy of the database to the supplied db func (s *State) Copy(db dbm.DB) (*State, error) { stateCopy := NewState(db) - s.tree.Iterate(func(key []byte, value []byte) bool { + s.tree.IterateRange(nil, nil, true, func(key, value []byte) bool { stateCopy.tree.Set(key, value) return false }) - _, err := stateCopy.writeState.save() + _, err := stateCopy.writeState.commit() if err != nil { return nil, err } return stateCopy, nil } - -// Key and value helpers - -func encodeTxRef(height, index uint64) []byte { - bs := make([]byte, 16) - binary.PutUint64BE(bs[:8], height) - binary.PutUint64BE(bs[8:], index) - return bs -} - -func decodeTxRef(bs []byte) (height, index uint64, _ error) { - if len(bs) != 16 { - return 0, 0, fmt.Errorf("tx reference must have 16 bytes but '%X' does not", bs) - } - height = binary.GetUint64BE(bs[:8]) - index = binary.GetUint64BE(bs[8:]) - return -} - -func txKey(txHash []byte) []byte { - return prefixedKey(txPrefix, txHash) -} - -func blockKey(height uint64) []byte { - bs := make([]byte, 8) - binary.PutUint64BE(bs, height) - return prefixedKey(blockPrefix, bs) -} - -func prefixedKey(prefix string, suffices ...[]byte) []byte { - key := []byte(prefix) - for _, suffix := range suffices { - key = append(key, suffix...) - } - return key -} - -// Returns the start key equal to the bytes of prefix and the end key which lexicographically above any key beginning -// with prefix -func prefixKeyRange(prefix string) (start, end []byte) { - start = []byte(prefix) - for i := len(start) - 1; i >= 0; i-- { - c := start[i] - if c < 0xff { - end = make([]byte, i+1) - copy(end, start) - end[i]++ - return - } - } - return -} diff --git a/execution/state_test.go b/execution/state_test.go index 4ed6a5c9c..6de235aff 100644 --- a/execution/state_test.go +++ b/execution/state_test.go @@ -19,6 +19,9 @@ import ( "fmt" "testing" + "github.com/hyperledger/burrow/txs" + "github.com/hyperledger/burrow/txs/payload" + "github.com/hyperledger/burrow/acm" "github.com/hyperledger/burrow/binary" "github.com/hyperledger/burrow/crypto" @@ -72,12 +75,15 @@ func TestWriteState_AddBlock(t *testing.T) { require.NoError(t, err) } -func mkBlock(height, txs, events uint64) *exec.BlockExecution { +func mkBlock(height, numTxs, events uint64) *exec.BlockExecution { be := &exec.BlockExecution{ Height: height, } - for ti := uint64(0); ti < txs; ti++ { + for ti := uint64(0); ti < numTxs; ti++ { + hash := txs.NewTx(&payload.CallTx{}).Hash() + hash[0] = byte(ti) txe := &exec.TxExecution{ + TxHash: hash, Height: height, } for e := uint64(0); e < events; e++ { diff --git a/logging/logconfig/config.go b/logging/logconfig/config.go index d5ffd0e89..d6d617221 100644 --- a/logging/logconfig/config.go +++ b/logging/logconfig/config.go @@ -4,8 +4,6 @@ import ( "bytes" "fmt" - "github.com/hyperledger/burrow/logging/structure" - "encoding/json" "github.com/BurntSushi/toml" @@ -29,16 +27,6 @@ func DefaultNodeLoggingConfig() *LoggingConfig { } } -func DefaultClientLoggingConfig() *LoggingConfig { - return &LoggingConfig{ - // No output - RootSink: Sink(). - SetTransform(FilterTransform(ExcludeWhenAnyMatches, - structure.CapturedLoggingSourceKey, "")). - SetOutput(StderrOutput()), - } -} - func New() *LoggingConfig { return &LoggingConfig{} } diff --git a/logging/logconfig/presets/instructions.go b/logging/logconfig/presets/instructions.go index c0f7dfc7d..192f4f96c 100644 --- a/logging/logconfig/presets/instructions.go +++ b/logging/logconfig/presets/instructions.go @@ -38,7 +38,9 @@ const ( Up = "up" Down = "down" Info = "info" + Trace = "trace" Minimal = "minimal" + Opcodes = "opcodes" IncludeAny = "include-any" Stderr = "stderr" Stdout = "stdout" @@ -84,6 +86,16 @@ var instructions = []Instruction{ "module", "mempool"))), nil }, }, + { + name: Opcodes, + desc: "Capture opcodes exclusively (when VMOptions includes DebugOpccodes in main config)", + builder: func(stack []*logconfig.SinkConfig, args []string) ([]*logconfig.SinkConfig, error) { + return push(stack, + logconfig.Sink(). + SetTransform(logconfig.FilterTransform(logconfig.IncludeWhenAllMatch, "tag", "DebugOpcodes")). + SetOutput(logconfig.StdoutOutput().SetFormat("{{.message}}"))), nil + }, + }, { name: IncludeAny, desc: "Establish an 'include when any predicate matches' filter transform at this this sink", @@ -104,6 +116,16 @@ var instructions = []Instruction{ return stack, nil }, }, + { + name: Trace, + desc: "Add a filter predicate to match the Info logging channel", + builder: func(stack []*logconfig.SinkConfig, args []string) ([]*logconfig.SinkConfig, error) { + sink := peek(stack) + ensureFilter(sink) + sink.Transform.FilterConfig.AddPredicate(structure.ChannelKey, structure.InfoChannelName) + return stack, nil + }, + }, { name: Stdout, desc: "Use Stdout output for this sink", diff --git a/storage/cache_db.go b/storage/cache_db.go new file mode 100644 index 000000000..7ed6dd3a7 --- /dev/null +++ b/storage/cache_db.go @@ -0,0 +1,106 @@ +package storage + +import ( + "fmt" + + dbm "github.com/tendermint/tendermint/libs/db" +) + +type CacheDB struct { + cache *KVCache + backend KVIterableReader +} + +func NewCacheDB(backend KVIterableReader) *CacheDB { + return &CacheDB{ + cache: NewKVCache(), + backend: backend, + } +} + +// DB implementation +func (cdb *CacheDB) Get(key []byte) []byte { + value, deleted := cdb.cache.Info(key) + if deleted { + return nil + } + if value != nil { + return value + } + return cdb.backend.Get(key) +} + +func (cdb *CacheDB) Has(key []byte) bool { + value, deleted := cdb.cache.Info(key) + return !deleted && (value != nil || cdb.backend.Has(key)) +} + +func (cdb *CacheDB) Iterator(start, end []byte) KVIterator { + // Keys from cache will sort first because of order in MultiIterator and Uniq will take the first KVs so KVs + // appearing in cache will override values from backend. + return Uniq(NewMultiIterator(false, cdb.cache.Iterator(start, end), cdb.backend.Iterator(start, end))) +} + +func (cdb *CacheDB) ReverseIterator(start, end []byte) KVIterator { + fmt.Printf("ReverseIterator(%s, %s)\n", string(start), string(end)) + return Uniq(NewMultiIterator(true, cdb.cache.ReverseIterator(start, end), cdb.backend.ReverseIterator(start, end))) +} + +func (cdb *CacheDB) Set(key, value []byte) { + cdb.cache.Set(key, value) +} + +func (cdb *CacheDB) SetSync(key, value []byte) { + cdb.cache.Set(key, value) +} + +func (cdb *CacheDB) Delete(key []byte) { + cdb.cache.Delete(key) +} + +func (cdb *CacheDB) DeleteSync(key []byte) { + cdb.Delete(key) +} + +func (cdb *CacheDB) Close() { +} + +func (cdb *CacheDB) NewBatch() dbm.Batch { + return &cacheBatch{ + cache: NewKVCache(), + backend: cdb, + } +} + +func (cdb *CacheDB) Commit(writer KVWriter) { + cdb.cache.WriteTo(writer) + cdb.cache.Reset() +} + +type cacheBatch struct { + cache *KVCache + backend *CacheDB +} + +func (cb *cacheBatch) Set(key, value []byte) { + cb.cache.Set(key, value) +} + +func (cb *cacheBatch) Delete(key []byte) { + cb.cache.Delete(key) +} + +func (cb *cacheBatch) Write() { + cb.cache.WriteTo(cb.backend) +} + +func (cb *cacheBatch) WriteSync() { + cb.Write() +} + +func (cdb *CacheDB) Print() { +} + +func (cdb *CacheDB) Stats() map[string]string { + return map[string]string{} +} diff --git a/storage/cache_db_test.go b/storage/cache_db_test.go new file mode 100644 index 000000000..67ce7a5c3 --- /dev/null +++ b/storage/cache_db_test.go @@ -0,0 +1,59 @@ +package storage + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + dbm "github.com/tendermint/tendermint/libs/db" +) + +func TestBatchCommit(t *testing.T) { + db := dbm.NewMemDB() + cdb := NewCacheDB(db) + foo := bz("foo") + bam := bz("bam") + bosh := bz("bosh") + boom := bz("boom") + db.Set(foo, bam) + assert.Equal(t, bam, cdb.Get(foo), "underlying writes should be seen") + cdb.Set(foo, bosh) + assert.Equal(t, bosh, cdb.Get(foo), "writes to CacheDB should be available") + batch := cdb.NewBatch() + batch.Set(foo, bam) + assert.Equal(t, bosh, cdb.Get(foo), "write to batch should not be seen") + batch.WriteSync() + cdb.Commit(db) + assert.Equal(t, bam, db.Get(foo), "changes should commit") + cdb.Set(foo, bosh) + assert.Equal(t, bam, db.Get(foo), "uncommitted changes should not be seen in db") + cdb.Delete(foo) + assert.Nil(t, cdb.Get(foo)) + assert.Equal(t, bam, db.Get(foo)) + cdb.Commit(db) + assert.Nil(t, db.Get(foo)) + cdb.Set(foo, boom) + assert.Nil(t, db.Get(foo)) +} + +func TestCacheDB_Iterator(t *testing.T) { + db := dbm.NewMemDB() + cdb := NewCacheDB(db) + foo := bz("foo") + bam := bz("bam") + bosh := bz("bosh") + boom := bz("boom") + + db.Set(append(foo, foo...), foo) + db.Set(append(foo, bam...), bam) + cdb.Set(append(foo, bosh...), bosh) + cdb.Set(boom, boom) + + it := cdb.Iterator(nil, nil) + kvp := collectIterator(it) + fmt.Println(kvp) + cdb.Commit(db) + + it = db.Iterator(nil, nil) + assert.Equal(t, kvp, collectIterator(it)) +} diff --git a/storage/channel_iterator.go b/storage/channel_iterator.go new file mode 100644 index 000000000..a59eb737e --- /dev/null +++ b/storage/channel_iterator.go @@ -0,0 +1,96 @@ +package storage + +import ( + "bytes" + "fmt" + "sort" +) + +type ChannelIterator struct { + ch <-chan KVPair + start []byte + end []byte + kv KVPair + invalid bool +} + +type KVPair struct { + Key []byte + Value []byte +} + +func (kv KVPair) String() string { + return fmt.Sprintf("%s => %s", string(kv.Key), string(kv.Value)) +} + +type KVPairs []KVPair + +func (kvp KVPairs) Len() int { + return len(kvp) +} + +func (kvp KVPairs) Less(i, j int) bool { + return bytes.Compare(kvp[i].Key, kvp[j].Key) == -1 +} + +func (kvp KVPairs) Swap(i, j int) { + kvp[i], kvp[j] = kvp[j], kvp[i] +} + +func (kvp KVPairs) Sorted() KVPairs { + kvpCopy := make(KVPairs, len(kvp)) + copy(kvpCopy, kvp) + sort.Stable(kvpCopy) + return kvpCopy +} + +// ChannelIterator wraps a stream of kvp KVPairs over a channel as a stateful KVIterator. The start and end keys provided +// are purely indicative (for Domain()) and are assumed to be honoured by the input channel - they are not checked +// and keys are not sorted. NewChannelIterator will block until the first value is received over the channel. +func NewChannelIterator(ch <-chan KVPair, start, end []byte) *ChannelIterator { + ci := &ChannelIterator{ + ch: ch, + start: start, + end: end, + } + // Load first element if it exists + ci.Next() + return ci +} + +func (it *ChannelIterator) Domain() ([]byte, []byte) { + return it.start, it.end +} + +func (it *ChannelIterator) Valid() bool { + return !it.invalid +} + +func (it *ChannelIterator) Next() { + if it.invalid { + panic("ChannelIterator.Value() called on invalid iterator") + } + kv, ok := <-it.ch + it.invalid = !ok + it.kv = kv +} + +func (it *ChannelIterator) Key() []byte { + if it.invalid { + panic("ChannelIterator.Key() called on invalid iterator") + } + return it.kv.Key +} + +func (it *ChannelIterator) Value() []byte { + if it.invalid { + panic("ChannelIterator.Value() called on invalid iterator") + } + return it.kv.Value +} + +func (it *ChannelIterator) Close() { + for range it.ch { + // drain channel if necessary + } +} diff --git a/storage/channel_iterator_test.go b/storage/channel_iterator_test.go new file mode 100644 index 000000000..482467059 --- /dev/null +++ b/storage/channel_iterator_test.go @@ -0,0 +1,16 @@ +package storage + +import "testing" + +func TestNewChannelIterator(t *testing.T) { + ch := make(chan KVPair) + go sendKVPair(ch, kvPairs("a", "hello", "b", "channel", "c", "this is nice")) + ci := NewChannelIterator(ch, bz("a"), bz("c")) + checkItem(t, ci, bz("a"), bz("hello")) + checkNext(t, ci, true) + checkItem(t, ci, bz("b"), bz("channel")) + checkNext(t, ci, true) + checkItem(t, ci, bz("c"), bz("this is nice")) + checkNext(t, ci, false) + checkInvalid(t, ci) +} diff --git a/storage/content_addressed_store.go b/storage/content_addressed_store.go new file mode 100644 index 000000000..a89abdbfe --- /dev/null +++ b/storage/content_addressed_store.go @@ -0,0 +1,37 @@ +package storage + +import ( + "crypto/sha256" + + "fmt" + + dbm "github.com/tendermint/tendermint/libs/db" +) + +type ContentAddressedStore struct { + db dbm.DB +} + +func NewContentAddressedStore(db dbm.DB) *ContentAddressedStore { + return &ContentAddressedStore{ + db: db, + } +} + +// These function match those used in Hoard + +// Put data in the database by saving data with a key that is its sha256 hash +func (cas *ContentAddressedStore) Put(data []byte) ([]byte, error) { + hasher := sha256.New() + _, err := hasher.Write(data) + if err != nil { + return nil, fmt.Errorf("ContentAddressedStore could not hash data: %v", err) + } + hash := hasher.Sum(nil) + cas.db.SetSync(hash, data) + return hash, nil +} + +func (cas *ContentAddressedStore) Get(hash []byte) ([]byte, error) { + return cas.db.Get(hash), nil +} diff --git a/storage/key_format.go b/storage/key_format.go new file mode 100644 index 000000000..9494ede6d --- /dev/null +++ b/storage/key_format.go @@ -0,0 +1,291 @@ +package storage + +import ( + "encoding/binary" + "fmt" + "strings" +) + +const VariadicSegmentLength = 0 + +type ByteSlicable interface { + Bytes() []byte +} + +type MustKeyFormat struct { + KeyFormat +} + +func NewMustKeyFormat(prefix string, layout ...int) *MustKeyFormat { + kf, err := NewKeyFormat(prefix, layout...) + if err != nil { + panic(err) + } + return &MustKeyFormat{ + KeyFormat: *kf, + } +} + +func (kf *MustKeyFormat) KeyBytes(segments ...[]byte) []byte { + key, err := kf.KeyFormat.KeyBytes(segments...) + if err != nil { + panic(err) + } + return key +} + +func (kf *MustKeyFormat) Key(args ...interface{}) []byte { + key, err := kf.KeyFormat.Key(args...) + if err != nil { + panic(err) + } + return key +} + +func (kf *MustKeyFormat) Scan(key []byte, args ...interface{}) { + err := kf.KeyFormat.Scan(key, args...) + if err != nil { + panic(err) + } +} + +func (kf *MustKeyFormat) Fix(args ...interface{}) *MustKeyFormat { + fkf, err := kf.KeyFormat.Fix(args...) + if err != nil { + panic(err) + } + return &MustKeyFormat{*fkf} +} + +func (kf *MustKeyFormat) Prefix(args ...interface{}) Prefix { + prefix, err := kf.KeyFormat.Prefix(args...) + if err != nil { + panic(err) + } + return prefix +} + +func (kf *MustKeyFormat) Suffix(args ...interface{}) Prefix { + prefix, err := kf.KeyFormat.Suffix(args...) + if err != nil { + panic(err) + } + return prefix +} + +// Provides a fixed-width lexicograph}ically sortable []byte key format +type KeyFormat struct { + prefix Prefix + layout []int + length int +} + +// Create a []byte key format based on a single byte prefix and fixed width key segments each of whose length is +// specified by by the corresponding element of layout. A final segment length of 0 can be used to indicate a variadic +// final element that may be of arbitrary length. +// +// For example, to store keys that could index some objects by a version number and their SHA256 hash using the form: +// 'c' then you would define the KeyFormat with: +// +// var keyFormat = NewKeyFormat('c', 8, 32) +// +// Then you can create a key with: +// +// func ObjectKey(version uint64, objectBytes []byte) []byte { +// hasher := sha256.New() +// hasher.Sum(nil) +// return keyFormat.Key(version, hasher.Sum(nil)) +// }} +func NewKeyFormat(prefix string, layout ...int) (*KeyFormat, error) { + // For prefix byte + p := Prefix(prefix) + length := p.Length() + for i, l := range layout { + if l < 0 { + panic(fmt.Errorf("KeyFormat layout must contain non-negative integers")) + } + if l == VariadicSegmentLength && i != len(layout)-1 { + return nil, fmt.Errorf("KeyFormat may only have a 0 in the last place of its layout to indicate a " + + "variadic segment") + } + length += int(l) + } + return &KeyFormat{ + prefix: p, + layout: layout, + length: length, + }, nil +} + +// Format the byte segments into the key format - will panic if the segment lengths do not match the layout. +func (kf *KeyFormat) KeyBytes(segments ...[]byte) ([]byte, error) { + key := make([]byte, kf.length) + n := copy(key, kf.prefix) + for i, s := range segments { + l := kf.layout[i] + if l == VariadicSegmentLength { + // Must be a final variadic element + key = append(key, s...) + n += len(s) + } else if len(s) != l { + return nil, fmt.Errorf("the segment '0x%X' provided to KeyFormat.KeyBytes() does not have required "+ + "%d bytes required by layout for segment %d", s, l, i) + } else { + n += l + // Big endian so pad on left if not given the full width for this segment + copy(key[n-len(s):n], s) + } + } + return key[:n], nil +} + +// Format the args passed into the key format - will panic if the arguments passed do not match the length +// of the segment to which they correspond. When called with no arguments returns the raw prefix (useful as a start +// element of the entire keys space when sorted lexicographically). +func (kf *KeyFormat) Key(args ...interface{}) ([]byte, error) { + if len(args) > len(kf.layout) { + return nil, fmt.Errorf("KeyFormat.Key() is provided with %d args but format only has %d segments", + len(args), len(kf.layout)) + } + segments := make([][]byte, len(args)) + for i, a := range args { + segments[i] = format(a) + } + return kf.KeyBytes(segments...) +} + +// Reads out the bytes associated with each segment of the key format from key. +func (kf *KeyFormat) ScanBytes(key []byte) [][]byte { + segments := make([][]byte, len(kf.layout)) + n := kf.prefix.Length() + for i, l := range kf.layout { + if l == 0 { + // Must be final variadic segment + segments[i] = key[n:] + return segments + } + n += l + if n > len(key) { + return segments[:i] + } + segments[i] = key[n-l : n] + } + return segments +} + +// Extracts the segments into the values pointed to by each of args. Each arg must be a pointer to int64, uint64, or +// []byte, and the width of the args must match layout. +func (kf *KeyFormat) Scan(key []byte, args ...interface{}) error { + segments := kf.ScanBytes(key) + if len(args) > len(segments) { + return fmt.Errorf("KeyFormat.Scan() is provided with %d args but format only has %d segments in key %X", + len(args), len(segments), key) + } + for i, a := range args { + scan(a, segments[i]) + } + return nil +} + +// Return the Key as a prefix - may just be the literal prefix, or an entire key +func (kf *KeyFormat) Prefix(args ...interface{}) (Prefix, error) { + return kf.Key(args...) +} + +func (kf *KeyFormat) Suffix(args ...interface{}) (Prefix, error) { + key, err := kf.Key(args...) + if err != nil { + return nil, err + } + return key[len(kf.prefix):], nil +} + +// Fixes the first args many segments as the prefix of a new KeyFormat by using the args to generate a key that becomes +// that prefix. Any remaining unassigned segments become the layout of the new KeyFormat. +func (kf *KeyFormat) Fix(args ...interface{}) (*KeyFormat, error) { + key, err := kf.Key(args...) + if err != nil { + return nil, err + } + return NewKeyFormat(string(key), kf.layout[len(args):]...) +} + +// Returns an iterator over the underlying iterable using this KeyFormat's prefix. This is to support proper iteration over the +// prefix in the presence of nil start or end which requests iteration to the inclusive edges of the domain. An optional +// argument for reverse can be passed to get reverse iteration. +func (kf *KeyFormat) Iterator(iterable KVIterable, start, end []byte, reverse ...bool) KVIterator { + if len(reverse) > 0 && reverse[0] { + return kf.prefix.ReverseIterator(iterable.ReverseIterator, start, end) + } + return kf.prefix.Iterator(iterable.Iterator, start, end) +} + +func (kf *KeyFormat) Unprefixed() *KeyFormat { + return &KeyFormat{ + prefix: []byte{}, + layout: kf.layout[:len(kf.layout):len(kf.layout)], + length: kf.length - len(kf.prefix), + } +} + +func (kf *KeyFormat) NumSegments() int { + return len(kf.layout) +} + +func (kf *KeyFormat) Layout() []int { + l := make([]int, len(kf.layout)) + copy(l, kf.layout) + return l +} + +func (kf *KeyFormat) String() string { + ls := make([]string, len(kf.layout)) + for i, l := range kf.layout { + ls[i] = fmt.Sprintf("[%d]byte", l) + } + return fmt.Sprintf("KeyFormat{%s|%s}", kf.prefix.HexString(), strings.Join(ls, "|")) +} + +func scan(a interface{}, value []byte) { + switch v := a.(type) { + case *int64: + // Negative values will be mapped correctly when read in as uint64 and then type converted + *v = int64(binary.BigEndian.Uint64(value)) + case *uint64: + *v = binary.BigEndian.Uint64(value) + case *[]byte: + *v = value + case *string: + *v = string(value) + default: + panic(fmt.Errorf("KeyFormat scan() does not support scanning value of type %T: %v", a, a)) + } +} + +func format(a interface{}) []byte { + switch v := a.(type) { + case uint64: + return formatUint64(v) + case int64: + return formatUint64(uint64(v)) + // Provide formatting from int,uint as a convenience to avoid casting arguments + case uint: + return formatUint64(uint64(v)) + case int: + return formatUint64(uint64(v)) + case []byte: + return v + case ByteSlicable: + return v.Bytes() + case string: + return []byte(v) + default: + panic(fmt.Errorf("KeyFormat format() does not support formatting value of type %T: %v", a, a)) + } +} + +func formatUint64(v uint64) []byte { + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, v) + return bs +} diff --git a/storage/key_format_test.go b/storage/key_format_test.go new file mode 100644 index 000000000..fcf517890 --- /dev/null +++ b/storage/key_format_test.go @@ -0,0 +1,97 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestKeyFormatBytes(t *testing.T) { + kf := NewMustKeyFormat("e", 8, 8, 8) + assert.Panics(t, func() { kf.KeyBytes([]byte{1, 2, 3}) }, "insufficient bytes for segment should panic") + assert.Equal(t, []byte{'e', 1, 2, 3, 4, 5, 6, 7, 8}, kf.KeyBytes([]byte{1, 2, 3, 4, 5, 6, 7, 8})) + assert.Equal(t, []byte{'e', 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 0, 0, 1, 1, 2, 2, 3, 3}, + kf.KeyBytes([]byte{1, 2, 3, 4, 5, 6, 7, 8}, []byte{1, 2, 3, 4, 5, 6, 7, 8}, []byte{0, 0, 1, 1, 2, 2, 3, 3})) + assert.Equal(t, []byte{'e'}, kf.KeyBytes()) +} + +func TestKeyFormat(t *testing.T) { + kf := NewMustKeyFormat("eab", 8, 8, 8) + key := []byte{'e', 'a', 'b', 0, 0, 0, 0, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 200, 0, 0, 0, 0, 0, 0, 1, 144} + var a, b, c int64 = 100, 200, 400 + assert.Equal(t, key, kf.Key(a, b, c)) + + var ao, bo, co = new(int64), new(int64), new(int64) + kf.Scan(key, ao, bo, co) + assert.Equal(t, a, *ao) + assert.Equal(t, b, *bo) + assert.Equal(t, c, *co) + + bs := new([]byte) + kf.Scan(key, ao, bo, bs) + assert.Equal(t, a, *ao) + assert.Equal(t, b, *bo) + assert.Equal(t, []byte{0, 0, 0, 0, 0, 0, 1, 144}, *bs) + + assert.Equal(t, []byte{'e', 'a', 'b', 0, 0, 0, 0, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 200}, kf.Key(a, b)) +} + +func TestNegativeKeys(t *testing.T) { + kf := NewMustKeyFormat("e", 8, 8) + + var a, b int64 = -100, -200 + // One's complement plus one + key := []byte{'e', + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, byte(0xff + a + 1), + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, byte(0xff + b + 1)} + assert.Equal(t, key, kf.Key(a, b)) + + var ao, bo = new(int64), new(int64) + kf.Scan(key, ao, bo) + assert.Equal(t, a, *ao) + assert.Equal(t, b, *bo) +} + +func TestOverflow(t *testing.T) { + kf := NewMustKeyFormat("o", 8, 8) + + var a int64 = 1 << 62 + var b uint64 = 1 << 63 + key := []byte{'o', + 0x40, 0, 0, 0, 0, 0, 0, 0, + 0x80, 0, 0, 0, 0, 0, 0, 0, + } + assert.Equal(t, key, kf.Key(a, b)) + + var ao, bo = new(int64), new(int64) + kf.Scan(key, ao, bo) + assert.Equal(t, a, *ao) + assert.Equal(t, int64(b), *bo) +} + +func TestVariadicKeys(t *testing.T) { + kf := NewMustKeyFormat("o", 8, 0) + str := "hello this is an arbitrary length string" + + var a int64 = 1 << 62 + key := append([]byte{'o', 0x40, 0, 0, 0, 0, 0, 0, 0}, []byte(str)...) + + assert.Equal(t, key, kf.Key(a, str)) + + var ao, stro = new(int64), new(string) + kf.Scan(key, ao, stro) + assert.Equal(t, a, *ao) + assert.Equal(t, str, *stro) +} + +func TestKeyFormat_Fix(t *testing.T) { + kf := NewMustKeyFormat("o", 8, 8, 4) + prefix := string([]byte{'o', 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 4}) + assert.Equal(t, *NewMustKeyFormat(prefix, 4), *kf.Fix(3, 4)) +} + +func TestKeyFormat_Suffix(t *testing.T) { + kf := NewMustKeyFormat("diplodocus", 4, 0) + key := kf.Suffix([]byte("Hi, "), "dinosaur") + assert.Equal(t, "Hi, dinosaur", key.String()) +} diff --git a/storage/kvcache.go b/storage/kvcache.go new file mode 100644 index 000000000..6166ba5e8 --- /dev/null +++ b/storage/kvcache.go @@ -0,0 +1,177 @@ +package storage + +import ( + "bytes" + "sort" +) + +type KVCache struct { + cache map[string]valueInfo +} + +type valueInfo struct { + value []byte + deleted bool +} + +// Creates an in-memory cache wrapping a map that stores the provided tombstone value for deleted keys +func NewKVCache() *KVCache { + return &KVCache{ + cache: make(map[string]valueInfo), + } +} + +func (kvc *KVCache) Info(key []byte) (value []byte, deleted bool) { + vi := kvc.cache[string(key)] + return vi.value, vi.deleted +} + +func (kvc *KVCache) Get(key []byte) []byte { + return kvc.cache[string(key)].value +} + +func (kvc *KVCache) Has(key []byte) bool { + vi, ok := kvc.cache[string(key)] + return ok && !vi.deleted +} + +func (kvc *KVCache) Set(key, value []byte) { + skey := string(key) + vi := kvc.cache[skey] + vi.deleted = false + vi.value = value + kvc.cache[skey] = vi +} + +func (kvc *KVCache) Delete(key []byte) { + skey := string(key) + vi := kvc.cache[skey] + vi.deleted = true + vi.value = nil + kvc.cache[skey] = vi +} + +func (kvc *KVCache) Iterator(start, end []byte) KVIterator { + return kvc.newIterator(NormaliseDomain(start, end, false)) +} + +func (kvc *KVCache) ReverseIterator(start, end []byte) KVIterator { + return kvc.newIterator(NormaliseDomain(start, end, true)) +} + +func (kvc *KVCache) newIterator(start, end []byte) *KVCacheIterator { + return &KVCacheIterator{ + start: start, + end: end, + keys: kvc.SortedKeysInDomain(start, end), + cache: kvc.cache, + } +} + +// Writes contents of cache to backend without flushing the cache +func (kvi *KVCache) WriteTo(writer KVWriter) { + for k, vi := range kvi.cache { + kb := []byte(k) + if vi.deleted { + writer.Delete(kb) + } else { + writer.Set(kb, vi.value) + } + } +} + +func (kvc *KVCache) Reset() { + kvc.cache = make(map[string]valueInfo) +} + +type KVCacheIterator struct { + cache map[string]valueInfo + start []byte + end []byte + keys [][]byte + index int +} + +func (kvi *KVCacheIterator) Domain() ([]byte, []byte) { + return kvi.start, kvi.end +} + +func (kvi *KVCacheIterator) Info() (key, value []byte, deleted bool) { + key = kvi.keys[kvi.index] + vi := kvi.cache[string(key)] + return key, vi.value, vi.deleted +} + +func (kvi *KVCacheIterator) Key() []byte { + return []byte(kvi.keys[kvi.index]) +} + +func (kvi *KVCacheIterator) Value() []byte { + return kvi.cache[string(kvi.keys[kvi.index])].value +} + +func (kvi *KVCacheIterator) Next() { + if !kvi.Valid() { + panic("KVCacheIterator.Next() called on invalid iterator") + } + kvi.index++ +} + +func (kvi *KVCacheIterator) Valid() bool { + return kvi.index < len(kvi.keys) +} + +func (kvi *KVCacheIterator) Close() {} + +type byteSlices [][]byte + +func (bss byteSlices) Len() int { + return len(bss) +} + +func (bss byteSlices) Less(i, j int) bool { + return bytes.Compare(bss[i], bss[j]) == -1 +} + +func (bss byteSlices) Swap(i, j int) { + bss[i], bss[j] = bss[j], bss[i] +} + +func (kvc *KVCache) SortedKeys(reverse bool) [][]byte { + keys := make(byteSlices, 0, len(kvc.cache)) + for k := range kvc.cache { + keys = append(keys, []byte(k)) + } + var sortable sort.Interface = keys + if reverse { + sortable = sort.Reverse(keys) + } + sort.Stable(sortable) + return keys +} + +func (kvc *KVCache) SortedKeysInDomain(start, end []byte) [][]byte { + comp := CompareKeys(start, end) + if comp == 0 { + return [][]byte{} + } + // Sort keys depending on order of end points + sortedKeys := kvc.SortedKeys(comp == 1) + // Attempt to seek to the first key in the range + startIndex := len(sortedKeys) + for i, key := range sortedKeys { + if CompareKeys(key, start) != comp { + startIndex = i + break + } + } + // Reslice to beginning of range or end if not found + sortedKeys = sortedKeys[startIndex:] + for i, key := range sortedKeys { + if CompareKeys(key, end) != comp { + sortedKeys = sortedKeys[:i] + break + } + } + return sortedKeys +} diff --git a/storage/kvcache_test.go b/storage/kvcache_test.go new file mode 100644 index 000000000..706ce0b87 --- /dev/null +++ b/storage/kvcache_test.go @@ -0,0 +1,47 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestKVCache_Iterator(t *testing.T) { + kvc := NewKVCache() + kvp := kvPairs("b", "ar", "f", "oo", "im", "aginative") + for _, kv := range kvp { + kvc.Set(kv.Key, kv.Value) + } + assert.Equal(t, kvp, collectIterator(kvc.Iterator(nil, nil))) +} + +func TestKVCache_SortedKeysInDomain(t *testing.T) { + assert.Equal(t, []string{"b"}, testSortedKeysInDomain(bz("b"), bz("c"), "a", "b", "c", "d")) + assert.Equal(t, []string{"b", "c"}, testSortedKeysInDomain(bz("b"), bz("cc"), "a", "b", "c", "d")) + assert.Equal(t, []string{"a", "b", "c", "d"}, testSortedKeysInDomain(bz(""), nil, "a", "b", "c", "d")) + assert.Equal(t, []string{"d", "c", "b", "a"}, testSortedKeysInDomain(nil, bz(""), "a", "b", "c", "d")) + assert.Equal(t, []string{}, testSortedKeysInDomain(nil, nil, "a", "b", "c", "d")) + assert.Equal(t, []string{}, testSortedKeysInDomain(bz(""), bz(""), "a", "b", "c", "d")) + assert.Equal(t, []string{}, testSortedKeysInDomain(bz("ab"), bz("ab"), "a", "b", "c", "d")) + assert.Equal(t, []string{"a"}, testSortedKeysInDomain(bz("0"), bz("ab"), "a", "b", "c", "d")) + assert.Equal(t, []string{"c", "b"}, testSortedKeysInDomain(bz("c1"), bz("a"), "a", "b", "c", "d")) + assert.Equal(t, []string{"c", "b"}, testSortedKeysInDomain(bz("c"), bz("a"), "a", "b", "c", "d")) + assert.Equal(t, []string{}, testSortedKeysInDomain(bz("e"), bz("c"), "a", "b")) + assert.Equal(t, []string{}, testSortedKeysInDomain(bz("e"), bz("c"), "z", "f")) +} + +func testSortedKeysInDomain(start, end []byte, keys ...string) []string { + cache := make(map[string]valueInfo) + for _, k := range keys { + cache[k] = valueInfo{} + } + kvc := KVCache{ + cache: cache, + } + bkeys := kvc.SortedKeysInDomain(start, end) + keys = make([]string, len(bkeys)) + for i, bk := range bkeys { + keys[i] = string(bk) + } + return keys +} diff --git a/storage/kvcascade.go b/storage/kvcascade.go new file mode 100644 index 000000000..8a08da3aa --- /dev/null +++ b/storage/kvcascade.go @@ -0,0 +1,39 @@ +package storage + +type KVCascade []KVIterableReader + +func (kvc KVCascade) Get(key []byte) []byte { + for _, kvs := range kvc { + value := kvs.Get(key) + if value != nil { + return value + } + } + return nil +} + +func (kvc KVCascade) Has(key []byte) bool { + for _, kvs := range kvc { + has := kvs.Has(key) + if has { + return true + } + } + return false +} + +func (kvc KVCascade) Iterator(start, end []byte) KVIterator { + iterators := make([]KVIterator, len(kvc)) + for i, kvs := range kvc { + iterators[i] = kvs.Iterator(start, end) + } + return NewMultiIterator(false, iterators...) +} + +func (kvc KVCascade) ReverseIterator(start, end []byte) KVIterator { + iterators := make([]KVIterator, len(kvc)) + for i, kvs := range kvc { + iterators[i] = kvs.ReverseIterator(start, end) + } + return NewMultiIterator(true, iterators...) +} diff --git a/storage/kvstore.go b/storage/kvstore.go new file mode 100644 index 000000000..1df1a5af0 --- /dev/null +++ b/storage/kvstore.go @@ -0,0 +1,98 @@ +package storage + +import ( + "bytes" + + dbm "github.com/tendermint/tendermint/libs/db" +) + +type KVIterator = dbm.Iterator + +// This is partially extrated from Cosmos SDK for alignment but is more minimal, we should suggest this becomes an +// embedded interface +type KVIterable interface { + // Iterator over a domain of keys in ascending order. End is exclusive. + // Start must be less than end, or the Iterator is invalid. + // Iterator must be closed by caller. + // To iterate over entire domain, use store.Iterator(nil, nil) + // CONTRACT: No writes may happen within a domain while an iterator exists over it. + Iterator(start, end []byte) KVIterator + + // Iterator over a domain of keys in descending order. End is exclusive. + // Start must be greater than end, or the Iterator is invalid. + // Iterator must be closed by caller. + // CONTRACT: No writes may happen within a domain while an iterator exists over it. + ReverseIterator(start, end []byte) KVIterator +} + +type KVReader interface { + // Get returns nil iff key doesn't exist. Panics on nil key. + Get(key []byte) []byte + // Has checks if a key exists. Panics on nil key. + Has(key []byte) bool +} + +type KVWriter interface { + // Set sets the key. Panics on nil key. + Set(key, value []byte) + // Delete deletes the key. Panics on nil key. + Delete(key []byte) +} + +type KVIterableReader interface { + KVReader + KVIterable +} + +// KVStore is a simple interface to get/set data +type KVReaderWriter interface { + KVReader + KVWriter +} + +type KVStore interface { + KVReaderWriter + KVIterable +} + +// NormaliseDomain encodes the concession that when nil is used as a lower bound is interpreted as low rather +// than high +func NormaliseDomain(start, end []byte, reverse bool) ([]byte, []byte) { + if reverse { + if len(end) == 0 { + end = []byte{} + } + } else { + if len(start) == 0 { + start = []byte{} + } + } + return start, end +} + +// KeyOrder maps []byte{} -> -1, []byte(nil) -> 1, and everything else to 0. This encodes the assumptions of the +// KVIterator domain endpoints +func KeyOrder(key []byte) int { + if key == nil { + // Sup + return 1 + } + if len(key) == 0 { + // Inf + return -1 + } + // Normal key + return 0 +} + +func CompareKeys(k1, k2 []byte) int { + ko1 := KeyOrder(k1) + ko2 := KeyOrder(k2) + if ko1 < ko2 { + return -1 + } + if ko1 > ko2 { + return 1 + } + return bytes.Compare(k1, k2) +} diff --git a/storage/kvstore_test.go b/storage/kvstore_test.go new file mode 100644 index 000000000..c40d8fa3f --- /dev/null +++ b/storage/kvstore_test.go @@ -0,0 +1,16 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCompareKeys(t *testing.T) { + assert.Equal(t, 1, CompareKeys(nil, []byte{2})) + assert.Equal(t, -1, CompareKeys([]byte{2}, nil)) + assert.Equal(t, -1, CompareKeys([]byte{}, nil)) + assert.Equal(t, 1, CompareKeys(nil, []byte{})) + assert.Equal(t, 0, CompareKeys(nil, nil)) + assert.Equal(t, -1, CompareKeys([]byte{1, 2, 3}, []byte{2})) +} diff --git a/storage/multi_iterator.go b/storage/multi_iterator.go new file mode 100644 index 000000000..853c1917c --- /dev/null +++ b/storage/multi_iterator.go @@ -0,0 +1,114 @@ +package storage + +import ( + "bytes" + "container/heap" +) + +type MultiIterator struct { + start []byte + end []byte + // Acts as priority queue based on sort order of current key in each iterator + iterators []KVIterator + iteratorOrder map[KVIterator]int + lessComp int +} + +// MultiIterator iterates in order over a series o +func NewMultiIterator(reverse bool, iterators ...KVIterator) *MultiIterator { + // reuse backing array + lessComp := -1 + if reverse { + lessComp = 1 + } + mi := &MultiIterator{ + iterators: iterators, + iteratorOrder: make(map[KVIterator]int), + lessComp: lessComp, + } + mi.init() + return mi +} + +func (mi *MultiIterator) init() { + validIterators := mi.iterators[:0] + for i, it := range mi.iterators { + mi.iteratorOrder[it] = i + if it.Valid() { + validIterators = append(validIterators, it) + start, end := it.Domain() + if mi.start == nil || bytes.Compare(start, mi.start) == mi.lessComp { + mi.start = start + } + if mi.end == nil || bytes.Compare(mi.end, end) == mi.lessComp { + mi.end = end + } + } else { + // Not clear if this is necessary - fairly sure it is permitted so can't hurt + it.Close() + } + } + mi.iterators = validIterators + heap.Init(mi) +} + +// sort.Interface implementation +func (mi *MultiIterator) Len() int { + return len(mi.iterators) +} + +func (mi *MultiIterator) Less(i, j int) bool { + comp := bytes.Compare(mi.iterators[i].Key(), mi.iterators[j].Key()) + return comp == mi.lessComp || (comp == 0 && mi.iteratorOrder[mi.iterators[i]] < mi.iteratorOrder[mi.iterators[j]]) +} + +func (mi *MultiIterator) Swap(i, j int) { + mi.iterators[i], mi.iterators[j] = mi.iterators[j], mi.iterators[i] +} + +func (mi *MultiIterator) Push(x interface{}) { + mi.iterators = append(mi.iterators, x.(KVIterator)) +} + +func (mi *MultiIterator) Pop() interface{} { + n := len(mi.iterators) - 1 + it := mi.iterators[n] + mi.iterators = mi.iterators[:n] + return it +} + +func (mi *MultiIterator) Domain() ([]byte, []byte) { + return mi.start, mi.end +} + +func (mi *MultiIterator) Valid() bool { + return len(mi.iterators) > 0 +} + +func (mi *MultiIterator) Next() { + // Always advance the lowest iterator - the same one we serve the KV pair from + it := heap.Pop(mi).(KVIterator) + it.Next() + if it.Valid() { + heap.Push(mi, it) + } +} + +func (mi *MultiIterator) Key() []byte { + return mi.Peek().Key() +} + +func (mi *MultiIterator) Value() []byte { + return mi.Peek().Value() +} + +func (mi *MultiIterator) Peek() KVIterator { + return mi.iterators[0] +} + +func (mi *MultiIterator) Close() { + // Close any remaining valid iterators + for _, it := range mi.iterators { + it.Close() + } +} diff --git a/storage/multi_iterator_test.go b/storage/multi_iterator_test.go new file mode 100644 index 000000000..97c79320b --- /dev/null +++ b/storage/multi_iterator_test.go @@ -0,0 +1,49 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMultiIterator(t *testing.T) { + t.Log("Testing forward iterator...") + ci1 := iteratorOver(kvPairs("a", "dogs")) + ci2 := iteratorOver(kvPairs("b", "frogs", "x", "mogs")) + ci3 := iteratorOver(kvPairs("d", "bar", "h", "flobs")) + ci4 := iteratorOver(kvPairs("c", "zfoo", "A", "nibble", "\xFF", "HIGH")) + mi := NewMultiIterator(false, ci4, ci2, ci3, ci1) + start, end := mi.Domain() + assert.Equal(t, "A", string(start)) + assert.Equal(t, "\xFF", string(end)) + assertIteratorSorted(t, mi, false) + + t.Log("Testing reverse iterator...") + ci1 = iteratorOver(kvPairs("a", "dogs"), true) + ci2 = iteratorOver(kvPairs("b", "frogs", "x", "mogs"), true) + ci3 = iteratorOver(kvPairs("d", "bar", "h", "flobs"), true) + ci4 = iteratorOver(kvPairs("c", "zfoo", "A", "nibble", "", ""), true) + mi = NewMultiIterator(true, ci4, ci2, ci3, ci1) + start, end = mi.Domain() + assert.Equal(t, "x", string(start)) + assert.Equal(t, "", string(end)) + assertIteratorSorted(t, mi, true) +} + +func TestDuplicateKeys(t *testing.T) { + t.Log("Testing iterators with duplicate keys...") + ci1 := iteratorOver(kvPairs("a", "dogs")) + ci2 := iteratorOver(kvPairs("a", "frogs", "x", "mogs")) + ci3 := iteratorOver(kvPairs("a", "bar", "h", "flobs")) + ci4 := iteratorOver(kvPairs("a", "zfoo", "A", "nibble", "\xFF", "HIGH")) + mi := NewMultiIterator(false, ci1, ci2, ci3, ci4) + var as []string + for mi.Valid() { + if string(mi.Key()) == "a" { + as = append(as, string(mi.Value())) + } + mi.Next() + } + assert.Equal(t, []string{"dogs", "frogs", "bar", "zfoo"}, as, + "duplicate keys should appear in iterator order") +} diff --git a/storage/prefix.go b/storage/prefix.go new file mode 100644 index 000000000..bfff89acd --- /dev/null +++ b/storage/prefix.go @@ -0,0 +1,229 @@ +package storage + +import ( + "bytes" + + dbm "github.com/tendermint/tendermint/libs/db" + "github.com/tmthrgd/go-hex" +) + +type Prefix []byte + +func NewPrefix(p string) Prefix { + return Prefix(p) +} + +func (p Prefix) Key(key []byte) []byte { + // Avoid any unintended memory sharing between keys + return append(p[:len(p):len(p)], key...) +} + +func (p Prefix) Suffix(key []byte) []byte { + return key[len(p):] +} + +// Get the lexicographical sibling above this prefix (i.e. the fixed length integer plus one) +func (p Prefix) Above() []byte { + for i := len(p) - 1; i >= 0; i-- { + c := p[i] + if c < 0xff { + inc := make([]byte, i+1) + copy(inc, p) + inc[i]++ + return inc + } + } + return nil +} + +// Get the lexicographical sibling below this prefix (i.e. the fixed length integer minus one) +func (p Prefix) Below() []byte { + for i := len(p) - 1; i >= 0; i-- { + c := p[i] + if c > 0x00 { + inc := make([]byte, i+1) + copy(inc, p) + inc[i]-- + return inc + } + } + return nil +} + +func (p Prefix) Iterator(iteratorFn func(start, end []byte) dbm.Iterator, start, end []byte) KVIterator { + var pstart, pend []byte = p.Key(start), nil + + if end == nil { + pend = p.Above() + } else { + pend = p.Key(end) + } + return &prefixIterator{ + start: start, + end: end, + prefix: p, + source: iteratorFn(pstart, pend), + } +} + +func (p Prefix) ReverseIterator(iteratorFn func(start, end []byte) dbm.Iterator, start, end []byte) KVIterator { + // Note because of the inclusive start, exclusive end on underlying iterator + // To get inclusive start/end we have to handle the following: + // 1012 above <- does not start with prefix (but included by underlying iterator) + // 1011232 + // 1011 prefix + // 1010111 <- does not start with prefix (but included by underlying iterator) + // 1010 below + var pstart, pend []byte + above := p.Above() + if start == nil { + pstart = above + } else { + pstart = p.Key(start) + } + if end == nil { + pend = p.Below() + } else { + pend = p.Key(end) + } + return &prefixIterator{ + start: start, + end: end, + prefix: p, + // Skip 'above' if necessary + source: skipOne(iteratorFn(pstart, pend), above), + } +} + +func (p Prefix) Iterable(source KVIterable) KVIterable { + return &prefixIterable{ + prefix: p, + source: source, + } +} + +type prefixIterable struct { + prefix Prefix + source KVIterable +} + +func (pi *prefixIterable) Iterator(start, end []byte) KVIterator { + return pi.prefix.Iterator(pi.source.Iterator, start, end) +} + +func (pi *prefixIterable) ReverseIterator(start, end []byte) KVIterator { + return pi.prefix.ReverseIterator(pi.source.ReverseIterator, start, end) +} + +func (p Prefix) Store(source KVStore) KVStore { + return &prefixKVStore{ + prefix: p, + source: source, + } +} + +func (p Prefix) Length() int { + return len(p) +} + +func (p Prefix) String() string { + return string(p) +} + +func (p Prefix) HexString() string { + return hex.EncodeUpperToString(p) +} + +type prefixIterator struct { + prefix Prefix + source dbm.Iterator + start []byte + end []byte + invalid bool +} + +func (pi *prefixIterator) Domain() ([]byte, []byte) { + return pi.start, pi.end +} + +func (pi *prefixIterator) Valid() bool { + pi.validate() + return !pi.invalid && pi.source.Valid() +} + +func (pi *prefixIterator) Next() { + if pi.invalid { + panic("prefixIterator.Next() called on invalid iterator") + } + pi.source.Next() + pi.validate() +} + +func (pi *prefixIterator) Key() []byte { + if pi.invalid { + panic("prefixIterator.Key() called on invalid iterator") + } + return pi.prefix.Suffix(pi.source.Key()) +} + +func (pi *prefixIterator) Value() []byte { + if pi.invalid { + panic("prefixIterator.Value() called on invalid iterator") + } + return pi.source.Value() +} + +func (pi *prefixIterator) Close() { + pi.source.Close() +} + +func (pi *prefixIterator) validate() { + if pi.invalid { + return + } + sourceValid := pi.source.Valid() + pi.invalid = !sourceValid || !bytes.HasPrefix(pi.source.Key(), pi.prefix) + if pi.invalid { + pi.Close() + } +} + +// If the first iterator item is skipKey, then +// skip it. +func skipOne(iterator dbm.Iterator, skipKey []byte) dbm.Iterator { + if iterator.Valid() { + if bytes.Equal(iterator.Key(), skipKey) { + iterator.Next() + } + } + return iterator +} + +type prefixKVStore struct { + prefix Prefix + source KVStore +} + +func (ps *prefixKVStore) Get(key []byte) []byte { + return ps.source.Get(ps.prefix.Key(key)) +} + +func (ps *prefixKVStore) Has(key []byte) bool { + return ps.source.Has(ps.prefix.Key(key)) +} + +func (ps *prefixKVStore) Set(key, value []byte) { + ps.source.Set(ps.prefix.Key(key), value) +} + +func (ps *prefixKVStore) Delete(key []byte) { + ps.source.Delete(ps.prefix.Key(key)) +} + +func (ps *prefixKVStore) Iterator(start, end []byte) dbm.Iterator { + return ps.prefix.Iterator(ps.source.Iterator, start, end) +} + +func (ps *prefixKVStore) ReverseIterator(start, end []byte) dbm.Iterator { + return ps.prefix.ReverseIterator(ps.source.ReverseIterator, start, end) +} diff --git a/storage/prefix_db.go b/storage/prefix_db.go new file mode 100644 index 000000000..e8092d15d --- /dev/null +++ b/storage/prefix_db.go @@ -0,0 +1,99 @@ +package storage + +import ( + "fmt" + + dbm "github.com/tendermint/tendermint/libs/db" +) + +type PrefixDB struct { + prefix Prefix + db dbm.DB +} + +func NewPrefixDB(db dbm.DB, prefix string) *PrefixDB { + return &PrefixDB{ + prefix: Prefix(prefix), + db: db, + } +} + +// DB implementation +func (pdb *PrefixDB) Get(key []byte) []byte { + return pdb.db.Get(pdb.prefix.Key(key)) +} + +func (pdb *PrefixDB) Has(key []byte) bool { + return pdb.db.Has(pdb.prefix.Key(key)) +} + +func (pdb *PrefixDB) Set(key, value []byte) { + pdb.db.Set(pdb.prefix.Key(key), value) +} + +func (pdb *PrefixDB) SetSync(key, value []byte) { + pdb.db.SetSync(pdb.prefix.Key(key), value) +} + +func (pdb *PrefixDB) Delete(key []byte) { + pdb.db.Delete(pdb.prefix.Key(key)) +} + +func (pdb *PrefixDB) DeleteSync(key []byte) { + pdb.db.DeleteSync(pdb.prefix.Key(key)) +} + +func (pdb *PrefixDB) Iterator(start, end []byte) dbm.Iterator { + return pdb.prefix.Iterator(pdb.db.Iterator, start, end) +} + +func (pdb *PrefixDB) ReverseIterator(start, end []byte) dbm.Iterator { + return pdb.prefix.ReverseIterator(pdb.db.ReverseIterator, start, end) +} + +func (pdb *PrefixDB) Close() { + pdb.db.Close() +} + +func (pdb *PrefixDB) Print() { + pdb.db.Print() +} + +func (pdb *PrefixDB) Stats() map[string]string { + stats := make(map[string]string) + stats["PrefixDB.prefix.string"] = string(pdb.prefix) + stats["PrefixDB.prefix.hex"] = fmt.Sprintf("%X", pdb.prefix) + source := pdb.db.Stats() + for key, value := range source { + stats["PrefixDB.db."+key] = value + } + return stats +} + +func (pdb *PrefixDB) NewBatch() dbm.Batch { + return &prefixBatch{ + prefix: pdb.prefix, + batch: pdb.db.NewBatch(), + } +} + +type prefixBatch struct { + prefix Prefix + batch dbm.Batch +} + +func (pb *prefixBatch) Set(key, value []byte) { + pb.batch.Set(pb.prefix.Key(key), value) +} + +func (pb *prefixBatch) Delete(key []byte) { + pb.batch.Delete(pb.prefix.Key(key)) +} + +func (pb *prefixBatch) Write() { + pb.batch.Write() +} + +func (pb *prefixBatch) WriteSync() { + pb.batch.WriteSync() +} diff --git a/storage/prefix_db_test.go b/storage/prefix_db_test.go new file mode 100644 index 000000000..2adda24aa --- /dev/null +++ b/storage/prefix_db_test.go @@ -0,0 +1,200 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tendermint/libs/db" +) + +func mockDBWithStuff() dbm.DB { + db := dbm.NewMemDB() + // Under "key" prefix + db.Set(bz("key"), bz("value")) + db.Set(bz("key1"), bz("value1")) + db.Set(bz("key2"), bz("value2")) + db.Set(bz("key3"), bz("value3")) + db.Set(bz("something"), bz("else")) + db.Set(bz(""), bz("")) + db.Set(bz("k"), bz("val")) + db.Set(bz("ke"), bz("valu")) + db.Set(bz("kee"), bz("valuu")) + return db +} + +func TestPrefixDBSimple(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, "key") + + checkValue(t, pdb, bz("key"), nil) + checkValue(t, pdb, bz(""), bz("value")) + checkValue(t, pdb, bz("key1"), nil) + checkValue(t, pdb, bz("1"), bz("value1")) + checkValue(t, pdb, bz("key2"), nil) + checkValue(t, pdb, bz("2"), bz("value2")) + checkValue(t, pdb, bz("key3"), nil) + checkValue(t, pdb, bz("3"), bz("value3")) + checkValue(t, pdb, bz("something"), nil) + checkValue(t, pdb, bz("k"), nil) + checkValue(t, pdb, bz("ke"), nil) + checkValue(t, pdb, bz("kee"), nil) +} + +func TestPrefixDBIterator1(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, "key") + + itr := pdb.Iterator(nil, nil) + checkDomain(t, itr, nil, nil) + checkItem(t, itr, bz(""), bz("value")) + checkNext(t, itr, true) + checkItem(t, itr, bz("1"), bz("value1")) + checkNext(t, itr, true) + checkItem(t, itr, bz("2"), bz("value2")) + checkNext(t, itr, true) + checkItem(t, itr, bz("3"), bz("value3")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBIterator2(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, "key") + + itr := pdb.Iterator(nil, bz("")) + checkDomain(t, itr, nil, bz("")) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBIterator3(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, "key") + + itr := pdb.Iterator(bz(""), nil) + checkDomain(t, itr, bz(""), nil) + checkItem(t, itr, bz(""), bz("value")) + checkNext(t, itr, true) + checkItem(t, itr, bz("1"), bz("value1")) + checkNext(t, itr, true) + checkItem(t, itr, bz("2"), bz("value2")) + checkNext(t, itr, true) + checkItem(t, itr, bz("3"), bz("value3")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBIterator4(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, "key") + + itr := pdb.Iterator(bz(""), bz("")) + checkDomain(t, itr, bz(""), bz("")) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBReverseIterator1(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, "key") + + itr := pdb.ReverseIterator(nil, nil) + checkDomain(t, itr, nil, nil) + checkItem(t, itr, bz("3"), bz("value3")) + checkNext(t, itr, true) + checkItem(t, itr, bz("2"), bz("value2")) + checkNext(t, itr, true) + checkItem(t, itr, bz("1"), bz("value1")) + checkNext(t, itr, true) + checkItem(t, itr, bz(""), bz("value")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBReverseIterator2(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, "key") + + itr := pdb.ReverseIterator(nil, bz("")) + checkDomain(t, itr, nil, bz("")) + checkItem(t, itr, bz("3"), bz("value3")) + checkNext(t, itr, true) + checkItem(t, itr, bz("2"), bz("value2")) + checkNext(t, itr, true) + checkItem(t, itr, bz("1"), bz("value1")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBReverseIterator3(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, "key") + + itr := pdb.ReverseIterator(bz(""), nil) + checkDomain(t, itr, bz(""), nil) + checkItem(t, itr, bz(""), bz("value")) + checkNext(t, itr, false) + checkInvalid(t, itr) + itr.Close() +} + +func TestPrefixDBReverseIterator4(t *testing.T) { + db := mockDBWithStuff() + pdb := NewPrefixDB(db, "key") + + itr := pdb.ReverseIterator(bz(""), bz("")) + checkInvalid(t, itr) + itr.Close() +} + +func checkValue(t *testing.T, db dbm.DB, key []byte, valueWanted []byte) { + valueGot := db.Get(key) + assert.Equal(t, valueWanted, valueGot) +} + +func checkValid(t *testing.T, itr dbm.Iterator, expected bool) { + valid := itr.Valid() + require.Equal(t, expected, valid) +} + +func checkNext(t *testing.T, itr dbm.Iterator, expected bool) { + itr.Next() + valid := itr.Valid() + require.Equal(t, expected, valid) +} + +func checkNextPanics(t *testing.T, itr dbm.Iterator) { + assert.Panics(t, func() { itr.Next() }, "checkNextPanics expected panic but didn't") +} + +func checkDomain(t *testing.T, itr dbm.Iterator, start, end []byte) { + ds, de := itr.Domain() + assert.Equal(t, start, ds, "checkDomain domain start incorrect") + assert.Equal(t, end, de, "checkDomain domain end incorrect") +} + +func checkItem(t *testing.T, itr dbm.Iterator, key []byte, value []byte) { + k, v := itr.Key(), itr.Value() + assert.Exactly(t, key, k) + assert.Exactly(t, value, v) +} + +func checkInvalid(t *testing.T, itr dbm.Iterator) { + checkValid(t, itr, false) + checkKeyPanics(t, itr) + checkValuePanics(t, itr) + checkNextPanics(t, itr) +} + +func checkKeyPanics(t *testing.T, itr dbm.Iterator) { + assert.Panics(t, func() { itr.Key() }, "checkKeyPanics expected panic but didn't") +} + +func checkValuePanics(t *testing.T, itr dbm.Iterator) { + assert.Panics(t, func() { itr.Key() }, "checkValuePanics expected panic but didn't") +} diff --git a/storage/prefix_test.go b/storage/prefix_test.go new file mode 100644 index 000000000..b03c45d4b --- /dev/null +++ b/storage/prefix_test.go @@ -0,0 +1,68 @@ +package storage + +import ( + "sort" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tendermint/libs/db" +) + +func TestPrefix_Iterable(t *testing.T) { + keys := [][]byte{ + {0x10, 0xab}, + {0x11, 0x00}, + {0x11, 0x00, 0x00}, + {0x11, 0x00, 0x00, 1}, + {0x11, 0x00, 0x00, 2}, + {0x11, 0x00, 0x00, 3}, + {0x11, 0x00, 0x00, 4}, + {0x11, 0x00, 0x01, 0x00}, + {0x11, 0x34, 0x00}, + {0x11, 0xff, 0xff}, + {0x11, 0xff, 0xff, 0xff}, + {0x12}, + } + memDB := dbm.NewMemDB() + for i, k := range keys { + memDB.Set(k, []byte{byte(i)}) + } + requireKeysSorted(t, keys) + p := Prefix([]byte{0x11, 0x00, 0x00}) + it := p.Iterable(memDB) + expectedKeys := [][]byte{{}, {1}, {2}, {3}, {4}} + requireKeysSorted(t, expectedKeys) + assert.Equal(t, expectedKeys, dumpKeys(it.Iterator(nil, nil))) + + expectedKeys = [][]byte{{4}, {3}, {2}, {1}, {}} + requireKeysSorted(t, expectedKeys, true) + assert.Equal(t, expectedKeys, dumpKeys(it.ReverseIterator(nil, nil))) +} + +func requireKeysSorted(t *testing.T, keys [][]byte, reverse ...bool) { + comp := -1 + if len(reverse) > 0 && reverse[0] { + comp = 1 + } + sortedKeys := make([][]byte, len(keys)) + for i, k := range keys { + sortedKeys[i] = make([]byte, len(k)) + copy(sortedKeys[i], k) + } + sort.Slice(sortedKeys, func(i, j int) bool { + return strings.Compare(string(sortedKeys[i]), string(sortedKeys[j])) == comp + }) + require.Equal(t, sortedKeys, keys) +} + +func dumpKeys(it dbm.Iterator) [][]byte { + var keys [][]byte + for it.Valid() { + keys = append(keys, it.Key()) + it.Next() + } + return keys +} diff --git a/storage/rwtree.go b/storage/rwtree.go new file mode 100644 index 000000000..184374339 --- /dev/null +++ b/storage/rwtree.go @@ -0,0 +1,112 @@ +package storage + +import ( + "fmt" + "sync" + + "github.com/tendermint/iavl" + dbm "github.com/tendermint/tendermint/libs/db" +) + +type RWTree struct { + // Values not reassigned + sync.RWMutex + // Working tree accumulating writes + tree *iavl.MutableTree + // Read tree serving previous state + readTree *iavl.ImmutableTree +} + +func NewRWTree(db dbm.DB, cacheSize int) *RWTree { + return &RWTree{ + tree: iavl.NewMutableTree(db, cacheSize), + readTree: iavl.NewImmutableTree(db, cacheSize), + } +} + +// Tries to load the execution state from DB, returns nil with no error if no state found +func (rwt *RWTree) Load(version int64) error { + if version <= 0 { + return fmt.Errorf("trying to load RWTree from non-positive version: version %d", version) + } + treeVersion, err := rwt.tree.LoadVersion(version) + if err != nil { + return fmt.Errorf("could not load current version of RWTree: version %d", version) + } + if treeVersion != version { + return fmt.Errorf("tried to load version %d of RWTree, but got version %d", version, treeVersion) + } + // Load previous version for readTree + // Set readTree + if version > 0 { + rwt.readTree, err = rwt.tree.GetImmutable(version - 1) + if err != nil { + return fmt.Errorf("could not load previous version of RWTree to use as read version") + } + } + return nil +} + +// Save the current write tree making writes accessible from read tree. +func (rwt *RWTree) Save() ([]byte, int64, error) { + // save state at a new version may still be orphaned before we save the version against the hash + hash, version, err := rwt.tree.SaveVersion() + if err != nil { + return nil, 0, fmt.Errorf("could not save RWTree: %v", err) + } + // Take an immutable reference to the tree we just saved for querying + rwt.readTree, err = rwt.tree.GetImmutable(version) + if err != nil { + return nil, 0, fmt.Errorf("RWTree.Save() could not obtain ImmutableTree read tree: %v", err) + } + return hash, version, nil +} + +func (rwt *RWTree) Set(key, value []byte) { + rwt.tree.Set(key, value) +} + +func (rwt *RWTree) Get(key []byte) []byte { + _, value := rwt.readTree.Get(key) + return value +} + +func (rwt *RWTree) IterateRange(start, end []byte, ascending bool, fn func(key []byte, value []byte) bool) (stopped bool) { + return rwt.readTree.IterateRange(start, end, ascending, fn) +} + +func (rwt *RWTree) Hash() []byte { + return rwt.readTree.Hash() +} + +func (rwt *RWTree) Has(key []byte) bool { + return rwt.Get(key) != nil +} + +func (rwt *RWTree) Delete(key []byte) { + rwt.tree.Remove(key) +} + +func (rwt *RWTree) Iterator(start, end []byte) dbm.Iterator { + ch := make(chan KVPair) + go func() { + defer close(ch) + rwt.readTree.IterateRange(start, end, true, func(key, value []byte) (stop bool) { + ch <- KVPair{key, value} + return + }) + }() + return NewChannelIterator(ch, start, end) +} + +func (rwt *RWTree) ReverseIterator(start, end []byte) dbm.Iterator { + ch := make(chan KVPair) + go func() { + defer close(ch) + rwt.readTree.IterateRange(start, end, false, func(key, value []byte) (stop bool) { + ch <- KVPair{key, value} + return + }) + }() + return NewChannelIterator(ch, start, end) +} diff --git a/storage/rwtree_test.go b/storage/rwtree_test.go new file mode 100644 index 000000000..186121dd2 --- /dev/null +++ b/storage/rwtree_test.go @@ -0,0 +1,22 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + dbm "github.com/tendermint/tendermint/libs/db" +) + +func TestSave(t *testing.T) { + db := dbm.NewMemDB() + rwt := NewRWTree(db, 100) + foo := bz("foo") + gaa := bz("gaa") + dam := bz("dam") + rwt.Set(foo, gaa) + rwt.Save() + assert.Equal(t, gaa, rwt.Get(foo)) + rwt.Set(foo, dam) + rwt.Save() + assert.Equal(t, dam, rwt.Get(foo)) +} diff --git a/storage/trie.go b/storage/trie.go new file mode 100644 index 000000000..211da65a4 --- /dev/null +++ b/storage/trie.go @@ -0,0 +1,24 @@ +package storage + +import "bytes" + +type Trie struct { + Root *TrieNode +} + +type TrieNode struct { + key []byte + value []byte + children [][]*TrieNode + mask [4]uint64 +} + +func NewTrie() *Trie { + return &Trie{ + Root: &TrieNode{}, + } +} + +func (tn *TrieNode) Insert(key, value []byte) { + bytes.Compare(tn.key, key) +} diff --git a/storage/unique_iterator.go b/storage/unique_iterator.go new file mode 100644 index 000000000..b1449914f --- /dev/null +++ b/storage/unique_iterator.go @@ -0,0 +1,43 @@ +package storage + +import "bytes" + +type uniqueIterator struct { + source KVIterator + prevKey []byte +} + +func Uniq(source KVIterator) *uniqueIterator { + return &uniqueIterator{ + source: source, + } +} + +func (ui *uniqueIterator) Domain() ([]byte, []byte) { + return ui.source.Domain() +} + +func (ui *uniqueIterator) Valid() bool { + return ui.source.Valid() +} + +func (ui *uniqueIterator) Next() { + ui.prevKey = ui.Key() + ui.source.Next() + // Skip elements with the same key a previous + for ui.source.Valid() && bytes.Equal(ui.Key(), ui.prevKey) { + ui.source.Next() + } +} + +func (ui *uniqueIterator) Key() []byte { + return ui.source.Key() +} + +func (ui *uniqueIterator) Value() []byte { + return ui.source.Value() +} + +func (ui *uniqueIterator) Close() { + ui.source.Close() +} diff --git a/storage/unique_iterator_test.go b/storage/unique_iterator_test.go new file mode 100644 index 000000000..bf82bccab --- /dev/null +++ b/storage/unique_iterator_test.go @@ -0,0 +1,16 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUniqueIterator(t *testing.T) { + it := iteratorOver(kvPairs("a", "dogs", "a", "pogs", + "b", "slime", "b", "grime", "b", "nogs", + "c", "strudel")) + + assert.Equal(t, kvPairs("a", "dogs", "b", "slime", "c", "strudel"), + collectIterator(Uniq(it))) +} diff --git a/storage/util_test.go b/storage/util_test.go new file mode 100644 index 000000000..fba4ad3d1 --- /dev/null +++ b/storage/util_test.go @@ -0,0 +1,73 @@ +package storage + +import ( + "sort" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func sendKVPair(ch chan<- KVPair, kvs []KVPair) { + for _, kv := range kvs { + ch <- kv + } + close(ch) +} + +func collectIterator(it KVIterator) KVPairs { + var kvp []KVPair + for it.Valid() { + kvp = append(kvp, KVPair{it.Key(), it.Value()}) + it.Next() + } + return kvp +} + +func kvPairs(kvs ...string) KVPairs { + n := len(kvs) / 2 + kvp := make([]KVPair, 0, n) + for i := 0; i < 2*n; i += 2 { + kvp = append(kvp, KVPair{[]byte(kvs[i]), []byte(kvs[i+1])}) + } + return kvp +} + +func assertIteratorSorted(t *testing.T, it KVIterator, reverse bool) { + prev := "" + for it.Valid() { + strKey := string(it.Key()) + t.Log(strKey, "=>", string(it.Value())) + if prev == "" { + prev = strKey + } + // Assert non-decreasing sequence of keys + if reverse { + assert.False(t, strings.Compare(prev, strKey) == -1) + } else { + assert.False(t, strings.Compare(prev, strKey) == 1) + } + prev = strKey + it.Next() + } +} + +func iteratorOver(kvp []KVPair, reverse ...bool) *ChannelIterator { + var sortable sort.Interface = KVPairs(kvp) + if len(reverse) > 0 && reverse[0] { + sortable = sort.Reverse(sortable) + } + sort.Stable(sortable) + ch := make(chan KVPair) + var start, end []byte + if len(kvp) > 0 { + start, end = kvp[0].Key, kvp[len(kvp)-1].Key + } + go sendKVPair(ch, kvp) + ci := NewChannelIterator(ch, start, end) + return ci +} + +func bz(s string) []byte { + return []byte(s) +} diff --git a/txs/tx.go b/txs/tx.go index 3a1480e58..ea5c37428 100644 --- a/txs/tx.go +++ b/txs/tx.go @@ -18,13 +18,14 @@ import ( "encoding/json" "fmt" + "github.com/tendermint/tmlibs/merkle/tmhash" + "github.com/hyperledger/burrow/acm" "github.com/hyperledger/burrow/acm/state" "github.com/hyperledger/burrow/binary" "github.com/hyperledger/burrow/crypto" "github.com/hyperledger/burrow/event/query" "github.com/hyperledger/burrow/txs/payload" - "golang.org/x/crypto/ripemd160" ) // Tx is the canonical object that we serialise to produce the SignBytes that we sign @@ -168,7 +169,7 @@ func (tx *Tx) String() string { // Regenerate the Tx hash if it has been mutated or as called by Hash() in first instance func (tx *Tx) Rehash() []byte { - hasher := ripemd160.New() + hasher := tmhash.New() hasher.Write(tx.MustSignBytes()) tx.txHash = hasher.Sum(nil) return tx.txHash