diff --git a/commitment/commitment.go b/commitment/commitment.go index 13f058525..d5652dcfc 100644 --- a/commitment/commitment.go +++ b/commitment/commitment.go @@ -351,6 +351,13 @@ func (branchData BranchData) IsComplete() bool { // MergeHexBranches combines two branchData, number 2 coming after (and potentially shadowing) number 1 func (branchData BranchData) MergeHexBranches(branchData2 BranchData, newData []byte) (BranchData, error) { + if branchData2 == nil { + return branchData, nil + } + if branchData == nil { + return branchData2, nil + } + touchMap1 := binary.BigEndian.Uint16(branchData[0:]) afterMap1 := binary.BigEndian.Uint16(branchData[2:]) bitmap1 := touchMap1 & afterMap1 diff --git a/commitment/hex_patricia_hashed.go b/commitment/hex_patricia_hashed.go index 7d8acc2b9..2f5f452eb 100644 --- a/commitment/hex_patricia_hashed.go +++ b/commitment/hex_patricia_hashed.go @@ -24,13 +24,15 @@ import ( "hash" "io" "math/bits" + "strings" "github.com/holiman/uint256" "golang.org/x/crypto/sha3" + "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/rlp" - "github.com/ledgerwatch/log/v3" ) // keccakState wraps sha3.state. In addition to the usual hash methods, it also supports @@ -87,10 +89,23 @@ type HexPatriciaHashed struct { keccak2 keccakState accountKeyLen int trace bool - auxBuffer [1 + length.Hash]byte + hashAuxBuffer [128]byte // buffer to compute cell hash or write hash-related things byteArrayWriter ByteArrayWriter } +type state struct { + TouchMap [128]uint16 // For each row, bitmap of cells that were either present before modification, or modified or deleted + AfterMap [128]uint16 // For each row, bitmap of cells that were present after modification + CurrentKeyLen int8 + RootChecked bool // Set to false if it is not known whether the root is empty, set to true if it is checked + RootTouched bool + RootPresent bool + RootHash [32]byte + BranchBefore [128]bool // For each row, whether there was a branch node in the database loaded in unfold + CurrentKey [128]byte // For each row indicates which column is currently selected + Depths [128]int // For each row, the depth of cells in that row +} + func NewHexPatriciaHashed(accountKeyLen int, branchFn func(prefix []byte) ([]byte, error), accountFn func(plainKey []byte, cell *Cell) error, @@ -123,6 +138,7 @@ type Cell struct { CodeHash [length.Hash]byte // hash of the bytecode Storage [length.Hash]byte StorageLen int + Delete bool } var ( @@ -130,18 +146,6 @@ var ( EmptyCodeHash, _ = hex.DecodeString("c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") ) -func (cell *Cell) isEmpty() bool { - return cell.apl == 0 && - cell.spl == 0 && - cell.downHashedLen == 0 && - cell.extLen == 0 && - cell.hl == 0 && - cell.Nonce == 0 && - cell.Balance.IsZero() && - bytes.Equal(cell.CodeHash[:], EmptyCodeHash) && - cell.StorageLen == 0 -} - func (cell *Cell) fillEmpty() { cell.apl = 0 cell.spl = 0 @@ -152,6 +156,7 @@ func (cell *Cell) fillEmpty() { cell.Balance.Clear() copy(cell.CodeHash[:], EmptyCodeHash) cell.StorageLen = 0 + cell.Delete = false } func (cell *Cell) fillFromUpperCell(upCell *Cell, depth, depthIncrement int) { @@ -391,18 +396,14 @@ func (cell *Cell) fillFromFields(data []byte, pos int, fieldBits PartFlags) (int return pos, nil } -func (cell *Cell) setStorage(plainKey, value []byte) { - cell.spl = len(plainKey) - copy(cell.spk[:], plainKey) +func (cell *Cell) setStorage(value []byte) { cell.StorageLen = len(value) if len(value) > 0 { copy(cell.Storage[:], value) } } -func (cell *Cell) setAccountFields(plainKey, codeHash []byte, balance *uint256.Int, nonce uint64) { - cell.apl = len(plainKey) - copy(cell.apk[:], plainKey) +func (cell *Cell) setAccountFields(codeHash []byte, balance *uint256.Int, nonce uint64) { copy(cell.CodeHash[:], codeHash) cell.Balance.SetBytes(balance.Bytes()) @@ -683,33 +684,32 @@ func (hph *HexPatriciaHashed) computeCellHash(cell *Cell, depth int, buf []byte) hashedKeyOffset = depth - 64 } singleton := depth <= 64 - _ = singleton - if err := hashKey(hph.keccak, cell.spk[hph.accountKeyLen:cell.spl], cell.downHashedKey[:], hashedKeyOffset); err != nil { + if err := hashKey(hph.keccak, cell.spk[hph.accountKeyLen:cell.spl], hph.hashAuxBuffer[:], hashedKeyOffset); err != nil { return nil, err } - cell.downHashedKey[64-hashedKeyOffset] = 16 // Add terminator + hph.hashAuxBuffer[64-hashedKeyOffset] = 16 // Add terminator if singleton { if hph.trace { - fmt.Printf("leafHashWithKeyVal(singleton) for [%x]=>[%x]\n", cell.downHashedKey[:64-hashedKeyOffset+1], cell.Storage[:cell.StorageLen]) + fmt.Printf("leafHashWithKeyVal(singleton) for [%x]=>[%x]\n", hph.hashAuxBuffer[:64-hashedKeyOffset+1], cell.Storage[:cell.StorageLen]) } aux := make([]byte, 0, 33) - if aux, err = hph.leafHashWithKeyVal(aux, cell.downHashedKey[:64-hashedKeyOffset+1], cell.Storage[:cell.StorageLen], true); err != nil { + if aux, err = hph.leafHashWithKeyVal(aux, hph.hashAuxBuffer[:64-hashedKeyOffset+1], cell.Storage[:cell.StorageLen], true); err != nil { return nil, err } storageRootHash = *(*[length.Hash]byte)(aux[1:]) storageRootHashIsSet = true } else { if hph.trace { - fmt.Printf("leafHashWithKeyVal for [%x]=>[%x]\n", cell.downHashedKey[:64-hashedKeyOffset+1], cell.Storage[:cell.StorageLen]) + fmt.Printf("leafHashWithKeyVal for [%x]=>[%x]\n", hph.hashAuxBuffer[:64-hashedKeyOffset+1], cell.Storage[:cell.StorageLen]) } - return hph.leafHashWithKeyVal(buf, cell.downHashedKey[:64-hashedKeyOffset+1], cell.Storage[:cell.StorageLen], false) + return hph.leafHashWithKeyVal(buf, hph.hashAuxBuffer[:64-hashedKeyOffset+1], cell.Storage[:cell.StorageLen], false) } } if cell.apl > 0 { - if err := hashKey(hph.keccak, cell.apk[:cell.apl], cell.downHashedKey[:], depth); err != nil { + if err := hashKey(hph.keccak, cell.apk[:cell.apl], hph.hashAuxBuffer[:], depth); err != nil { return nil, err } - cell.downHashedKey[64-depth] = 16 // Add terminator + hph.hashAuxBuffer[64-depth] = 16 // Add terminator if !storageRootHashIsSet { if cell.extLen > 0 { // Extension @@ -732,9 +732,9 @@ func (hph *HexPatriciaHashed) computeCellHash(cell *Cell, depth int, buf []byte) var valBuf [128]byte valLen := cell.accountForHashing(valBuf[:], storageRootHash) if hph.trace { - fmt.Printf("accountLeafHashWithKey for [%x]=>[%x]\n", cell.downHashedKey[:65-depth], valBuf[:valLen]) + fmt.Printf("accountLeafHashWithKey for [%x]=>[%x]\n", hph.hashAuxBuffer[:65-depth], valBuf[:valLen]) } - return hph.accountLeafHashWithKey(buf, cell.downHashedKey[:65-depth], rlp.RlpEncodedBytes(valBuf[:valLen])) + return hph.accountLeafHashWithKey(buf, hph.hashAuxBuffer[:65-depth], rlp.RlpEncodedBytes(valBuf[:valLen])) } buf = append(buf, 0x80+32) if cell.extLen > 0 { @@ -766,6 +766,10 @@ func (hph *HexPatriciaHashed) needUnfolding(hashedKey []byte) int { if hph.trace { fmt.Printf("needUnfolding root, rootChecked = %t\n", hph.rootChecked) } + if hph.rootChecked && hph.root.downHashedLen == 0 && hph.root.hl == 0 { + // Previously checked, empty root, no unfolding needed + return 0 + } cell = &hph.root if cell.downHashedLen == 0 && cell.hl == 0 && !hph.rootChecked { // Need to attempt to unfold the root @@ -806,15 +810,16 @@ func (hph *HexPatriciaHashed) needUnfolding(hashedKey []byte) int { return unfolding } -func (hph *HexPatriciaHashed) unfoldBranchNode(row int, deleted bool, depth int) error { +// unfoldBranchNode returns true if unfolding has been done +func (hph *HexPatriciaHashed) unfoldBranchNode(row int, deleted bool, depth int) (bool, error) { branchData, err := hph.branchFn(hexToCompact(hph.currentKey[:hph.currentKeyLen])) if err != nil { - return err + return false, err } if !hph.rootChecked && hph.currentKeyLen == 0 && len(branchData) == 0 { // Special case - empty or deleted root hph.rootChecked = true - return nil + return false, nil } if len(branchData) == 0 { log.Warn("got empty branch data during unfold", "row", row, "depth", depth, "deleted", deleted) @@ -830,7 +835,7 @@ func (hph *HexPatriciaHashed) unfoldBranchNode(row int, deleted bool, depth int) hph.afterMap[row] = bitmap hph.touchMap[row] = 0 } - //fmt.Printf("unfoldBranchNode [unfoldBranchNode%x], afterMap = [%016b], touchMap = [%016b]\n", branchData, hph.afterMap[row], hph.touchMap[row]) + //fmt.Printf("unfoldBranchNode [%x], afterMap = [%016b], touchMap = [%016b]\n", branchData, hph.afterMap[row], hph.touchMap[row]) // Loop iterating over the set bits of modMask for bitset, j := bitmap, 0; bitset != 0; j++ { bit := bitset & -bitset @@ -840,7 +845,7 @@ func (hph *HexPatriciaHashed) unfoldBranchNode(row int, deleted bool, depth int) pos++ var err error if pos, err = cell.fillFromFields(branchData, pos, PartFlags(fieldBits)); err != nil { - return fmt.Errorf("prefix [%x], branchData[%x]: %w", hph.currentKey[:hph.currentKeyLen], branchData, err) + return false, fmt.Errorf("prefix [%x], branchData[%x]: %w", hph.currentKey[:hph.currentKeyLen], branchData, err) } if hph.trace { fmt.Printf("cell (%d, %x) depth=%d, hash=[%x], a=[%x], s=[%x], ex=[%x]\n", row, nibble, depth, cell.h[:cell.hl], cell.apk[:cell.apl], cell.spk[:cell.spl], cell.extension[:cell.extLen]) @@ -848,18 +853,18 @@ func (hph *HexPatriciaHashed) unfoldBranchNode(row int, deleted bool, depth int) if cell.apl > 0 { hph.accountFn(cell.apk[:cell.apl], cell) if hph.trace { - fmt.Printf("accountFn[%x] return balance=%d, nonce=%d\n", cell.apk[:cell.apl], &cell.Balance, cell.Nonce) + fmt.Printf("accountFn[%x] return balance=%d, nonce=%d code=%x\n", cell.apk[:cell.apl], &cell.Balance, cell.Nonce, cell.CodeHash[:]) } } if cell.spl > 0 { hph.storageFn(cell.spk[:cell.spl], cell) } if err = cell.deriveHashedKeys(depth, hph.keccak, hph.accountKeyLen); err != nil { - return err + return false, err } bitset ^= bit } - return nil + return true, nil } func (hph *HexPatriciaHashed) unfold(hashedKey []byte, unfolding int) error { @@ -879,7 +884,7 @@ func (hph *HexPatriciaHashed) unfold(hashedKey []byte, unfolding int) error { touched = hph.rootTouched present = hph.rootPresent if hph.trace { - fmt.Printf("root, touched %t, present %t\n", touched, present) + fmt.Printf("unfold root, touched %t, present %t, column %d\n", touched, present, col) } } else { upDepth = hph.depths[hph.activeRows-1] @@ -902,8 +907,11 @@ func (hph *HexPatriciaHashed) unfold(hashedKey []byte, unfolding int) error { hph.branchBefore[row] = false if upCell.downHashedLen == 0 { depth = upDepth + 1 - if err := hph.unfoldBranchNode(row, touched && !present /* deleted */, depth); err != nil { + if unfolded, err := hph.unfoldBranchNode(row, touched && !present /* deleted */, depth); err != nil { return err + } else if !unfolded { + // Return here to prevent activeRow from being incremented + return nil } } else if upCell.downHashedLen >= unfolding { depth = upDepth + unfolding @@ -1086,8 +1094,8 @@ func (hph *HexPatriciaHashed) fold() (branchData BranchData, updateKey []byte, e } hph.keccak2.Reset() - pt := rlp.GenerateStructLen(hph.auxBuffer[:], totalBranchLen) - if _, err := hph.keccak2.Write(hph.auxBuffer[:pt]); err != nil { + pt := rlp.GenerateStructLen(hph.hashAuxBuffer[:], totalBranchLen) + if _, err := hph.keccak2.Write(hph.hashAuxBuffer[:pt]); err != nil { return nil, nil, err } @@ -1103,7 +1111,7 @@ func (hph *HexPatriciaHashed) fold() (branchData BranchData, updateKey []byte, e return nil, nil } cell := &hph.grid[row][nibble] - cellHash, err := hph.computeCellHash(cell, depth, hph.auxBuffer[:0]) + cellHash, err := hph.computeCellHash(cell, depth, hph.hashAuxBuffer[:0]) if err != nil { return nil, err } @@ -1133,8 +1141,10 @@ func (hph *HexPatriciaHashed) fold() (branchData BranchData, updateKey []byte, e } } upCell.extLen = depth - upDepth - 1 + upCell.downHashedLen = upCell.extLen if upCell.extLen > 0 { copy(upCell.extension[:], hph.currentKey[upDepth:hph.currentKeyLen]) + copy(upCell.downHashedKey[:], hph.currentKey[upDepth:hph.currentKeyLen]) } if depth < 64 { upCell.apl = 0 @@ -1162,30 +1172,6 @@ func (hph *HexPatriciaHashed) fold() (branchData BranchData, updateKey []byte, e return branchData, updateKey, nil } -func (hph *HexPatriciaHashed) foldRoot() (BranchData, error) { - if hph.trace { - fmt.Printf("foldRoot: activeRows: %d\n", hph.activeRows) - } - if hph.activeRows != 0 { - return nil, fmt.Errorf("cannot fold root - there are still active rows: %d", hph.activeRows) - } - if hph.root.downHashedLen == 0 { - // Not overwrite previous branch node - return nil, nil - } - - rootGetter := func(_ int, _ bool) (*Cell, error) { - _, err := hph.RootHash() - if err != nil { - return nil, fmt.Errorf("folding root failed: %w", err) - } - return &hph.root, nil - } - - branchData, _, err := EncodeBranch(1, 1, 1, rootGetter) - return branchData, err -} - func (hph *HexPatriciaHashed) deleteCell(hashedKey []byte) { if hph.trace { fmt.Printf("deleteCell, activeRows = %d\n", hph.activeRows) @@ -1225,20 +1211,22 @@ func (hph *HexPatriciaHashed) deleteCell(hashedKey []byte) { cell.Nonce = 0 } -func (hph *HexPatriciaHashed) updateCell(hashedKey []byte) *Cell { +func (hph *HexPatriciaHashed) updateCell(plainKey, hashedKey []byte) *Cell { var cell *Cell var col, depth int if hph.activeRows == 0 { - hph.activeRows++ - } - row := hph.activeRows - 1 - depth = hph.depths[row] - col = int(hashedKey[hph.currentKeyLen]) - cell = &hph.grid[row][col] - hph.touchMap[row] |= (uint16(1) << col) - hph.afterMap[row] |= (uint16(1) << col) - if hph.trace { - fmt.Printf("updateAccount setting (%d, %x), depth=%d\n", row, col, depth) + cell = &hph.root + hph.rootTouched, hph.rootPresent = true, true + } else { + row := hph.activeRows - 1 + depth = hph.depths[row] + col = int(hashedKey[hph.currentKeyLen]) + cell = &hph.grid[row][col] + hph.touchMap[row] |= (uint16(1) << col) + hph.afterMap[row] |= (uint16(1) << col) + if hph.trace { + fmt.Printf("updateCell setting (%d, %x), depth=%d\n", row, col, depth) + } } if cell.downHashedLen == 0 { copy(cell.downHashedKey[:], hashedKey[depth:]) @@ -1251,11 +1239,20 @@ func (hph *HexPatriciaHashed) updateCell(hashedKey []byte) *Cell { fmt.Printf("left downHasheKey=[%x]\n", cell.downHashedKey[:cell.downHashedLen]) } } + if len(hashedKey) == 2*length.Hash { + // account + cell.apl = len(plainKey) + copy(cell.apk[:], plainKey) + //copy(cell.CodeHash[:], EmptyCodeHash) + } else { + cell.spl = len(plainKey) + copy(cell.spk[:], plainKey) + } return cell } func (hph *HexPatriciaHashed) RootHash() ([]byte, error) { - hash, err := hph.computeCellHash(&hph.root, 0, hph.auxBuffer[:0]) + hash, err := hph.computeCellHash(&hph.root, 0, nil) if err != nil { return nil, err } @@ -1288,36 +1285,34 @@ func (hph *HexPatriciaHashed) ReviewKeys(plainKeys, hashedKeys [][]byte) (rootHa // Update the cell stagedCell.fillEmpty() - var deleteCell bool if len(plainKey) == hph.accountKeyLen { if err := hph.accountFn(plainKey, stagedCell); err != nil { return nil, nil, fmt.Errorf("accountFn for key %x failed: %w", plainKey, err) } - if stagedCell.isEmpty() { - deleteCell = true - } else { - cell := hph.updateCell(hashedKey) - cell.setAccountFields(plainKey, stagedCell.CodeHash[:], &stagedCell.Balance, stagedCell.Nonce) + if !stagedCell.Delete { + cell := hph.updateCell(plainKey, hashedKey) + cell.setAccountFields(stagedCell.CodeHash[:], &stagedCell.Balance, stagedCell.Nonce) if hph.trace { - fmt.Printf("accountFn filled cell plainKey: %x balance: %v nonce: %v codeHash: %x\n", cell.apk, cell.Balance.String(), cell.Nonce, cell.CodeHash) + fmt.Printf("accountFn reading key %x => balance=%v nonce=%v codeHash=%x\n", cell.apk, cell.Balance.Uint64(), cell.Nonce, cell.CodeHash) } } } else { if err = hph.storageFn(plainKey, stagedCell); err != nil { return nil, nil, fmt.Errorf("storageFn for key %x failed: %w", plainKey, err) } - if hph.trace { - fmt.Printf("storageFn filled %x : %x\n", plainKey, stagedCell.Storage) - } - if stagedCell.StorageLen == 0 { - deleteCell = true - } else { - hph.updateCell(hashedKey).setStorage(plainKey, stagedCell.Storage[:stagedCell.StorageLen]) + if !stagedCell.Delete { + hph.updateCell(plainKey, hashedKey).setStorage(stagedCell.Storage[:stagedCell.StorageLen]) + if hph.trace { + fmt.Printf("storageFn reading key %x => %x\n", plainKey, stagedCell.Storage[:stagedCell.StorageLen]) + } } } - if deleteCell { + if stagedCell.Delete { + if hph.trace { + fmt.Printf("delete cell %x hash %x\n", plainKey, hashedKey) + } hph.deleteCell(hashedKey) } } @@ -1329,11 +1324,6 @@ func (hph *HexPatriciaHashed) ReviewKeys(plainKeys, hashedKeys [][]byte) (rootHa branchNodeUpdates[string(updateKey)] = branchData } } - if branchData, err := hph.foldRoot(); err != nil { - return nil, nil, fmt.Errorf("foldRoot: %w", err) - } else if branchData != nil { - branchNodeUpdates[string(hexToCompact([]byte{}))] = branchData - } rootHash, err = hph.RootHash() if err != nil { @@ -1372,6 +1362,176 @@ func (hph *HexPatriciaHashed) ResetFns( hph.storageFn = storageFn } +type stateRootFlag int8 + +var ( + stateRootPresent stateRootFlag = 1 + stateRootChecked stateRootFlag = 2 + stateRootTouched stateRootFlag = 4 +) + +func (s *state) Encode(buf []byte) ([]byte, error) { + var rootFlags stateRootFlag + if s.RootPresent { + rootFlags |= stateRootPresent + } + if s.RootChecked { + rootFlags |= stateRootChecked + } + if s.RootTouched { + rootFlags |= stateRootTouched + } + + ee := bytes.NewBuffer(buf) + if err := binary.Write(ee, binary.BigEndian, s.CurrentKeyLen); err != nil { + return nil, fmt.Errorf("encode currentKeyLen: %w", err) + } + if err := binary.Write(ee, binary.BigEndian, int8(rootFlags)); err != nil { + return nil, fmt.Errorf("encode rootFlags: %w", err) + } + if err := binary.Write(ee, binary.BigEndian, s.RootHash); err != nil { + return nil, fmt.Errorf("encode rootHash: %w", err) + } + if err := binary.Write(ee, binary.BigEndian, s.CurrentKey); err != nil { + return nil, fmt.Errorf("encode currentKey: %w", err) + } + d := make([]byte, len(s.Depths)) + for i := 0; i < len(s.Depths); i++ { + d[i] = byte(s.Depths[i]) + } + if err := binary.Write(ee, binary.BigEndian, d); err != nil { + return nil, fmt.Errorf("encode depths: %w", err) + } + if err := binary.Write(ee, binary.BigEndian, s.TouchMap); err != nil { + return nil, fmt.Errorf("encode touchMap: %w", err) + } + if err := binary.Write(ee, binary.BigEndian, s.AfterMap); err != nil { + return nil, fmt.Errorf("encode afterMap: %w", err) + } + + var before1, before2 uint64 + for i := 0; i < 64; i++ { + if s.BranchBefore[i] { + before1 |= 1 << i + } + } + for i, j := 64, 0; i < 128; i, j = i+1, j+1 { + if s.BranchBefore[i] { + before2 |= 1 << j + } + } + if err := binary.Write(ee, binary.BigEndian, before1); err != nil { + return nil, fmt.Errorf("encode branchBefore_1: %w", err) + } + if err := binary.Write(ee, binary.BigEndian, before2); err != nil { + return nil, fmt.Errorf("encode branchBefore_2: %w", err) + } + return ee.Bytes(), nil +} + +func (s *state) Decode(buf []byte) error { + aux := bytes.NewBuffer(buf) + if err := binary.Read(aux, binary.BigEndian, &s.CurrentKeyLen); err != nil { + return fmt.Errorf("currentKeyLen: %w", err) + } + var rootFlags stateRootFlag + if err := binary.Read(aux, binary.BigEndian, &rootFlags); err != nil { + return fmt.Errorf("rootFlags: %w", err) + } + + if rootFlags&stateRootPresent != 0 { + s.RootPresent = true + } + if rootFlags&stateRootTouched != 0 { + s.RootTouched = true + } + if rootFlags&stateRootChecked != 0 { + s.RootChecked = true + } + if err := binary.Read(aux, binary.BigEndian, &s.RootHash); err != nil { + return fmt.Errorf("rootHash: %w", err) + } + if err := binary.Read(aux, binary.BigEndian, &s.CurrentKey); err != nil { + return fmt.Errorf("currentKey: %w", err) + } + d := make([]byte, len(s.Depths)) + if err := binary.Read(aux, binary.BigEndian, &d); err != nil { + return fmt.Errorf("depths: %w", err) + } + for i := 0; i < len(s.Depths); i++ { + s.Depths[i] = int(d[i]) + } + if err := binary.Read(aux, binary.BigEndian, &s.TouchMap); err != nil { + return fmt.Errorf("touchMap: %w", err) + } + if err := binary.Read(aux, binary.BigEndian, &s.AfterMap); err != nil { + return fmt.Errorf("afterMap: %w", err) + } + var branch1, branch2 uint64 + if err := binary.Read(aux, binary.BigEndian, &branch1); err != nil { + return fmt.Errorf("branchBefore1: %w", err) + } + if err := binary.Read(aux, binary.BigEndian, &branch2); err != nil { + return fmt.Errorf("branchBefore2: %w", err) + } + + for i := 0; i < 64; i++ { + if branch1&(1< 0; unfolding = hph.needUnfolding(hashedKey) { + if err := hph.unfold(hashedKey, unfolding); err != nil { + return nil, fmt.Errorf("unfold: %w", err) + } + } + + // Update the cell + if update.Flags == DELETE_UPDATE { + hph.deleteCell(hashedKey) + } else { + cell := hph.updateCell(plainKey, hashedKey) + if update.Flags&BALANCE_UPDATE != 0 { + cell.Balance.Set(&update.Balance) + } + if update.Flags&NONCE_UPDATE != 0 { + cell.Nonce = update.Nonce + } + if update.Flags&CODE_UPDATE != 0 { + copy(cell.CodeHash[:], update.CodeHashOrStorage[:]) + } + if update.Flags&STORAGE_UPDATE != 0 { + cell.setStorage(update.CodeHashOrStorage[:update.ValLength]) + } + } + return branchNodeUpdates, nil +} + +func (hph *HexPatriciaHashed) Commit() ([]byte, map[string]BranchData, error) { + branchNodeUpdates := make(map[string]BranchData) + // Folding everything up to the root + for hph.activeRows > 0 { + if branchData, updateKey, err := hph.fold(); err != nil { + return nil, nil, fmt.Errorf("final fold: %w", err) + } else if branchData != nil { + branchNodeUpdates[string(updateKey)] = branchData + } + } + + rootHash, err := hph.RootHash() + if err != nil { + return nil, nil, err + } + return rootHash, branchNodeUpdates, nil +} + +func (hph *HexPatriciaHashed) ProcessUpdates(plainKeys, hashedKeys [][]byte, updates []Update) (rootHash []byte, branchNodeUpdates map[string]BranchData, err error) { + branchNodeUpdates = make(map[string]BranchData) + + for i, plainKey := range plainKeys { + hashedKey := hashedKeys[i] + if hph.trace { + fmt.Printf("plainKey=[%x], hashedKey=[%x], currentKey=[%x]\n", plainKey, hashedKey, hph.currentKey[:hph.currentKeyLen]) + } + // Keep folding until the currentKey is the prefix of the key we modify + for hph.needFolding(hashedKey) { + if branchData, updateKey, err := hph.fold(); err != nil { + return nil, nil, fmt.Errorf("fold: %w", err) + } else if branchData != nil { + branchNodeUpdates[string(updateKey)] = branchData + } + } + // Now unfold until we step on an empty cell + for unfolding := hph.needUnfolding(hashedKey); unfolding > 0; unfolding = hph.needUnfolding(hashedKey) { + if err := hph.unfold(hashedKey, unfolding); err != nil { + return nil, nil, fmt.Errorf("unfold: %w", err) + } + } + + update := updates[i] + // Update the cell + if update.Flags == DELETE_UPDATE { + hph.deleteCell(hashedKey) + if hph.trace { + fmt.Printf("key %x deleted\n", plainKey) + } + } else { + cell := hph.updateCell(plainKey, hashedKey) + if hph.trace { + fmt.Printf("accountFn updated key %x =>", plainKey) + } + if update.Flags&BALANCE_UPDATE != 0 { + if hph.trace { + fmt.Printf(" balance=%d", update.Balance.Uint64()) + } + cell.Balance.Set(&update.Balance) + } + if update.Flags&NONCE_UPDATE != 0 { + if hph.trace { + fmt.Printf(" nonce=%d", update.Nonce) + } + cell.Nonce = update.Nonce + } + if update.Flags&CODE_UPDATE != 0 { + if hph.trace { + fmt.Printf(" codeHash=%x", update.CodeHashOrStorage) + } + copy(cell.CodeHash[:], update.CodeHashOrStorage[:]) + } + if hph.trace { + fmt.Printf("\n") + } + if update.Flags&STORAGE_UPDATE != 0 { + cell.setStorage(update.CodeHashOrStorage[:update.ValLength]) + if hph.trace { + fmt.Printf("\rstorageFn filled key %x => %x\n", plainKey, update.CodeHashOrStorage[:update.ValLength]) + } + } + } + } + // Folding everything up to the root + for hph.activeRows > 0 { + if branchData, updateKey, err := hph.fold(); err != nil { + return nil, nil, fmt.Errorf("final fold: %w", err) + } else if branchData != nil { + branchNodeUpdates[string(updateKey)] = branchData + } + } + + rootHash, err = hph.RootHash() + if err != nil { + return nil, branchNodeUpdates, fmt.Errorf("root hash evaluation failed: %w", err) + } + return rootHash, branchNodeUpdates, nil +} + +//nolint +func (hph *HexPatriciaHashed) hashAndNibblizeKey(key []byte) []byte { + hashedKey := make([]byte, length.Hash) + + hph.keccak.Reset() + hph.keccak.Write(key[:length.Addr]) + copy(hashedKey[:length.Hash], hph.keccak.Sum(nil)) + + if len(key[length.Addr:]) > 0 { + hashedKey = append(hashedKey, make([]byte, length.Hash)...) + hph.keccak.Reset() + hph.keccak.Write(key[length.Addr:]) + copy(hashedKey[length.Hash:], hph.keccak.Sum(nil)) + } + + nibblized := make([]byte, len(hashedKey)*2) + for i, b := range hashedKey { + nibblized[i*2] = (b >> 4) & 0xf + nibblized[i*2+1] = b & 0xf + } + return nibblized +} + +type UpdateFlags uint8 + +const ( + CODE_UPDATE UpdateFlags = 1 + DELETE_UPDATE UpdateFlags = 2 + BALANCE_UPDATE UpdateFlags = 4 + NONCE_UPDATE UpdateFlags = 8 + STORAGE_UPDATE UpdateFlags = 16 +) + +func (uf UpdateFlags) String() string { + var sb strings.Builder + if uf == DELETE_UPDATE { + sb.WriteString("Delete") + } else { + if uf&BALANCE_UPDATE != 0 { + sb.WriteString("+Balance") + } + if uf&NONCE_UPDATE != 0 { + sb.WriteString("+Nonce") + } + if uf&CODE_UPDATE != 0 { + sb.WriteString("+Code") + } + if uf&STORAGE_UPDATE != 0 { + sb.WriteString("+Storage") + } + } + return sb.String() +} + +type Update struct { + Flags UpdateFlags + Balance uint256.Int + Nonce uint64 + CodeHashOrStorage [length.Hash]byte + ValLength int +} + +func (u *Update) DecodeForStorage(enc []byte) { + u.Nonce = 0 + u.Balance.Clear() + copy(u.CodeHashOrStorage[:], EmptyCodeHash) + + pos := 0 + nonceBytes := int(enc[pos]) + pos++ + if nonceBytes > 0 { + u.Nonce = bytesToUint64(enc[pos : pos+nonceBytes]) + pos += nonceBytes + } + balanceBytes := int(enc[pos]) + pos++ + if balanceBytes > 0 { + u.Balance.SetBytes(enc[pos : pos+balanceBytes]) + pos += balanceBytes + } + codeHashBytes := int(enc[pos]) + pos++ + if codeHashBytes > 0 { + copy(u.CodeHashOrStorage[:], enc[pos:pos+codeHashBytes]) + } +} + +func (u *Update) Encode(buf []byte, numBuf []byte) []byte { + buf = append(buf, byte(u.Flags)) + if u.Flags&BALANCE_UPDATE != 0 { + buf = append(buf, byte(u.Balance.ByteLen())) + buf = append(buf, u.Balance.Bytes()...) + } + if u.Flags&NONCE_UPDATE != 0 { + n := binary.PutUvarint(numBuf, u.Nonce) + buf = append(buf, numBuf[:n]...) + } + if u.Flags&CODE_UPDATE != 0 { + buf = append(buf, u.CodeHashOrStorage[:]...) + } + if u.Flags&STORAGE_UPDATE != 0 { + n := binary.PutUvarint(numBuf, uint64(u.ValLength)) + buf = append(buf, numBuf[:n]...) + if u.ValLength > 0 { + buf = append(buf, u.CodeHashOrStorage[:u.ValLength]...) + } + } + return buf +} + +func (u *Update) Decode(buf []byte, pos int) (int, error) { + if len(buf) < pos+1 { + return 0, fmt.Errorf("decode Update: buffer too small for flags") + } + u.Flags = UpdateFlags(buf[pos]) + pos++ + if u.Flags&BALANCE_UPDATE != 0 { + if len(buf) < pos+1 { + return 0, fmt.Errorf("decode Update: buffer too small for balance len") + } + balanceLen := int(buf[pos]) + pos++ + if len(buf) < pos+balanceLen { + return 0, fmt.Errorf("decode Update: buffer too small for balance") + } + u.Balance.SetBytes(buf[pos : pos+balanceLen]) + pos += balanceLen + } + if u.Flags&NONCE_UPDATE != 0 { + var n int + u.Nonce, n = binary.Uvarint(buf[pos:]) + if n == 0 { + return 0, fmt.Errorf("decode Update: buffer too small for nonce") + } + if n < 0 { + return 0, fmt.Errorf("decode Update: nonce overflow") + } + pos += n + } + if u.Flags&CODE_UPDATE != 0 { + if len(buf) < pos+32 { + return 0, fmt.Errorf("decode Update: buffer too small for codeHash") + } + copy(u.CodeHashOrStorage[:], buf[pos:pos+32]) + pos += 32 + } + if u.Flags&STORAGE_UPDATE != 0 { + l, n := binary.Uvarint(buf[pos:]) + if n == 0 { + return 0, fmt.Errorf("decode Update: buffer too small for storage len") + } + if n < 0 { + return 0, fmt.Errorf("decode Update: storage lee overflow") + } + pos += n + if len(buf) < pos+int(l) { + return 0, fmt.Errorf("decode Update: buffer too small for storage") + } + u.ValLength = int(l) + copy(u.CodeHashOrStorage[:], buf[pos:pos+int(l)]) + pos += int(l) + } + return pos, nil +} + +func (u *Update) String() string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("Flags: [%s]", u.Flags)) + if u.Flags&BALANCE_UPDATE != 0 { + sb.WriteString(fmt.Sprintf(", Balance: [%d]", &u.Balance)) + } + if u.Flags&NONCE_UPDATE != 0 { + sb.WriteString(fmt.Sprintf(", Nonce: [%d]", u.Nonce)) + } + if u.Flags&CODE_UPDATE != 0 { + sb.WriteString(fmt.Sprintf(", CodeHash: [%x]", u.CodeHashOrStorage)) + } + if u.Flags&STORAGE_UPDATE != 0 { + sb.WriteString(fmt.Sprintf(", Storage: [%x]", u.CodeHashOrStorage[:u.ValLength])) + } + return sb.String() +} diff --git a/commitment/hex_patricia_hashed_fuzz_test.go b/commitment/hex_patricia_hashed_fuzz_test.go index 94d3d0e2e..53432f83f 100644 --- a/commitment/hex_patricia_hashed_fuzz_test.go +++ b/commitment/hex_patricia_hashed_fuzz_test.go @@ -4,12 +4,15 @@ package commitment import ( "bytes" + "encoding/binary" "encoding/hex" "math/rand" "testing" - "github.com/ledgerwatch/erigon-lib/common/length" "github.com/stretchr/testify/require" + "golang.org/x/crypto/sha3" + + "github.com/ledgerwatch/erigon-lib/common/length" ) // go test -trimpath -v -fuzz=Fuzz_ProcessUpdate$ -fuzztime=300s ./commitment @@ -73,35 +76,66 @@ func Fuzz_ProcessUpdate(f *testing.F) { // go test -trimpath -v -fuzz=Fuzz_ProcessUpdates_ArbitraryUpdateCount -fuzztime=300s ./commitment func Fuzz_ProcessUpdates_ArbitraryUpdateCount(f *testing.F) { - ha, _ := hex.DecodeString("83a93c6ddd2660654f34d55f5deead039a4ac4853528b894383f646193852ddb078e00fbcb52d82bb791edddb1cffee89e599b5b45bb60f04b6c5c276635570c12e31d882f333b6beab06c11e603881b0c68788beca64fcc9185fb2823da72151d077192d321d83df17d49f2e37f2f69e43b147bc7bd8c3ae7ea161b7c9e81c5a540f37158e79f3d503813a32374abb0f94ad7d8ddca63bfd427e8570b64bb6e0b255e344f2e2849c623d6690c2d6ea66d90818e3169297acc58177cb3b8fae48852883b2850c7a48f4b0eea3ccc4c04e6cb6025e9e8f7db2589c7dae81517c514790cfd6f668903161349e") + ha, _ := hex.DecodeString("0008852883b2850c7a48f4b0eea3ccc4c04e6cb6025e9e8f7db2589c7dae81517c514790cfd6f668903161349e") f.Add(ha) f.Fuzz(func(t *testing.T, build []byte) { - keyMap := make(map[string]uint64) - i := 0 - for i < len(build) { - keyLen := int(build[i]>>4) + 1 - valLen := int(build[i]&15) + 1 - i++ - var key []byte - var val uint64 - for keyLen > 0 && i < len(build) { - key = append(key, build[i]) - i++ - keyLen-- - } - for valLen > 0 && i < len(build) { - val += uint64(build[i]) - i++ - valLen-- - } - keyMap[hex.EncodeToString(key)] = val + if len(build) < 12 { + f.SkipNow() } + i := 0 + keysCount := binary.BigEndian.Uint32(build[i : i+4]) + i += 4 + ks := binary.BigEndian.Uint32(build[i : i+4]) + keysSeed := rand.New(rand.NewSource(int64(ks))) + i += 4 + us := binary.BigEndian.Uint32(build[i : i+4]) + updateSeed := rand.New(rand.NewSource(int64(us))) + + t.Logf("fuzzing %d keys keysSeed=%d updateSeed=%d", keysCount, ks, us) builder := NewUpdateBuilder() - for account, balance := range keyMap { - builder.Balance(account, balance) + for k := uint32(0); k < keysCount; k++ { + var key [length.Addr]byte + n, err := keysSeed.Read(key[:]) + pkey := hex.EncodeToString(key[:]) + require.NoError(t, err) + require.EqualValues(t, length.Addr, n) + + aux := make([]byte, 32) + + flg := UpdateFlags(updateSeed.Intn(int(CODE_UPDATE | DELETE_UPDATE | STORAGE_UPDATE | NONCE_UPDATE | BALANCE_UPDATE))) + switch { + case flg&BALANCE_UPDATE != 0: + builder.Balance(pkey, updateSeed.Uint64()).Nonce(pkey, updateSeed.Uint64()) + continue + case flg&CODE_UPDATE != 0: + keccak := sha3.NewLegacyKeccak256().(keccakState) + var s [8]byte + n, err := updateSeed.Read(s[:]) + require.NoError(t, err) + require.EqualValues(t, len(s), n) + keccak.Write(s[:]) + keccak.Read(aux) + + builder.CodeHash(pkey, hex.EncodeToString(aux)) + continue + case flg&STORAGE_UPDATE != 0: + sz := updateSeed.Intn(length.Hash) + n, err = updateSeed.Read(aux[:sz]) + require.NoError(t, err) + require.EqualValues(t, sz, n) + + loc := make([]byte, updateSeed.Intn(length.Hash-1)+1) + keysSeed.Read(loc) + builder.Storage(pkey, hex.EncodeToString(loc), hex.EncodeToString(aux[:sz])) + continue + case flg&DELETE_UPDATE != 0: + continue + default: + continue + } } ms := NewMockState(t) @@ -109,35 +143,29 @@ func Fuzz_ProcessUpdates_ArbitraryUpdateCount(f *testing.F) { hph := NewHexPatriciaHashed(20, ms.branchFn, ms.accountFn, ms.storageFn) hphAnother := NewHexPatriciaHashed(20, ms2.branchFn, ms2.accountFn, ms2.storageFn) + plainKeys, hashedKeys, updates := builder.Build() + hph.SetTrace(false) hphAnother.SetTrace(false) - plainKeys, hashedKeys, updates := builder.Build() - if err := ms.applyPlainUpdates(plainKeys, updates); err != nil { - t.Fatal(err) - } + err := ms.applyPlainUpdates(plainKeys, updates) + require.NoError(t, err) - rootHash, branchNodeUpdates, err := hph.ReviewKeys(plainKeys, hashedKeys) - if err != nil { - t.Fatal(err) - } + rootHashReview, branchNodeUpdates, err := hph.ReviewKeys(plainKeys, hashedKeys) + require.NoError(t, err) ms.applyBranchNodeUpdates(branchNodeUpdates) - if len(rootHash) != 32 { - t.Fatalf("invalid root hash length: expected 32 bytes, got %v", len(rootHash)) - } + require.Len(t, rootHashReview, length.Hash, "invalid root hash length") - rootHashAnother, _, err := hphAnother.ReviewKeys(plainKeys, hashedKeys) - if err != nil { - t.Fatal(err) - } + err = ms2.applyPlainUpdates(plainKeys, updates) + require.NoError(t, err) - if len(rootHashAnother) > 32 { - t.Fatalf("invalid root hash length: expected 32 bytes, got %v", len(rootHash)) - } - if !bytes.Equal(rootHash, rootHashAnother) { - t.Fatalf("invalid second root hash with same updates: [%v] != [%v]", hex.EncodeToString(rootHash), hex.EncodeToString(rootHashAnother)) - } + rootHashAnother, branchUpdatesAnother, err := hphAnother.ReviewKeys(plainKeys, hashedKeys) + require.NoError(t, err) + ms2.applyBranchNodeUpdates(branchUpdatesAnother) + + require.Len(t, rootHashAnother, length.Hash, "invalid root hash length") + require.EqualValues(t, rootHashReview, rootHashAnother, "storage-based and update-based rootHash mismatch") }) } diff --git a/commitment/hex_patricia_hashed_test.go b/commitment/hex_patricia_hashed_test.go index 9f45feb0c..d0d09c325 100644 --- a/commitment/hex_patricia_hashed_test.go +++ b/commitment/hex_patricia_hashed_test.go @@ -19,10 +19,12 @@ package commitment import ( "encoding/hex" "fmt" + "math/rand" "testing" - "github.com/ledgerwatch/erigon-lib/common/length" "github.com/stretchr/testify/require" + + "github.com/ledgerwatch/erigon-lib/common/length" ) func TestEmptyState(t *testing.T) { @@ -138,7 +140,7 @@ func Test_HexPatriciaHashed_ProcessUpdates_UniqueRepresentation(t *testing.T) { ms2 := NewMockState(t) plainKeys, hashedKeys, updates := NewUpdateBuilder(). - Balance("f4", 4). + Balance("f5", 4). Balance("ff", 900234). Balance("04", 1233). Storage("04", "01", "0401"). @@ -265,3 +267,52 @@ func Test_Sepolia(t *testing.T) { require.EqualValues(t, testData.expectedRoot, fmt.Sprintf("%x", rootHash)) } } + +func Test_HexPatriciaHashed_StateEncode(t *testing.T) { + //trie := NewHexPatriciaHashed(length.Hash, nil, nil, nil) + var s state + rnd := rand.New(rand.NewSource(42)) + n, err := rnd.Read(s.CurrentKey[:]) + require.NoError(t, err) + require.EqualValues(t, 128, n) + n, err = rnd.Read(s.RootHash[:]) + require.NoError(t, err) + require.EqualValues(t, 32, n) + s.RootPresent = true + s.RootTouched = true + s.RootChecked = true + + s.CurrentKeyLen = int8(rnd.Intn(129)) + for i := 0; i < len(s.Depths); i++ { + s.Depths[i] = rnd.Int() + } + for i := 0; i < len(s.TouchMap); i++ { + s.TouchMap[i] = uint16(rnd.Intn(1<<16 - 1)) + } + for i := 0; i < len(s.AfterMap); i++ { + s.AfterMap[i] = uint16(rnd.Intn(1<<16 - 1)) + } + for i := 0; i < len(s.BranchBefore); i++ { + if rnd.Intn(100) > 49 { + s.BranchBefore[i] = true + } + } + + enc, err := s.Encode(nil) + require.NoError(t, err) + require.NotEmpty(t, enc) + + var s1 state + err = s1.Decode(enc) + require.NoError(t, err) + + require.EqualValues(t, s.RootHash[:], s1.RootHash[:]) + require.EqualValues(t, s.CurrentKey[:], s1.CurrentKey[:]) + require.EqualValues(t, s.AfterMap[:], s1.AfterMap[:]) + require.EqualValues(t, s.TouchMap[:], s1.TouchMap[:]) + require.EqualValues(t, s.BranchBefore[:], s1.BranchBefore[:]) + require.EqualValues(t, s.RootTouched, s1.RootTouched) + require.EqualValues(t, s.RootPresent, s1.RootPresent) + require.EqualValues(t, s.RootChecked, s1.RootChecked) + require.EqualValues(t, s.CurrentKeyLen, s1.CurrentKeyLen) +} diff --git a/commitment/patricia_state_mock_test.go b/commitment/patricia_state_mock_test.go index 84d2400ac..97909e3d1 100644 --- a/commitment/patricia_state_mock_test.go +++ b/commitment/patricia_state_mock_test.go @@ -4,176 +4,16 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "strings" "testing" "github.com/holiman/uint256" - "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/common/length" "golang.org/x/crypto/sha3" "golang.org/x/exp/slices" -) -type UpdateFlags uint8 - -const ( - CODE_UPDATE UpdateFlags = 1 - DELETE_UPDATE UpdateFlags = 2 - BALANCE_UPDATE UpdateFlags = 4 - NONCE_UPDATE UpdateFlags = 8 - STORAGE_UPDATE UpdateFlags = 16 + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/length" ) -func (uf UpdateFlags) String() string { - var sb strings.Builder - if uf == DELETE_UPDATE { - sb.WriteString("Delete") - } else { - if uf&BALANCE_UPDATE != 0 { - sb.WriteString("+Balance") - } - if uf&NONCE_UPDATE != 0 { - sb.WriteString("+Nonce") - } - if uf&CODE_UPDATE != 0 { - sb.WriteString("+Code") - } - if uf&STORAGE_UPDATE != 0 { - sb.WriteString("+Storage") - } - } - return sb.String() -} - -type Update struct { - Flags UpdateFlags - Balance uint256.Int - Nonce uint64 - CodeHashOrStorage [length.Hash]byte - ValLength int -} - -func (u *Update) DecodeForStorage(enc []byte) { - u.Nonce = 0 - u.Balance.Clear() - copy(u.CodeHashOrStorage[:], EmptyCodeHash) - - pos := 0 - nonceBytes := int(enc[pos]) - pos++ - if nonceBytes > 0 { - u.Nonce = bytesToUint64(enc[pos : pos+nonceBytes]) - pos += nonceBytes - } - balanceBytes := int(enc[pos]) - pos++ - if balanceBytes > 0 { - u.Balance.SetBytes(enc[pos : pos+balanceBytes]) - pos += balanceBytes - } - codeHashBytes := int(enc[pos]) - pos++ - if codeHashBytes > 0 { - copy(u.CodeHashOrStorage[:], enc[pos:pos+codeHashBytes]) - } -} - -func (u Update) encode(buf []byte, numBuf []byte) []byte { - buf = append(buf, byte(u.Flags)) - if u.Flags&BALANCE_UPDATE != 0 { - buf = append(buf, byte(u.Balance.ByteLen())) - buf = append(buf, u.Balance.Bytes()...) - } - if u.Flags&NONCE_UPDATE != 0 { - n := binary.PutUvarint(numBuf, u.Nonce) - buf = append(buf, numBuf[:n]...) - } - if u.Flags&CODE_UPDATE != 0 { - buf = append(buf, u.CodeHashOrStorage[:]...) - } - if u.Flags&STORAGE_UPDATE != 0 { - n := binary.PutUvarint(numBuf, uint64(u.ValLength)) - buf = append(buf, numBuf[:n]...) - if u.ValLength > 0 { - buf = append(buf, u.CodeHashOrStorage[:u.ValLength]...) - } - } - return buf -} - -func (u *Update) decode(buf []byte, pos int) (int, error) { - if len(buf) < pos+1 { - return 0, fmt.Errorf("decode Update: buffer too small for flags") - } - u.Flags = UpdateFlags(buf[pos]) - pos++ - if u.Flags&BALANCE_UPDATE != 0 { - if len(buf) < pos+1 { - return 0, fmt.Errorf("decode Update: buffer too small for balance len") - } - balanceLen := int(buf[pos]) - pos++ - if len(buf) < pos+balanceLen { - return 0, fmt.Errorf("decode Update: buffer too small for balance") - } - u.Balance.SetBytes(buf[pos : pos+balanceLen]) - pos += balanceLen - } - if u.Flags&NONCE_UPDATE != 0 { - var n int - u.Nonce, n = binary.Uvarint(buf[pos:]) - if n == 0 { - return 0, fmt.Errorf("decode Update: buffer too small for nonce") - } - if n < 0 { - return 0, fmt.Errorf("decode Update: nonce overflow") - } - pos += n - } - if u.Flags&CODE_UPDATE != 0 { - if len(buf) < pos+32 { - return 0, fmt.Errorf("decode Update: buffer too small for codeHash") - } - copy(u.CodeHashOrStorage[:], buf[pos:pos+32]) - pos += 32 - } - if u.Flags&STORAGE_UPDATE != 0 { - l, n := binary.Uvarint(buf[pos:]) - if n == 0 { - return 0, fmt.Errorf("decode Update: buffer too small for storage len") - } - if n < 0 { - return 0, fmt.Errorf("decode Update: storage lee overflow") - } - pos += n - if len(buf) < pos+int(l) { - return 0, fmt.Errorf("decode Update: buffer too small for storage") - } - u.ValLength = int(l) - copy(u.CodeHashOrStorage[:], buf[pos:pos+int(l)]) - pos += int(l) - } - return pos, nil -} - -func (u Update) String() string { - var sb strings.Builder - sb.WriteString(fmt.Sprintf("Flags: [%s]", u.Flags)) - if u.Flags&BALANCE_UPDATE != 0 { - sb.WriteString(fmt.Sprintf(", Balance: [%d]", &u.Balance)) - } - if u.Flags&NONCE_UPDATE != 0 { - sb.WriteString(fmt.Sprintf(", Nonce: [%d]", u.Nonce)) - } - if u.Flags&CODE_UPDATE != 0 { - sb.WriteString(fmt.Sprintf(", CodeHash: [%x]", u.CodeHashOrStorage)) - } - if u.Flags&STORAGE_UPDATE != 0 { - sb.WriteString(fmt.Sprintf(", Storage: [%x]", u.CodeHashOrStorage[:u.ValLength])) - } - return sb.String() -} - // In memory commitment and state to use with the tests type MockState struct { t *testing.T @@ -199,13 +39,14 @@ func (ms MockState) branchFn(prefix []byte) ([]byte, error) { } func (ms MockState) accountFn(plainKey []byte, cell *Cell) error { - exBytes, ok := ms.sm[string(plainKey)] + exBytes, ok := ms.sm[string(plainKey[:])] if !ok { ms.t.Logf("accountFn not found key [%x]", plainKey) + cell.Delete = true return nil } var ex Update - pos, err := ex.decode(exBytes, 0) + pos, err := ex.Decode(exBytes, 0) if err != nil { ms.t.Fatalf("accountFn decode existing [%x], bytes: [%x]: %v", plainKey, exBytes, err) return nil @@ -241,13 +82,14 @@ func (ms MockState) accountFn(plainKey []byte, cell *Cell) error { } func (ms MockState) storageFn(plainKey []byte, cell *Cell) error { - exBytes, ok := ms.sm[string(plainKey)] + exBytes, ok := ms.sm[string(plainKey[:])] if !ok { ms.t.Logf("storageFn not found key [%x]", plainKey) + cell.Delete = true return nil } var ex Update - pos, err := ex.decode(exBytes, 0) + pos, err := ex.Decode(exBytes, 0) if err != nil { ms.t.Fatalf("storageFn decode existing [%x], bytes: [%x]: %v", plainKey, exBytes, err) return nil @@ -290,7 +132,7 @@ func (ms *MockState) applyPlainUpdates(plainKeys [][]byte, updates []Update) err } else { if exBytes, ok := ms.sm[string(key)]; ok { var ex Update - pos, err := ex.decode(exBytes, 0) + pos, err := ex.Decode(exBytes, 0) if err != nil { return fmt.Errorf("applyPlainUpdates decode existing [%x], bytes: [%x]: %w", key, exBytes, err) } @@ -313,9 +155,9 @@ func (ms *MockState) applyPlainUpdates(plainKeys [][]byte, updates []Update) err ex.Flags |= STORAGE_UPDATE copy(ex.CodeHashOrStorage[:], update.CodeHashOrStorage[:]) } - ms.sm[string(key)] = ex.encode(nil, ms.numBuf[:]) + ms.sm[string(key)] = ex.Encode(nil, ms.numBuf[:]) } else { - ms.sm[string(key)] = update.encode(nil, ms.numBuf[:]) + ms.sm[string(key)] = update.Encode(nil, ms.numBuf[:]) } } } diff --git a/go.mod b/go.mod index aebcd9b86..cfe7e0c6c 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/valyala/fastrand v1.1.0 // indirect github.com/valyala/histogram v1.2.0 // indirect - golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect + golang.org/x/net v0.0.0-20220607020251-c690dde0001d // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 350dc7f16..ca7f999e0 100644 --- a/go.sum +++ b/go.sum @@ -143,8 +143,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 h1:HVyaeDAYux4pnY+D/SiwmLOR36ewZ4iGQIIrtnuCjFA= -golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220607020251-c690dde0001d h1:4SFsTMi4UahlKoloni7L4eYzhFRifURQLw+yv0QDCx8= +golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/kv/tables.go b/kv/tables.go index e07e0ece8..ab4a152de 100644 --- a/kv/tables.go +++ b/kv/tables.go @@ -385,6 +385,13 @@ const ( CodeSettings = "CodeSettings" CodeIdx = "CodeIdx" + CommitmentKeys = "CommitmentKeys" + CommitmentVals = "CommitmentVals" + CommitmentHistoryKeys = "CommitmentHistoryKeys" + CommitmentHistoryVals = "CommitmentHistoryVals" + CommitmentSettings = "CommitmentSettings" + CommitmentIdx = "CommitmentIdx" + LogAddressKeys = "LogAddressKeys" LogAddressIdx = "LogAddressIdx" LogTopicsKeys = "LogTopicsKeys" @@ -519,6 +526,13 @@ var ChaindataTables = []string{ CodeSettings, CodeIdx, + CommitmentKeys, + CommitmentVals, + CommitmentHistoryKeys, + CommitmentHistoryVals, + CommitmentSettings, + CommitmentIdx, + LogAddressKeys, LogAddressIdx, LogTopicsKeys, @@ -625,29 +639,32 @@ var ChaindataTablesCfg = TableCfg{ }, CallTraceSet: {Flags: DupSort}, - AccountKeys: {Flags: DupSort}, - AccountHistoryKeys: {Flags: DupSort}, - AccountIdx: {Flags: DupSort}, - StorageKeys: {Flags: DupSort}, - StorageHistoryKeys: {Flags: DupSort}, - StorageIdx: {Flags: DupSort}, - CodeKeys: {Flags: DupSort}, - CodeHistoryKeys: {Flags: DupSort}, - CodeIdx: {Flags: DupSort}, - LogAddressKeys: {Flags: DupSort}, - LogAddressIdx: {Flags: DupSort}, - LogTopicsKeys: {Flags: DupSort}, - LogTopicsIdx: {Flags: DupSort}, - TracesFromKeys: {Flags: DupSort}, - TracesFromIdx: {Flags: DupSort}, - TracesToKeys: {Flags: DupSort}, - TracesToIdx: {Flags: DupSort}, - RAccountKeys: {Flags: DupSort}, - RAccountIdx: {Flags: DupSort}, - RStorageKeys: {Flags: DupSort}, - RStorageIdx: {Flags: DupSort}, - RCodeKeys: {Flags: DupSort}, - RCodeIdx: {Flags: DupSort}, + AccountKeys: {Flags: DupSort}, + AccountHistoryKeys: {Flags: DupSort}, + AccountIdx: {Flags: DupSort}, + StorageKeys: {Flags: DupSort}, + StorageHistoryKeys: {Flags: DupSort}, + StorageIdx: {Flags: DupSort}, + CodeKeys: {Flags: DupSort}, + CodeHistoryKeys: {Flags: DupSort}, + CodeIdx: {Flags: DupSort}, + CommitmentKeys: {Flags: DupSort}, + CommitmentHistoryKeys: {Flags: DupSort}, + CommitmentIdx: {Flags: DupSort}, + LogAddressKeys: {Flags: DupSort}, + LogAddressIdx: {Flags: DupSort}, + LogTopicsKeys: {Flags: DupSort}, + LogTopicsIdx: {Flags: DupSort}, + TracesFromKeys: {Flags: DupSort}, + TracesFromIdx: {Flags: DupSort}, + TracesToKeys: {Flags: DupSort}, + TracesToIdx: {Flags: DupSort}, + RAccountKeys: {Flags: DupSort}, + RAccountIdx: {Flags: DupSort}, + RStorageKeys: {Flags: DupSort}, + RStorageIdx: {Flags: DupSort}, + RCodeKeys: {Flags: DupSort}, + RCodeIdx: {Flags: DupSort}, } var TxpoolTablesCfg = TableCfg{} diff --git a/recsplit/eliasfano32/elias_fano.go b/recsplit/eliasfano32/elias_fano.go index 52e858f94..e077c6830 100644 --- a/recsplit/eliasfano32/elias_fano.go +++ b/recsplit/eliasfano32/elias_fano.go @@ -301,13 +301,15 @@ const maxDataSize = 0xFFFFFFFFFFFF // Read inputs the state of golomb rice encoding from a reader s func ReadEliasFano(r []byte) (*EliasFano, int) { - p := (*[maxDataSize / 8]uint64)(unsafe.Pointer(&r[16])) ef := &EliasFano{ count: binary.BigEndian.Uint64(r[:8]), u: binary.BigEndian.Uint64(r[8:16]), - data: p[:], + data: make([]uint64, len(r[16:])/8), // p := (*[maxDataSize / 8]uint64)(unsafe.Pointer(&r[16])) } ef.maxOffset = ef.u - 1 + for i, fi := 16, 0; i < len(r[16:]); i, fi = i+8, fi+1 { + ef.data[fi] = binary.LittleEndian.Uint64(r[i : i+8]) + } ef.deriveFields() return ef, 16 + 8*len(ef.data) } @@ -627,8 +629,11 @@ func (ef *DoubleEliasFano) Read(r []byte) int { ef.uPosition = binary.BigEndian.Uint64(r[16:24]) ef.cumKeysMinDelta = binary.BigEndian.Uint64(r[24:32]) ef.posMinDelta = binary.BigEndian.Uint64(r[32:40]) - p := (*[maxDataSize / 8]uint64)(unsafe.Pointer(&r[40])) - ef.data = p[:] + + ef.data = make([]uint64, len(r[40:])/8) + for i, fi := 16, 0; i < len(r[40:]); i, fi = i+8, fi+1 { + ef.data[fi] = binary.LittleEndian.Uint64(r[i : i+8]) + } ef.deriveFields() return 40 + 8*len(ef.data) } diff --git a/state/aggregator.go b/state/aggregator.go index b4f93e5db..841593456 100644 --- a/state/aggregator.go +++ b/state/aggregator.go @@ -17,23 +17,38 @@ package state import ( + "bytes" + "encoding/binary" "fmt" + "hash" + "os" "sync" + "sync/atomic" "github.com/RoaringBitmap/roaring/roaring64" + "github.com/google/btree" + "github.com/holiman/uint256" + "golang.org/x/crypto/sha3" + + "github.com/ledgerwatch/erigon-lib/commitment" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/length" "github.com/ledgerwatch/erigon-lib/kv" ) // Reconstruction of the aggregator in another package, `aggregator` type Aggregator struct { + commitFn func(txNum uint64) error aggregationStep uint64 accounts *Domain storage *Domain code *Domain - accountsHistory *History - storageHistory *History - codeHistory *History + commitment *Domain + stats FilesStats + commTree *btree.BTreeG[*CommitmentItem] + keccak hash.Hash + patriciaTrie *commitment.HexPatriciaHashed logAddrs *InvertedIndex logTopics *InvertedIndex tracesFrom *InvertedIndex @@ -48,6 +63,9 @@ func NewAggregator( ) (*Aggregator, error) { a := &Aggregator{ aggregationStep: aggregationStep, + patriciaTrie: commitment.NewHexPatriciaHashed(length.Addr, nil, nil, nil), + commTree: btree.NewG[*CommitmentItem](32, commitmentItemLess), + keccak: sha3.NewLegacyKeccak256(), } closeAgg := true defer func() { @@ -55,7 +73,10 @@ func NewAggregator( a.Close() } }() - var err error + err := os.MkdirAll(dir, 0764) + if err != nil { + return nil, err + } if a.accounts, err = NewDomain(dir, aggregationStep, "accounts", kv.AccountKeys, kv.AccountVals, kv.AccountHistoryKeys, kv.AccountHistoryVals, kv.AccountSettings, kv.AccountIdx, 0 /* prefixLen */, false /* compressVals */); err != nil { return nil, err } @@ -65,15 +86,14 @@ func NewAggregator( if a.code, err = NewDomain(dir, aggregationStep, "code", kv.CodeKeys, kv.CodeVals, kv.CodeHistoryKeys, kv.CodeHistoryVals, kv.CodeSettings, kv.CodeIdx, 0 /* prefixLen */, true /* compressVals */); err != nil { return nil, err } - if a.accountsHistory, err = NewHistory(dir, aggregationStep, "accounts", kv.AccountHistoryKeys, kv.AccountIdx, kv.AccountHistoryVals, kv.AccountSettings, false /* compressVals */); err != nil { - return nil, err - } - if a.storageHistory, err = NewHistory(dir, aggregationStep, "storage", kv.StorageHistoryKeys, kv.StorageIdx, kv.StorageHistoryVals, kv.StorageSettings, false /* compressVals */); err != nil { - return nil, err - } - if a.codeHistory, err = NewHistory(dir, aggregationStep, "code", kv.CodeHistoryKeys, kv.CodeIdx, kv.CodeHistoryVals, kv.CodeSettings, true /* compressVals */); err != nil { + if a.commitment, err = NewDomain(dir, aggregationStep, "commitment", kv.CommitmentKeys, kv.CommitmentVals, kv.CommitmentHistoryKeys, kv.CommitmentHistoryVals, kv.CommitmentSettings, kv.CommitmentIdx, 0 /* prefixLen */, false /* compressVals */); err != nil { return nil, err } + + //merge := func(a, b []byte) ([]byte, error) { + // return commitment.BranchData(a).MergeHexBranches(commitment.BranchData(b), nil) + //} + //a.commitment.SetValueMergeStrategy(merge) if a.logAddrs, err = NewInvertedIndex(dir, aggregationStep, "logaddrs", kv.LogAddressKeys, kv.LogAddressIdx); err != nil { return nil, err } @@ -95,6 +115,7 @@ func (a *Aggregator) GetAndResetStats() DomainStats { stats.Accumulate(a.accounts.GetAndResetStats()) stats.Accumulate(a.storage.GetAndResetStats()) stats.Accumulate(a.code.GetAndResetStats()) + stats.Accumulate(a.commitment.GetAndResetStats()) return stats } @@ -108,15 +129,10 @@ func (a *Aggregator) Close() { if a.code != nil { a.code.Close() } - if a.accountsHistory != nil { - a.accountsHistory.Close() - } - if a.storageHistory != nil { - a.storageHistory.Close() - } - if a.codeHistory != nil { - a.codeHistory.Close() + if a.commitment != nil { + a.commitment.Close() } + if a.logAddrs != nil { a.logAddrs.Close() } @@ -136,6 +152,7 @@ func (a *Aggregator) SetTx(tx kv.RwTx) { a.accounts.SetTx(tx) a.storage.SetTx(tx) a.code.SetTx(tx) + a.commitment.SetTx(tx) a.logAddrs.SetTx(tx) a.logTopics.SetTx(tx) a.tracesFrom.SetTx(tx) @@ -147,6 +164,7 @@ func (a *Aggregator) SetTxNum(txNum uint64) { a.accounts.SetTxNum(txNum) a.storage.SetTxNum(txNum) a.code.SetTxNum(txNum) + a.commitment.SetTxNum(txNum) a.logAddrs.SetTxNum(txNum) a.logTopics.SetTxNum(txNum) a.tracesFrom.SetTxNum(txNum) @@ -157,6 +175,7 @@ type AggCollation struct { accounts Collation storage Collation code Collation + commitment Collation logAddrs map[string]*roaring64.Bitmap logTopics map[string]*roaring64.Bitmap tracesFrom map[string]*roaring64.Bitmap @@ -167,6 +186,7 @@ func (c AggCollation) Close() { c.accounts.Close() c.storage.Close() c.code.Close() + c.commitment.Close() } func (a *Aggregator) collate(step uint64, txFrom, txTo uint64, roTx kv.Tx) (AggCollation, error) { @@ -187,6 +207,9 @@ func (a *Aggregator) collate(step uint64, txFrom, txTo uint64, roTx kv.Tx) (AggC if ac.code, err = a.code.collate(step, txFrom, txTo, roTx); err != nil { return AggCollation{}, err } + if ac.commitment, err = a.commitment.collate(step, txFrom, txTo, roTx); err != nil { + return AggCollation{}, err + } if ac.logAddrs, err = a.logAddrs.collate(txFrom, txTo, roTx); err != nil { return AggCollation{}, err } @@ -207,6 +230,7 @@ type AggStaticFiles struct { accounts StaticFiles storage StaticFiles code StaticFiles + commitment StaticFiles logAddrs InvertedFiles logTopics InvertedFiles tracesFrom InvertedFiles @@ -217,6 +241,7 @@ func (sf AggStaticFiles) Close() { sf.accounts.Close() sf.storage.Close() sf.code.Close() + sf.commitment.Close() sf.logAddrs.Close() sf.logTopics.Close() sf.tracesFrom.Close() @@ -232,8 +257,8 @@ func (a *Aggregator) buildFiles(step uint64, collation AggCollation) (AggStaticF } }() var wg sync.WaitGroup - wg.Add(7) - errCh := make(chan error, 7) + wg.Add(8) + errCh := make(chan error, 8) go func() { defer wg.Done() var err error @@ -255,6 +280,13 @@ func (a *Aggregator) buildFiles(step uint64, collation AggCollation) (AggStaticF errCh <- err } }() + go func() { + defer wg.Done() + var err error + if sf.commitment, err = a.commitment.buildFiles(step, collation.commitment); err != nil { + errCh <- err + } + }() go func() { defer wg.Done() var err error @@ -301,6 +333,7 @@ func (a *Aggregator) integrateFiles(sf AggStaticFiles, txNumFrom, txNumTo uint64 a.accounts.integrateFiles(sf.accounts, txNumFrom, txNumTo) a.storage.integrateFiles(sf.storage, txNumFrom, txNumTo) a.code.integrateFiles(sf.code, txNumFrom, txNumTo) + a.commitment.integrateFiles(sf.commitment, txNumFrom, txNumTo) a.logAddrs.integrateFiles(sf.logAddrs, txNumFrom, txNumTo) a.logTopics.integrateFiles(sf.logTopics, txNumFrom, txNumTo) a.tracesFrom.integrateFiles(sf.tracesFrom, txNumFrom, txNumTo) @@ -317,6 +350,9 @@ func (a *Aggregator) prune(step uint64, txFrom, txTo uint64) error { if err := a.code.prune(step, txFrom, txTo); err != nil { return err } + if err := a.commitment.prune(step, txFrom, txTo); err != nil { + return err + } if err := a.logAddrs.prune(txFrom, txTo); err != nil { return err } @@ -340,6 +376,9 @@ func (a *Aggregator) EndTxNumMinimax() uint64 { if txNum := a.code.endTxNumMinimax(); txNum < min { min = txNum } + if txNum := a.commitment.endTxNumMinimax(); txNum < min { + min = txNum + } if txNum := a.logAddrs.endTxNumMinimax(); txNum < min { min = txNum } @@ -359,6 +398,7 @@ type Ranges struct { accounts DomainRanges storage DomainRanges code DomainRanges + commitment DomainRanges logAddrsStartTxNum, logAddrsEndTxNum uint64 logAddrs bool logTopicsStartTxNum, logTopicsEndTxNum uint64 @@ -370,7 +410,7 @@ type Ranges struct { } func (r Ranges) any() bool { - return r.accounts.any() || r.storage.any() || r.code.any() || r.logAddrs || r.logTopics || r.tracesFrom || r.tracesTo + return r.accounts.any() || r.storage.any() || r.code.any() || r.commitment.any() || r.logAddrs || r.logTopics || r.tracesFrom || r.tracesTo } func (a *Aggregator) findMergeRange(maxEndTxNum, maxSpan uint64) Ranges { @@ -378,6 +418,7 @@ func (a *Aggregator) findMergeRange(maxEndTxNum, maxSpan uint64) Ranges { r.accounts = a.accounts.findMergeRange(maxEndTxNum, maxSpan) r.storage = a.storage.findMergeRange(maxEndTxNum, maxSpan) r.code = a.code.findMergeRange(maxEndTxNum, maxSpan) + r.commitment = a.commitment.findMergeRange(maxEndTxNum, maxSpan) r.logAddrs, r.logAddrsStartTxNum, r.logAddrsEndTxNum = a.logAddrs.findMergeRange(maxEndTxNum, maxSpan) r.logTopics, r.logTopicsStartTxNum, r.logTopicsEndTxNum = a.logTopics.findMergeRange(maxEndTxNum, maxSpan) r.tracesFrom, r.tracesFromStartTxNum, r.tracesFromEndTxNum = a.tracesFrom.findMergeRange(maxEndTxNum, maxSpan) @@ -387,23 +428,26 @@ func (a *Aggregator) findMergeRange(maxEndTxNum, maxSpan uint64) Ranges { } type SelectedStaticFiles struct { - accounts []*filesItem - accountsIdx, accountsHist []*filesItem - accountsI int - storage []*filesItem - storageIdx, storageHist []*filesItem - storageI int - code []*filesItem - codeIdx, codeHist []*filesItem - codeI int - logAddrs []*filesItem - logAddrsI int - logTopics []*filesItem - logTopicsI int - tracesFrom []*filesItem - tracesFromI int - tracesTo []*filesItem - tracesToI int + accounts []*filesItem + accountsIdx, accountsHist []*filesItem + accountsI int + storage []*filesItem + storageIdx, storageHist []*filesItem + storageI int + code []*filesItem + codeIdx, codeHist []*filesItem + codeI int + commitment []*filesItem + commitmentIdx, commitmentHist []*filesItem + commitmentI int + logAddrs []*filesItem + logAddrsI int + logTopics []*filesItem + logTopicsI int + tracesFrom []*filesItem + tracesFromI int + tracesTo []*filesItem + tracesToI int } func (sf SelectedStaticFiles) Close() { @@ -411,6 +455,7 @@ func (sf SelectedStaticFiles) Close() { sf.accounts, sf.accountsIdx, sf.accountsHist, sf.storage, sf.storageIdx, sf.storageHist, sf.code, sf.codeIdx, sf.codeHist, + sf.commitment, sf.commitmentIdx, sf.commitmentHist, sf.logAddrs, sf.logTopics, sf.tracesFrom, sf.tracesTo, } { for _, item := range group { @@ -437,6 +482,9 @@ func (a *Aggregator) staticFilesInRange(r Ranges) SelectedStaticFiles { if r.code.any() { sf.code, sf.codeIdx, sf.codeHist, sf.codeI = a.code.staticFilesInRange(r.code) } + if r.commitment.any() { + sf.commitment, sf.commitmentIdx, sf.commitmentHist, sf.commitmentI = a.commitment.staticFilesInRange(r.commitment) + } if r.logAddrs { sf.logAddrs, sf.logAddrsI = a.logAddrs.staticFilesInRange(r.logAddrsStartTxNum, r.logAddrsEndTxNum) } @@ -453,16 +501,18 @@ func (a *Aggregator) staticFilesInRange(r Ranges) SelectedStaticFiles { } type MergedFiles struct { - accounts *filesItem - accountsIdx, accountsHist *filesItem - storage *filesItem - storageIdx, storageHist *filesItem - code *filesItem - codeIdx, codeHist *filesItem - logAddrs *filesItem - logTopics *filesItem - tracesFrom *filesItem - tracesTo *filesItem + accounts *filesItem + accountsIdx, accountsHist *filesItem + storage *filesItem + storageIdx, storageHist *filesItem + code *filesItem + codeIdx, codeHist *filesItem + commitment *filesItem + commitmentIdx, commitmentHist *filesItem + logAddrs *filesItem + logTopics *filesItem + tracesFrom *filesItem + tracesTo *filesItem } func (mf MergedFiles) Close() { @@ -470,6 +520,7 @@ func (mf MergedFiles) Close() { mf.accounts, mf.accountsIdx, mf.accountsHist, mf.storage, mf.storageIdx, mf.storageHist, mf.code, mf.codeIdx, mf.codeHist, + mf.commitment, mf.commitmentIdx, mf.commitmentHist, mf.logAddrs, mf.logTopics, mf.tracesFrom, mf.tracesTo, } { if item != nil { @@ -492,8 +543,8 @@ func (a *Aggregator) mergeFiles(files SelectedStaticFiles, r Ranges, maxSpan uin } }() var wg sync.WaitGroup - wg.Add(7) - errCh := make(chan error, 7) + wg.Add(8) + errCh := make(chan error, 8) go func() { defer wg.Done() var err error @@ -521,6 +572,15 @@ func (a *Aggregator) mergeFiles(files SelectedStaticFiles, r Ranges, maxSpan uin } } }() + go func() { + defer wg.Done() + var err error + if r.commitment.any() { + if mf.commitment, mf.commitmentIdx, mf.commitmentHist, err = a.commitment.mergeFiles(files.commitment, files.commitmentIdx, files.commitmentHist, r.commitment, maxSpan); err != nil { + errCh <- err + } + } + }() go func() { defer wg.Done() var err error @@ -575,6 +635,7 @@ func (a *Aggregator) integrateMergedFiles(outs SelectedStaticFiles, in MergedFil a.accounts.integrateMergedFiles(outs.accounts, outs.accountsIdx, outs.accountsHist, in.accounts, in.accountsIdx, in.accountsHist) a.storage.integrateMergedFiles(outs.storage, outs.storageIdx, outs.storageHist, in.storage, in.storageIdx, in.storageHist) a.code.integrateMergedFiles(outs.code, outs.codeIdx, outs.codeHist, in.code, in.codeIdx, in.codeHist) + a.commitment.integrateMergedFiles(outs.commitment, outs.commitmentIdx, outs.commitmentHist, in.commitment, in.commitmentIdx, in.commitmentHist) a.logAddrs.integrateMergedFiles(outs.logAddrs, in.logAddrs) a.logTopics.integrateMergedFiles(outs.logTopics, in.logTopics) a.tracesFrom.integrateMergedFiles(outs.tracesFrom, in.tracesFrom) @@ -591,6 +652,9 @@ func (a *Aggregator) deleteFiles(outs SelectedStaticFiles) error { if err := a.code.deleteFiles(outs.code, outs.codeIdx, outs.codeHist); err != nil { return err } + if err := a.commitment.deleteFiles(outs.commitment, outs.commitmentIdx, outs.commitmentHist); err != nil { + return err + } if err := a.logAddrs.deleteFiles(outs.logAddrs); err != nil { return err } @@ -633,6 +697,14 @@ func (ac *AggregatorContext) ReadAccountCode(addr []byte, roTx kv.Tx) ([]byte, e return ac.code.Get(addr, nil, roTx) } +func (ac *AggregatorContext) ReadCommitment(addr []byte, roTx kv.Tx) ([]byte, error) { + return ac.commitment.Get(addr, nil, roTx) +} + +func (ac *AggregatorContext) ReadCommitmentBeforeTxNum(addr []byte, txNum uint64, roTx kv.Tx) ([]byte, error) { + return ac.commitment.GetBeforeTxNum(addr, txNum, roTx) +} + func (ac *AggregatorContext) ReadAccountCodeBeforeTxNum(addr []byte, txNum uint64, roTx kv.Tx) ([]byte, error) { return ac.code.GetBeforeTxNum(addr, txNum, roTx) } @@ -653,14 +725,350 @@ func (ac *AggregatorContext) ReadAccountCodeSizeBeforeTxNum(addr []byte, txNum u return len(code), nil } +func bytesToUint64(buf []byte) (x uint64) { + for i, b := range buf { + x = x<<8 + uint64(b) + if i == 7 { + return + } + } + return +} + +func (a *AggregatorContext) branchFn(prefix []byte) ([]byte, error) { + // Look in the summary table first + stateValue, err := a.ReadCommitment(prefix, a.a.rwTx) + if err != nil { + return nil, fmt.Errorf("failed read branch %x: %w", commitment.CompactedKeyToHex(prefix), err) + } + if stateValue == nil { + return nil, nil + } + // fmt.Printf("Returning branch data prefix [%x], mergeVal=[%x]\n", commitment.CompactedKeyToHex(prefix), stateValue) + return stateValue[2:], nil // Skip touchMap but keep afterMap +} + +func (a *AggregatorContext) accountFn(plainKey []byte, cell *commitment.Cell) error { + encAccount, err := a.ReadAccountData(plainKey, a.a.rwTx) + if err != nil { + return err + } + cell.Nonce = 0 + cell.Balance.Clear() + copy(cell.CodeHash[:], commitment.EmptyCodeHash) + if len(encAccount) > 0 { + nonce, balance, chash := DecodeAccountBytes(encAccount) + cell.Nonce = nonce + cell.Balance.Set(balance) + if chash != nil { + copy(cell.CodeHash[:], chash) + } + } + + code, err := a.ReadAccountCode(plainKey, a.a.rwTx) + if err != nil { + return err + } + if code != nil { + a.a.keccak.Reset() + a.a.keccak.Write(code) + copy(cell.CodeHash[:], a.a.keccak.Sum(nil)) + } + cell.Delete = len(encAccount) == 0 && len(code) == 0 + return nil +} + +func (a *AggregatorContext) storageFn(plainKey []byte, cell *commitment.Cell) error { + // Look in the summary table first + enc, err := a.ReadAccountStorage(plainKey[:length.Addr], plainKey[length.Addr:], a.a.rwTx) + if err != nil { + return err + } + cell.StorageLen = len(enc) + copy(cell.Storage[:], enc) + cell.Delete = cell.StorageLen == 0 + return nil +} + +var ( + keyCommitLatestTx = []byte("latesttx") + keyLatestTxInDB = []byte("dblasttx") +) + +func (a *Aggregator) SeekCommitment(txNum uint64) (uint64, error) { + if txNum == 0 { + return 0, nil + } + ctx := a.MakeContext() + latestTxNum := txNum + for { + a.SetTxNum(latestTxNum + 1) + latest, err := ctx.ReadCommitment(keyCommitLatestTx, a.rwTx) + if err != nil { + return 0, err + } + if len(latest) != 8 { + break + } + v := binary.BigEndian.Uint64(latest) + if v == latestTxNum { + break + } + latestTxNum = v + } + + a.SetTxNum(latestTxNum) + dblast, err := ctx.ReadCommitment(keyLatestTxInDB, a.rwTx) + if err != nil { + return 0, err + } + if len(dblast) == 8 { + v := binary.BigEndian.Uint64(dblast) + latestTxNum = v + } + + a.SetTxNum(latestTxNum) + + buf, err := ctx.ReadCommitment(makeCommitmentKey(latestTxNum), a.rwTx) + if err != nil { + return 0, err + } + if len(buf) == 0 { + return 0, fmt.Errorf("root state was not found") + } + if err := a.patriciaTrie.SetState(buf); err != nil { + return 0, err + } + + return latestTxNum, nil +} + +func makeCommitmentKey(txNum uint64) []byte { + var b [8]byte + binary.BigEndian.PutUint64(b[:], txNum) + return append([]byte("roothash"), b[:]...) +} + +// Evaluates commitment for processed state. Commit=true - store trie state after evaluation +func (a *Aggregator) ComputeCommitment(commit, trace bool) (rootHash []byte, err error) { + touchedKeys, hashedKeys, updates := a.touchedKeyList() + if len(touchedKeys) == 0 { + rootHash, err = a.patriciaTrie.RootHash() + if commit && err == nil { + state, err := a.patriciaTrie.EncodeCurrentState(nil, rootHash) + if err != nil { + return nil, err + } + if err = a.UpdateCommitmentData(makeCommitmentKey(a.txNum), state); err != nil { + return nil, err + } + var b [8]byte + binary.BigEndian.PutUint64(b[:], a.txNum) + if err = a.UpdateCommitmentData(keyCommitLatestTx, b[:]); err != nil { + return nil, err + } + } + return rootHash, err + } + + ctx := a.MakeContext() + a.patriciaTrie.Reset() + a.patriciaTrie.SetTrace(trace) + a.patriciaTrie.ResetFns(ctx.branchFn, ctx.accountFn, ctx.storageFn) + + rootHash, branchNodeUpdates, err := a.patriciaTrie.ReviewKeys(touchedKeys, hashedKeys) + if err != nil { + return nil, err + } + _ = updates + a.patriciaTrie.Reset() + a.patriciaTrie.SetTrace(trace) + a.patriciaTrie.ResetFns(ctx.branchFn, ctx.accountFn, ctx.storageFn) + + rootHash2, _, err := a.patriciaTrie.ProcessUpdates(touchedKeys, hashedKeys, updates) + if err != nil { + return nil, err + } + + if !bytes.Equal(rootHash, rootHash2) { + fmt.Printf("hash mismatch: state direct reading=%x update based=%x\n", rootHash, rootHash2) + return rootHash2, nil + } + + for pref, update := range branchNodeUpdates { + prefix := []byte(pref) + + stateValue, err := ctx.ReadCommitment(prefix, a.rwTx) + if err != nil { + return nil, err + } + + stated := commitment.BranchData(stateValue) + merged, err := stated.MergeHexBranches(update, nil) + if err != nil { + return nil, err + } + if bytes.Equal(stated, merged) { + continue + } + //if trace { + // fmt.Printf("computeCommitment merge [%x] [%x]+[%x]=>[%x]\n", prefix, stated, update, merged) + //} + if err = a.UpdateCommitmentData(prefix, merged); err != nil { + return nil, err + } + } + if commit { + state, err := a.patriciaTrie.EncodeCurrentState(nil, rootHash) + if err != nil { + return nil, err + } + if err = a.UpdateCommitmentData(makeCommitmentKey(a.txNum), state); err != nil { + return nil, err + } + var b [8]byte + binary.BigEndian.PutUint64(b[:], a.txNum) + if err = a.UpdateCommitmentData(keyCommitLatestTx, b[:]); err != nil { + return nil, err + } + } + + return rootHash, nil +} + +func (a *Aggregator) hashAndNibblizeKey(key []byte) []byte { + hashedKey := make([]byte, length.Hash) + + a.keccak.Reset() + a.keccak.Write(key[:length.Addr]) + copy(hashedKey[:length.Hash], a.keccak.Sum(nil)) + + if len(key[length.Addr:]) > 0 { + hashedKey = append(hashedKey, make([]byte, length.Hash)...) + a.keccak.Reset() + a.keccak.Write(key[length.Addr:]) + copy(hashedKey[length.Hash:], a.keccak.Sum(nil)) + } + + nibblized := make([]byte, len(hashedKey)*2) + for i, b := range hashedKey { + nibblized[i*2] = (b >> 4) & 0xf + nibblized[i*2+1] = b & 0xf + } + return nibblized +} + +func (a *Aggregator) touchPlainKeyAccount(c *CommitmentItem, val []byte) { + if len(val) == 0 { + c.update.Flags = commitment.DELETE_UPDATE + return + } + c.update.DecodeForStorage(val) + c.update.Flags = commitment.BALANCE_UPDATE | commitment.NONCE_UPDATE + item, found := a.commTree.Get(&CommitmentItem{hashedKey: c.hashedKey}) + if !found { + return + } + if item.update.Flags&commitment.CODE_UPDATE != 0 { + c.update.Flags |= commitment.CODE_UPDATE + copy(c.update.CodeHashOrStorage[:], item.update.CodeHashOrStorage[:]) + } +} + +func (a *Aggregator) touchPlainKeyStorage(c *CommitmentItem, val []byte) { + c.update.ValLength = len(val) + if len(val) == 0 { + c.update.Flags = commitment.DELETE_UPDATE + } else { + c.update.Flags = commitment.STORAGE_UPDATE + copy(c.update.CodeHashOrStorage[:], val) + } +} + +func (a *Aggregator) touchPlainKeyCode(c *CommitmentItem, val []byte) { + c.update.Flags = commitment.CODE_UPDATE + item, found := a.commTree.Get(c) + if !found { + a.keccak.Reset() + a.keccak.Write(val) + copy(c.update.CodeHashOrStorage[:], a.keccak.Sum(nil)) + return + } + if item.update.Flags&commitment.BALANCE_UPDATE != 0 { + c.update.Flags |= commitment.BALANCE_UPDATE + c.update.Balance.Set(&item.update.Balance) + } + if item.update.Flags&commitment.NONCE_UPDATE != 0 { + c.update.Flags |= commitment.NONCE_UPDATE + c.update.Nonce = item.update.Nonce + } + if item.update.Flags == commitment.DELETE_UPDATE && len(val) == 0 { + c.update.Flags = commitment.DELETE_UPDATE + } else { + a.keccak.Reset() + a.keccak.Write(val) + copy(c.update.CodeHashOrStorage[:], a.keccak.Sum(nil)) + } +} + +func (a *Aggregator) touchPlainKey(key, val []byte, fn func(c *CommitmentItem, val []byte)) { + c := &CommitmentItem{plainKey: common.Copy(key), hashedKey: a.hashAndNibblizeKey(key)} + fn(c, val) + a.commTree.ReplaceOrInsert(c) +} + +type CommitmentItem struct { + plainKey []byte + hashedKey []byte + update commitment.Update +} + +func commitmentItemLess(i, j *CommitmentItem) bool { + return bytes.Compare(i.hashedKey, j.hashedKey) < 0 +} + +func (a *Aggregator) touchedKeyList() ([][]byte, [][]byte, []commitment.Update) { + plainKeys := make([][]byte, a.commTree.Len()) + hashedKeys := make([][]byte, a.commTree.Len()) + updates := make([]commitment.Update, a.commTree.Len()) + + j := 0 + a.commTree.Ascend(func(item *CommitmentItem) bool { + plainKeys[j] = item.plainKey + hashedKeys[j] = item.hashedKey + updates[j] = item.update + j++ + return true + }) + + a.commTree.Clear(false) + return plainKeys, hashedKeys, updates +} + func (a *Aggregator) ReadyToFinishTx() bool { return (a.txNum+1)%a.aggregationStep == 0 } +func (a *Aggregator) SetCommitFn(fn func(txNum uint64) error) { + a.commitFn = fn +} + func (a *Aggregator) FinishTx() error { - if (a.txNum+1)%a.aggregationStep != 0 { + atomic.AddUint64(&a.stats.TxCount, 1) + + var b [8]byte + binary.BigEndian.PutUint64(b[:], a.txNum) + if err := a.UpdateCommitmentData(keyLatestTxInDB, b[:]); err != nil { + return err + } + + if !a.ReadyToFinishTx() { return nil } + _, err := a.ComputeCommitment(true, false) + if err != nil { + return err + } closeAll := true step := a.txNum / a.aggregationStep if step == 0 { @@ -676,6 +1084,7 @@ func (a *Aggregator) FinishTx() error { collation.Close() } }() + sf, err := a.buildFiles(step, collation) if err != nil { return err @@ -713,21 +1122,35 @@ func (a *Aggregator) FinishTx() error { } } closeAll = false + + if a.commitFn != nil { + if err := a.commitFn(a.txNum); err != nil { + return err + } + } + return nil } func (a *Aggregator) UpdateAccountData(addr []byte, account []byte) error { + a.touchPlainKey(addr, account, a.touchPlainKeyAccount) return a.accounts.Put(addr, nil, account) } func (a *Aggregator) UpdateAccountCode(addr []byte, code []byte) error { + a.touchPlainKey(addr, code, a.touchPlainKeyCode) if len(code) == 0 { return a.code.Delete(addr, nil) } return a.code.Put(addr, nil, code) } +func (a *Aggregator) UpdateCommitmentData(prefix []byte, code []byte) error { + return a.commitment.Put(prefix, nil, code) +} + func (a *Aggregator) DeleteAccount(addr []byte) error { + a.touchPlainKey(addr, nil, a.touchPlainKeyAccount) if err := a.accounts.Delete(addr, nil); err != nil { return err } @@ -736,6 +1159,7 @@ func (a *Aggregator) DeleteAccount(addr []byte) error { } var e error if err := a.storage.defaultDc.IteratePrefix(addr, func(k, _ []byte) { + a.touchPlainKey(k, nil, a.touchPlainKeyStorage) if e == nil { e = a.storage.Delete(k, nil) } @@ -746,6 +1170,11 @@ func (a *Aggregator) DeleteAccount(addr []byte) error { } func (a *Aggregator) WriteAccountStorage(addr, loc []byte, value []byte) error { + composite := make([]byte, len(addr)+len(loc)) + copy(composite, addr) + copy(composite[length.Addr:], loc) + + a.touchPlainKey(composite, value, a.touchPlainKeyStorage) if len(value) == 0 { return a.storage.Delete(addr, loc) } @@ -785,40 +1214,71 @@ func (ac *AggregatorContext) TraceToIterator(addr []byte, startTxNum, endTxNum u } type FilesStats struct { + TxCount uint64 + FilesCount uint64 + IdxSize uint64 + DataSize uint64 } func (a *Aggregator) Stats() FilesStats { - var fs FilesStats - return fs + res := a.stats + stat := a.GetAndResetStats() + res.IdxSize = stat.IndexSize + res.DataSize = stat.DataSize + res.FilesCount = stat.FilesCount + return res } type AggregatorContext struct { - a *Aggregator - accounts *DomainContext - storage *DomainContext - code *DomainContext - accountsHistory *HistoryContext - storageHistory *HistoryContext - codeHistory *HistoryContext - logAddrs *InvertedIndexContext - logTopics *InvertedIndexContext - tracesFrom *InvertedIndexContext - tracesTo *InvertedIndexContext - keyBuf []byte + a *Aggregator + accounts *DomainContext + storage *DomainContext + code *DomainContext + commitment *DomainContext + logAddrs *InvertedIndexContext + logTopics *InvertedIndexContext + tracesFrom *InvertedIndexContext + tracesTo *InvertedIndexContext + keyBuf []byte } func (a *Aggregator) MakeContext() *AggregatorContext { return &AggregatorContext{ - a: a, - accounts: a.accounts.MakeContext(), - storage: a.storage.MakeContext(), - code: a.code.MakeContext(), - accountsHistory: a.accountsHistory.MakeContext(), - storageHistory: a.storageHistory.MakeContext(), - codeHistory: a.codeHistory.MakeContext(), - logAddrs: a.logAddrs.MakeContext(), - logTopics: a.logTopics.MakeContext(), - tracesFrom: a.tracesFrom.MakeContext(), - tracesTo: a.tracesTo.MakeContext(), + a: a, + accounts: a.accounts.MakeContext(), + storage: a.storage.MakeContext(), + code: a.code.MakeContext(), + commitment: a.commitment.MakeContext(), + logAddrs: a.logAddrs.MakeContext(), + logTopics: a.logTopics.MakeContext(), + tracesFrom: a.tracesFrom.MakeContext(), + tracesTo: a.tracesTo.MakeContext(), + } +} + +func DecodeAccountBytes(enc []byte) (nonce uint64, balance *uint256.Int, hash []byte) { + balance = new(uint256.Int) + + if len(enc) > 0 { + pos := 0 + nonceBytes := int(enc[pos]) + pos++ + if nonceBytes > 0 { + nonce = bytesToUint64(enc[pos : pos+nonceBytes]) + pos += nonceBytes + } + balanceBytes := int(enc[pos]) + pos++ + if balanceBytes > 0 { + balance.SetBytes(enc[pos : pos+balanceBytes]) + pos += balanceBytes + } + codeHashBytes := int(enc[pos]) + pos++ + if codeHashBytes > 0 { + codeHash := make([]byte, length.Hash) + copy(codeHash[:], enc[pos:pos+codeHashBytes]) + } } + return } diff --git a/state/aggregator_test.go b/state/aggregator_test.go new file mode 100644 index 000000000..9857e93d3 --- /dev/null +++ b/state/aggregator_test.go @@ -0,0 +1,178 @@ +package state + +import ( + "context" + "encoding/binary" + "testing" + + "github.com/ledgerwatch/log/v3" + "github.com/stretchr/testify/require" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" +) + +func testDbAndAggregator(t *testing.T, prefixLen int, aggStep uint64) (string, kv.RwDB, *Aggregator) { + t.Helper() + path := t.TempDir() + logger := log.New() + db := mdbx.NewMDBX(logger).Path(path).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { + return kv.ChaindataTablesCfg + }).MustOpen() + agg, err := NewAggregator(path, aggStep) + require.NoError(t, err) + return path, db, agg +} + +func TestAggregator_Merge(t *testing.T) { + _, db, agg := testDbAndAggregator(t, 0, 100) + defer db.Close() + defer agg.Close() + + tx, err := db.BeginRw(context.Background()) + require.NoError(t, err) + defer func() { + if tx != nil { + tx.Rollback() + } + }() + agg.SetTx(tx) + txs := uint64(10000) + // keys are encodings of numbers 1..31 + // each key changes value on every txNum which is multiple of the key + var maxWrite, otherMaxWrite uint64 + for txNum := uint64(1); txNum <= txs; txNum++ { + agg.SetTxNum(txNum) + var v [8]byte + binary.BigEndian.PutUint64(v[:], txNum) + var err error + if txNum%135 == 0 { + err = agg.UpdateCommitmentData([]byte("otherroothash"), v[:]) + otherMaxWrite = txNum + } else { + err = agg.UpdateCommitmentData([]byte("roothash"), v[:]) + maxWrite = txNum + } + require.NoError(t, err) + require.NoError(t, agg.FinishTx()) + if txNum%100 == 0 { + err = tx.Commit() + require.NoError(t, err) + tx, err = db.BeginRw(context.Background()) + require.NoError(t, err) + agg.SetTx(tx) + } + } + err = tx.Commit() + require.NoError(t, err) + tx = nil + + // Check the history + roTx, err := db.BeginRo(context.Background()) + require.NoError(t, err) + defer roTx.Rollback() + + dc := agg.MakeContext() + v, err := dc.ReadCommitment([]byte("roothash"), roTx) + require.NoError(t, err) + + require.EqualValues(t, maxWrite, binary.BigEndian.Uint64(v[:])) + + v, err = dc.ReadCommitment([]byte("otherroothash"), roTx) + require.NoError(t, err) + + require.EqualValues(t, otherMaxWrite, binary.BigEndian.Uint64(v[:])) +} + +func TestAggregator_RestartOnFiles(t *testing.T) { + aggStep := uint64(100) + path, db, agg := testDbAndAggregator(t, 0, aggStep) + defer db.Close() + + tx, err := db.BeginRw(context.Background()) + require.NoError(t, err) + defer func() { + if tx != nil { + tx.Rollback() + } + if agg != nil { + agg.Close() + } + }() + agg.SetTx(tx) + + comit := func(txn uint64) error { + err = tx.Commit() + require.NoError(t, err) + tx, err = db.BeginRw(context.Background()) + require.NoError(t, err) + agg.SetTx(tx) + return nil + } + agg.SetCommitFn(comit) + + txs := uint64(1026) + // keys are encodings of numbers 1..31 + // each key changes value on every txNum which is multiple of the key + var maxWrite, otherMaxWrite uint64 + for txNum := uint64(1); txNum <= txs; txNum++ { + agg.SetTxNum(txNum) + var v [8]byte + binary.BigEndian.PutUint64(v[:], txNum) + var err error + if txNum%135 == 0 { + err = agg.UpdateCommitmentData([]byte("otherroothash"), v[:]) + otherMaxWrite = txNum + } else { + err = agg.UpdateCommitmentData([]byte("roothash"), v[:]) + maxWrite = txNum + } + agg.ComputeCommitment(true, false) + require.NoError(t, err) + require.NoError(t, agg.FinishTx()) + if txNum+1%100 == 0 { + comit(txNum) + } + } + err = tx.Commit() + require.NoError(t, err) + tx = nil + agg.Close() + agg = nil + + anotherAgg, err := NewAggregator(path, aggStep) + require.NoError(t, err) + defer anotherAgg.Close() + + rwTx, err := db.BeginRw(context.Background()) + require.NoError(t, err) + defer func() { + if rwTx != nil { + rwTx.Rollback() + } + }() + + anotherAgg.SetTx(rwTx) + startTx := anotherAgg.EndTxNumMinimax() + sstartTx, err := anotherAgg.SeekCommitment(startTx) + require.NoError(t, err) + _ = sstartTx + rwTx.Rollback() + rwTx = nil + + // Check the history + roTx, err := db.BeginRo(context.Background()) + require.NoError(t, err) + defer roTx.Rollback() + + dc := anotherAgg.MakeContext() + v, err := dc.ReadCommitment([]byte("roothash"), roTx) + require.NoError(t, err) + + require.EqualValues(t, maxWrite, binary.BigEndian.Uint64(v[:])) + + v, err = dc.ReadCommitment([]byte("otherroothash"), roTx) + require.NoError(t, err) + + require.EqualValues(t, otherMaxWrite, binary.BigEndian.Uint64(v[:])) +} diff --git a/state/domain.go b/state/domain.go index ae221c906..03ff9a089 100644 --- a/state/domain.go +++ b/state/domain.go @@ -27,16 +27,18 @@ import ( "path/filepath" "regexp" "strconv" + "sync/atomic" "time" "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" + "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" - "github.com/ledgerwatch/log/v3" ) var ( @@ -59,13 +61,20 @@ func filesItemLess(i, j *filesItem) bool { } type DomainStats struct { - HistoryQueries int + MergesCount uint64 + HistoryQueries uint64 EfSearchTime time.Duration + DataSize uint64 + IndexSize uint64 + FilesCount uint64 } func (ds *DomainStats) Accumulate(other DomainStats) { ds.HistoryQueries += other.HistoryQueries ds.EfSearchTime += other.EfSearchTime + ds.IndexSize += other.IndexSize + ds.DataSize += other.DataSize + ds.FilesCount += other.FilesCount } // Domain is a part of the state (examples are Accounts, Storage, Code) @@ -75,10 +84,11 @@ type Domain struct { keysTable string // key -> invertedStep , invertedStep = ^(txNum / aggregationStep), Needs to be table with DupSort valsTable string // key + invertedStep -> values - files *btree.BTreeG[*filesItem] // Static files pertaining to this domain, items are of type `filesItem` - prefixLen int // Number of bytes in the keys that can be used for prefix iteration - stats DomainStats - defaultDc *DomainContext + files *btree.BTreeG[*filesItem] // Static files pertaining to this domain, items are of type `filesItem` + prefixLen int // Number of bytes in the keys that can be used for prefix iteration + stats DomainStats + mergesCount uint64 + defaultDc *DomainContext } func NewDomain( @@ -133,7 +143,7 @@ func (d *Domain) scanStateFiles(files []fs.DirEntry) { subs := re.FindStringSubmatch(name) if len(subs) != 4 { if len(subs) != 0 { - log.Warn("File ignored by doman scan, more than 4 submatches", "name", name, "submatches", len(subs)) + log.Warn("File ignored by domain scan, more than 4 submatches", "name", name, "submatches", len(subs)) } continue } @@ -168,12 +178,23 @@ func (d *Domain) scanStateFiles(files []fs.DirEntry) { func (d *Domain) openFiles() error { var err error var totalKeys uint64 + + invalidFileItems := make([]*filesItem, 0) d.files.Ascend(func(item *filesItem) bool { datPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kv", d.filenameBase, item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep)) + if fi, err := os.Stat(datPath); err != nil || fi.IsDir() { + invalidFileItems = append(invalidFileItems, item) + return true + } if item.decompressor, err = compress.NewDecompressor(datPath); err != nil { return false } + idxPath := filepath.Join(d.dir, fmt.Sprintf("%s.%d-%d.kvi", d.filenameBase, item.startTxNum/d.aggregationStep, item.endTxNum/d.aggregationStep)) + if fi, err := os.Stat(idxPath); err != nil || fi.IsDir() { + invalidFileItems = append(invalidFileItems, item) + return true + } if item.index, err = recsplit.OpenIndex(idxPath); err != nil { return false } @@ -183,6 +204,9 @@ func (d *Domain) openFiles() error { if err != nil { return err } + for _, item := range invalidFileItems { + d.files.Delete(item) + } return nil } @@ -204,9 +228,9 @@ func (d *Domain) Close() { d.closeFiles() } -func (dc *DomainContext) get(key []byte, roTx kv.Tx) ([]byte, bool, error) { +func (dc *DomainContext) get(key []byte, fromTxNum uint64, roTx kv.Tx) ([]byte, bool, error) { var invertedStep [8]byte - binary.BigEndian.PutUint64(invertedStep[:], ^(dc.d.txNum / dc.d.aggregationStep)) + binary.BigEndian.PutUint64(invertedStep[:], ^(fromTxNum / dc.d.aggregationStep)) keyCursor, err := roTx.CursorDupSort(dc.d.keysTable) if err != nil { return nil, false, err @@ -216,8 +240,9 @@ func (dc *DomainContext) get(key []byte, roTx kv.Tx) ([]byte, bool, error) { if err != nil { return nil, false, err } - if foundInvStep == nil { - v, found := dc.readFromFiles(key) + if len(foundInvStep) == 0 { + atomic.AddUint64(&dc.d.stats.HistoryQueries, 1) + v, found := dc.readFromFiles(key, fromTxNum) return v, found, nil } keySuffix := make([]byte, len(key)+8) @@ -234,7 +259,7 @@ func (dc *DomainContext) Get(key1, key2 []byte, roTx kv.Tx) ([]byte, error) { key := make([]byte, len(key1)+len(key2)) copy(key, key1) copy(key[len(key1):], key2) - v, _, err := dc.get(key, roTx) + v, _, err := dc.get(key, dc.d.txNum, roTx) return v, err } @@ -251,7 +276,7 @@ func (d *Domain) Put(key1, key2, val []byte) error { key := make([]byte, len(key1)+len(key2)) copy(key, key1) copy(key[len(key1):], key2) - original, _, err := d.defaultDc.get(key, d.tx) + original, _, err := d.defaultDc.get(key, d.txNum, d.tx) if err != nil { return err } @@ -279,7 +304,7 @@ func (d *Domain) Delete(key1, key2 []byte) error { key := make([]byte, len(key1)+len(key2)) copy(key, key1) copy(key[len(key1):], key2) - original, found, err := d.defaultDc.get(key, d.tx) + original, found, err := d.defaultDc.get(key, d.txNum, d.tx) if err != nil { return err } @@ -382,15 +407,23 @@ func (d *Domain) MakeContext() *DomainContext { dc.hc = d.History.MakeContext() bt := btree.NewG[ctxItem](32, ctxItemLess) dc.files = bt + var datsz, idxsz, files uint64 + d.files.Ascend(func(item *filesItem) bool { + getter := item.decompressor.MakeGetter() + datsz += uint64(getter.Size()) + idxsz += uint64(item.index.Size()) + files += 2 + bt.ReplaceOrInsert(ctxItem{ startTxNum: item.startTxNum, endTxNum: item.endTxNum, - getter: item.decompressor.MakeGetter(), + getter: getter, reader: recsplit.NewIndexReader(item.index), }) return true }) + d.stats.DataSize, d.stats.IndexSize, d.stats.FilesCount = datsz, idxsz, files return dc } @@ -751,12 +784,28 @@ func (d *Domain) prune(step uint64, txFrom, txTo uint64) error { } defer keysCursor.Close() var k, v []byte + keyMaxSteps := make(map[string]uint64) + + for k, v, err = keysCursor.First(); err == nil && k != nil; k, v, err = keysCursor.Next() { + s := ^binary.BigEndian.Uint64(v) + if maxS, seen := keyMaxSteps[string(k)]; !seen || s > maxS { + keyMaxSteps[string(k)] = s + } + } + if err != nil { + return fmt.Errorf("iterate of %s keys: %w", d.filenameBase, err) + } + for k, v, err = keysCursor.First(); err == nil && k != nil; k, v, err = keysCursor.Next() { s := ^binary.BigEndian.Uint64(v) if s == step { + if maxS := keyMaxSteps[string(k)]; maxS <= step { + continue + } if err = keysCursor.DeleteCurrent(); err != nil { return fmt.Errorf("clean up %s for [%x]=>[%x]: %w", d.filenameBase, k, v, err) } + //fmt.Printf("domain prune key %x [s%d]\n", string(k), s) } } if err != nil { @@ -770,24 +819,33 @@ func (d *Domain) prune(step uint64, txFrom, txTo uint64) error { for k, _, err = valsCursor.First(); err == nil && k != nil; k, _, err = valsCursor.Next() { s := ^binary.BigEndian.Uint64(k[len(k)-8:]) if s == step { + if maxS := keyMaxSteps[string(k[:len(k)-8])]; maxS <= step { + continue + } if err = valsCursor.DeleteCurrent(); err != nil { return fmt.Errorf("clean up %s for [%x]: %w", d.filenameBase, k, err) } + //fmt.Printf("domain prune value for %x (invs %x) [s%d]\n", string(k),k[len(k)-8):], s) } } if err != nil { return fmt.Errorf("iterate over %s vals: %w", d.filenameBase, err) } + if err = d.History.prune(step, txFrom, txTo); err != nil { - return err + return fmt.Errorf("prune history at step %d [%d, %d): %w", step, txFrom, txTo, err) } return nil } -func (dc *DomainContext) readFromFiles(filekey []byte) ([]byte, bool) { +func (dc *DomainContext) readFromFiles(filekey []byte, fromTxNum uint64) ([]byte, bool) { var val []byte var found bool + dc.files.Descend(func(item ctxItem) bool { + if item.endTxNum < fromTxNum { + return false + } if item.reader.Empty() { return true } @@ -938,7 +996,7 @@ func (dc *DomainContext) GetBeforeTxNum(key []byte, txNum uint64, roTx kv.Tx) ([ if hOk { return v, nil } - if v, _, err = dc.get(key, roTx); err != nil { + if v, _, err = dc.get(key, txNum-1, roTx); err != nil { return nil, err } return v, nil diff --git a/state/domain_test.go b/state/domain_test.go index 04885462e..2427a3b9f 100644 --- a/state/domain_test.go +++ b/state/domain_test.go @@ -20,14 +20,16 @@ import ( "context" "encoding/binary" "fmt" + "os" "strings" "testing" + "github.com/ledgerwatch/log/v3" + "github.com/stretchr/testify/require" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/recsplit" - "github.com/ledgerwatch/log/v3" - "github.com/stretchr/testify/require" ) func testDbAndDomain(t *testing.T, prefixLen int) (string, kv.RwDB, *Domain) { @@ -177,6 +179,14 @@ func TestAfterPrune(t *testing.T) { err = d.Put([]byte("key1"), nil, []byte("value1.2")) require.NoError(t, err) + d.SetTxNum(17) + err = d.Put([]byte("key1"), nil, []byte("value1.3")) + require.NoError(t, err) + + d.SetTxNum(18) + err = d.Put([]byte("key2"), nil, []byte("value2.2")) + require.NoError(t, err) + err = tx.Commit() require.NoError(t, err) @@ -200,10 +210,10 @@ func TestAfterPrune(t *testing.T) { dc := d.MakeContext() v, err = dc.Get([]byte("key1"), nil, tx) require.NoError(t, err) - require.Equal(t, []byte("value1.2"), v) + require.Equal(t, []byte("value1.3"), v) v, err = dc.Get([]byte("key2"), nil, tx) require.NoError(t, err) - require.Equal(t, []byte("value2.1"), v) + require.Equal(t, []byte("value2.2"), v) err = d.prune(0, 0, 16) require.NoError(t, err) @@ -221,15 +231,15 @@ func TestAfterPrune(t *testing.T) { var k []byte k, _, err = cur.First() require.NoError(t, err) - require.Nil(t, k, table) + require.NotNilf(t, k, table, string(k)) } v, err = dc.Get([]byte("key1"), nil, tx) require.NoError(t, err) - require.Equal(t, []byte("value1.2"), v) + require.Equal(t, []byte("value1.3"), v) v, err = dc.Get([]byte("key2"), nil, tx) require.NoError(t, err) - require.Equal(t, []byte("value2.1"), v) + require.Equal(t, []byte("value2.2"), v) } func filledDomain(t *testing.T) (string, kv.RwDB, *Domain, uint64) { @@ -301,6 +311,11 @@ func checkHistory(t *testing.T, db kv.RwDB, d *Domain, txs uint64) { } else { require.Nil(t, val, label) } + if txNum == txs { + val, err := dc.Get(k[:], nil, roTx) + require.NoError(t, err) + require.EqualValues(t, v[:], val) + } } } } @@ -466,6 +481,34 @@ func collateAndMerge(t *testing.T, db kv.RwDB, d *Domain, txs uint64) { } } +func collateAndMergeOnce(t *testing.T, d *Domain, step uint64) { + t.Helper() + txFrom, txTo := (step)*d.aggregationStep, (step+1)*d.aggregationStep + + c, err := d.collate(step, txFrom, txTo, d.tx) + require.NoError(t, err) + + sf, err := d.buildFiles(step, c) + require.NoError(t, err) + d.integrateFiles(sf, txFrom, txTo) + + err = d.prune(step, txFrom, txTo) + require.NoError(t, err) + + var r DomainRanges + maxEndTxNum := d.endTxNumMinimax() + maxSpan := d.aggregationStep * d.aggregationStep + for r = d.findMergeRange(maxEndTxNum, maxSpan); r.any(); r = d.findMergeRange(maxEndTxNum, maxSpan) { + valuesOuts, indexOuts, historyOuts, _ := d.staticFilesInRange(r) + valuesIn, indexIn, historyIn, err := d.mergeFiles(valuesOuts, indexOuts, historyOuts, r, maxSpan) + require.NoError(t, err) + + d.integrateMergedFiles(valuesOuts, indexOuts, historyOuts, valuesIn, indexIn, historyIn) + err = d.deleteFiles(valuesOuts, indexOuts, historyOuts) + require.NoError(t, err) + } +} + func TestMergeFiles(t *testing.T) { _, db, d, txs := filledDomain(t) defer db.Close() @@ -546,3 +589,209 @@ func TestDelete(t *testing.T) { require.Nil(t, val, label) } } + +func filledDomainFixedSize(t *testing.T, keysCount, txCount uint64) (string, kv.RwDB, *Domain, map[string][]bool) { + t.Helper() + path, db, d := testDbAndDomain(t, 0 /* prefixLen */) + tx, err := db.BeginRw(context.Background()) + require.NoError(t, err) + defer func() { + if tx != nil { + tx.Rollback() + } + }() + d.SetTx(tx) + // keys are encodings of numbers 1..31 + // each key changes value on every txNum which is multiple of the key + dat := make(map[string][]bool) // K:V is key -> list of bools. If list[i] == true, i'th txNum should persists + + for txNum := uint64(1); txNum <= txCount; txNum++ { + d.SetTxNum(txNum) + for keyNum := uint64(1); keyNum <= keysCount; keyNum++ { + if keyNum == txNum%d.aggregationStep { + continue + } + var k [8]byte + var v [8]byte + binary.BigEndian.PutUint64(k[:], keyNum) + binary.BigEndian.PutUint64(v[:], txNum) + err = d.Put(k[:], nil, v[:]) + require.NoError(t, err) + + if _, ok := dat[fmt.Sprintf("%d", keyNum)]; !ok { + dat[fmt.Sprintf("%d", keyNum)] = make([]bool, txCount+1) + } + dat[fmt.Sprintf("%d", keyNum)][txNum] = true + } + if txNum%d.aggregationStep == 0 { + err = tx.Commit() + require.NoError(t, err) + tx, err = db.BeginRw(context.Background()) + require.NoError(t, err) + d.SetTx(tx) + } + } + err = tx.Commit() + require.NoError(t, err) + tx = nil + return path, db, d, dat +} + +// firstly we write all the data to domain +// then we collate-merge-prune +// then check. +// in real life we periodically do collate-merge-prune without stopping adding data +func TestDomain_Prune_AfterAllWrites(t *testing.T) { + keyCount, txCount := uint64(4), uint64(64) + path, db, dom, data := filledDomainFixedSize(t, keyCount, txCount) + defer db.Close() + defer dom.Close() + defer os.Remove(path) + + collateAndMerge(t, db, dom, txCount) + + roTx, err := db.BeginRo(context.Background()) + require.NoError(t, err) + defer roTx.Rollback() + + // Check the history + dc := dom.MakeContext() + for txNum := uint64(1); txNum <= txCount; txNum++ { + for keyNum := uint64(1); keyNum <= keyCount; keyNum++ { + var k [8]byte + var v [8]byte + label := fmt.Sprintf("txNum=%d, keyNum=%d\n", txNum, keyNum) + binary.BigEndian.PutUint64(k[:], keyNum) + binary.BigEndian.PutUint64(v[:], txNum) + + val, err := dc.GetBeforeTxNum(k[:], txNum+1, roTx) + // during generation such keys are skipped so value should be nil for this call + require.NoError(t, err, label) + if !data[fmt.Sprintf("%d", keyNum)][txNum] { + if txNum > 1 { + binary.BigEndian.PutUint64(v[:], txNum-1) + } else { + require.Nil(t, val, label) + continue + } + } + require.EqualValues(t, v[:], val) + } + } + + var v [8]byte + binary.BigEndian.PutUint64(v[:], txCount) + + for keyNum := uint64(1); keyNum <= keyCount; keyNum++ { + var k [8]byte + label := fmt.Sprintf("txNum=%d, keyNum=%d\n", txCount, keyNum) + binary.BigEndian.PutUint64(k[:], keyNum) + + storedV, err := dc.Get(k[:], nil, roTx) + require.NoError(t, err, label) + require.EqualValues(t, v[:], storedV, label) + } +} + +func TestDomain_PruneOnWrite(t *testing.T) { + keysCount, txCount := uint64(16), uint64(64) + + path, db, d := testDbAndDomain(t, 0 /* prefixLen */) + defer db.Close() + defer d.Close() + defer os.Remove(path) + + tx, err := db.BeginRw(context.Background()) + require.NoError(t, err) + defer func() { + if tx != nil { + tx.Rollback() + } + }() + d.SetTx(tx) + // keys are encodings of numbers 1..31 + // each key changes value on every txNum which is multiple of the key + data := make(map[string][]uint64) + + for txNum := uint64(1); txNum <= txCount; txNum++ { + d.SetTxNum(txNum) + for keyNum := uint64(1); keyNum <= keysCount; keyNum++ { + if keyNum == txNum%d.aggregationStep { + continue + } + var k [8]byte + var v [8]byte + binary.BigEndian.PutUint64(k[:], keyNum) + binary.BigEndian.PutUint64(v[:], txNum) + err = d.Put(k[:], nil, v[:]) + require.NoError(t, err) + + list, ok := data[fmt.Sprintf("%d", keyNum)] + if !ok { + data[fmt.Sprintf("%d", keyNum)] = make([]uint64, 0) + } + data[fmt.Sprintf("%d", keyNum)] = append(list, txNum) + } + if txNum%d.aggregationStep == 0 { + step := txNum/d.aggregationStep - 1 + if step == 0 { + continue + } + step-- + collateAndMergeOnce(t, d, step) + + err = tx.Commit() + require.NoError(t, err) + tx, err = db.BeginRw(context.Background()) + require.NoError(t, err) + d.SetTx(tx) + } + } + err = tx.Commit() + require.NoError(t, err) + tx = nil + + roTx, err := db.BeginRo(context.Background()) + require.NoError(t, err) + defer roTx.Rollback() + + // Check the history + dc := d.MakeContext() + for txNum := uint64(1); txNum <= txCount; txNum++ { + for keyNum := uint64(1); keyNum <= keysCount; keyNum++ { + valNum := txNum + var k [8]byte + var v [8]byte + label := fmt.Sprintf("txNum=%d, keyNum=%d\n", txNum, keyNum) + binary.BigEndian.PutUint64(k[:], keyNum) + binary.BigEndian.PutUint64(v[:], valNum) + + val, err := dc.GetBeforeTxNum(k[:], txNum+1, roTx) + if keyNum == txNum%d.aggregationStep { + if txNum > 1 { + binary.BigEndian.PutUint64(v[:], txNum-1) + require.EqualValues(t, v[:], val) + continue + } else { + require.Nil(t, val, label) + continue + } + } + require.NoError(t, err, label) + require.EqualValues(t, v[:], val, label) + } + } + + var v [8]byte + binary.BigEndian.PutUint64(v[:], txCount) + + for keyNum := uint64(1); keyNum <= keysCount; keyNum++ { + var k [8]byte + label := fmt.Sprintf("txNum=%d, keyNum=%d\n", txCount, keyNum) + binary.BigEndian.PutUint64(k[:], keyNum) + + storedV, err := dc.Get(k[:], nil, roTx) + require.NoError(t, err, label) + require.EqualValues(t, v[:], storedV, label) + } +} diff --git a/state/history.go b/state/history.go index 3e58305be..b106a27b8 100644 --- a/state/history.go +++ b/state/history.go @@ -30,22 +30,22 @@ import ( "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" + "github.com/ledgerwatch/log/v3" + "golang.org/x/exp/slices" + "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" - "github.com/ledgerwatch/log/v3" - "golang.org/x/exp/slices" ) type History struct { *InvertedIndex historyValsTable string // key1+key2+txnNum -> oldValue , stores values BEFORE change settingsTable string - - files *btree.BTreeG[*filesItem] - compressVals bool - workers int + files *btree.BTreeG[*filesItem] + compressVals bool + workers int } func NewHistory( @@ -128,17 +128,30 @@ func (h *History) scanStateFiles(files []fs.DirEntry) { func (h *History) openFiles() error { var totalKeys uint64 var err error + + invalidFileItems := make([]*filesItem, 0) h.files.Ascend(func(item *filesItem) bool { datPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep)) + if fi, err := os.Stat(datPath); err != nil || fi.IsDir() { + invalidFileItems = append(invalidFileItems, item) + return true + } if item.decompressor, err = compress.NewDecompressor(datPath); err != nil { return false } idxPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.vi", h.filenameBase, item.startTxNum/h.aggregationStep, item.endTxNum/h.aggregationStep)) + + if fi, err := os.Stat(idxPath); err != nil || fi.IsDir() { + invalidFileItems = append(invalidFileItems, item) + return true + } + //if !dir.Exist(idxPath) { // if _, err = buildIndex(item.decompressor, idxPath, h.dir, item.decompressor.Count()/2, false /* values */); err != nil { // return false // } //} + if item.index, err = recsplit.OpenIndex(idxPath); err != nil { return false } @@ -148,6 +161,9 @@ func (h *History) openFiles() error { if err != nil { return err } + for _, item := range invalidFileItems { + h.files.Delete(item) + } return nil } diff --git a/state/merge.go b/state/merge.go index 5ff654a46..4c2c38954 100644 --- a/state/merge.go +++ b/state/merge.go @@ -25,11 +25,12 @@ import ( "os" "path/filepath" + "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/compress" "github.com/ledgerwatch/erigon-lib/recsplit" "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" - "github.com/ledgerwatch/log/v3" ) func (d *Domain) endTxNumMinimax() uint64 { @@ -245,19 +246,80 @@ func (h *History) staticFilesInRange(r HistoryRanges) (indexFiles, historyFiles return } +func maxUint64(a, b uint64) uint64 { + if a < b { + return b + } + return a +} + +type eliasFanoMinHeap []uint64 + +func (h eliasFanoMinHeap) Len() int { + return len(h) +} + +func (h eliasFanoMinHeap) Less(i, j int) bool { + return h[i] < h[j] +} + +func (h eliasFanoMinHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *eliasFanoMinHeap) Push(a interface{}) { + ai := a.(uint64) + for i := 0; i < len(*h); i++ { + if (*h)[i] == ai { + return + } + } + *h = append(*h, a.(uint64)) +} + +func (h *eliasFanoMinHeap) Pop() interface{} { + c := *h + *h = c[:len(c)-1] + return c[len(c)-1] +} + func mergeEfs(preval, val, buf []byte) ([]byte, error) { preef, _ := eliasfano32.ReadEliasFano(preval) ef, _ := eliasfano32.ReadEliasFano(val) preIt := preef.Iterator() efIt := ef.Iterator() - newEf := eliasfano32.NewEliasFano(preef.Count()+ef.Count(), ef.Max()) + //fmt.Printf("merge ef Pre [%x] || Val [%x]\n", preval, val) + + minHeap := make(eliasFanoMinHeap, 0) + + //prelist := make([]uint64, 0) for preIt.HasNext() { - newEf.AddOffset(preIt.Next()) + v := preIt.Next() + heap.Push(&minHeap, v) + //prelist = append(prelist, v) } + //fmt.Printf("prelist (%d) [%v]\n", len(prelist), prelist) + //newList := make([]uint64, 0) for efIt.HasNext() { - newEf.AddOffset(efIt.Next()) + v := efIt.Next() + heap.Push(&minHeap, v) + //newList = append(newList, v) } + //fmt.Printf("newlist (%d) [%v]\n", len(newList), newList) + + newEf := eliasfano32.NewEliasFano(uint64(minHeap.Len()), maxUint64(ef.Max(), preef.Max())) + for minHeap.Len() > 0 { + v := heap.Pop(&minHeap).(uint64) + newEf.AddOffset(v) + } + newEf.Build() + //nit := newEf.Iterator() + //res := make([]uint64, 0) + //for nit.HasNext() { + // res = append(res, nit.Next()) + //} + //fmt.Printf("merged ef [%v]\n", res) return newEf.AppendBytes(buf), nil } @@ -378,6 +440,13 @@ func (d *Domain) mergeFiles(valuesFiles, indexFiles, historyFiles []*filesItem, return nil, nil, nil, err } count++ // Only counting keys, not values + + //if d.valueMergeFn != nil { + // valBuf, err = d.valueMergeFn(valBuf, nil) + // if err != nil { + // return nil, nil, nil, err + // } + //} if d.compressVals { if err = comp.AddWord(valBuf); err != nil { return nil, nil, nil, err @@ -422,9 +491,15 @@ func (d *Domain) mergeFiles(valuesFiles, indexFiles, historyFiles []*filesItem, } } closeItem = false + d.stats.MergesCount++ + d.mergesCount++ return } +//func (d *Domain) SetValueMergeStrategy(merge func([]byte, []byte) ([]byte, error)) { +// d.valueMergeFn = merge +//} + func (ii *InvertedIndex) mergeFiles(files []*filesItem, startTxNum, endTxNum uint64, maxSpan uint64) (*filesItem, error) { var outItem *filesItem var comp *compress.Compressor @@ -483,6 +558,7 @@ func (ii *InvertedIndex) mergeFiles(files []*filesItem, startTxNum, endTxNum uin lastKey := common.Copy(cp[0].key) lastVal := common.Copy(cp[0].val) var mergedOnce bool + // Advance all the items that have this key (including the top) for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) { ci1 := cp[0] @@ -620,8 +696,20 @@ func (h *History) mergeFiles(indexFiles, historyFiles []*filesItem, r HistoryRan for cp.Len() > 0 { lastKey := common.Copy(cp[0].key) // Advance all the items that have this key (including the top) + //var mergeOnce bool for cp.Len() > 0 && bytes.Equal(cp[0].key, lastKey) { ci1 := cp[0] + + //if h.valueMergeFn != nil && mergeOnce { + // valBuf, err = h.valueMergeFn(ci1.val, valBuf) + // if err != nil { + // return nil, nil, err + // } + // ci1.val = valBuf + //} + //if !mergeOnce { + // mergeOnce = true + //} ef, _ := eliasfano32.ReadEliasFano(ci1.val) for i := uint64(0); i < ef.Count(); i++ { if h.compressVals { diff --git a/state/merge_test.go b/state/merge_test.go new file mode 100644 index 000000000..3b406872c --- /dev/null +++ b/state/merge_test.go @@ -0,0 +1,67 @@ +package state + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32" +) + +func Test_mergeEliasFano(t *testing.T) { + firstList := []int{1, 298164, 298163, 13, 298160, 298159} + sort.Ints(firstList) + uniq := make(map[int]struct{}) + + first := eliasfano32.NewEliasFano(uint64(len(firstList)), uint64(firstList[len(firstList)-1])) + for _, v := range firstList { + uniq[v] = struct{}{} + first.AddOffset(uint64(v)) + } + first.Build() + firstBytes := first.AppendBytes(nil) + + fit := first.Iterator() + for fit.HasNext() { + v := fit.Next() + require.Contains(t, firstList, int(v)) + } + + secondList := []int{ + 1, 644951, 644995, 682653, 13, + 644988, 644987, 644946, 644994, + 644942, 644945, 644941, 644940, + 644939, 644938, 644792, 644787} + sort.Ints(secondList) + second := eliasfano32.NewEliasFano(uint64(len(secondList)), uint64(secondList[len(secondList)-1])) + + for _, v := range secondList { + second.AddOffset(uint64(v)) + uniq[v] = struct{}{} + } + second.Build() + secondBytes := second.AppendBytes(nil) + + sit := second.Iterator() + for sit.HasNext() { + v := sit.Next() + require.Contains(t, secondList, int(v)) + } + + menc, err := mergeEfs(firstBytes, secondBytes, nil) + require.NoError(t, err) + + merged, _ := eliasfano32.ReadEliasFano(menc) + require.NoError(t, err) + require.EqualValues(t, len(uniq), merged.Count()) + mergedLists := append(firstList, secondList...) + sort.Ints(mergedLists) + require.EqualValues(t, mergedLists[len(mergedLists)-1], merged.Max()) + + mit := merged.Iterator() + for mit.HasNext() { + v := mit.Next() + require.Contains(t, mergedLists, int(v)) + } +}