Skip to content

Commit

Permalink
core, eth/filters, miner, xeth: Optimised log filtering
Browse files Browse the repository at this point in the history
Log filtering is now using a MIPmap like approach where addresses of
logs are added to a mapped bloom bin. The current levels for the MIP are
in ranges of 1.000.000, 500.000, 100.000, 50.000, 1.000. Logs are
therefor filtered in batches of 1.000.
  • Loading branch information
obscuren committed Oct 16, 2015
1 parent 30f057a commit 6dc1478
Show file tree
Hide file tree
Showing 14 changed files with 647 additions and 134 deletions.
27 changes: 22 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,17 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
events = append(events, ChainEvent{block, block.Hash(), logs})

// This puts transactions in a extra db for rpc
PutTransactions(self.chainDb, block, block.Transactions())
if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil {
return i, err
}
// store the receipts
PutReceipts(self.chainDb, receipts)

if err := PutReceipts(self.chainDb, receipts); err != nil {
return i, err
}
// Write map map bloom filters
if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
return i, err
}
case SideStatTy:
if glog.V(logger.Detail) {
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
Expand Down Expand Up @@ -743,8 +750,18 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// insert the block in the canonical way, re-writing history
self.insert(block)
// write canonical receipts and transactions
PutTransactions(self.chainDb, block, block.Transactions())
PutReceipts(self.chainDb, GetBlockReceipts(self.chainDb, block.Hash()))
if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil {
return err
}
receipts := GetBlockReceipts(self.chainDb, block.Hash())
// write receipts
if err := PutReceipts(self.chainDb, receipts); err != nil {
return err
}
// Write map map bloom filters
if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
return err
}

addedTxs = append(addedTxs, block.Transactions()...)
}
Expand Down
7 changes: 6 additions & 1 deletion core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,12 @@ func (b *BlockGen) AddTx(tx *types.Transaction) {
b.receipts = append(b.receipts, receipt)
}

func (b *BlockGen) AddReceipt(receipt *types.Receipt) {
// AddUncheckedReceipts forcefully adds a receipts to the block without a
// backing transaction.
//
// AddUncheckedReceipts will cause consensus failures when used during real
// chain processing. This is best used in conjuction with raw block insertion.
func (b *BlockGen) AddUncheckedReceipt(receipt *types.Receipt) {
b.receipts = append(b.receipts, receipt)
}

Expand Down
44 changes: 44 additions & 0 deletions core/chain_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package core

import (
"bytes"
"encoding/binary"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -42,6 +44,9 @@ var (

ExpDiffPeriod = big.NewInt(100000)
blockHashPre = []byte("block-hash-") // [deprecated by eth/63]

mipmapPre = []byte("mipmap-log-bloom-")
MIPMapLevels = []uint64{1000000, 500000, 100000, 50000, 1000}
)

// CalcDifficulty is the difficulty adjustment algorithm. It returns
Expand Down Expand Up @@ -346,3 +351,42 @@ func GetBlockByHashOld(db ethdb.Database, hash common.Hash) *types.Block {
}
return (*types.Block)(&block)
}

// returns a formatted MIP mapped key by adding prefix, canonical number and level
//
// ex. fn(98, 1000) = (prefix || 1000 || 0)
func mipmapKey(num, level uint64) []byte {
lkey := make([]byte, 8)
binary.BigEndian.PutUint64(lkey, level)
key := new(big.Int).SetUint64(num / level * level)

return append(mipmapPre, append(lkey, key.Bytes()...)...)
}

// WriteMapmapBloom writes each address included in the receipts' logs to the
// MIP bloom bin.
func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts) error {
batch := db.NewBatch()
for _, level := range MIPMapLevels {
key := mipmapKey(number, level)
bloomDat, _ := db.Get(key)
bloom := types.BytesToBloom(bloomDat)
for _, receipt := range receipts {
for _, log := range receipt.Logs() {
bloom.Add(log.Address.Big())
}
}
batch.Put(key, bloom.Bytes())
}
if err := batch.Write(); err != nil {
return fmt.Errorf("mipmap write fail for: %d: %v", number, err)
}
return nil
}

// GetMipmapBloom returns a bloom filter using the number and level as input
// parameters. For available levels see MIPMapLevels.
func GetMipmapBloom(db ethdb.Database, number, level uint64) types.Bloom {
bloomDat, _ := db.Get(mipmapKey(number, level))
return types.BytesToBloom(bloomDat)
}
112 changes: 112 additions & 0 deletions core/chain_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package core

import (
"encoding/json"
"io/ioutil"
"math/big"
"os"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
Expand Down Expand Up @@ -318,3 +321,112 @@ func TestHeadStorage(t *testing.T) {
t.Fatalf("Head block hash mismatch: have %v, want %v", entry, blockFull.Hash())
}
}

func TestMipmapBloom(t *testing.T) {
db, _ := ethdb.NewMemDatabase()

receipt1 := new(types.Receipt)
receipt1.SetLogs(vm.Logs{
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
&vm.Log{Address: common.BytesToAddress([]byte("address"))},
})
receipt2 := new(types.Receipt)
receipt2.SetLogs(vm.Logs{
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
&vm.Log{Address: common.BytesToAddress([]byte("address1"))},
})

WriteMipmapBloom(db, 1, types.Receipts{receipt1})
WriteMipmapBloom(db, 2, types.Receipts{receipt2})

for _, level := range MIPMapLevels {
bloom := GetMipmapBloom(db, 2, level)
if !bloom.Test(new(big.Int).SetBytes([]byte("address1"))) {
t.Error("expected test to be included on level:", level)
}
}

// reset
db, _ = ethdb.NewMemDatabase()
receipt := new(types.Receipt)
receipt.SetLogs(vm.Logs{
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
})
WriteMipmapBloom(db, 999, types.Receipts{receipt1})

receipt = new(types.Receipt)
receipt.SetLogs(vm.Logs{
&vm.Log{Address: common.BytesToAddress([]byte("test 1"))},
})
WriteMipmapBloom(db, 1000, types.Receipts{receipt})

bloom := GetMipmapBloom(db, 1000, 1000)
if bloom.TestBytes([]byte("test")) {
t.Error("test should not have been included")
}
}

func TestMipmapChain(t *testing.T) {
dir, err := ioutil.TempDir("", "mipmap")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)

var (
db, _ = ethdb.NewLDBDatabase(dir, 16)
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))

hash1 = common.BytesToHash([]byte("topic1"))
)
defer db.Close()

genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr, big.NewInt(1000000)})
chain := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) {
var receipts types.Receipts
switch i {
case 1:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.SetLogs(vm.Logs{
&vm.Log{
Address: addr,
Topics: []common.Hash{hash1},
},
})
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}
case 1000:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.SetLogs(vm.Logs{&vm.Log{Address: addr2}})
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}

}

// store the receipts
err := PutReceipts(db, receipts)
if err != nil {
t.Fatal(err)
}
WriteMipmapBloom(db, uint64(i+1), receipts)
})
for _, block := range chain {
WriteBlock(db, block)
if err := WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
if err := WriteHeadBlockHash(db, block.Hash()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
if err := PutBlockReceipts(db, block, block.Receipts()); err != nil {
t.Fatal("error writing block receipts:", err)
}
}

bloom := GetMipmapBloom(db, 0, 1000)
if bloom.TestBytes(addr2[:]) {
t.Error("address was included in bloom and should not have")
}
}
32 changes: 11 additions & 21 deletions core/transaction_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package core

import (
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
Expand All @@ -32,22 +34,16 @@ var (
)

// PutTransactions stores the transactions in the given database
func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) {
batch := new(leveldb.Batch)
_, batchWrite := db.(*ethdb.LDBDatabase)
func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) error {
batch := db.NewBatch()

for i, tx := range block.Transactions() {
rlpEnc, err := rlp.EncodeToBytes(tx)
if err != nil {
glog.V(logger.Debug).Infoln("Failed encoding tx", err)
return
return fmt.Errorf("failed encoding tx: %v", err)
}

if batchWrite {
batch.Put(tx.Hash().Bytes(), rlpEnc)
} else {
db.Put(tx.Hash().Bytes(), rlpEnc)
}
batch.Put(tx.Hash().Bytes(), rlpEnc)

var txExtra struct {
BlockHash common.Hash
Expand All @@ -59,22 +55,16 @@ func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactio
txExtra.Index = uint64(i)
rlpMeta, err := rlp.EncodeToBytes(txExtra)
if err != nil {
glog.V(logger.Debug).Infoln("Failed encoding tx meta data", err)
return
return fmt.Errorf("failed encoding tx meta data: %v", err)
}

if batchWrite {
batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
} else {
db.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
}
batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
}

if db, ok := db.(*ethdb.LDBDatabase); ok {
if err := db.LDB().Write(batch, nil); err != nil {
glog.V(logger.Error).Infoln("db write err:", err)
}
if err := batch.Write(); err != nil {
return fmt.Errorf("failed writing tx to db: %v", err)
}
return nil
}

func DeleteTransaction(db ethdb.Database, txHash common.Hash) {
Expand Down
41 changes: 41 additions & 0 deletions core/types/bloom9.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package types

import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -28,6 +29,46 @@ type bytesBacked interface {
Bytes() []byte
}

const bloomLength = 256

type Bloom [bloomLength]byte

func BytesToBloom(b []byte) Bloom {
var bloom Bloom
bloom.SetBytes(b)
return bloom
}

func (b *Bloom) SetBytes(d []byte) {
if len(b) < len(d) {
panic(fmt.Sprintf("bloom bytes too big %d %d", len(b), len(d)))
}

copy(b[bloomLength-len(d):], d)
}

func (b *Bloom) Add(d *big.Int) {
bin := new(big.Int).SetBytes(b[:])
bin.Or(bin, bloom9(d.Bytes()))
b.SetBytes(bin.Bytes())
}

func (b Bloom) Big() *big.Int {
return common.Bytes2Big(b[:])
}

func (b Bloom) Bytes() []byte {
return b[:]
}

func (b Bloom) Test(test *big.Int) bool {
return BloomLookup(b, test)
}

func (b Bloom) TestBytes(test []byte) bool {
return b.Test(common.BytesToBig(test))
}

func CreateBloom(receipts Receipts) Bloom {
bin := new(big.Int)
for _, receipt := range receipts {
Expand Down
Loading

0 comments on commit 6dc1478

Please sign in to comment.