Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
erigon3: build .efi after download #654
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Sep 26, 2022
1 parent 417cea6 commit 7790688
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 15 deletions.
2 changes: 1 addition & 1 deletion recsplit/recsplit.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type RecSplitArgs struct {
// are likely to use different hash function, to collision attacks are unlikely to slow down any meaningful number of nodes at the same time
func NewRecSplit(args RecSplitArgs) (*RecSplit, error) {
bucketCount := (args.KeyCount + args.BucketSize - 1) / args.BucketSize
rs := &RecSplit{bucketSize: args.BucketSize, keyExpectedCount: uint64(args.KeyCount), bucketCount: uint64(bucketCount)}
rs := &RecSplit{bucketSize: args.BucketSize, keyExpectedCount: uint64(args.KeyCount), bucketCount: uint64(bucketCount), lvl: log.LvlDebug}
if len(args.StartSeed) == 0 {
args.StartSeed = []uint64{0x106393c187cae21a, 0x6453cec3f7376937, 0x643e521ddbd2be98, 0x3740c6412f6572cb, 0x717d47562f1ce470, 0x4cd6eb4c63befb7c, 0x9bfd8c5e18c8da73,
0x082f20e10092a9a3, 0x2ada2ce68d21defc, 0xe33cb4f3e7c6466b, 0x3980be458c509c59, 0xc466fd9584828e8c, 0x45f0aabe1a61ede6, 0xf6e7b8b33ad9b98d,
Expand Down
49 changes: 49 additions & 0 deletions state/aggregator22.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ func (a *Aggregator22) Close() {
a.closeFiles()
}

func (a *Aggregator22) SetWorkers(i int) {
a.accounts.workers = i
a.storage.workers = i
a.code.workers = i
a.logAddrs.workers = i
a.logTopics.workers = i
a.tracesFrom.workers = i
a.tracesTo.workers = i
}

func (a *Aggregator22) Files() (res []string) {
res = append(res, a.accounts.Files()...)
res = append(res, a.storage.Files()...)
Expand Down Expand Up @@ -114,6 +124,45 @@ func (a *Aggregator22) closeFiles() {
}
}

func (a *Aggregator22) BuildMissedIndices() error {
if a.accounts != nil {
if err := a.accounts.BuildMissedIndices(); err != nil {
return err
}
}
if a.storage != nil {
if err := a.storage.BuildMissedIndices(); err != nil {
return err
}
}
if a.code != nil {
if err := a.code.BuildMissedIndices(); err != nil {
return err
}
}
if a.logAddrs != nil {
if err := a.logAddrs.BuildMissedIndices(); err != nil {
return err
}
}
if a.logTopics != nil {
if err := a.logTopics.BuildMissedIndices(); err != nil {
return err
}
}
if a.tracesFrom != nil {
if err := a.tracesFrom.BuildMissedIndices(); err != nil {
return err
}
}
if a.tracesTo != nil {
if err := a.tracesTo.BuildMissedIndices(); err != nil {
return err
}
}
return nil
}

func (a *Aggregator22) SetLogPrefix(v string) { a.logPrefix = v }

func (a *Aggregator22) SetTx(tx kv.RwTx) {
Expand Down
14 changes: 12 additions & 2 deletions state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type History struct {

files *btree.BTreeG[*filesItem]
compressVals bool
workers int
}

func NewHistory(
Expand All @@ -62,6 +63,7 @@ func NewHistory(
historyValsTable: historyValsTable,
settingsTable: settingsTable,
compressVals: compressVals,
workers: 1,
}
var err error
h.InvertedIndex, err = NewInvertedIndex(dir, aggregationStep, filenameBase, indexKeysTable, indexTable)
Expand Down Expand Up @@ -178,6 +180,14 @@ func (h *History) Files() (res []string) {
return res
}

func (h *History) BuildMissedIndices() (err error) {
if err := h.InvertedIndex.BuildMissedIndices(); err != nil {
return err
}
//TODO: build .vi
return nil
}

func (h *History) AddPrevValue(key1, key2, original []byte) error {
lk := len(key1) + len(key2)
historyKey := make([]byte, lk+8)
Expand Down Expand Up @@ -234,7 +244,7 @@ func (h *History) collate(step, txFrom, txTo uint64, roTx kv.Tx) (HistoryCollati
}
}()
historyPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.v", h.filenameBase, step, step+1))
if historyComp, err = compress.NewCompressor(context.Background(), "collate history", historyPath, h.dir, compress.MinPatternScore, 1, log.LvlDebug); err != nil {
if historyComp, err = compress.NewCompressor(context.Background(), "collate history", historyPath, h.dir, compress.MinPatternScore, h.workers, log.LvlDebug); err != nil {
return HistoryCollation{}, fmt.Errorf("create %s history compressor: %w", h.filenameBase, err)
}
keysCursor, err := roTx.CursorDupSort(h.indexKeysTable)
Expand Down Expand Up @@ -372,7 +382,7 @@ func (h *History) buildFiles(step uint64, collation HistoryCollation) (HistoryFi
}
// Build history ef
efHistoryPath := filepath.Join(h.dir, fmt.Sprintf("%s.%d-%d.ef", h.filenameBase, step, step+1))
efHistoryComp, err = compress.NewCompressor(context.Background(), "ef history", efHistoryPath, h.dir, compress.MinPatternScore, 1, log.LvlDebug)
efHistoryComp, err = compress.NewCompressor(context.Background(), "ef history", efHistoryPath, h.dir, compress.MinPatternScore, h.workers, log.LvlDebug)
if err != nil {
return HistoryFiles{}, fmt.Errorf("create %s ef history compressor: %w", h.filenameBase, err)
}
Expand Down
68 changes: 56 additions & 12 deletions state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"path/filepath"
"regexp"
"strconv"
"strings"

"github.com/RoaringBitmap/roaring/roaring64"
"github.com/google/btree"
Expand All @@ -50,6 +51,8 @@ type InvertedIndex struct {
txNum uint64
txNumBytes [8]byte
files *btree.BTreeG[*filesItem]

workers int
}

func NewInvertedIndex(
Expand All @@ -66,6 +69,7 @@ func NewInvertedIndex(
filenameBase: filenameBase,
indexKeysTable: indexKeysTable,
indexTable: indexTable,
workers: 1,
}

files, err := os.ReadDir(dir)
Expand Down Expand Up @@ -123,24 +127,64 @@ func (ii *InvertedIndex) scanStateFiles(files []fs.DirEntry) {
}
}

func (ii *InvertedIndex) BuildMissedIndices() (err error) {
var missedIndices []uint64
ii.files.Ascend(func(item *filesItem) bool { // don't run slow logic while iterating on btree
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))
if !dir.Exist(idxPath) {
missedIndices = append(missedIndices, fromStep, toStep)
}
return true
})
if len(missedIndices) == 0 {
return nil
}
var logItems []string
for i := 0; i < len(missedIndices); i += 2 {
fromStep, toStep := missedIndices[i], missedIndices[i+1]
logItems = append(logItems, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))
}
log.Info("[snapshots] BuildMissedIndices", "files", strings.Join(logItems, ","))

for i := 0; i < len(missedIndices); i += 2 {
fromStep, toStep := missedIndices[i], missedIndices[i+1]
idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))
if dir.Exist(idxPath) {
return nil
}
item, ok := ii.files.Get(&filesItem{startTxNum: fromStep * ii.aggregationStep, endTxNum: toStep * ii.aggregationStep})
if !ok {
return nil
}
if _, err := buildIndex(item.decompressor, idxPath, ii.dir, item.decompressor.Count()/2, false /* values */); err != nil {
return err
}
}
return ii.openFiles()
}

func (ii *InvertedIndex) openFiles() error {
var err error
var totalKeys uint64
ii.files.Ascend(func(item *filesItem) bool {
datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep))
if item.decompressor, err = compress.NewDecompressor(datPath); err != nil {
log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath)
return false
}
idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep))
if !dir.Exist(idxPath) {
if _, err = buildIndex(item.decompressor, idxPath, ii.dir, item.decompressor.Count()/2, false /* values */); err != nil {
fromStep, toStep := item.startTxNum/ii.aggregationStep, item.endTxNum/ii.aggregationStep
if item.decompressor == nil {
datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, fromStep, toStep))
if item.decompressor, err = compress.NewDecompressor(datPath); err != nil {
log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath)
return false
}
}
if item.index, err = recsplit.OpenIndex(idxPath); err != nil {
log.Debug("InvertedIndex.openFiles: %w, %s", err, datPath)
return false
if item.index == nil {
idxPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.efi", ii.filenameBase, fromStep, toStep))
if !dir.Exist(idxPath) {
return false
}
if item.index, err = recsplit.OpenIndex(idxPath); err != nil {
log.Debug("InvertedIndex.openFiles: %w, %s", err, idxPath)
return false
}
}
totalKeys += item.index.KeyCount()
return true
Expand Down Expand Up @@ -585,7 +629,7 @@ func (ii *InvertedIndex) buildFiles(step uint64, bitmaps map[string]*roaring64.B
txNumFrom := step * ii.aggregationStep
txNumTo := (step + 1) * ii.aggregationStep
datPath := filepath.Join(ii.dir, fmt.Sprintf("%s.%d-%d.ef", ii.filenameBase, txNumFrom/ii.aggregationStep, txNumTo/ii.aggregationStep))
comp, err = compress.NewCompressor(context.Background(), "ef", datPath, ii.dir, compress.MinPatternScore, 1, log.LvlDebug)
comp, err = compress.NewCompressor(context.Background(), "ef", datPath, ii.dir, compress.MinPatternScore, ii.workers, log.LvlDebug)
if err != nil {
return InvertedFiles{}, fmt.Errorf("create %s compressor: %w", ii.filenameBase, err)
}
Expand Down

0 comments on commit 7790688

Please sign in to comment.