diff --git a/blockEmulator_Linux_Precompile b/blockEmulator_Linux_Precompile index 73a9197..ba1d44f 100644 Binary files a/blockEmulator_Linux_Precompile and b/blockEmulator_Linux_Precompile differ diff --git a/blockEmulator_MacOS_Precompile b/blockEmulator_MacOS_Precompile index b93456a..3f4cdb7 100644 Binary files a/blockEmulator_MacOS_Precompile and b/blockEmulator_MacOS_Precompile differ diff --git a/blockEmulator_Windows_Precompile.exe b/blockEmulator_Windows_Precompile.exe index 7871fda..eb00eb8 100644 Binary files a/blockEmulator_Windows_Precompile.exe and b/blockEmulator_Windows_Precompile.exe differ diff --git a/chain/blockchain.go b/chain/blockchain.go index 4849b41..2d92131 100644 --- a/chain/blockchain.go +++ b/chain/blockchain.go @@ -16,6 +16,7 @@ import ( "sync" "time" + "github.com/bits-and-blooms/bitset" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" @@ -39,11 +40,20 @@ func GetTxTreeRoot(txs []*core.Transaction) []byte { triedb := trie.NewDatabase(rawdb.NewMemoryDatabase()) transactionTree := trie.NewEmpty(triedb) for _, tx := range txs { - transactionTree.Update(tx.TxHash, tx.Encode()) + transactionTree.Update(tx.TxHash, []byte{0}) } return transactionTree.Hash().Bytes() } +// Get bloom filter +func GetBloomFilter(txs []*core.Transaction) *bitset.BitSet { + bs := bitset.New(2048) + for _, tx := range txs { + bs.Set(utils.ModBytes(tx.TxHash, 2048)) + } + return bs +} + // Write Partition Map func (bc *BlockChain) Update_PartitionMap(key string, val uint64) { bc.pmlock.Lock() @@ -131,11 +141,6 @@ func (bc *BlockChain) GetUpdateStatusTrie(txs []*core.Transaction) common.Hash { st.Update([]byte(tx.Recipient), r_state.Encode()) cnt++ } - - // if senderIn && !recipientIn { - // // change this part to the pbft stage - // fmt.Printf("this transaciton is cross-shard txs, will be sent to relaypool later\n") - // } } // commit the memory trie to the database in the disk if cnt == 0 { @@ -155,7 +160,7 @@ func (bc *BlockChain) GetUpdateStatusTrie(txs []*core.Transaction) common.Hash { } // generate (mine) a block, this function return a block -func (bc *BlockChain) GenerateBlock() *core.Block { +func (bc *BlockChain) GenerateBlock(miner int32) *core.Block { // pack the transactions from the txpool txs := bc.Txpool.PackTxs(bc.ChainConfig.BlockSize) bh := &core.BlockHeader{ @@ -168,8 +173,10 @@ func (bc *BlockChain) GenerateBlock() *core.Block { bh.StateRoot = rt.Bytes() bh.TxRoot = GetTxTreeRoot(txs) + bh.Bloom = *GetBloomFilter(txs) + bh.Miner = 0 b := core.NewBlock(bh, txs) - b.Header.Miner = 0 + b.Hash = b.Header.Hash() return b } @@ -189,6 +196,7 @@ func (bc *BlockChain) NewGenisisBlock() *core.Block { statusTrie := trie.NewEmpty(triedb) bh.StateRoot = statusTrie.Hash().Bytes() bh.TxRoot = GetTxTreeRoot(body) + bh.Bloom = *GetBloomFilter(body) b := core.NewBlock(bh, body) b.Hash = b.Header.Hash() return b @@ -234,11 +242,12 @@ func (bc *BlockChain) AddBlock(b *core.Block) { // the ChainConfig is pre-defined to identify the blockchain; the db is the status trie database in disk func NewBlockChain(cc *params.ChainConfig, db ethdb.Database) (*BlockChain, error) { fmt.Println("Generating a new blockchain", db) + chainDBfp := params.DatabaseWrite_path + fmt.Sprintf("chainDB/S%d_N%d", cc.ShardID, cc.NodeID) bc := &BlockChain{ db: db, ChainConfig: cc, Txpool: core.NewTxPool(), - Storage: storage.NewStorage(cc), + Storage: storage.NewStorage(chainDBfp, cc), PartitionMap: make(map[string]uint64), } curHash, err := bc.Storage.GetNewestBlockHash() @@ -291,7 +300,7 @@ func (bc *BlockChain) IsValidBlock(b *core.Block) error { } // add accounts -func (bc *BlockChain) AddAccounts(ac []string, as []*core.AccountState) { +func (bc *BlockChain) AddAccounts(ac []string, as []*core.AccountState, miner int32) { fmt.Printf("The len of accounts is %d, now adding the accounts\n", len(ac)) bh := &core.BlockHeader{ @@ -300,7 +309,7 @@ func (bc *BlockChain) AddAccounts(ac []string, as []*core.AccountState) { Time: time.Time{}, } // handle transactions to build root - rt := common.BytesToHash(bc.CurrentBlock.Header.StateRoot) + rt := bc.CurrentBlock.Header.StateRoot if len(ac) != 0 { st, err := trie.New(trie.TrieID(common.BytesToHash(bc.CurrentBlock.Header.StateRoot)), bc.triedb) if err != nil { @@ -322,18 +331,19 @@ func (bc *BlockChain) AddAccounts(ac []string, as []*core.AccountState) { if err != nil { log.Panic(err) } - err = bc.triedb.Commit(rt, false) + err = bc.triedb.Commit(rrt, false) if err != nil { log.Panic(err) } - rt = rrt + rt = rrt.Bytes() } emptyTxs := make([]*core.Transaction, 0) - bh.StateRoot = rt.Bytes() + bh.StateRoot = rt bh.TxRoot = GetTxTreeRoot(emptyTxs) + bh.Bloom = *GetBloomFilter(emptyTxs) + bh.Miner = 0 b := core.NewBlock(bh, emptyTxs) - b.Header.Miner = 0 b.Hash = b.Header.Hash() bc.CurrentBlock = b @@ -369,6 +379,7 @@ func (bc *BlockChain) FetchAccounts(addrs []string) []*core.AccountState { func (bc *BlockChain) CloseBlockChain() { bc.Storage.DataBase.Close() bc.triedb.CommitPreimages() + bc.db.Close() } // print the details of a blockchain diff --git a/chain/blockchain_test.go b/chain/blockchain_test.go new file mode 100644 index 0000000..59dc219 --- /dev/null +++ b/chain/blockchain_test.go @@ -0,0 +1,57 @@ +package chain + +import ( + "blockEmulator/core" + "blockEmulator/params" + "fmt" + "log" + "math/big" + "os" + "testing" + + "github.com/ethereum/go-ethereum/core/rawdb" +) + +func TestBlockChain(t *testing.T) { + accounts := []string{"000000000001", "00000000002", "00000000003", "00000000004", "00000000005", "00000000006"} + as := make([]*core.AccountState, 0) + for idx := range accounts { + as = append(as, &core.AccountState{ + Balance: big.NewInt(int64(idx)), + }) + } + fp := params.DatabaseWrite_path + "mptDB/ldb/s0/N0" + fmt.Println(fp) + db, err := rawdb.NewLevelDBDatabase(fp, 0, 1, "accountState", false) + if err != nil { + log.Panic(err) + } + params.ShardNum = 1 + pcc := ¶ms.ChainConfig{ + ChainID: 0, + NodeID: 0, + ShardID: 0, + Nodes_perShard: 1, + ShardNums: 1, + BlockSize: uint64(params.MaxBlockSize_global), + BlockInterval: uint64(params.Block_Interval), + InjectSpeed: uint64(params.InjectSpeed), + } + CurChain, _ := NewBlockChain(pcc, db) + CurChain.PrintBlockChain() + CurChain.AddAccounts(accounts, as, 0) + CurChain.PrintBlockChain() + + astates := CurChain.FetchAccounts(accounts) + for idx, state := range astates { + fmt.Println(accounts[idx], state.Balance) + } + CurChain.CloseBlockChain() + + // clear test data file + err = os.RemoveAll(params.ExpDataRootDir) + if err != nil { + fmt.Printf("Failed to delete directory: %v\n", err) + return + } +} diff --git a/chain/merkleproof.go b/chain/merkleproof.go new file mode 100644 index 0000000..f8f3b91 --- /dev/null +++ b/chain/merkleproof.go @@ -0,0 +1,185 @@ +package chain + +import ( + "blockEmulator/core" + "blockEmulator/utils" + "bytes" + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/trie" +) + +type TxProofResult struct { + Found bool + BlockHash []byte + TxHash []byte + TxRoot []byte + BlockHeight uint64 + KeyList [][]byte + ValueList [][]byte + Error string +} + +// Generate proof for the tx which hash is txHash. +// Find all blocks in this chain. +func (bc *BlockChain) TxProofGenerate(txHash []byte) TxProofResult { + nowblockHash := bc.CurrentBlock.Hash + nowheight := bc.CurrentBlock.Header.Number + + for ; nowheight > 0; nowheight-- { + // get a block from db + block, err1 := bc.Storage.GetBlock(nowblockHash) + if err1 != nil { + return TxProofResult{ + Found: false, + TxHash: txHash, + Error: err1.Error(), + } + } + if ret := TxProofGenerateOnTheBlock(txHash, block); ret.Found { + return ret + } + + // go into next block + nowblockHash = block.Header.ParentBlockHash + } + + return TxProofResult{ + Found: false, + TxHash: txHash, + Error: errors.New("cannot find this tx").Error(), + } +} + +// Make Tx proof on a certain block. +func TxProofGenerateOnTheBlock(txHash []byte, block *core.Block) TxProofResult { + // If no value in bloom filter, then the tx must not be in this block + bitMapIdxofTx := utils.ModBytes(txHash, 2048) + if !block.Header.Bloom.Test(bitMapIdxofTx) { + return TxProofResult{ + Found: false, + TxHash: txHash, + Error: errors.New("cannot find this tx").Error(), + } + } + + // now try to find whether this tx is in this block + // check the correctness of this tx Trie + triedb := trie.NewDatabase(rawdb.NewMemoryDatabase()) + transactionTree := trie.NewEmpty(triedb) + for _, tx := range block.Body { + transactionTree.Update(tx.TxHash, []byte{0}) + } + if !bytes.Equal(transactionTree.Hash().Bytes(), block.Header.TxRoot) { + return TxProofResult{ + Found: false, + TxHash: txHash, + Error: fmt.Errorf("tx root mismatch in height %d", block.Header.Number).Error(), + } + } + + // generate proof + keylist, valuelist := make([][]byte, 0), make([][]byte, 0) + proof := rawdb.NewMemoryDatabase() + if err := transactionTree.Prove(txHash, 0, proof); err == nil { + it := proof.NewIterator(nil, nil) + for it.Next() { + keylist = append(keylist, it.Key()) + valuelist = append(valuelist, it.Value()) + } + return TxProofResult{ + Found: true, + BlockHash: block.Hash, + TxHash: txHash, + TxRoot: block.Header.TxRoot, + BlockHeight: block.Header.Number, + KeyList: keylist, + ValueList: valuelist, + } + } + return TxProofResult{ + Found: false, + TxHash: txHash, + Error: errors.New("cannot find this tx").Error(), + } +} + +func TxProofBatchGenerateOnBlock(txHashes [][]byte, block *core.Block) []TxProofResult { + txProofs := make([]TxProofResult, len(txHashes)) + // check the tx trie first. + // check the correctness of this tx Trie + triedb := trie.NewDatabase(rawdb.NewMemoryDatabase()) + transactionTree := trie.NewEmpty(triedb) + for _, tx := range block.Body { + transactionTree.Update(tx.TxHash, []byte{0}) + } + if !bytes.Equal(transactionTree.Hash().Bytes(), block.Header.TxRoot) { + for i := 0; i < len(txHashes); i++ { + txProofs[i] = TxProofResult{ + Found: false, + TxHash: txHashes[i], + Error: fmt.Errorf("tx root mismatch in height %d", block.Header.Number).Error(), + } + } + return txProofs + } + + for idx, txHash := range txHashes { + bitMapIdxofTx := utils.ModBytes(txHash, 2048) + if !block.Header.Bloom.Test(bitMapIdxofTx) { + txProofs[idx] = TxProofResult{ + Found: false, + TxHash: txHash, + Error: errors.New("cannot find this tx").Error(), + } + continue + } + // generate proof + keylist, valuelist := make([][]byte, 0), make([][]byte, 0) + proof := rawdb.NewMemoryDatabase() + if err := transactionTree.Prove(txHash, 0, proof); err == nil { + it := proof.NewIterator(nil, nil) + for it.Next() { + keylist = append(keylist, it.Key()) + valuelist = append(valuelist, it.Value()) + } + txProofs[idx] = TxProofResult{ + Found: true, + BlockHash: block.Hash, + TxHash: txHash, + TxRoot: block.Header.TxRoot, + BlockHeight: block.Header.Number, + KeyList: keylist, + ValueList: valuelist, + } + } else { + txProofs[idx] = TxProofResult{ + Found: false, + TxHash: txHash, + Error: errors.New("cannot find this tx").Error(), + } + } + } + return txProofs +} + +func TxProofVerify(txHash []byte, proof *TxProofResult) (bool, error) { + if !proof.Found { + return false, errors.New("the result shows not found") + } + + // check the proof + recoveredProof := rawdb.NewMemoryDatabase() + listLen := len(proof.KeyList) + for i := 0; i < listLen; i++ { + recoveredProof.Put(proof.KeyList[i], proof.ValueList[i]) + } + if _, err := trie.VerifyProof(common.BytesToHash(proof.TxRoot), []byte(proof.TxHash), recoveredProof); err != nil { + return false, errors.New("wrong proof") + } + + return true, nil +} diff --git a/chain/merkleproof_test.go b/chain/merkleproof_test.go new file mode 100644 index 0000000..638b6bd --- /dev/null +++ b/chain/merkleproof_test.go @@ -0,0 +1,77 @@ +package chain + +import ( + "blockEmulator/core" + "blockEmulator/params" + "fmt" + "log" + "math/big" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/core/rawdb" +) + +func TestMerkleProof(t *testing.T) { + txProof, txHash := buildBlockChain() + clearBlockchainData() + // fmt.Printf("%v", txProof) + if ok, err := TxProofVerify(txHash, &txProof); !ok { + log.Panic("Fail to verify ", err.Error()) + } +} + +func buildBlockChain() (TxProofResult, []byte) { + // build a blockchain + fp := params.DatabaseWrite_path + "mptDB/ldb/s0/N0" + db, err := rawdb.NewLevelDBDatabase(fp, 0, 1, "accountState", false) + if err != nil { + log.Panic(err) + } + params.ShardNum = 1 + pcc := ¶ms.ChainConfig{ + ChainID: 0, + NodeID: 0, + ShardID: 0, + Nodes_perShard: 1, + ShardNums: 1, + BlockSize: uint64(params.MaxBlockSize_global), + BlockInterval: uint64(params.Block_Interval), + InjectSpeed: uint64(params.InjectSpeed), + } + CurChain, _ := NewBlockChain(pcc, db) + CurChain.PrintBlockChain() + + // update blokchain + TxForProof := core.NewTransaction("00000000001", "00000000002", big.NewInt(1234), 126526, time.Now()) + for i := 0; i < 4; i++ { + // add a special tx for further proof validation. + if i == 2 { + CurChain.Txpool.AddTx2Pool(TxForProof) + } + // add txs + for j := 0; j < 1000; j++ { + CurChain.Txpool.AddTx2Pool(core.NewTransaction("00000000001", "00000000002", big.NewInt(1000), uint64(i*1234+j+213), time.Now())) + } + b := CurChain.GenerateBlock(int32(i)) + CurChain.AddBlock(b) + } + + // get proof of this Tx + txProofResult := CurChain.TxProofGenerate(TxForProof.TxHash) + + // close blockchain + CurChain.CloseBlockChain() + + return txProofResult, TxForProof.TxHash +} + +func clearBlockchainData() { + // clear test data file + err := os.RemoveAll(params.ExpDataRootDir) + if err != nil { + fmt.Printf("Failed to delete directory: %v\n", err) + return + } +} diff --git a/consensus_shard/pbft_all/accountTransfer_module.go b/consensus_shard/pbft_all/accountTransfer_module.go index a45c49e..ca30fd0 100644 --- a/consensus_shard/pbft_all/accountTransfer_module.go +++ b/consensus_shard/pbft_all/accountTransfer_module.go @@ -186,7 +186,7 @@ func (cphm *CLPAPbftInsideExtraHandleMod) accountTransfer_do(atm *message.Accoun // add the account into the state trie cphm.pbftNode.pl.Plog.Printf("%d addrs to add\n", len(atm.Addrs)) cphm.pbftNode.pl.Plog.Printf("%d accountstates to add\n", len(atm.AccountState)) - cphm.pbftNode.CurChain.AddAccounts(atm.Addrs, atm.AccountState) + cphm.pbftNode.CurChain.AddAccounts(atm.Addrs, atm.AccountState, cphm.pbftNode.view.Load()) if uint64(len(cphm.cdm.ModifiedMap)) != atm.ATid { cphm.cdm.ModifiedMap = append(cphm.cdm.ModifiedMap, atm.ModifiedMap) diff --git a/consensus_shard/pbft_all/accountTransfermod_Broker.go b/consensus_shard/pbft_all/accountTransfermod_Broker.go index 83523a8..19e3c76 100644 --- a/consensus_shard/pbft_all/accountTransfermod_Broker.go +++ b/consensus_shard/pbft_all/accountTransfermod_Broker.go @@ -207,7 +207,7 @@ func (cphm *CLPAPbftInsideExtraHandleMod_forBroker) accountTransfer_do(atm *mess } cphm.pbftNode.pl.Plog.Printf("%d key-vals are updated\n", cnt) // add the account into the state trie - cphm.pbftNode.CurChain.AddAccounts(atm.Addrs, atm.AccountState) + cphm.pbftNode.CurChain.AddAccounts(atm.Addrs, atm.AccountState, cphm.pbftNode.view.Load()) if uint64(len(cphm.cdm.ModifiedMap)) != atm.ATid { cphm.cdm.ModifiedMap = append(cphm.cdm.ModifiedMap, atm.ModifiedMap) diff --git a/consensus_shard/pbft_all/pbftInside_modCLPABroker.go b/consensus_shard/pbft_all/pbftInside_modCLPABroker.go index 2f8fabd..2b1a5d3 100644 --- a/consensus_shard/pbft_all/pbftInside_modCLPABroker.go +++ b/consensus_shard/pbft_all/pbftInside_modCLPABroker.go @@ -35,7 +35,7 @@ func (cphm *CLPAPbftInsideExtraHandleMod_forBroker) HandleinPropose() (bool, *me } // ELSE: propose a block - block := cphm.pbftNode.CurChain.GenerateBlock() + block := cphm.pbftNode.CurChain.GenerateBlock(int32(cphm.pbftNode.NodeID)) r := &message.Request{ RequestType: message.BlockRequest, ReqTime: time.Now(), diff --git a/consensus_shard/pbft_all/pbftInside_module.go b/consensus_shard/pbft_all/pbftInside_module.go index 17bf8d4..d487ad5 100644 --- a/consensus_shard/pbft_all/pbftInside_module.go +++ b/consensus_shard/pbft_all/pbftInside_module.go @@ -23,7 +23,7 @@ type RawRelayPbftExtraHandleMod struct { // propose request with different types func (rphm *RawRelayPbftExtraHandleMod) HandleinPropose() (bool, *message.Request) { // new blocks - block := rphm.pbftNode.CurChain.GenerateBlock() + block := rphm.pbftNode.CurChain.GenerateBlock(int32(rphm.pbftNode.NodeID)) r := &message.Request{ RequestType: message.BlockRequest, ReqTime: time.Now(), @@ -92,24 +92,11 @@ func (rphm *RawRelayPbftExtraHandleMod) HandleinCommit(cmsg *message.Commit) boo } // send relay txs - for sid := uint64(0); sid < rphm.pbftNode.pbftChainConfig.ShardNums; sid++ { - if sid == rphm.pbftNode.ShardID { - continue - } - relay := message.Relay{ - Txs: rphm.pbftNode.CurChain.Txpool.RelayPool[sid], - SenderShardID: rphm.pbftNode.ShardID, - SenderSeq: rphm.pbftNode.sequenceID, - } - rByte, err := json.Marshal(relay) - if err != nil { - log.Panic() - } - msg_send := message.MergeMessage(message.CRelay, rByte) - go networks.TcpDial(msg_send, rphm.pbftNode.ip_nodeTable[sid][0]) - rphm.pbftNode.pl.Plog.Printf("S%dN%d : sended relay txs to %d\n", rphm.pbftNode.ShardID, rphm.pbftNode.NodeID, sid) + if params.RelayWithMerkleProof == 1 { + rphm.pbftNode.RelayWithProofSend(block) + } else { + rphm.pbftNode.RelayMsgSend() } - rphm.pbftNode.CurChain.Txpool.ClearRelayPool() // send txs excuted in this block to the listener // add more message to measure more metrics diff --git a/consensus_shard/pbft_all/pbftInside_moduleBroker.go b/consensus_shard/pbft_all/pbftInside_moduleBroker.go index f4c26d2..90c3c13 100644 --- a/consensus_shard/pbft_all/pbftInside_moduleBroker.go +++ b/consensus_shard/pbft_all/pbftInside_moduleBroker.go @@ -22,7 +22,7 @@ type RawBrokerPbftExtraHandleMod struct { // propose request with different types func (rbhm *RawBrokerPbftExtraHandleMod) HandleinPropose() (bool, *message.Request) { // new blocks - block := rbhm.pbftNode.CurChain.GenerateBlock() + block := rbhm.pbftNode.CurChain.GenerateBlock(int32(rbhm.pbftNode.NodeID)) r := &message.Request{ RequestType: message.BlockRequest, ReqTime: time.Now(), diff --git a/consensus_shard/pbft_all/pbftInside_moduleCLPA.go b/consensus_shard/pbft_all/pbftInside_moduleCLPA.go index 8238bd4..3a718f8 100644 --- a/consensus_shard/pbft_all/pbftInside_moduleCLPA.go +++ b/consensus_shard/pbft_all/pbftInside_moduleCLPA.go @@ -35,7 +35,7 @@ func (cphm *CLPAPbftInsideExtraHandleMod) HandleinPropose() (bool, *message.Requ } // ELSE: propose a block - block := cphm.pbftNode.CurChain.GenerateBlock() + block := cphm.pbftNode.CurChain.GenerateBlock(int32(cphm.pbftNode.NodeID)) r := &message.Request{ RequestType: message.BlockRequest, ReqTime: time.Now(), @@ -121,24 +121,11 @@ func (cphm *CLPAPbftInsideExtraHandleMod) HandleinCommit(cmsg *message.Commit) b } // send relay txs - for sid := uint64(0); sid < cphm.pbftNode.pbftChainConfig.ShardNums; sid++ { - if sid == cphm.pbftNode.ShardID { - continue - } - relay := message.Relay{ - Txs: cphm.pbftNode.CurChain.Txpool.RelayPool[sid], - SenderShardID: cphm.pbftNode.ShardID, - SenderSeq: cphm.pbftNode.sequenceID, - } - rByte, err := json.Marshal(relay) - if err != nil { - log.Panic() - } - msg_send := message.MergeMessage(message.CRelay, rByte) - go networks.TcpDial(msg_send, cphm.pbftNode.ip_nodeTable[sid][0]) - cphm.pbftNode.pl.Plog.Printf("S%dN%d : sended relay txs to %d\n", cphm.pbftNode.ShardID, cphm.pbftNode.NodeID, sid) + if params.RelayWithMerkleProof == 1 { + cphm.pbftNode.RelayWithProofSend(block) + } else { + cphm.pbftNode.RelayMsgSend() } - cphm.pbftNode.CurChain.Txpool.ClearRelayPool() // send txs excuted in this block to the listener // add more message to measure more metrics diff --git a/consensus_shard/pbft_all/pbftOutside_module.go b/consensus_shard/pbft_all/pbftOutside_module.go index 9d9f8fc..d55b3ac 100644 --- a/consensus_shard/pbft_all/pbftOutside_module.go +++ b/consensus_shard/pbft_all/pbftOutside_module.go @@ -1,6 +1,7 @@ package pbft_all import ( + "blockEmulator/chain" "blockEmulator/message" "encoding/json" "log" @@ -17,6 +18,8 @@ func (rrom *RawRelayOutsideModule) HandleMessageOutsidePBFT(msgType message.Mess switch msgType { case message.CRelay: rrom.handleRelay(content) + case message.CRelayWithProof: + rrom.handleRelayWithProof(content) case message.CInject: rrom.handleInjectTx(content) default: @@ -39,6 +42,34 @@ func (rrom *RawRelayOutsideModule) handleRelay(content []byte) { rrom.pbftNode.pl.Plog.Printf("S%dN%d : has handled relay txs msg\n", rrom.pbftNode.ShardID, rrom.pbftNode.NodeID) } +func (rrom *RawRelayOutsideModule) handleRelayWithProof(content []byte) { + rwp := new(message.RelayWithProof) + err := json.Unmarshal(content, rwp) + if err != nil { + log.Panic(err) + } + rrom.pbftNode.pl.Plog.Printf("S%dN%d : has received relay txs & proofs from shard %d, the senderSeq is %d\n", rrom.pbftNode.ShardID, rrom.pbftNode.NodeID, rwp.SenderShardID, rwp.SenderSeq) + // validate the proofs of txs + isAllCorrect := true + for i, tx := range rwp.Txs { + if ok, _ := chain.TxProofVerify(tx.TxHash, &rwp.TxProofs[i]); !ok { + isAllCorrect = false + break + } + } + if isAllCorrect { + rrom.pbftNode.pl.Plog.Println("All proofs are passed.") + rrom.pbftNode.CurChain.Txpool.AddTxs2Pool(rwp.Txs) + } else { + rrom.pbftNode.pl.Plog.Println("Err: wrong proof!") + } + + rrom.pbftNode.seqMapLock.Lock() + rrom.pbftNode.seqIDMap[rwp.SenderShardID] = rwp.SenderSeq + rrom.pbftNode.seqMapLock.Unlock() + rrom.pbftNode.pl.Plog.Printf("S%dN%d : has handled relay txs msg\n", rrom.pbftNode.ShardID, rrom.pbftNode.NodeID) +} + func (rrom *RawRelayOutsideModule) handleInjectTx(content []byte) { it := new(message.InjectTxs) err := json.Unmarshal(content, it) diff --git a/consensus_shard/pbft_all/pbftOutside_moduleCLPA.go b/consensus_shard/pbft_all/pbftOutside_moduleCLPA.go index 2fa8c74..9ce17c7 100644 --- a/consensus_shard/pbft_all/pbftOutside_moduleCLPA.go +++ b/consensus_shard/pbft_all/pbftOutside_moduleCLPA.go @@ -1,6 +1,7 @@ package pbft_all import ( + "blockEmulator/chain" "blockEmulator/consensus_shard/pbft_all/dataSupport" "blockEmulator/message" "encoding/json" @@ -18,6 +19,8 @@ func (crom *CLPARelayOutsideModule) HandleMessageOutsidePBFT(msgType message.Mes switch msgType { case message.CRelay: crom.handleRelay(content) + case message.CRelayWithProof: + crom.handleRelayWithProof(content) case message.CInject: crom.handleInjectTx(content) @@ -48,6 +51,33 @@ func (crom *CLPARelayOutsideModule) handleRelay(content []byte) { crom.pbftNode.pl.Plog.Printf("S%dN%d : has handled relay txs msg\n", crom.pbftNode.ShardID, crom.pbftNode.NodeID) } +func (crom *CLPARelayOutsideModule) handleRelayWithProof(content []byte) { + rwp := new(message.RelayWithProof) + err := json.Unmarshal(content, rwp) + if err != nil { + log.Panic(err) + } + crom.pbftNode.pl.Plog.Printf("S%dN%d : has received relay txs & proofs from shard %d, the senderSeq is %d\n", crom.pbftNode.ShardID, crom.pbftNode.NodeID, rwp.SenderShardID, rwp.SenderSeq) + // validate the proofs of txs + isAllCorrect := true + for i, tx := range rwp.Txs { + if ok, _ := chain.TxProofVerify(tx.TxHash, &rwp.TxProofs[i]); !ok { + isAllCorrect = false + break + } + } + if isAllCorrect { + crom.pbftNode.CurChain.Txpool.AddTxs2Pool(rwp.Txs) + } else { + crom.pbftNode.pl.Plog.Println("Err: wrong proof!") + } + + crom.pbftNode.seqMapLock.Lock() + crom.pbftNode.seqIDMap[rwp.SenderShardID] = rwp.SenderSeq + crom.pbftNode.seqMapLock.Unlock() + crom.pbftNode.pl.Plog.Printf("S%dN%d : has handled relay txs msg\n", crom.pbftNode.ShardID, crom.pbftNode.NodeID) +} + func (crom *CLPARelayOutsideModule) handleInjectTx(content []byte) { it := new(message.InjectTxs) err := json.Unmarshal(content, it) diff --git a/consensus_shard/pbft_all/toolFuncs.go b/consensus_shard/pbft_all/toolFuncs.go index 5b3a9be..3f7bac4 100644 --- a/consensus_shard/pbft_all/toolFuncs.go +++ b/consensus_shard/pbft_all/toolFuncs.go @@ -1,8 +1,10 @@ package pbft_all import ( + "blockEmulator/chain" "blockEmulator/core" "blockEmulator/message" + "blockEmulator/networks" "blockEmulator/params" "blockEmulator/shard" "crypto/sha256" @@ -98,3 +100,63 @@ func computeTCL(txs []*core.Transaction, commitTS time.Time) int64 { } return ret } + +// help to send Relay message to other shards. +func (p *PbftConsensusNode) RelayMsgSend() { + if params.RelayWithMerkleProof != 0 { + log.Panicf("Parameter Error: RelayWithMerkleProof should be 0, but RelayWithMerkleProof=%d", params.RelayWithMerkleProof) + } + + for sid := uint64(0); sid < p.pbftChainConfig.ShardNums; sid++ { + if sid == p.ShardID { + continue + } + relay := message.Relay{ + Txs: p.CurChain.Txpool.RelayPool[sid], + SenderShardID: p.ShardID, + SenderSeq: p.sequenceID, + } + rByte, err := json.Marshal(relay) + if err != nil { + log.Panic() + } + msg_send := message.MergeMessage(message.CRelay, rByte) + go networks.TcpDial(msg_send, p.ip_nodeTable[sid][0]) + p.pl.Plog.Printf("S%dN%d : sended relay txs to %d\n", p.ShardID, p.NodeID, sid) + } + p.CurChain.Txpool.ClearRelayPool() +} + +// help to send RelayWithProof message to other shards. +func (p *PbftConsensusNode) RelayWithProofSend(block *core.Block) { + if params.RelayWithMerkleProof != 1 { + log.Panicf("Parameter Error: RelayWithMerkleProof should be 1, but RelayWithMerkleProof=%d", params.RelayWithMerkleProof) + } + for sid := uint64(0); sid < p.pbftChainConfig.ShardNums; sid++ { + if sid == p.ShardID { + continue + } + + txHashes := make([][]byte, len(p.CurChain.Txpool.RelayPool[sid])) + for i, tx := range p.CurChain.Txpool.RelayPool[sid] { + txHashes[i] = tx.TxHash[:] + } + txProofs := chain.TxProofBatchGenerateOnBlock(txHashes, block) + + rwp := message.RelayWithProof{ + Txs: p.CurChain.Txpool.RelayPool[sid], + TxProofs: txProofs, + SenderShardID: p.ShardID, + SenderSeq: p.sequenceID, + } + rByte, err := json.Marshal(rwp) + if err != nil { + log.Panic() + } + msg_send := message.MergeMessage(message.CRelayWithProof, rByte) + + go networks.TcpDial(msg_send, p.ip_nodeTable[sid][0]) + p.pl.Plog.Printf("S%dN%d : sended relay txs & proofs to %d\n", p.ShardID, p.NodeID, sid) + } + p.CurChain.Txpool.ClearRelayPool() +} diff --git a/core/block.go b/core/block.go index 517a174..4eec5d4 100644 --- a/core/block.go +++ b/core/block.go @@ -10,6 +10,8 @@ import ( "fmt" "log" "time" + + "github.com/bits-and-blooms/bitset" ) // The definition of blockheader @@ -17,9 +19,10 @@ type BlockHeader struct { ParentBlockHash []byte StateRoot []byte TxRoot []byte + Bloom bitset.BitSet Number uint64 Time time.Time - Miner uint64 + Miner int32 } // Encode blockHeader for storing further diff --git a/go.mod b/go.mod index 5018305..5b467b7 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect github.com/VictoriaMetrics/fastcache v1.6.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.14.3 github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cockroachdb/errors v1.9.1 // indirect diff --git a/go.sum b/go.sum index 3d63775..b264639 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.14.3 h1:Gd2c8lSNf9pKXom5JtD7AaKO8o7fGQ2LtFj1436qilA= +github.com/bits-and-blooms/bitset v1.14.3/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= diff --git a/message/message.go b/message/message.go index 544c388..40934b2 100644 --- a/message/message.go +++ b/message/message.go @@ -19,8 +19,9 @@ const ( CSendOldrequest MessageType = "sendOldrequest" CStop MessageType = "stop" - CRelay MessageType = "relay" - CInject MessageType = "inject" + CRelay MessageType = "relay" + CRelayWithProof MessageType = "CRelay&Proof" + CInject MessageType = "inject" CBlockInfo MessageType = "BlockInfo" CSeqIDinfo MessageType = "SequenceID" diff --git a/message/message_relay.go b/message/message_relay.go index 58278d2..9928a24 100644 --- a/message/message_relay.go +++ b/message/message_relay.go @@ -1,6 +1,9 @@ package message -import "blockEmulator/core" +import ( + "blockEmulator/chain" + "blockEmulator/core" +) // if transaction relaying is used, this message is used for sending sequence id, too type Relay struct { @@ -8,3 +11,11 @@ type Relay struct { SenderShardID uint64 SenderSeq uint64 } + +// This struct is similar to Relay. Nodes receiving this message will validate this proof first. +type RelayWithProof struct { + Txs []*core.Transaction + TxProofs []chain.TxProofResult + SenderShardID uint64 + SenderSeq uint64 +} diff --git a/params/global_config.go b/params/global_config.go index dcb0aee..672bde9 100644 --- a/params/global_config.go +++ b/params/global_config.go @@ -24,7 +24,9 @@ var ( InjectSpeed = 2000 // The speed of transaction injection TotalDataSize = 160000 // The total number of txs to be injected TxBatchSize = 16000 // The supervisor read a batch of txs then send them. The size of a batch is 'TxBatchSize' - BrokerNum = 10 // The # of Broker accounts used in Broker / CLPA_Broker. + + BrokerNum = 10 // The # of Broker accounts used in Broker / CLPA_Broker. + RelayWithMerkleProof = 0 // When using a consensus about "Relay", nodes will send Tx Relay with proof if "RelayWithMerkleProof" = 1 ExpDataRootDir = "expTest" // The root dir where the experimental data should locate. DataWrite_path = ExpDataRootDir + "/result/" // Measurement data result output path @@ -50,10 +52,11 @@ type globalConfig struct { InjectSpeed int `json:"InjectSpeed"` TotalDataSize int `json:"TotalDataSize"` - TxBatchSize int `json:"TxBatchSize"` - BrokerNum int `json:"BrokerNum"` - DatasetFile string `json:"DatasetFile"` - ReconfigTimeGap int `json:"ReconfigTimeGap"` + TxBatchSize int `json:"TxBatchSize"` + BrokerNum int `json:"BrokerNum"` + RelayWithMerkleProof int `json:"RelayWithMerkleProof"` + DatasetFile string `json:"DatasetFile"` + ReconfigTimeGap int `json:"ReconfigTimeGap"` } func ReadConfigFile() { @@ -89,6 +92,7 @@ func ReadConfigFile() { TxBatchSize = config.TxBatchSize BrokerNum = config.BrokerNum + RelayWithMerkleProof = config.RelayWithMerkleProof DatasetFile = config.DatasetFile ReconfigTimeGap = config.ReconfigTimeGap diff --git a/test/test_clpa.go b/partition/clpa_test.go similarity index 70% rename from test/test_clpa.go rename to partition/clpa_test.go index ffb8d3a..0138684 100644 --- a/test/test_clpa.go +++ b/partition/clpa_test.go @@ -1,18 +1,19 @@ -package test +package partition import ( - "blockEmulator/partition" + "blockEmulator/params" "encoding/csv" "io" "log" "os" + "testing" ) -func Test_CLPA() { - k := new(partition.CLPAState) +func TestClpa(t *testing.T) { + k := new(CLPAState) k.Init_CLPAState(0.5, 100, 4) - txfile, err := os.Open("../2000000to2999999_BlockTransaction.csv") + txfile, err := os.Open("../" + params.DatasetFile) if err != nil { log.Panic(err) } @@ -20,6 +21,8 @@ func Test_CLPA() { defer txfile.Close() reader := csv.NewReader(txfile) datanum := 0 + + // read transactions reader.Read() for { data, err := reader.Read() @@ -30,10 +33,10 @@ func Test_CLPA() { log.Panic(err) } if data[6] == "0" && data[7] == "0" && len(data[3]) > 16 && len(data[4]) > 16 && data[3] != data[4] { - s := partition.Vertex{ + s := Vertex{ Addr: data[3][2:], } - r := partition.Vertex{ + r := Vertex{ Addr: data[4][2:], } k.AddEdge(s, r) @@ -42,6 +45,4 @@ func Test_CLPA() { } k.CLPA_Partition() - - print(k.CrossShardEdgeNum) } diff --git a/partition/partition_CLPA.go b/partition/partition_CLPA.go index 9bee2c5..ec6f6c2 100644 --- a/partition/partition_CLPA.go +++ b/partition/partition_CLPA.go @@ -230,7 +230,7 @@ func (cs *CLPAState) getShard_score(v Vertex, uShard int) float64 { // CLPA 划分算法 func (cs *CLPAState) CLPA_Partition() (map[string]uint64, int) { cs.ComputeEdges2Shard() - fmt.Println(cs.CrossShardEdgeNum) + fmt.Println("Before running CLPA, cross-shard edge number:", cs.CrossShardEdgeNum) res := make(map[string]uint64) updateTreshold := make(map[string]int) for iter := 0; iter < cs.MaxIterations; iter += 1 { // 第一层循环控制算法次数,constraint @@ -269,7 +269,7 @@ func (cs *CLPAState) CLPA_Partition() (map[string]uint64, int) { } cs.ComputeEdges2Shard() - fmt.Println(cs.CrossShardEdgeNum) + fmt.Println("After running CLPA, cross-shard edge number:", cs.CrossShardEdgeNum) return res, cs.CrossShardEdgeNum } diff --git a/query/query_accountstate.go b/query/query_accountstate.go index 70c53ba..880a9dc 100644 --- a/query/query_accountstate.go +++ b/query/query_accountstate.go @@ -2,31 +2,78 @@ package query import ( "blockEmulator/core" - "blockEmulator/params" "log" - "strconv" + "os" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/trie" ) -func QueryAccountState(ShardID, NodeID uint64, address string) *core.AccountState { - fp := params.DatabaseWrite_path + "mptDB/ldb/s" + strconv.FormatUint(ShardID, 10) + "/n" + strconv.FormatUint(NodeID, 10) - db, _ := rawdb.NewLevelDBDatabase(fp, 0, 1, "accountState", false) +func QueryAccountState(chainDBfp, mptfp string, ShardID, NodeID uint64, address string) *core.AccountState { + _, err := os.Stat(mptfp) + if os.IsNotExist(err) { + log.Panic("No filepath") + } + // recover the leveldb + db, _ := rawdb.Open(rawdb.OpenOptions{Type: "leveldb", Directory: mptfp, Namespace: "accountState"}) triedb := trie.NewDatabaseWithConfig(db, &trie.Config{ Cache: 0, Preimages: true, }) defer db.Close() - storage := initStorage(ShardID, NodeID) + // fetch the newest blockHeader + storage := initStorage(chainDBfp, ShardID, NodeID) + defer storage.DataBase.Close() curHash, _ := storage.GetNewestBlockHash() curb, _ := storage.GetBlock(curHash) + // recover the MPT st, err := trie.New(trie.TrieID(common.BytesToHash(curb.Header.StateRoot)), triedb) if err != nil { log.Panic() } + // fetch the account asenc, _ := st.Get([]byte(address)) + if asenc == nil { + return nil + } state_a := core.DecodeAS(asenc) return state_a } + +func QueryAccountStateList(chainDBfp, mptfp string, ShardID, NodeID uint64, addresses []string) []*core.AccountState { + _, err := os.Stat(mptfp) + if os.IsNotExist(err) { + log.Panic("No filepath") + } + + ret := make([]*core.AccountState, len(addresses)) + // recover the leveldb + db, _ := rawdb.Open(rawdb.OpenOptions{Type: "leveldb", Directory: mptfp, Namespace: "accountState"}) + triedb := trie.NewDatabaseWithConfig(db, &trie.Config{ + Cache: 0, + Preimages: true, + }) + defer db.Close() + // fetch the newest blockHeader + storage := initStorage(chainDBfp, ShardID, NodeID) + defer storage.DataBase.Close() + curHash, _ := storage.GetNewestBlockHash() + curb, _ := storage.GetBlock(curHash) + // recover the MPT + st, err := trie.New(trie.TrieID(common.BytesToHash(curb.Header.StateRoot)), triedb) + if err != nil { + log.Panic() + } + // fetch the accounts + for i, address := range addresses { + asenc, _ := st.Get([]byte(address)) + if asenc == nil { + ret[i] = nil + } else { + ret[i] = core.DecodeAS(asenc) + } + } + + return ret +} diff --git a/query/query_block.go b/query/query_block.go index f3dfecd..66ab9dd 100644 --- a/query/query_block.go +++ b/query/query_block.go @@ -4,21 +4,23 @@ import ( "blockEmulator/core" "blockEmulator/params" "blockEmulator/storage" + "fmt" "github.com/boltdb/bolt" ) -func initStorage(ShardID, NodeID uint64) *storage.Storage { +func initStorage(dbfp string, ShardID, NodeID uint64) *storage.Storage { pcc := ¶ms.ChainConfig{ ChainID: ShardID, NodeID: NodeID, ShardID: ShardID, } - return storage.NewStorage(pcc) + return storage.NewStorage(dbfp, pcc) } func QueryBlocks(ShardID, NodeID uint64) []*core.Block { - db := initStorage(ShardID, NodeID).DataBase + dbfp := params.DatabaseWrite_path + fmt.Sprintf("chainDB/S%d_N%d", ShardID, NodeID) + db := initStorage(dbfp, ShardID, NodeID).DataBase defer db.Close() blocks := make([]*core.Block, 0) err1 := db.View(func(tx *bolt.Tx) error { @@ -33,13 +35,14 @@ func QueryBlocks(ShardID, NodeID uint64) []*core.Block { return nil }) if err1 != nil { - err1.Error() + fmt.Println(err1.Error()) } return blocks } func QueryBlock(ShardID, NodeID, Number uint64) *core.Block { - db := initStorage(ShardID, NodeID).DataBase + dbfp := params.DatabaseWrite_path + fmt.Sprintf("chainDB/S%d_N%d", ShardID, NodeID) + db := initStorage(dbfp, ShardID, NodeID).DataBase defer db.Close() block := new(core.Block) err1 := db.View(func(tx *bolt.Tx) error { @@ -56,13 +59,14 @@ func QueryBlock(ShardID, NodeID, Number uint64) *core.Block { return nil }) if err1 != nil { - err1.Error() + fmt.Println(err1.Error()) } return block } func QueryNewestBlock(ShardID, NodeID uint64) *core.Block { - storage := initStorage(ShardID, NodeID) + dbfp := params.DatabaseWrite_path + fmt.Sprintf("chainDB/S%d_N%d", ShardID, NodeID) + storage := initStorage(dbfp, ShardID, NodeID) defer storage.DataBase.Close() hash, _ := storage.GetNewestBlockHash() block, _ := storage.GetBlock(hash) @@ -70,7 +74,8 @@ func QueryNewestBlock(ShardID, NodeID uint64) *core.Block { } func QueryBlockTxs(ShardID, NodeID, Number uint64) []*core.Transaction { - db := initStorage(ShardID, NodeID).DataBase + dbfp := params.DatabaseWrite_path + fmt.Sprintf("chainDB/S%d_N%d", ShardID, NodeID) + db := initStorage(dbfp, ShardID, NodeID).DataBase defer db.Close() block := new(core.Block) err1 := db.View(func(tx *bolt.Tx) error { @@ -87,7 +92,7 @@ func QueryBlockTxs(ShardID, NodeID, Number uint64) []*core.Transaction { return nil }) if err1 != nil { - err1.Error() + fmt.Println(err1.Error()) } return block.Body } diff --git a/query/query_test.go b/query/query_test.go new file mode 100644 index 0000000..d032e27 --- /dev/null +++ b/query/query_test.go @@ -0,0 +1,94 @@ +package query + +import ( + "blockEmulator/chain" + "blockEmulator/core" + "blockEmulator/params" + "fmt" + "log" + "math/big" + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/core/rawdb" +) + +func TestQuery(t *testing.T) { + // pre-build a blockchain + buildBlockChain() + fmt.Println("Now a new blockchain is generated.") + + // query block data from the database files (boltDB) + blocks := QueryBlocks(0, 0) + fmt.Println("The number of blocks in this shard:", len(blocks)) + + block_a := QueryBlock(0, 0, 2) + block_a.PrintBlock() + + block_b := QueryNewestBlock(0, 0) + block_b.PrintBlock() + + // query tx data from the database files + txs := QueryBlockTxs(0, 0, 3) + fmt.Println(len(txs)) + + // query account state from level db + mptfp := params.DatabaseWrite_path + "mptDB/ldb/s0/n0" + chaindbfp := params.DatabaseWrite_path + fmt.Sprintf("chainDB/S%d_N%d", 0, 0) + accountState := QueryAccountState(chaindbfp, mptfp, 0, 0, "00000000001") + fmt.Println("The account balance of 00000000001:", accountState.Balance) + + clearBlockchainData() +} + +func buildBlockChain() { + accounts := []string{"00000000001", "00000000002", "00000000003", "00000000004", "00000000005", "00000000006"} + as := make([]*core.AccountState, 0) + for idx := range accounts { + as = append(as, &core.AccountState{ + Balance: big.NewInt(int64(idx*1000000) + 1000000), + }) + } + fp := params.DatabaseWrite_path + "mptDB/ldb/s0/N0" + fmt.Println(fp) + db, err := rawdb.NewLevelDBDatabase(fp, 0, 1, "accountState", false) + if err != nil { + log.Panic(err) + } + params.ShardNum = 1 + pcc := ¶ms.ChainConfig{ + ChainID: 0, + NodeID: 0, + ShardID: 0, + Nodes_perShard: 1, + ShardNums: 1, + BlockSize: uint64(params.MaxBlockSize_global), + BlockInterval: uint64(params.Block_Interval), + InjectSpeed: uint64(params.InjectSpeed), + } + CurChain, _ := chain.NewBlockChain(pcc, db) + CurChain.PrintBlockChain() + CurChain.AddAccounts(accounts, as, 0) + CurChain.Txpool.AddTx2Pool(core.NewTransaction("00000000001", "00000000002", big.NewInt(100000), 1, time.Now())) + + for i := 0; i < 4; i++ { + b := CurChain.GenerateBlock(int32(i)) + CurChain.AddBlock(b) + } + + astates := CurChain.FetchAccounts(accounts) + for idx, state := range astates { + fmt.Println(accounts[idx], state.Balance) + } + CurChain.CloseBlockChain() +} + +func clearBlockchainData() { + // clear test data file + err := os.RemoveAll(params.ExpDataRootDir) + if err != nil { + fmt.Printf("Failed to delete directory: %v\n", err) + return + } +} diff --git a/test/test_result.go b/query/resultcheck_test.go similarity index 54% rename from test/test_result.go rename to query/resultcheck_test.go index d7b0336..e687825 100644 --- a/test/test_result.go +++ b/query/resultcheck_test.go @@ -1,7 +1,6 @@ -package test +package query import ( - "blockEmulator/chain" "blockEmulator/core" "blockEmulator/params" "encoding/csv" @@ -11,11 +10,46 @@ import ( "math/big" "os" "strconv" + "testing" "time" - - "github.com/ethereum/go-ethereum/core/rawdb" ) +func TestFinalResult(t *testing.T) { + // check the final result after running BlockEmulator + + // get the result from Dataset + accountBalance := loadFinalResultFromDataset() + acCorrect := make(map[string]bool) + + // convert keys to list + accounts := make([]string, len(accountBalance)) + i := 0 + for key := range accountBalance { + accounts[i] = key + i++ + } + + // check the result from BlockEmulator + for sid := 0; sid < params.ShardNum; sid++ { + mptfp := "../" + params.DatabaseWrite_path + "mptDB/ldb/s" + strconv.FormatUint(uint64(sid), 10) + "/n0" + chaindbfp := "../" + params.DatabaseWrite_path + fmt.Sprintf("chainDB/S%d_N%d", sid, 0) + aslist := QueryAccountStateList(chaindbfp, mptfp, uint64(sid), 0, accounts) + for idx, as := range aslist { + if as != nil && as.Balance.Cmp(accountBalance[accounts[idx]]) == 0 { + acCorrect[accounts[idx]] = true + } + } + } + fmt.Println("Results from BlockEmulator: # of correct accounts", len(acCorrect)) + if len(acCorrect) == len(accountBalance) { + fmt.Println("test pass") + } else if len(accountBalance)-len(acCorrect) < params.BrokerNum { + fmt.Printf("%d err accounts, they maybe brokers", len(accountBalance)-len(acCorrect)) + } else { + log.Panic("Err, too many wrong accounts", len(accountBalance)-len(acCorrect)) + } +} + func data2tx(data []string, nonce uint64) (*core.Transaction, bool) { if data[6] == "0" && data[7] == "0" && len(data[3]) > 16 && len(data[4]) > 16 && data[3] != data[4] { val, ok := new(big.Int).SetString(data[8], 10) @@ -28,11 +62,9 @@ func data2tx(data []string, nonce uint64) (*core.Transaction, bool) { return &core.Transaction{}, false } -func Ttestresult(ShardNums int) { +func loadFinalResultFromDataset() map[string]*big.Int { accountBalance := make(map[string]*big.Int) - acCorrect := make(map[string]bool) - - txfile, err := os.Open(params.DatasetFile) + txfile, err := os.Open("../" + params.DatasetFile) if err != nil { log.Panic(err) } @@ -67,37 +99,7 @@ func Ttestresult(ShardNums int) { } } } - fmt.Println(len(accountBalance)) - for sid := 0; sid < ShardNums; sid++ { - fp := params.DatabaseWrite_path + "mptDB/ldb/s" + strconv.FormatUint(uint64(sid), 10) + "/n0" - db, err := rawdb.NewLevelDBDatabase(fp, 0, 1, "accountState", false) - if err != nil { - log.Panic(err) - } - pcc := ¶ms.ChainConfig{ - ChainID: uint64(sid), - NodeID: 0, - ShardID: uint64(sid), - Nodes_perShard: uint64(params.NodesInShard), - ShardNums: uint64(ShardNums), - BlockSize: uint64(params.MaxBlockSize_global), - BlockInterval: uint64(params.Block_Interval), - InjectSpeed: uint64(params.InjectSpeed), - } - CurChain, _ := chain.NewBlockChain(pcc, db) - for key, val := range accountBalance { - v := CurChain.FetchAccounts([]string{key}) - if val.Cmp(v[0].Balance) == 0 { - acCorrect[key] = true - } - } - CurChain.CloseBlockChain() - } - fmt.Println(len(acCorrect)) - if len(acCorrect) == len(accountBalance) { - fmt.Println("test pass") - } else { - fmt.Println(len(accountBalance)-len(acCorrect), "accounts errs, they may be brokers~;") - fmt.Println("if the number of err accounts is too large, the mechanism has bugs") - } + fmt.Println("Results from dataset file: # of accounts", len(accountBalance)) + + return accountBalance } diff --git a/storage/storage.go b/storage/storage.go index 4380fc3..e2d7008 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -10,7 +10,6 @@ import ( "fmt" "log" "os" - "strconv" "github.com/boltdb/bolt" ) @@ -24,7 +23,7 @@ type Storage struct { } // new a storage, build a bolt datase -func NewStorage(cc *params.ChainConfig) *Storage { +func NewStorage(dbFp string, cc *params.ChainConfig) *Storage { dir := params.DatabaseWrite_path + "chainDB" errMkdir := os.MkdirAll(dir, os.ModePerm) if errMkdir != nil { @@ -32,7 +31,7 @@ func NewStorage(cc *params.ChainConfig) *Storage { } s := &Storage{ - dbFilePath: params.DatabaseWrite_path + "chainDB/S" + strconv.FormatUint(cc.ShardID, 10) + "_N" + strconv.FormatUint(cc.NodeID, 10), + dbFilePath: dbFp, blockBucket: "block", blockHeaderBucket: "blockHeader", newestBlockHashBucket: "newestBlockHash", diff --git a/test/test_blockChain.go b/test/test_blockChain.go deleted file mode 100644 index 68d3801..0000000 --- a/test/test_blockChain.go +++ /dev/null @@ -1,50 +0,0 @@ -package test - -import ( - "blockEmulator/chain" - "blockEmulator/core" - "blockEmulator/params" - "fmt" - "log" - "math/big" - - "github.com/ethereum/go-ethereum/core/rawdb" -) - -func TestBlockChain(ShardNums int) { - accounts := []string{"000000000001", "00000000002", "00000000003", "00000000004", "00000000005", "00000000006"} - as := make([]*core.AccountState, 0) - for idx := range accounts { - as = append(as, &core.AccountState{ - Balance: big.NewInt(int64(idx)), - }) - } - for sid := 0; sid < 1; sid++ { - fp := params.DatabaseWrite_path + "mptDB/ldb/s0/N0" - db, err := rawdb.NewLevelDBDatabase(fp, 0, 1, "accountState", false) - if err != nil { - log.Panic(err) - } - params.ShardNum = 1 - pcc := ¶ms.ChainConfig{ - ChainID: uint64(sid), - NodeID: 0, - ShardID: uint64(sid), - Nodes_perShard: uint64(1), - ShardNums: 4, - BlockSize: uint64(params.MaxBlockSize_global), - BlockInterval: uint64(params.Block_Interval), - InjectSpeed: uint64(params.InjectSpeed), - } - CurChain, _ := chain.NewBlockChain(pcc, db) - CurChain.PrintBlockChain() - CurChain.AddAccounts(accounts, as) - CurChain.PrintBlockChain() - - astates := CurChain.FetchAccounts(accounts) - for _, state := range astates { - fmt.Println(state.Balance) - } - CurChain.CloseBlockChain() - } -} diff --git a/test/test_pbft.go b/test/test_pbft.go deleted file mode 100644 index 1054eb4..0000000 --- a/test/test_pbft.go +++ /dev/null @@ -1,59 +0,0 @@ -package test - -import ( - "blockEmulator/consensus_shard/pbft_all" - "blockEmulator/params" - "blockEmulator/supervisor" - "strconv" - "time" -) - -// TEST case -//nid, _ := strconv.ParseUint(os.Args[1], 10, 64) -//nnm, _ := strconv.ParseUint(os.Args[2], 10, 64) -//sid, _ := strconv.ParseUint(os.Args[3], 10, 64) -//snm, _ := strconv.ParseUint(os.Args[4], 10, 64) -//test.TestPBFT(nid, nnm, sid, snm) - -func TestPBFT(nid, nnm, sid, snm uint64) { - params.ShardNum = int(snm) - for i := uint64(0); i < snm; i++ { - if _, ok := params.IPmap_nodeTable[i]; !ok { - params.IPmap_nodeTable[i] = make(map[uint64]string) - } - for j := uint64(0); j < nnm; j++ { - params.IPmap_nodeTable[i][j] = "127.0.0.1:" + strconv.Itoa(8800+int(i)*100+int(j)) - } - } - params.IPmap_nodeTable[params.SupervisorShard] = make(map[uint64]string) - params.IPmap_nodeTable[params.SupervisorShard][0] = "127.0.0.1:18800" - - pcc := ¶ms.ChainConfig{ - ChainID: sid, - NodeID: nid, - ShardID: sid, - Nodes_perShard: uint64(params.NodesInShard), - ShardNums: snm, - BlockSize: uint64(params.MaxBlockSize_global), - BlockInterval: uint64(params.Block_Interval), - InjectSpeed: uint64(params.InjectSpeed), - } - - if nid == 12345678 { - lsn := new(supervisor.Supervisor) - lsn.NewSupervisor("127.0.0.1:18800", pcc, "Relay", "TPS_Relay", "Latency_Relay", "CrossTxRate_Relay", "TxNumberCount_Relay") - time.Sleep(10000 * time.Millisecond) - go lsn.SupervisorTxHandling() - lsn.TcpListen() - return - } - - worker := pbft_all.NewPbftNode(sid, nid, pcc, "Relay") - time.Sleep(5 * time.Second) - if nid == 0 { - go worker.Propose() - worker.TcpListen() - } else { - worker.TcpListen() - } -} diff --git a/test/test_query.go b/test/test_query.go deleted file mode 100644 index 0b7d458..0000000 --- a/test/test_query.go +++ /dev/null @@ -1,31 +0,0 @@ -package test - -import ( - "blockEmulator/query" - "fmt" -) - -func TestQueryBlocks() { - blocks := query.QueryBlocks(2, 0) - fmt.Println(len(blocks)) -} - -func TestQueryBlock() { - block := query.QueryBlock(0, 1, 22) - fmt.Println(len(block.Body)) -} - -func TestQueryNewestBlock() { - blocks := query.QueryNewestBlock(0, 0) - fmt.Println(blocks.Header.Number) -} - -func TestQueryBlockTxs() { - txs := query.QueryBlockTxs(0, 0, 3) - fmt.Println(len(txs)) -} - -func TestQueryAccountState() { - accountState := query.QueryAccountState(0, 0, "32be343b94f860124dc4fee278fdcbd38c102d88") - fmt.Println(accountState.Balance) -} diff --git a/utils/utils_partition.go b/utils/utils_partition.go index 7972845..ae050ac 100644 --- a/utils/utils_partition.go +++ b/utils/utils_partition.go @@ -3,6 +3,7 @@ package utils import ( "blockEmulator/params" "log" + "math/big" "strconv" ) @@ -18,3 +19,10 @@ func Addr2Shard(addr Address) int { } return int(num) % params.ShardNum } + +// mod method +func ModBytes(data []byte, mod uint) uint { + num := new(big.Int).SetBytes(data) + result := new(big.Int).Mod(num, big.NewInt(int64(mod))) + return uint(result.Int64()) +}