From ad0e8d47e97fb2770352695aece53e435e7c9cf8 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 22 Sep 2022 13:59:22 +0700 Subject: [PATCH 1/7] remove sequential compressor #648 --- compress/compress.go | 643 ------------------------------------------- 1 file changed, 643 deletions(-) diff --git a/compress/compress.go b/compress/compress.go index 8f1069aa7..e3e1d0957 100644 --- a/compress/compress.go +++ b/compress/compress.go @@ -28,15 +28,12 @@ import ( "math/bits" "os" "path/filepath" - "sort" "sync" "time" - "github.com/flanglet/kanzi-go/transform" "github.com/ledgerwatch/erigon-lib/common" dir2 "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/etl" - "github.com/ledgerwatch/erigon-lib/patricia" "github.com/ledgerwatch/log/v3" "golang.org/x/exp/slices" ) @@ -187,36 +184,6 @@ func (c *Compressor) Compress() error { return nil } -type CompressorSequential struct { - outputFile string // File where to output the dictionary and compressed data - tmpDir string // temporary directory to use for ETL when building dictionary - minPatternScore uint64 //minimum score (per superstring) required to consider including pattern into the dictionary - // Buffer for "superstring" - transformation of superstrings where each byte of a word, say b, - // is turned into 2 bytes, 0x01 and b, and two zero bytes 0x00 0x00 are inserted after each word - // this is needed for using ordinary (one string) suffix sorting algorithm instead of a generalised (many superstrings) suffix - // sorting algorithm - superstring []byte - divsufsort *transform.DivSufSort // Instance of DivSufSort - algorithm for building suffix array for the superstring - suffixarray []int32 // Suffix array - output for divsufsort algorithm - lcp []int32 // LCP array (Longest Common Prefix) - collector *etl.Collector // Collector used to handle very large sets of superstrings - numBuf [binary.MaxVarintLen64]byte // Buffer for producing var int serialisation - collectBuf []byte // Buffer for forming key to call collector - dictBuilder DictionaryBuilder // Priority queue that selects dictionary patterns with highest scores, and then sorts them by scores - pt patricia.PatriciaTree // Patricia tree of dictionary patterns - mf patricia.MatchFinder // Match finder to use together with patricia tree (it stores search context and buffers matches) - ring *Ring // Cycling ring for dynamic programming algorithm determining optimal coverage of word by dictionary patterns - wordFile *os.File // Temporary file to keep superstrings in for the second pass - wordW *bufio.Writer // Bufferred writer for temporary file - interFile *os.File // File to write intermediate compression to - interW *bufio.Writer // Buffered writer associate to interFile - patterns []int // Buffer of pattern ids (used in the dynamic programming algorithm to remember patterns corresponding to dynamic cells) - uncovered []int // Buffer of intervals that are not covered by patterns - posMap map[uint64]uint64 // Counter of use for each position within compressed word (for building huffman code for positions) - - wordsCount, emptyWordsCount uint64 -} - // superstringLimit limits how large can one "superstring" get before it is processed // CompressorSequential allocates 7 bytes for each uint of superstringLimit. For example, // superstingLimit 16m will result in 112Mb being allocated for various arrays @@ -662,616 +629,6 @@ func (r *Ring) Truncate(i int) { r.tail = (r.head + i) & (len(r.cells) - 1) } -func NewCompressorSequential(logPrefix, outputFile string, tmpDir string, minPatternScore uint64) (*CompressorSequential, error) { - c := &CompressorSequential{ - minPatternScore: minPatternScore, - outputFile: outputFile, - tmpDir: tmpDir, - superstring: make([]byte, 0, superstringLimit), // Allocate enough, so we never need to resize - suffixarray: make([]int32, superstringLimit), - lcp: make([]int32, superstringLimit/2), - collectBuf: make([]byte, 8, 256), - ring: NewRing(), - patterns: make([]int, 0, 32), - uncovered: make([]int, 0, 32), - posMap: make(map[uint64]uint64), - } - var err error - if c.divsufsort, err = transform.NewDivSufSort(); err != nil { - return nil, err - } - if c.wordFile, err = os.CreateTemp(c.tmpDir, "superstrings-"); err != nil { - return nil, err - } - c.wordW = bufio.NewWriterSize(c.wordFile, etl.BufIOSize) - c.collector = etl.NewCollector(logPrefix, tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize/2)) - return c, nil -} - -// AddWord needs to be called repeatedly to provide all the superstrings to compress -func (c *CompressorSequential) AddWord(word []byte) error { - c.wordsCount++ - if len(word) == 0 { - c.emptyWordsCount++ - } - if len(c.superstring)+2*len(word)+2 > superstringLimit { - // Adding this word would make superstring go over the limit - if err := c.processSuperstring(); err != nil { - return fmt.Errorf("buildDictNextWord: error processing superstring: %w", err) - } - } - for _, b := range word { - c.superstring = append(c.superstring, 1, b) - } - c.superstring = append(c.superstring, 0, 0) - n := binary.PutUvarint(c.numBuf[:], uint64(len(word))) - if _, err := c.wordW.Write(c.numBuf[:n]); err != nil { - return err - } - if len(word) > 0 { - if _, err := c.wordW.Write(word); err != nil { - return err - } - } - return nil -} - -func (c *CompressorSequential) Compress() error { - if c.wordW != nil { - if err := c.wordW.Flush(); err != nil { - return err - } - } - if err := c.buildDictionary(); err != nil { - return err - } - if err := c.findMatches(); err != nil { - return err - } - if err := c.optimiseCodes(); err != nil { - return err - } - return nil -} - -func (c *CompressorSequential) Close() { - c.collector.Close() - c.wordFile.Close() - c.interFile.Close() -} - -func (c *CompressorSequential) findMatches() error { - // Build patricia tree out of the patterns in the dictionary, for further matching in individual superstrings - // Allocate temporary initial codes to the patterns so that patterns with higher scores get smaller code - // This helps reduce the size of intermediate compression - for i, p := range c.dictBuilder.items { - p.code = uint64(len(c.dictBuilder.items) - i - 1) - c.pt.Insert(p.word, p) - } - var err error - if c.interFile, err = os.CreateTemp(c.tmpDir, "inter-compress-"); err != nil { - return err - } - c.interW = bufio.NewWriterSize(c.interFile, etl.BufIOSize) - if _, err := c.wordFile.Seek(0, 0); err != nil { - return err - } - defer os.Remove(c.wordFile.Name()) - defer c.wordFile.Close() - r := bufio.NewReaderSize(c.wordFile, etl.BufIOSize) - var readBuf []byte - l, e := binary.ReadUvarint(r) - for ; e == nil; l, e = binary.ReadUvarint(r) { - c.posMap[l+1]++ - c.posMap[0]++ - if int(l) > len(readBuf) { - readBuf = make([]byte, l) - } - if _, e := io.ReadFull(r, readBuf[:l]); e != nil { - return e - } - word := readBuf[:l] - // Encode length of the word as var int for the intermediate compression - n := binary.PutUvarint(c.numBuf[:], uint64(len(word))) - if _, err := c.interW.Write(c.numBuf[:n]); err != nil { - return err - } - if len(word) > 0 { - matches := c.mf.FindLongestMatches(word) - if len(matches) == 0 { - n = binary.PutUvarint(c.numBuf[:], 0) - if _, err := c.interW.Write(c.numBuf[:n]); err != nil { - return err - } - if _, err := c.interW.Write(word); err != nil { - return err - } - continue - } - c.ring.Reset() - c.patterns = append(c.patterns[:0], 0, 0) // Sentinel entry - no meaning - lastF := matches[len(matches)-1] - for j := lastF.Start; j < lastF.End; j++ { - d := c.ring.PushBack() - d.optimStart = j + 1 - d.coverStart = len(word) - d.compression = 0 - d.patternIdx = 0 - d.score = 0 - } - // Starting from the last match - for i := len(matches); i > 0; i-- { - f := matches[i-1] - p := f.Val.(*Pattern) - firstCell := c.ring.Get(0) - maxCompression := firstCell.compression - maxScore := firstCell.score - maxCell := firstCell - var maxInclude bool - for e := 0; e < c.ring.Len(); e++ { - cell := c.ring.Get(e) - comp := cell.compression - 4 - if cell.coverStart >= f.End { - comp += f.End - f.Start - } else { - comp += cell.coverStart - f.Start - } - score := cell.score + p.score - if comp > maxCompression || (comp == maxCompression && score > maxScore) { - maxCompression = comp - maxScore = score - maxInclude = true - maxCell = cell - } else if cell.optimStart > f.End { - c.ring.Truncate(e) - break - } - } - d := c.ring.PushFront() - d.optimStart = f.Start - d.score = maxScore - d.compression = maxCompression - if maxInclude { - d.coverStart = f.Start - d.patternIdx = len(c.patterns) - c.patterns = append(c.patterns, i-1, maxCell.patternIdx) - } else { - d.coverStart = maxCell.coverStart - d.patternIdx = maxCell.patternIdx - } - } - optimCell := c.ring.Get(0) - // Count number of patterns - var patternCount uint64 - patternIdx := optimCell.patternIdx - for patternIdx != 0 { - patternCount++ - patternIdx = c.patterns[patternIdx+1] - } - n = binary.PutUvarint(c.numBuf[:], patternCount) - if _, err := c.interW.Write(c.numBuf[:n]); err != nil { - return err - } - patternIdx = optimCell.patternIdx - lastStart := 0 - var lastUncovered int - c.uncovered = c.uncovered[:0] - for patternIdx != 0 { - pattern := c.patterns[patternIdx] - p := matches[pattern].Val.(*Pattern) - if matches[pattern].Start > lastUncovered { - c.uncovered = append(c.uncovered, lastUncovered, matches[pattern].Start) - } - lastUncovered = matches[pattern].End - // Starting position - c.posMap[uint64(matches[pattern].Start-lastStart+1)]++ - lastStart = matches[pattern].Start - n = binary.PutUvarint(c.numBuf[:], uint64(matches[pattern].Start)) - if _, err := c.interW.Write(c.numBuf[:n]); err != nil { - return err - } - // Code - n = binary.PutUvarint(c.numBuf[:], p.code) - if _, err := c.interW.Write(c.numBuf[:n]); err != nil { - return err - } - p.uses++ - patternIdx = c.patterns[patternIdx+1] - } - if len(word) > lastUncovered { - c.uncovered = append(c.uncovered, lastUncovered, len(word)) - } - // Add uncoded input - for i := 0; i < len(c.uncovered); i += 2 { - if _, err := c.interW.Write(word[c.uncovered[i]:c.uncovered[i+1]]); err != nil { - return err - } - } - } - } - if e != nil && !errors.Is(e, io.EOF) { - return e - } - if err = c.interW.Flush(); err != nil { - return err - } - return nil -} - -// optimises coding for patterns and positions -func (c *CompressorSequential) optimiseCodes() error { - if _, err := c.interFile.Seek(0, 0); err != nil { - return err - } - defer os.Remove(c.interFile.Name()) - defer c.interFile.Close() - // Select patterns with non-zero use and sort them by increasing frequency of use (in preparation for building Huffman tree) - var patternList PatternList - for _, p := range c.dictBuilder.items { - if p.uses > 0 { - patternList = append(patternList, p) - } - } - slices.SortFunc[*Pattern](patternList, patternListLess) - - i := 0 // Will be going over the patternList - // Build Huffman tree for codes - var codeHeap PatternHeap - heap.Init(&codeHeap) - tieBreaker := uint64(0) - for codeHeap.Len()+(patternList.Len()-i) > 1 { - // New node - h := &PatternHuff{ - tieBreaker: tieBreaker, - } - if codeHeap.Len() > 0 && (i >= patternList.Len() || codeHeap[0].uses < patternList[i].uses) { - // Take h0 from the heap - h.h0 = heap.Pop(&codeHeap).(*PatternHuff) - h.h0.AddZero() - h.uses += h.h0.uses - } else { - // Take p0 from the list - h.p0 = patternList[i] - h.p0.code = 0 - h.p0.codeBits = 1 - h.uses += h.p0.uses - i++ - } - if codeHeap.Len() > 0 && (i >= patternList.Len() || codeHeap[0].uses < patternList[i].uses) { - // Take h1 from the heap - h.h1 = heap.Pop(&codeHeap).(*PatternHuff) - h.h1.AddOne() - h.uses += h.h1.uses - } else { - // Take p1 from the list - h.p1 = patternList[i] - h.p1.code = 1 - h.p1.codeBits = 1 - h.uses += h.p1.uses - i++ - } - tieBreaker++ - heap.Push(&codeHeap, h) - } - if codeHeap.Len() > 0 { - root := heap.Pop(&codeHeap).(*PatternHuff) // Root node of huffman tree - root.SetDepth(0) - } - // Calculate total size of the dictionary - var patternsSize uint64 - for _, p := range patternList { - ns := binary.PutUvarint(c.numBuf[:], uint64(p.depth)) // Length of the word's depth - n := binary.PutUvarint(c.numBuf[:], uint64(len(p.word))) // Length of the word's length - patternsSize += uint64(ns + n + len(p.word)) - } - // Start writing to result file - cf, err := os.Create(c.outputFile) - if err != nil { - return err - } - defer cf.Close() - defer cf.Sync() - cw := bufio.NewWriterSize(cf, etl.BufIOSize) - defer cw.Flush() - // 1-st, output amount of words and emptyWords in file - binary.BigEndian.PutUint64(c.numBuf[:], c.wordsCount) - if _, err = cw.Write(c.numBuf[:8]); err != nil { - return err - } - binary.BigEndian.PutUint64(c.numBuf[:], c.emptyWordsCount) - if _, err = cw.Write(c.numBuf[:8]); err != nil { - return err - } - // 2-nd, output dictionary size - binary.BigEndian.PutUint64(c.numBuf[:], patternsSize) // Dictionary size - if _, err = cw.Write(c.numBuf[:8]); err != nil { - return err - } - // 3-rd, write all the pattens, with their depths - slices.SortFunc[*Pattern](patternList, patternListLess) - for _, p := range patternList { - ns := binary.PutUvarint(c.numBuf[:], uint64(p.depth)) - if _, err = cw.Write(c.numBuf[:ns]); err != nil { - return err - } - n := binary.PutUvarint(c.numBuf[:], uint64(len(p.word))) - if _, err = cw.Write(c.numBuf[:n]); err != nil { - return err - } - if _, err = cw.Write(p.word); err != nil { - return err - } - //fmt.Printf("[comp] depth=%d, code=[%b], pattern=[%x]\n", p.depth, p.code, p.word) - } - positionList := make(PositionList, 0, len(c.posMap)) - pos2code := make(map[uint64]*Position, len(c.posMap)) - for pos, uses := range c.posMap { - p := &Position{pos: pos, uses: uses, code: pos, codeBits: 0} - positionList = append(positionList, p) - pos2code[pos] = p - } - slices.SortFunc(positionList, positionListLess) - i = 0 // Will be going over the positionList - // Build Huffman tree for codes - var posHeap PositionHeap - heap.Init(&posHeap) - tieBreaker = uint64(0) - for posHeap.Len()+(positionList.Len()-i) > 1 { - // New node - h := &PositionHuff{ - tieBreaker: tieBreaker, - } - if posHeap.Len() > 0 && (i >= positionList.Len() || posHeap[0].uses < positionList[i].uses) { - // Take h0 from the heap - h.h0 = heap.Pop(&posHeap).(*PositionHuff) - h.h0.AddZero() - h.uses += h.h0.uses - } else { - // Take p0 from the list - h.p0 = positionList[i] - h.p0.code = 0 - h.p0.codeBits = 1 - h.uses += h.p0.uses - i++ - } - if posHeap.Len() > 0 && (i >= positionList.Len() || posHeap[0].uses < positionList[i].uses) { - // Take h1 from the heap - h.h1 = heap.Pop(&posHeap).(*PositionHuff) - h.h1.AddOne() - h.uses += h.h1.uses - } else { - // Take p1 from the list - h.p1 = positionList[i] - h.p1.code = 1 - h.p1.codeBits = 1 - h.uses += h.p1.uses - i++ - } - tieBreaker++ - heap.Push(&posHeap, h) - } - if posHeap.Len() > 0 { - posRoot := heap.Pop(&posHeap).(*PositionHuff) - posRoot.SetDepth(0) - } - // Calculate the size of pos dictionary - var posSize uint64 - for _, p := range positionList { - ns := binary.PutUvarint(c.numBuf[:], uint64(p.depth)) // Length of the position's depth - n := binary.PutUvarint(c.numBuf[:], p.pos) - posSize += uint64(ns + n) - } - // First, output dictionary size - binary.BigEndian.PutUint64(c.numBuf[:], posSize) // Dictionary size - if _, err = cw.Write(c.numBuf[:8]); err != nil { - return err - } - slices.SortFunc(positionList, positionListLess) - // Write all the positions and their depths - for _, p := range positionList { - ns := binary.PutUvarint(c.numBuf[:], uint64(p.depth)) - if _, err = cw.Write(c.numBuf[:ns]); err != nil { - return err - } - n := binary.PutUvarint(c.numBuf[:], p.pos) - if _, err = cw.Write(c.numBuf[:n]); err != nil { - return err - } - } - r := bufio.NewReaderSize(c.interFile, etl.BufIOSize) - var hc HuffmanCoder - hc.w = cw - l, e := binary.ReadUvarint(r) - for ; e == nil; l, e = binary.ReadUvarint(r) { - posCode := pos2code[l+1] - if posCode != nil { - if e = hc.encode(posCode.code, posCode.codeBits); e != nil { - return e - } - } - if l == 0 { - if e = hc.flush(); e != nil { - return e - } - } else { - var pNum uint64 // Number of patterns - if pNum, e = binary.ReadUvarint(r); e != nil { - return e - } - // Now reading patterns one by one - var lastPos uint64 - var lastUncovered int - var uncoveredCount int - for i := 0; i < int(pNum); i++ { - var pos uint64 // Starting position for pattern - if pos, e = binary.ReadUvarint(r); e != nil { - return e - } - posCode = pos2code[pos-lastPos+1] - lastPos = pos - if posCode != nil { - if e = hc.encode(posCode.code, posCode.codeBits); e != nil { - return e - } - } - var code uint64 // Code of the pattern - if code, e = binary.ReadUvarint(r); e != nil { - return e - } - patternCode := c.dictBuilder.items[len(c.dictBuilder.items)-1-int(code)] - if int(pos) > lastUncovered { - uncoveredCount += int(pos) - lastUncovered - } - lastUncovered = int(pos) + len(patternCode.word) - if e = hc.encode(patternCode.code, patternCode.codeBits); e != nil { - return e - } - } - if int(l) > lastUncovered { - uncoveredCount += int(l) - lastUncovered - } - // Terminating position and flush - posCode = pos2code[0] - if posCode != nil { - if e = hc.encode(posCode.code, posCode.codeBits); e != nil { - return e - } - } - if e = hc.flush(); e != nil { - return e - } - // Copy uncovered characters - if uncoveredCount > 0 { - if _, e = io.CopyN(cw, r, int64(uncoveredCount)); e != nil { - return e - } - } - } - } - if e != nil && !errors.Is(e, io.EOF) { - return e - } - return nil -} - -func (c *CompressorSequential) buildDictionary() error { - if len(c.superstring) > 0 { - // Process any residual superstrings - if err := c.processSuperstring(); err != nil { - return fmt.Errorf("buildDictionary: error processing superstring: %w", err) - } - } - c.dictBuilder.Reset(maxDictPatterns) - if err := c.collector.Load(nil, "", c.dictBuilder.loadFunc, etl.TransformArgs{}); err != nil { - return err - } - c.dictBuilder.finish() - c.collector.Close() - // Sort dictionary inside the dictionary bilder in the order of increasing scores - (&c.dictBuilder).Sort() - return nil -} - -func (c *CompressorSequential) processSuperstring() error { - c.divsufsort.ComputeSuffixArray(c.superstring, c.suffixarray[:len(c.superstring)]) - // filter out suffixes that start with odd positions - we reuse the first half of sa.suffixarray for that - // because it won't be used after filtration - n := len(c.superstring) / 2 - saFiltered := c.suffixarray[:n] - j := 0 - for _, s := range c.suffixarray[:len(c.superstring)] { - if (s & 1) == 0 { - saFiltered[j] = s >> 1 - j++ - } - } - // Now create an inverted array - we reuse the second half of sa.suffixarray for that - saInverted := c.suffixarray[:n] - for i := 0; i < n; i++ { - saInverted[saFiltered[i]] = int32(i) - } - // Create LCP array (Kasai's algorithm) - var k int - // Process all suffixes one by one starting from - // first suffix in superstring - for i := 0; i < n; i++ { - /* If the current suffix is at n-1, then we don’t - have next substring to consider. So lcp is not - defined for this substring, we put zero. */ - if saInverted[i] == int32(n-1) { - k = 0 - continue - } - - /* j contains index of the next substring to - be considered to compare with the present - substring, i.e., next string in suffix array */ - j := int(saFiltered[saInverted[i]+1]) - - // Directly start matching from k'th index as - // at-least k-1 characters will match - for i+k < n && j+k < n && c.superstring[(i+k)*2] != 0 && c.superstring[(j+k)*2] != 0 && c.superstring[(i+k)*2+1] == c.superstring[(j+k)*2+1] { - k++ - } - - c.lcp[saInverted[i]] = int32(k) // lcp for the present suffix. - - // Deleting the starting character from the string. - if k > 0 { - k-- - } - } - // Walk over LCP array and compute the scores of the strings - b := saInverted - j = 0 - for i := 0; i < n-1; i++ { - // Only when there is a drop in LCP value - if c.lcp[i+1] >= c.lcp[i] { - j = i - continue - } - for l := c.lcp[i]; l > c.lcp[i+1]; l-- { - if l < minPatternLen || l > maxPatternLen { - continue - } - // Go back - var isNew bool - for j > 0 && c.lcp[j-1] >= l { - j-- - isNew = true - } - if !isNew { - break - } - window := i - j + 2 - copy(b, saFiltered[j:i+2]) - sort.Slice(b[:window], func(i1, i2 int) bool { return b[i1] < b[i2] }) - repeats := 1 - lastK := 0 - for k := 1; k < window; k++ { - if b[k] >= b[lastK]+l { - repeats++ - lastK = k - } - } - score := uint64(repeats * int(l-4)) - if score >= c.minPatternScore { - // Dictionary key is the concatenation of the score and the dictionary word (to later aggregate the scores from multiple chunks) - c.collectBuf = c.collectBuf[:8] - for s := int32(0); s < l; s++ { - c.collectBuf = append(c.collectBuf, c.superstring[(saFiltered[i]+s)*2+1]) - } - binary.BigEndian.PutUint64(c.collectBuf[:8], score) - if err := c.collector.Collect(c.collectBuf[8:], c.collectBuf[:8]); err != nil { // key will be copied by Collect function - return fmt.Errorf("collecting %x with score %d: %w", c.collectBuf[8:], score, err) - } - } - } - } - c.superstring = c.superstring[:0] - return nil -} - type DictAggregator struct { lastWord []byte lastWordScore uint64 From dac8c207fc16fed04eb3cae2449bd0e2c792192c Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 22 Sep 2022 14:08:56 +0700 Subject: [PATCH 2/7] clean deps (#649) --- go.mod | 1 - go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/go.mod b/go.mod index 8c338df43..aebcd9b86 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/RoaringBitmap/roaring v1.2.1 github.com/VictoriaMetrics/metrics v1.22.2 github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b - github.com/flanglet/kanzi-go v1.9.1-0.20211212184056-72dda96261ee github.com/go-stack/stack v1.8.1 github.com/google/btree v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 diff --git a/go.sum b/go.sum index 30d069bb9..350dc7f16 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/flanglet/kanzi-go v1.9.1-0.20211212184056-72dda96261ee h1:CaVlPeoz5kJQ+cAOV+ZDdlr3J2FmKyNkGu9LY+x7cDM= -github.com/flanglet/kanzi-go v1.9.1-0.20211212184056-72dda96261ee/go.mod h1:/sUSVgDcbjsisuW42GPDgaMqvJ0McZERNICnD7b1nRA= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= From f38bce356deb5ad9b3f14e4b82e9b20c1bf70790 Mon Sep 17 00:00:00 2001 From: giuliorebuffo Date: Thu, 22 Sep 2022 18:05:02 +0200 Subject: [PATCH 3/7] added verkle tree buckets --- kv/tables.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/kv/tables.go b/kv/tables.go index 8f90d4adf..e07e0ece8 100644 --- a/kv/tables.go +++ b/kv/tables.go @@ -230,6 +230,12 @@ Invariants: const TrieOfAccounts = "TrieAccount" const TrieOfStorage = "TrieStorage" +// Mapping [block number] => [Verkle Root] +const VerkleRoots = "VerkleRoots" + +// Mapping [Verkle Root] => [Rlp-Encoded Verkle Node] +const VerkleTrie = "VerkleTrie" + const ( // DatabaseInfo is used to store information about data layout. DatabaseInfo = "DbInfo" @@ -532,6 +538,9 @@ var ChaindataTables = []string{ RStorageIdx, RCodeKeys, RCodeIdx, + + VerkleRoots, + VerkleTrie, } const ( From eaad0d0a4ad50d6d13e76fd8c81fb8c40a6428aa Mon Sep 17 00:00:00 2001 From: Enrique Jose Avila Asapche Date: Fri, 23 Sep 2022 13:47:01 +0300 Subject: [PATCH 4/7] changed baseFeeCap from uint64 -> uint256 (#646) * changed baseFeeCap from uint64 -> uint256 * fixed test * ops * no overflow * tmp var * difference * ops difference * adding proper rlp with feecap --- txpool/pool.go | 90 +++++++++++++++++++++++++--------------- txpool/pool_fuzz_test.go | 8 ++-- txpool/pool_test.go | 34 +++++++-------- types/txn.go | 6 +-- 4 files changed, 81 insertions(+), 57 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index cb50df8b2..f94a65500 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -223,7 +223,7 @@ type metaTx struct { subPool SubPoolMarker nonceDistance uint64 // how far their nonces are from the state's nonce for the sender cumulativeBalanceDistance uint64 // how far their cumulativeRequiredBalance are from the state's balance for the sender - minFeeCap uint64 + minFeeCap uint256.Int minTip uint64 bestIndex int worstIndex int @@ -656,7 +656,7 @@ func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots) { func (p *TxPool) validateTx(txn *types.TxSlot, isLocal bool, stateCache kvcache.CacheView) DiscardReason { // Drop non-local transactions under our own minimal accepted gas price or tip - if !isLocal && txn.FeeCap < p.cfg.MinFeeCap { + if !isLocal && uint256.NewInt(p.cfg.MinFeeCap).Cmp(&txn.FeeCap) == 1 { if txn.Traced { log.Info(fmt.Sprintf("TX TRACING: validateTx underpriced idHash=%x local=%t, feeCap=%d, cfg.MinFeeCap=%d", txn.IDHash, isLocal, txn.FeeCap, p.cfg.MinFeeCap)) } @@ -695,7 +695,7 @@ func (p *TxPool) validateTx(txn *types.TxSlot, isLocal bool, stateCache kvcache. } // Transactor should have enough funds to cover the costs total := uint256.NewInt(txn.Gas) - total.Mul(total, uint256.NewInt(txn.FeeCap)) + total.Mul(total, &txn.FeeCap) total.Add(total, &txn.Value) if senderBalance.Cmp(total) < 0 { if txn.Traced { @@ -1019,8 +1019,10 @@ func (p *TxPool) addLocked(mt *metaTx) DiscardReason { tipThreshold := uint256.NewInt(0) tipThreshold = tipThreshold.Mul(&found.Tx.Tip, uint256.NewInt(100+p.cfg.PriceBump)) tipThreshold.Div(tipThreshold, u256.N100) - feecapThreshold := found.Tx.FeeCap * (100 + p.cfg.PriceBump) / 100 - if mt.Tx.Tip.Cmp(tipThreshold) < 0 || mt.Tx.FeeCap < feecapThreshold { + feecapThreshold := uint256.NewInt(0) + feecapThreshold.Mul(&found.Tx.FeeCap, uint256.NewInt(100+p.cfg.PriceBump)) + feecapThreshold.Div(feecapThreshold, u256.N100) + if mt.Tx.Tip.Cmp(tipThreshold) < 0 || mt.Tx.FeeCap.Cmp(feecapThreshold) < 0 { // Both tip and feecap need to be larger than previously to replace the transaction // In case if the transation is stuck, "poke" it to rebroadcast // TODO refactor to return the list of promoted hashes instead of using added inside the pool @@ -1152,7 +1154,7 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint protocolBaseFee, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, DiscardReason)) { noGapsNonce := senderNonce cumulativeRequiredBalance := uint256.NewInt(0) - minFeeCap := uint64(math.MaxUint64) + minFeeCap := uint256.NewInt(0).SetAllOne() minTip := uint64(math.MaxUint64) var toDel []*metaTx // can't delete items while iterate them byNonce.ascend(senderID, func(mt *metaTx) bool { @@ -1177,8 +1179,10 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint toDel = append(toDel, mt) return true } - minFeeCap = cmp.Min(minFeeCap, mt.Tx.FeeCap) - mt.minFeeCap = minFeeCap + if minFeeCap.Cmp(&mt.Tx.FeeCap) < 0 { + *minFeeCap = mt.Tx.FeeCap + } + mt.minFeeCap = *minFeeCap if mt.Tx.Tip.IsUint64() { minTip = cmp.Min(minTip, mt.Tx.Tip.Uint64()) } @@ -1191,13 +1195,13 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint // Sender has enough balance for: gasLimit x feeCap + transferred_value needBalance := uint256.NewInt(mt.Tx.Gas) - needBalance.Mul(needBalance, uint256.NewInt(mt.Tx.FeeCap)) + needBalance.Mul(needBalance, &mt.Tx.FeeCap) needBalance.Add(needBalance, &mt.Tx.Value) // 1. Minimum fee requirement. Set to 1 if feeCap of the transaction is no less than in-protocol // parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means // this transaction will never be included into this particular chain. mt.subPool &^= EnoughFeeCapProtocol - if mt.minFeeCap >= protocolBaseFee { + if mt.minFeeCap.Cmp(uint256.NewInt(protocolBaseFee)) >= 0 { mt.subPool |= EnoughFeeCapProtocol } else { mt.subPool = 0 // TODO: we immediately drop all transactions if they have no first bit - then maybe we don't need this bit at all? And don't add such transactions to queue? @@ -1261,7 +1265,7 @@ func onSenderStateChange(senderID uint64, senderNonce uint64, senderBalance uint // being promoted to the pending or basefee pool, for re-broadcasting func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, discard func(*metaTx, DiscardReason)) { // Demote worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard - for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap < pendingBaseFee); worst = pending.Worst() { + for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) < 0); worst = pending.Worst() { if worst.subPool >= BaseFeePoolBits { baseFee.Add(pending.PopWorst()) } else if worst.subPool >= QueuedPoolBits { @@ -1272,7 +1276,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint } // Promote best transactions from base fee pool to pending pool while they qualify - for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap >= pendingBaseFee; best = baseFee.Best() { + for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() { pending.Add(baseFee.PopBest()) } @@ -1287,7 +1291,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint // Promote best transactions from the queued pool to either pending or base fee pool, while they qualify for best := queued.Best(); queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = queued.Best() { - if best.minFeeCap >= pendingBaseFee { + if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 { pending.Add(queued.PopBest()) } else { baseFee.Add(queued.PopBest()) @@ -2018,7 +2022,9 @@ func (s *bestSlice) Swap(i, j int) { s.ms[i], s.ms[j] = s.ms[j], s.ms[i] s.ms[i].bestIndex, s.ms[j].bestIndex = i, j } -func (s *bestSlice) Less(i, j int) bool { return s.ms[i].better(s.ms[j], s.pendingBaseFee) } +func (s *bestSlice) Less(i, j int) bool { + return s.ms[i].better(s.ms[j], *uint256.NewInt(s.pendingBaseFee)) +} func (s *bestSlice) UnsafeRemove(i *metaTx) { s.Swap(i.bestIndex, len(s.ms)-1) s.ms[len(s.ms)-1].bestIndex = -1 @@ -2172,13 +2178,13 @@ type BestQueue struct { pendingBastFee uint64 } -func (mt *metaTx) better(than *metaTx, pendingBaseFee uint64) bool { +func (mt *metaTx) better(than *metaTx, pendingBaseFee uint256.Int) bool { subPool := mt.subPool thanSubPool := than.subPool - if mt.minFeeCap >= pendingBaseFee { + if mt.minFeeCap.Cmp(&pendingBaseFee) >= 0 { subPool |= EnoughFeeCapBlock } - if than.minFeeCap >= pendingBaseFee { + if than.minFeeCap.Cmp(&pendingBaseFee) >= 0 { thanSubPool |= EnoughFeeCapBlock } if subPool != thanSubPool { @@ -2187,19 +2193,31 @@ func (mt *metaTx) better(than *metaTx, pendingBaseFee uint64) bool { switch mt.currentSubPool { case PendingSubPool: - var effectiveTip, thanEffectiveTip uint64 - if pendingBaseFee <= mt.minFeeCap { - effectiveTip = cmp.Min(mt.minFeeCap-pendingBaseFee, mt.minTip) + var effectiveTip, thanEffectiveTip uint256.Int + if mt.minFeeCap.Cmp(&pendingBaseFee) >= 0 { + difference := uint256.NewInt(0) + difference.Sub(&mt.minFeeCap, &pendingBaseFee) + if difference.Cmp(uint256.NewInt(mt.minTip)) <= 0 { + effectiveTip = *difference + } else { + effectiveTip = *uint256.NewInt(mt.minTip) + } } - if pendingBaseFee <= than.minFeeCap { - thanEffectiveTip = cmp.Min(than.minFeeCap-pendingBaseFee, than.minTip) + if than.minFeeCap.Cmp(&pendingBaseFee) >= 0 { + difference := uint256.NewInt(0) + difference.Sub(&than.minFeeCap, &pendingBaseFee) + if difference.Cmp(uint256.NewInt(than.minTip)) <= 0 { + thanEffectiveTip = *difference + } else { + thanEffectiveTip = *uint256.NewInt(than.minTip) + } } - if effectiveTip != thanEffectiveTip { - return effectiveTip > thanEffectiveTip + if effectiveTip.Cmp(&thanEffectiveTip) != 0 { + return effectiveTip.Cmp(&thanEffectiveTip) > 0 } case BaseFeeSubPool: - if mt.minFeeCap != than.minFeeCap { - return mt.minFeeCap > than.minFeeCap + if mt.minFeeCap.Cmp(&than.minFeeCap) != 0 { + return mt.minFeeCap.Cmp(&than.minFeeCap) > 0 } case QueuedSubPool: if mt.nonceDistance != than.nonceDistance { @@ -2212,13 +2230,13 @@ func (mt *metaTx) better(than *metaTx, pendingBaseFee uint64) bool { return mt.timestamp < than.timestamp } -func (mt *metaTx) worse(than *metaTx, pendingBaseFee uint64) bool { +func (mt *metaTx) worse(than *metaTx, pendingBaseFee uint256.Int) bool { subPool := mt.subPool thanSubPool := than.subPool - if mt.minFeeCap >= pendingBaseFee { + if mt.minFeeCap.Cmp(&pendingBaseFee) >= 0 { subPool |= EnoughFeeCapBlock } - if than.minFeeCap >= pendingBaseFee { + if than.minFeeCap.Cmp(&pendingBaseFee) >= 0 { thanSubPool |= EnoughFeeCapBlock } if subPool != thanSubPool { @@ -2228,7 +2246,7 @@ func (mt *metaTx) worse(than *metaTx, pendingBaseFee uint64) bool { switch mt.currentSubPool { case PendingSubPool: if mt.minFeeCap != than.minFeeCap { - return mt.minFeeCap < than.minFeeCap + return mt.minFeeCap.Cmp(&than.minFeeCap) < 0 } if mt.nonceDistance != than.nonceDistance { return mt.nonceDistance > than.nonceDistance @@ -2247,8 +2265,10 @@ func (mt *metaTx) worse(than *metaTx, pendingBaseFee uint64) bool { return mt.timestamp > than.timestamp } -func (p BestQueue) Len() int { return len(p.ms) } -func (p BestQueue) Less(i, j int) bool { return p.ms[i].better(p.ms[j], p.pendingBastFee) } +func (p BestQueue) Len() int { return len(p.ms) } +func (p BestQueue) Less(i, j int) bool { + return p.ms[i].better(p.ms[j], *uint256.NewInt(p.pendingBastFee)) +} func (p BestQueue) Swap(i, j int) { p.ms[i], p.ms[j] = p.ms[j], p.ms[i] p.ms[i].bestIndex = i @@ -2277,8 +2297,10 @@ type WorstQueue struct { pendingBaseFee uint64 } -func (p WorstQueue) Len() int { return len(p.ms) } -func (p WorstQueue) Less(i, j int) bool { return p.ms[i].worse(p.ms[j], p.pendingBaseFee) } +func (p WorstQueue) Len() int { return len(p.ms) } +func (p WorstQueue) Less(i, j int) bool { + return p.ms[i].worse(p.ms[j], *uint256.NewInt(p.pendingBaseFee)) +} func (p WorstQueue) Swap(i, j int) { p.ms[i], p.ms[j] = p.ms[j], p.ms[i] p.ms[i].worstIndex = i diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index ae17518ba..25c092599 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -202,7 +202,7 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []b Nonce: txNonce[i], Value: values[i%len(values)], Tip: *uint256.NewInt(tips[i%len(tips)]), - FeeCap: feeCap[i%len(feeCap)], + FeeCap: *uint256.NewInt(feeCap[i%len(feeCap)]), } txRlp := fakeRlpTx(txs.Txs[i], senders.At(i%senders.Len())) _, err := parseCtx.ParseTransaction(txRlp, 0, txs.Txs[i], nil, false, nil) @@ -219,7 +219,7 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []b // fakeRlpTx add anything what identifying tx to `data` to make hash unique func fakeRlpTx(slot *types.TxSlot, data []byte) []byte { dataLen := rlp.U64Len(1) + //chainID - rlp.U64Len(slot.Nonce) + rlp.U256Len(&slot.Tip) + rlp.U64Len(slot.FeeCap) + + rlp.U64Len(slot.Nonce) + rlp.U256Len(&slot.Tip) + rlp.U256Len(&slot.FeeCap) + rlp.U64Len(0) + // gas rlp.StringLen(0) + // dest addr rlp.U256Len(&slot.Value) + @@ -236,7 +236,9 @@ func fakeRlpTx(slot *types.TxSlot, data []byte) []byte { bb := bytes.NewBuffer(buf[p:p]) _ = slot.Tip.EncodeRLP(bb) p += rlp.U256Len(&slot.Tip) - p += rlp.EncodeU64(slot.FeeCap, buf[p:]) + bb = bytes.NewBuffer(buf[p:p]) + _ = slot.FeeCap.EncodeRLP(bb) + p += rlp.U256Len(&slot.FeeCap) p += rlp.EncodeU64(0, buf[p:]) //gas p += rlp.EncodeString([]byte{}, buf[p:]) //destrination addr bb = bytes.NewBuffer(buf[p:p]) diff --git a/txpool/pool_test.go b/txpool/pool_test.go index dcce6fd05..0a9474c7e 100644 --- a/txpool/pool_test.go +++ b/txpool/pool_test.go @@ -133,7 +133,7 @@ func TestNonceFromAddress(t *testing.T) { var txSlots types.TxSlots txSlot1 := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 3, } @@ -151,14 +151,14 @@ func TestNonceFromAddress(t *testing.T) { txSlots := types.TxSlots{} txSlot2 := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 4, } txSlot2.IDHash[0] = 2 txSlot3 := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 6, } @@ -179,7 +179,7 @@ func TestNonceFromAddress(t *testing.T) { var txSlots types.TxSlots txSlot1 := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 9 * common.Ether, + FeeCap: *uint256.NewInt(9 * common.Ether), Gas: 100000, Nonce: 3, } @@ -197,7 +197,7 @@ func TestNonceFromAddress(t *testing.T) { var txSlots types.TxSlots txSlot1 := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 1, } @@ -257,7 +257,7 @@ func TestReplaceWithHigherFee(t *testing.T) { var txSlots types.TxSlots txSlot := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 3, } @@ -275,7 +275,7 @@ func TestReplaceWithHigherFee(t *testing.T) { txSlots := types.TxSlots{} txSlot := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 3000000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 3, } @@ -295,7 +295,7 @@ func TestReplaceWithHigherFee(t *testing.T) { txSlots := types.TxSlots{} txSlot := &types.TxSlot{ Tip: *uint256.NewInt(3000000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 3, } @@ -315,7 +315,7 @@ func TestReplaceWithHigherFee(t *testing.T) { txSlots := types.TxSlots{} txSlot := &types.TxSlot{ Tip: *uint256.NewInt(330000), - FeeCap: 330000, + FeeCap: *uint256.NewInt(330000), Gas: 100000, Nonce: 3, } @@ -378,7 +378,7 @@ func TestReverseNonces(t *testing.T) { var txSlots types.TxSlots txSlot := &types.TxSlot{ Tip: *uint256.NewInt(500_000), - FeeCap: 3_000_000, + FeeCap: *uint256.NewInt(3_000_000), Gas: 100000, Nonce: 3, } @@ -405,7 +405,7 @@ func TestReverseNonces(t *testing.T) { var txSlots types.TxSlots txSlot := &types.TxSlot{ Tip: *uint256.NewInt(500_000), - FeeCap: 500_000, + FeeCap: *uint256.NewInt(500_000), Gas: 100000, Nonce: 2, } @@ -432,7 +432,7 @@ func TestReverseNonces(t *testing.T) { var txSlots types.TxSlots txSlot := &types.TxSlot{ Tip: *uint256.NewInt(600_000), - FeeCap: 3_000_000, + FeeCap: *uint256.NewInt(3_000_000), Gas: 100000, Nonce: 2, } @@ -507,7 +507,7 @@ func TestTxPoke(t *testing.T) { var txSlots types.TxSlots txSlot := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 2, } @@ -535,7 +535,7 @@ func TestTxPoke(t *testing.T) { txSlots := types.TxSlots{} txSlot := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 2, } @@ -564,7 +564,7 @@ func TestTxPoke(t *testing.T) { txSlots := types.TxSlots{} txSlot := &types.TxSlot{ Tip: *uint256.NewInt(3000000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 2, } @@ -594,7 +594,7 @@ func TestTxPoke(t *testing.T) { txSlots := types.TxSlots{} txSlot := &types.TxSlot{ Tip: *uint256.NewInt(300000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(300000), Gas: 100000, Nonce: 2, } @@ -616,7 +616,7 @@ func TestTxPoke(t *testing.T) { txSlots := types.TxSlots{} txSlot := &types.TxSlot{ Tip: *uint256.NewInt(3000000), - FeeCap: 300000, + FeeCap: *uint256.NewInt(3000000), Gas: 100000, Nonce: 2, } diff --git a/types/txn.go b/types/txn.go index 0583edcbf..02de12dfd 100644 --- a/types/txn.go +++ b/types/txn.go @@ -82,7 +82,7 @@ func NewTxParseContext(chainID uint256.Int) *TxParseContext { type TxSlot struct { Nonce uint64 // Nonce of the transaction Tip uint256.Int // Maximum tip that transaction is giving to miner/block proposer - FeeCap uint64 // Maximum fee that transaction burns and gives to the miner/block proposer + FeeCap uint256.Int // Maximum fee that transaction burns and gives to the miner/block proposer Gas uint64 // Gas limit of the transaction Value uint256.Int // Value transferred by the transaction IDHash [32]byte // Transaction hash for the purposes of using it as a transaction Id @@ -215,10 +215,10 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int, slot *TxSlo // Next follows feeCap, but only for dynamic fee transactions, for legacy transaction, it is // equal to tip if txType < DynamicFeeTxType { - slot.FeeCap = slot.Tip.Uint64() + slot.FeeCap = slot.Tip } else { // Although consensus rules specify that feeCap can be up to 256 bit long, we narrow it to 64 bit - p, slot.FeeCap, err = rlp.U64(payload, p) + p, err = rlp.U256(payload, p, &slot.FeeCap) if err != nil { return 0, fmt.Errorf("%w: feeCap: %s", ErrParseTxn, err) } From 183d2718ca6c4b20650fa419eaaa3ac03fc6ce6c Mon Sep 17 00:00:00 2001 From: Enrique Jose Avila Asapche Date: Sun, 25 Sep 2022 12:35:30 +0300 Subject: [PATCH 5/7] allow mem-mutation to create buckets (#650) --- kv/memdb/memory_mutation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kv/memdb/memory_mutation.go b/kv/memdb/memory_mutation.go index 370476a56..87928adea 100644 --- a/kv/memdb/memory_mutation.go +++ b/kv/memdb/memory_mutation.go @@ -274,7 +274,7 @@ func (m *MemoryMutation) CollectMetrics() { } func (m *MemoryMutation) CreateBucket(bucket string) error { - panic("Not implemented") + return m.memTx.CreateBucket(bucket) } func (m *MemoryMutation) Flush(tx kv.RwTx) error { From 417cea6485e91efc80c52877b49dd424aa15b41f Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 26 Sep 2022 09:42:44 +0700 Subject: [PATCH 6/7] erigon22: non-pointer btree (#653) --- aggregator/aggregator.go | 10 ++------- recsplit/index_test.go | 3 --- recsplit/recsplit_fuzz_test.go | 3 --- recsplit/recsplit_test.go | 17 +-------------- state/domain.go | 31 ++++++++++++--------------- state/history.go | 39 ++++++++++++++++------------------ state/inverted_index.go | 12 +++++------ state/merge.go | 5 +---- state/state_recon.go | 10 ++++----- 9 files changed, 47 insertions(+), 83 deletions(-) diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index a390aa1d9..7fae1cef4 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -575,10 +575,7 @@ func buildIndex(d *compress.Decompressor, idxPath, tmpDir string, count int) (*r BucketSize: 2000, LeafSize: 8, TmpDir: tmpDir, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, - IndexFile: idxPath, + IndexFile: idxPath, }); err != nil { return nil, err } @@ -1616,10 +1613,7 @@ func (a *Aggregator) reduceHistoryFiles(fType FileType, item *byEndBlockItem) er BucketSize: 2000, LeafSize: 8, TmpDir: a.diffDir, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, - IndexFile: idxPath, + IndexFile: idxPath, }); err != nil { return fmt.Errorf("reduceHistoryFiles NewRecSplit: %w", err) } diff --git a/recsplit/index_test.go b/recsplit/index_test.go index ab79c94fe..8e2c0723d 100644 --- a/recsplit/index_test.go +++ b/recsplit/index_test.go @@ -36,9 +36,6 @@ func TestReWriteIndex(t *testing.T) { TmpDir: tmpDir, IndexFile: indexFile, LeafSize: 8, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, }) if err != nil { t.Fatal(err) diff --git a/recsplit/recsplit_fuzz_test.go b/recsplit/recsplit_fuzz_test.go index 9e463b5b5..316cdeedf 100644 --- a/recsplit/recsplit_fuzz_test.go +++ b/recsplit/recsplit_fuzz_test.go @@ -56,9 +56,6 @@ func FuzzRecSplit(f *testing.F) { TmpDir: tmpDir, IndexFile: indexFile, LeafSize: 8, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, }) if err != nil { t.Fatal(err) diff --git a/recsplit/recsplit_test.go b/recsplit/recsplit_test.go index b187d6484..d4c85ac0a 100644 --- a/recsplit/recsplit_test.go +++ b/recsplit/recsplit_test.go @@ -31,9 +31,6 @@ func TestRecSplit2(t *testing.T) { TmpDir: tmpDir, IndexFile: filepath.Join(tmpDir, "index"), LeafSize: 8, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, }) if err != nil { t.Fatal(err) @@ -67,9 +64,6 @@ func TestRecSplitDuplicate(t *testing.T) { TmpDir: tmpDir, IndexFile: filepath.Join(tmpDir, "index"), LeafSize: 8, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, }) if err != nil { t.Fatal(err) @@ -94,9 +88,6 @@ func TestRecSplitLeafSizeTooLarge(t *testing.T) { TmpDir: tmpDir, IndexFile: filepath.Join(tmpDir, "index"), LeafSize: 64, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, }) if err == nil { t.Errorf("test is expected to fail, leaf size too large") @@ -113,9 +104,6 @@ func TestIndexLookup(t *testing.T) { TmpDir: tmpDir, IndexFile: indexFile, LeafSize: 8, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, }) if err != nil { t.Fatal(err) @@ -149,10 +137,7 @@ func TestTwoLayerIndex(t *testing.T) { TmpDir: tmpDir, IndexFile: indexFile, LeafSize: 8, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, - Enums: true, + Enums: true, }) if err != nil { t.Fatal(err) diff --git a/state/domain.go b/state/domain.go index 0aff797d0..ae221c906 100644 --- a/state/domain.go +++ b/state/domain.go @@ -363,7 +363,7 @@ type ctxItem struct { reader *recsplit.IndexReader } -func ctxItemLess(i, j *ctxItem) bool { +func ctxItemLess(i, j ctxItem) bool { if i.endTxNum == j.endTxNum { return i.startTxNum > j.startTxNum } @@ -373,17 +373,17 @@ func ctxItemLess(i, j *ctxItem) bool { // DomainContext allows accesing the same domain from multiple go-routines type DomainContext struct { d *Domain - files *btree.BTreeG[*ctxItem] + files *btree.BTreeG[ctxItem] hc *HistoryContext } func (d *Domain) MakeContext() *DomainContext { dc := &DomainContext{d: d} dc.hc = d.History.MakeContext() - bt := btree.NewG[*ctxItem](32, ctxItemLess) + bt := btree.NewG[ctxItem](32, ctxItemLess) dc.files = bt d.files.Ascend(func(item *filesItem) bool { - bt.ReplaceOrInsert(&ctxItem{ + bt.ReplaceOrInsert(ctxItem{ startTxNum: item.startTxNum, endTxNum: item.endTxNum, getter: item.decompressor.MakeGetter(), @@ -426,7 +426,7 @@ func (dc *DomainContext) IteratePrefix(prefix []byte, it func(k, v []byte)) erro } heap.Push(&cp, &CursorItem{t: DB_CURSOR, key: common.Copy(k), val: common.Copy(v), c: keysCursor, endTxNum: txNum, reverse: true}) } - dc.files.Ascend(func(item *ctxItem) bool { + dc.files.Ascend(func(item ctxItem) bool { if item.reader.Empty() { return true } @@ -684,10 +684,7 @@ func buildIndex(d *compress.Decompressor, idxPath, dir string, count int, values BucketSize: 2000, LeafSize: 8, TmpDir: dir, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, - IndexFile: idxPath, + IndexFile: idxPath, }); err != nil { return nil, fmt.Errorf("create recsplit: %w", err) } @@ -790,7 +787,7 @@ func (d *Domain) prune(step uint64, txFrom, txTo uint64) error { func (dc *DomainContext) readFromFiles(filekey []byte) ([]byte, bool) { var val []byte var found bool - dc.files.Descend(func(item *ctxItem) bool { + dc.files.Descend(func(item ctxItem) bool { if item.reader.Empty() { return true } @@ -820,12 +817,12 @@ func (dc *DomainContext) historyBeforeTxNum(key []byte, txNum uint64, roTx kv.Tx var foundStartTxNum uint64 var found bool var anyItem bool // Whether any filesItem has been looked at in the loop below - var topState *ctxItem - dc.files.AscendGreaterOrEqual(&search, func(i *ctxItem) bool { + var topState ctxItem + dc.files.AscendGreaterOrEqual(search, func(i ctxItem) bool { topState = i return false }) - dc.hc.indexFiles.AscendGreaterOrEqual(&search, func(item *ctxItem) bool { + dc.hc.indexFiles.AscendGreaterOrEqual(search, func(item ctxItem) bool { anyItem = true offset := item.reader.Lookup(key) g := item.getter @@ -852,7 +849,7 @@ func (dc *DomainContext) historyBeforeTxNum(key []byte, txNum uint64, roTx kv.Tx if anyItem { // If there were no changes but there were history files, the value can be obtained from value files var val []byte - dc.files.DescendLessOrEqual(topState, func(item *ctxItem) bool { + dc.files.DescendLessOrEqual(topState, func(item ctxItem) bool { if item.reader.Empty() { return true } @@ -913,11 +910,11 @@ func (dc *DomainContext) historyBeforeTxNum(key []byte, txNum uint64, roTx kv.Tx } var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], foundTxNum) - var historyItem *ctxItem + var historyItem ctxItem search.startTxNum = foundStartTxNum search.endTxNum = foundEndTxNum - historyItem, ok := dc.hc.historyFiles.Get(&search) - if !ok || historyItem == nil { + historyItem, ok := dc.hc.historyFiles.Get(search) + if !ok { return nil, false, fmt.Errorf("no %s file found for [%x]", dc.d.filenameBase, key) } offset := historyItem.reader.Lookup2(txKey[:], key) diff --git a/state/history.go b/state/history.go index 3200fa8d6..f64a92949 100644 --- a/state/history.go +++ b/state/history.go @@ -417,10 +417,7 @@ func (h *History) buildFiles(step uint64, collation HistoryCollation) (HistoryFi BucketSize: 2000, LeafSize: 8, TmpDir: h.dir, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, - IndexFile: historyIdxPath, + IndexFile: historyIdxPath, }); err != nil { return HistoryFiles{}, fmt.Errorf("create recsplit: %w", err) } @@ -579,16 +576,16 @@ func (h *History) pruneF(txFrom, txTo uint64, f func(txNum uint64, k, v []byte) type HistoryContext struct { h *History - indexFiles, historyFiles *btree.BTreeG[*ctxItem] + indexFiles, historyFiles *btree.BTreeG[ctxItem] tx kv.Tx } func (h *History) MakeContext() *HistoryContext { var hc = HistoryContext{h: h} - hc.indexFiles = btree.NewG[*ctxItem](32, ctxItemLess) + hc.indexFiles = btree.NewG[ctxItem](32, ctxItemLess) h.InvertedIndex.files.Ascend(func(item *filesItem) bool { - hc.indexFiles.ReplaceOrInsert(&ctxItem{ + hc.indexFiles.ReplaceOrInsert(ctxItem{ startTxNum: item.startTxNum, endTxNum: item.endTxNum, getter: item.decompressor.MakeGetter(), @@ -596,9 +593,9 @@ func (h *History) MakeContext() *HistoryContext { }) return true }) - hc.historyFiles = btree.NewG[*ctxItem](32, ctxItemLess) + hc.historyFiles = btree.NewG[ctxItem](32, ctxItemLess) h.files.Ascend(func(item *filesItem) bool { - hc.historyFiles.ReplaceOrInsert(&ctxItem{ + hc.historyFiles.ReplaceOrInsert(ctxItem{ startTxNum: item.startTxNum, endTxNum: item.endTxNum, getter: item.decompressor.MakeGetter(), @@ -617,7 +614,7 @@ func (hc *HistoryContext) GetNoState(key []byte, txNum uint64) ([]byte, bool, er var foundStartTxNum uint64 var found bool //hc.indexFiles.Ascend(func(item *ctxItem) bool { - hc.indexFiles.AscendGreaterOrEqual(&ctxItem{startTxNum: txNum, endTxNum: txNum}, func(item *ctxItem) bool { + hc.indexFiles.AscendGreaterOrEqual(ctxItem{startTxNum: txNum, endTxNum: txNum}, func(item ctxItem) bool { //fmt.Printf("ef item %d-%d, key %x\n", item.startTxNum, item.endTxNum, key) if item.reader.Empty() { return true @@ -641,12 +638,12 @@ func (hc *HistoryContext) GetNoState(key []byte, txNum uint64) ([]byte, bool, er return true }) if found { - var historyItem *ctxItem + var historyItem ctxItem var ok bool var search ctxItem search.startTxNum = foundStartTxNum search.endTxNum = foundEndTxNum - if historyItem, ok = hc.historyFiles.Get(&search); !ok { + if historyItem, ok = hc.historyFiles.Get(search); !ok { return nil, false, fmt.Errorf("no %s file found for [%x]", hc.h.filenameBase, key) } var txKey [8]byte @@ -676,12 +673,12 @@ func (hc *HistoryContext) GetNoStateWithRecent(key []byte, txNum uint64, roTx kv var foundStartTxNum uint64 var found bool var anyItem bool // Whether any filesItem has been looked at in the loop below - var topState *ctxItem - hc.historyFiles.AscendGreaterOrEqual(&search, func(i *ctxItem) bool { + var topState ctxItem + hc.historyFiles.AscendGreaterOrEqual(search, func(i ctxItem) bool { topState = i return false }) - hc.indexFiles.AscendGreaterOrEqual(&search, func(item *ctxItem) bool { + hc.indexFiles.AscendGreaterOrEqual(search, func(item ctxItem) bool { anyItem = true offset := item.reader.Lookup(key) g := item.getter @@ -707,11 +704,11 @@ func (hc *HistoryContext) GetNoStateWithRecent(key []byte, txNum uint64, roTx kv if found { var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], foundTxNum) - var historyItem *ctxItem + var historyItem ctxItem search.startTxNum = foundStartTxNum search.endTxNum = foundEndTxNum - historyItem, ok := hc.historyFiles.Get(&search) - if !ok || historyItem == nil { + historyItem, ok := hc.historyFiles.Get(search) + if !ok { return nil, false, fmt.Errorf("no %s file found for [%x]", hc.h.filenameBase, key) } offset := historyItem.reader.Lookup2(txKey[:], key) @@ -728,7 +725,7 @@ func (hc *HistoryContext) GetNoStateWithRecent(key []byte, txNum uint64, roTx kv if anyItem { // If there were no changes but there were history files, the value can be obtained from value files var val []byte - hc.historyFiles.DescendLessOrEqual(topState, func(item *ctxItem) bool { + hc.historyFiles.DescendLessOrEqual(topState, func(item ctxItem) bool { if item.reader.Empty() { return true } @@ -801,7 +798,7 @@ func (hc *HistoryContext) IterateChanged(startTxNum, endTxNum uint64, roTx kv.Tx valsTable: hc.h.historyValsTable, } - hc.indexFiles.Ascend(func(item *ctxItem) bool { + hc.indexFiles.Ascend(func(item ctxItem) bool { if item.endTxNum >= endTxNum { hi.hasNextInDb = false } @@ -896,7 +893,7 @@ func (hi *HistoryIterator1) advanceInFiles() { hi.nextFileKey = key binary.BigEndian.PutUint64(hi.txnKey[:], n) search := ctxItem{startTxNum: top.startTxNum, endTxNum: top.endTxNum} - historyItem, ok := hi.hc.historyFiles.Get(&search) + historyItem, ok := hi.hc.historyFiles.Get(search) if !ok { panic(fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.nextFileKey)) } diff --git a/state/inverted_index.go b/state/inverted_index.go index b7809cc29..1a7061167 100644 --- a/state/inverted_index.go +++ b/state/inverted_index.go @@ -203,9 +203,9 @@ func (ii *InvertedIndex) Add(key []byte) error { func (ii *InvertedIndex) MakeContext() *InvertedIndexContext { var ic = InvertedIndexContext{ii: ii} - ic.files = btree.NewG[*ctxItem](32, ctxItemLess) + ic.files = btree.NewG[ctxItem](32, ctxItemLess) ii.files.Ascend(func(item *filesItem) bool { - ic.files.ReplaceOrInsert(&ctxItem{ + ic.files.ReplaceOrInsert(ctxItem{ startTxNum: item.startTxNum, endTxNum: item.endTxNum, getter: item.decompressor.MakeGetter(), @@ -223,7 +223,7 @@ func (ii *InvertedIndex) MakeContext() *InvertedIndexContext { type InvertedIterator struct { key []byte startTxNum, endTxNum uint64 - stack []*ctxItem + stack []ctxItem efIt *eliasfano32.EliasFanoIter next uint64 hasNextInFiles bool @@ -340,7 +340,7 @@ func (it *InvertedIterator) Next() uint64 { type InvertedIndexContext struct { ii *InvertedIndex - files *btree.BTreeG[*ctxItem] + files *btree.BTreeG[ctxItem] } // IterateRange is to be used in public API, therefore it relies on read-only transaction @@ -358,7 +358,7 @@ func (ic *InvertedIndexContext) IterateRange(key []byte, startTxNum, endTxNum ui it.hasNextInDb = true search.startTxNum = 0 search.endTxNum = startTxNum - ic.files.DescendGreaterThan(&search, func(item *ctxItem) bool { + ic.files.DescendGreaterThan(search, func(item ctxItem) bool { if item.startTxNum < endTxNum { it.stack = append(it.stack, item) it.hasNextInFiles = true @@ -493,7 +493,7 @@ func (ic *InvertedIndexContext) IterateChangedKeys(startTxNum, endTxNum uint64, ii1.hasNextInDb = true ii1.roTx = roTx ii1.indexTable = ic.ii.indexTable - ic.files.AscendGreaterOrEqual(&ctxItem{endTxNum: startTxNum}, func(item *ctxItem) bool { + ic.files.AscendGreaterOrEqual(ctxItem{endTxNum: startTxNum}, func(item ctxItem) bool { if item.endTxNum >= endTxNum { ii1.hasNextInDb = false } diff --git a/state/merge.go b/state/merge.go index cc6346a33..5ff654a46 100644 --- a/state/merge.go +++ b/state/merge.go @@ -660,10 +660,7 @@ func (h *History) mergeFiles(indexFiles, historyFiles []*filesItem, r HistoryRan BucketSize: 2000, LeafSize: 8, TmpDir: h.dir, - StartSeed: []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, - 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, - 0x4ef95e25f4b4983d, 0x81175195173b92d3, 0x4e50927d8dd15978, 0x1ea2099d1fafae7f, 0x425c8a06fbaaa815, 0xcd4216006c74052a}, - IndexFile: idxPath, + IndexFile: idxPath, }); err != nil { return nil, nil, fmt.Errorf("create recsplit: %w", err) } diff --git a/state/state_recon.go b/state/state_recon.go index 47c6314e2..c56813050 100644 --- a/state/state_recon.go +++ b/state/state_recon.go @@ -31,7 +31,7 @@ import ( func (hc *HistoryContext) IsMaxTxNum(key []byte, txNum uint64) bool { var found bool var foundTxNum uint64 - hc.indexFiles.AscendGreaterOrEqual(&ctxItem{startTxNum: txNum, endTxNum: txNum}, func(item *ctxItem) bool { + hc.indexFiles.AscendGreaterOrEqual(ctxItem{startTxNum: txNum, endTxNum: txNum}, func(item ctxItem) bool { if item.endTxNum <= txNum { return true } @@ -167,7 +167,7 @@ func (si *ScanIterator) Total() uint64 { func (hc *HistoryContext) iterateReconTxs(fromKey, toKey []byte, uptoTxNum uint64) *ScanIterator { var si ScanIterator - hc.indexFiles.Ascend(func(item *ctxItem) bool { + hc.indexFiles.Ascend(func(item ctxItem) bool { g := item.getter for g.HasNext() { key, offset := g.NextUncompressed() @@ -220,12 +220,12 @@ func (hi *HistoryIterator) advance() { hi.key = key var txKey [8]byte binary.BigEndian.PutUint64(txKey[:], n) - var historyItem *ctxItem + var historyItem ctxItem var ok bool var search ctxItem search.startTxNum = top.startTxNum search.endTxNum = top.endTxNum - if historyItem, ok = hi.hc.historyFiles.Get(&search); !ok { + if historyItem, ok = hi.hc.historyFiles.Get(search); !ok { panic(fmt.Errorf("no %s file found for [%x]", hi.hc.h.filenameBase, hi.key)) } offset := historyItem.reader.Lookup2(txKey[:], hi.key) @@ -262,7 +262,7 @@ func (hi *HistoryIterator) Total() uint64 { func (hc *HistoryContext) iterateHistoryBeforeTxNum(fromKey, toKey []byte, txNum uint64) *HistoryIterator { var hi HistoryIterator heap.Init(&hi.h) - hc.indexFiles.Ascend(func(item *ctxItem) bool { + hc.indexFiles.Ascend(func(item ctxItem) bool { g := item.getter g.Reset(0) for g.HasNext() { From 7790688724b6054dc2c4b9bbf8eae70acd702003 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 26 Sep 2022 15:26:58 +0700 Subject: [PATCH 7/7] erigon3: build .efi after download #654 --- recsplit/recsplit.go | 2 +- state/aggregator22.go | 49 +++++++++++++++++++++++++++++ state/history.go | 14 +++++++-- state/inverted_index.go | 68 +++++++++++++++++++++++++++++++++-------- 4 files changed, 118 insertions(+), 15 deletions(-) diff --git a/recsplit/recsplit.go b/recsplit/recsplit.go index 80d5037fc..b99ad8456 100644 --- a/recsplit/recsplit.go +++ b/recsplit/recsplit.go @@ -127,7 +127,7 @@ type RecSplitArgs struct { // are likely to use different hash function, to collision attacks are unlikely to slow down any meaningful number of nodes at the same time func NewRecSplit(args RecSplitArgs) (*RecSplit, error) { bucketCount := (args.KeyCount + args.BucketSize - 1) / args.BucketSize - rs := &RecSplit{bucketSize: args.BucketSize, keyExpectedCount: uint64(args.KeyCount), bucketCount: uint64(bucketCount)} + rs := &RecSplit{bucketSize: args.BucketSize, keyExpectedCount: uint64(args.KeyCount), bucketCount: uint64(bucketCount), lvl: log.LvlDebug} if len(args.StartSeed) == 0 { args.StartSeed = []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73, 0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d, diff --git a/state/aggregator22.go b/state/aggregator22.go index eaa583f04..1dfd5324e 100644 --- a/state/aggregator22.go +++ b/state/aggregator22.go @@ -79,6 +79,16 @@ func (a *Aggregator22) Close() { a.closeFiles() } +func (a *Aggregator22) SetWorkers(i int) { + a.accounts.workers = i + a.storage.workers = i + a.code.workers = i + a.logAddrs.workers = i + a.logTopics.workers = i + a.tracesFrom.workers = i + a.tracesTo.workers = i +} + func (a *Aggregator22) Files() (res []string) { res = append(res, a.accounts.Files()...) res = append(res, a.storage.Files()...) @@ -114,6 +124,45 @@ func (a *Aggregator22) closeFiles() { } } +func (a *Aggregator22) BuildMissedIndices() error { + if a.accounts != nil { + if err := a.accounts.BuildMissedIndices(); err != nil { + return err + } + } + if a.storage != nil { + if err := a.storage.BuildMissedIndices(); err != nil { + return err + } + } + if a.code != nil { + if err := a.code.BuildMissedIndices(); err != nil { + return err + } + } + if a.logAddrs != nil { + if err := a.logAddrs.BuildMissedIndices(); err != nil { + return err + } + } + if a.logTopics != nil { + if err := a.logTopics.BuildMissedIndices(); err != nil { + return err + } + } + if a.tracesFrom != nil { + if err := a.tracesFrom.BuildMissedIndices(); err != nil { + return err + } + } + if a.tracesTo != nil { + if err := a.tracesTo.BuildMissedIndices(); err != nil { + return err + } + } + return nil +} + func (a *Aggregator22) SetLogPrefix(v string) { a.logPrefix = v } func (a *Aggregator22) SetTx(tx kv.RwTx) { diff --git a/state/history.go b/state/history.go index f64a92949..3e58305be 100644 --- a/state/history.go +++ b/state/history.go @@ -45,6 +45,7 @@ type History struct { files *btree.BTreeG[*filesItem] compressVals bool + workers int } func NewHistory( @@ -62,6 +63,7 @@ func NewHistory( historyValsTable: historyValsTable, settingsTable: settingsTable, compressVals: compressVals, + workers: 1, } var err error h.InvertedIndex, err = NewInvertedIndex(dir, aggregationStep, filenameBase, indexKeysTable, indexTable) @@ -178,6 +180,14 @@ func (h *History) Files() (res []string) { return res } +func (h *History) BuildMissedIndices() (err error) { + if err := h.InvertedIndex.BuildMissedIndices(); err != nil { + return err + } + //TODO: build .vi + return nil +} + func (h *History) AddPrevValue(key1, key2, original []byte) error { lk := len(key1) + len(key2) historyKey := make([]byte, lk+8) @@ -234,7 +244,7 @@ func (h *History) collate(step, txFrom, txTo uint64, roTx kv.Tx) (HistoryCollati } }() historyPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, step, step+1)) - if historyComp, err = compress.NewCompressor(context.Background(), "collate history", historyPath, h.dir, compress.MinPatternScore, 1, log.LvlDebug); err != nil { + if historyComp, err = compress.NewCompressor(context.Background(), "collate history", historyPath, h.dir, compress.MinPatternScore, h.workers, log.LvlDebug); err != nil { return HistoryCollation{}, fmt.Errorf("create %s history compressor: %w", h.filenameBase, err) } keysCursor, err := roTx.CursorDupSort(h.indexKeysTable) @@ -372,7 +382,7 @@ func (h *History) buildFiles(step uint64, collation HistoryCollation) (HistoryFi } // Build history ef efHistoryPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.ef", h.filenameBase, step, step+1)) - efHistoryComp, err = compress.NewCompressor(context.Background(), "ef history", efHistoryPath, h.dir, compress.MinPatternScore, 1, log.LvlDebug) + efHistoryComp, err = compress.NewCompressor(context.Background(), "ef history", efHistoryPath, h.dir, compress.MinPatternScore, h.workers, log.LvlDebug) if err != nil { return HistoryFiles{}, fmt.Errorf("create %s ef history compressor: %w", h.filenameBase, err) } diff --git a/state/inverted_index.go b/state/inverted_index.go index 1a7061167..b52b63b83 100644 --- a/state/inverted_index.go +++ b/state/inverted_index.go @@ -27,6 +27,7 @@ import ( "path/filepath" "regexp" "strconv" + "strings" "github.com/RoaringBitmap/roaring/roaring64" "github.com/google/btree" @@ -50,6 +51,8 @@ type InvertedIndex struct { txNum uint64 txNumBytes [8]byte files *btree.BTreeG[*filesItem] + + workers int } func NewInvertedIndex( @@ -66,6 +69,7 @@ func NewInvertedIndex( filenameBase: filenameBase, indexKeysTable: indexKeysTable, indexTable: indexTable, + workers: 1, } files, err := os.ReadDir(dir) @@ -123,24 +127,64 @@ func (ii *InvertedIndex) scanStateFiles(files []fs.DirEntry) { } } +func (ii *InvertedIndex) BuildMissedIndices() (err error) { + var missedIndices []uint64 + ii.files.Ascend(func(item *filesItem) bool { // don't run slow logic while iterating on btree + fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep + idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)) + if !dir.Exist(idxPath) { + missedIndices = append(missedIndices, fromStep, toStep) + } + return true + }) + if len(missedIndices) == 0 { + return nil + } + var logItems []string + for i := 0; i < len(missedIndices); i += 2 { + fromStep, toStep := missedIndices[i], missedIndices[i+1] + logItems = append(logItems, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)) + } + log.Info("[snapshots] BuildMissedIndices", "files", strings.Join(logItems, ",")) + + for i := 0; i < len(missedIndices); i += 2 { + fromStep, toStep := missedIndices[i], missedIndices[i+1] + idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)) + if dir.Exist(idxPath) { + return nil + } + item, ok := ii.files.Get(&filesItem{startTxNum: fromStep * ii.aggregationStep, endTxNum: toStep * ii.aggregationStep}) + if !ok { + return nil + } + if _, err := buildIndex(item.decompressor, idxPath, ii.dir, item.decompressor.Count()/2, false /* values */); err != nil { + return err + } + } + return ii.openFiles() +} + func (ii *InvertedIndex) openFiles() error { var err error var totalKeys uint64 ii.files.Ascend(func(item *filesItem) bool { - datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep)) - if item.decompressor, err = compress.NewDecompressor(datPath); err != nil { - log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath) - return false - } - idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep)) - if !dir.Exist(idxPath) { - if _, err = buildIndex(item.decompressor, idxPath, ii.dir, item.decompressor.Count()/2, false /* values */); err != nil { + fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep + if item.decompressor == nil { + datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, fromStep, toStep)) + if item.decompressor, err = compress.NewDecompressor(datPath); err != nil { + log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath) return false } } - if item.index, err = recsplit.OpenIndex(idxPath); err != nil { - log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath) - return false + if item.index == nil { + idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep)) + if !dir.Exist(idxPath) { + return false + } + if item.index, err = recsplit.OpenIndex(idxPath); err != nil { + log.Debug("InvertedIndex.openFiles: %w, %s", err, idxPath) + return false + } } totalKeys += item.index.KeyCount() return true @@ -585,7 +629,7 @@ func (ii *InvertedIndex) buildFiles(step uint64, bitmaps map[string]*roaring64.B txNumFrom := step * ii.aggregationStep txNumTo := (step + 1) * ii.aggregationStep datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, txNumFrom/ii.aggregationStep, txNumTo/ii.aggregationStep)) - comp, err = compress.NewCompressor(context.Background(), "ef", datPath, ii.dir, compress.MinPatternScore, 1, log.LvlDebug) + comp, err = compress.NewCompressor(context.Background(), "ef", datPath, ii.dir, compress.MinPatternScore, ii.workers, log.LvlDebug) if err != nil { return InvertedFiles{}, fmt.Errorf("create %s compressor: %w", ii.filenameBase, err) }