diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 908c5a9e340..13efdf24797 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -56,7 +56,7 @@ import ( type Aggregator struct { db kv.RoDB d [kv.DomainLen]*Domain - iis map[kv.InvertedIdx]*InvertedIndex + iis []*InvertedIndex dirs datadir.Dirs tmpdir string aggregationStep uint64 @@ -141,7 +141,6 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6 logger: logger, collateAndBuildWorkers: 1, mergeWorkers: 1, - iis: make(map[kv.InvertedIdx]*InvertedIndex), commitmentValuesTransform: AggregatorSqueezeCommitmentValues, @@ -215,17 +214,18 @@ func (a *Aggregator) registerII(idx kv.InvertedIdx, salt *uint32, dirs datadir.D keysTable: indexKeysTable, valuesTable: indexTable, compression: seg.CompressNone, + name: idx, } - if _, ok := a.iis[idx]; ok { + if ii := a.searchII(idx); ii != nil { return fmt.Errorf("inverted index %s already registered", idx) } - var err error - a.iis[idx], err = NewInvertedIndex(idxCfg, logger) + ii, err := NewInvertedIndex(idxCfg, logger) if err != nil { return err } + a.iis = append(a.iis, ii) return nil } @@ -493,13 +493,7 @@ func (c AggV3Collation) Close() { type AggV3StaticFiles struct { d [kv.DomainLen]StaticFiles - ivfs map[kv.InvertedIdx]InvertedFiles -} - -func NewAggV3StaticFiles() *AggV3StaticFiles { - return &AggV3StaticFiles{ - ivfs: make(map[kv.InvertedIdx]InvertedFiles), - } + ivfs []InvertedFiles } // CleanupOnError - call it on collation fail. It's closing all files @@ -521,7 +515,7 @@ func (a *Aggregator) buildFiles(ctx context.Context, step uint64) error { txTo = a.FirstTxNumOfStep(step + 1) stepStartedAt = time.Now() - static = NewAggV3StaticFiles() + static = &AggV3StaticFiles{ivfs: make([]InvertedFiles, len(a.iis))} closeCollations = true collListMu = sync.Mutex{} collations = make([]Collation, 0) @@ -762,9 +756,21 @@ func (a *Aggregator) DomainTables(domains ...kv.Domain) (tables []string) { } func (a *Aggregator) InvertedIndexTables(indices ...kv.InvertedIdx) (tables []string) { for _, idx := range indices { - tables = append(tables, a.iis[idx].Tables()...) + if ii := a.searchII(idx); ii != nil { + tables = append(tables, ii.Tables()...) + } } - return tables + + return +} + +func (a *Aggregator) searchII(name kv.InvertedIdx) *InvertedIndex { + for _, ii := range a.iis { + if ii.name == name { + return ii + } + } + return nil } type flusher interface { @@ -1071,7 +1077,7 @@ func (ac *AggregatorRoTx) Prune(ctx context.Context, tx kv.RwTx, limit uint64, l } } - stats := make(map[kv.InvertedIdx]*InvertedIndexPruneStat, len(ac.a.iis)) + stats := make([]*InvertedIndexPruneStat, len(ac.a.iis)) for iikey := range ac.a.iis { stat, err := ac.iis[iikey].Prune(ctx, tx, txFrom, txTo, limit, logEvery, false, nil) if err != nil { @@ -1079,7 +1085,7 @@ func (ac *AggregatorRoTx) Prune(ctx context.Context, tx kv.RwTx, limit uint64, l } stats[iikey] = stat } - for iikey, _ := range ac.a.iis { + for iikey := range ac.a.iis { aggStat.Indices[ac.iis[iikey].ii.filenameBase] = stats[iikey] } @@ -1230,18 +1236,12 @@ func (a *Aggregator) recalcVisibleFilesMinimaxTxNum() { type RangesV3 struct { domain [kv.DomainLen]DomainRanges - invertedIndex map[kv.InvertedIdx]*MergeRange -} - -func NewRangesV3() *RangesV3 { - return &RangesV3{ - invertedIndex: make(map[kv.InvertedIdx]*MergeRange), - } + invertedIndex []*MergeRange } func (r RangesV3) String() string { ss := []string{} - for _, d := range r.domain { + for _, d := range &r.domain { if d.any() { ss = append(ss, fmt.Sprintf("%s(%s)", d.name, d.String())) } @@ -1250,14 +1250,14 @@ func (r RangesV3) String() string { aggStep := r.domain[kv.AccountsDomain].aggStep for p, mr := range r.invertedIndex { if mr != nil && mr.needMerge { - ss = append(ss, mr.String(string(p), aggStep)) + ss = append(ss, mr.String(fmt.Sprintf("idx%d", p), aggStep)) } } return strings.Join(ss, ", ") } func (r RangesV3) any() bool { - for _, d := range r.domain { + for _, d := range &r.domain { if d.any() { return true } @@ -1271,7 +1271,7 @@ func (r RangesV3) any() bool { } func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *RangesV3 { - r := NewRangesV3() + r := &RangesV3{invertedIndex: make([]*MergeRange, len(ac.a.iis))} if ac.a.commitmentValuesTransform { lmrAcc := ac.d[kv.AccountsDomain].files.LatestMergedRange() lmrSto := ac.d[kv.StorageDomain].files.LatestMergedRange() @@ -1292,7 +1292,7 @@ func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *RangesV3 cr := r.domain[kv.CommitmentDomain] restorePrevRange := false - for k, dr := range r.domain { + for k, dr := range &r.domain { kd := kv.Domain(k) if kd == kv.CommitmentDomain || cr.values.Equal(&dr.values) { continue @@ -1311,7 +1311,7 @@ func (ac *AggregatorRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *RangesV3 } } if restorePrevRange { - for k, dr := range r.domain { + for k, dr := range &r.domain { r.domain[k].values = MergeRange{} ac.a.logger.Debug("findMergeRange: commitment range is different than accounts or storage, cancel kv merge", ac.d[k].d.filenameBase, dr.values.String("", ac.a.StepSize())) @@ -1333,7 +1333,7 @@ func (ac *AggregatorRoTx) RestrictSubsetFileDeletions(b bool) { } func (ac *AggregatorRoTx) mergeFiles(ctx context.Context, files *SelectedStaticFilesV3, r *RangesV3) (*MergedFilesV3, error) { - mf := NewMergedFilesV3() + mf := &MergedFilesV3{iis: make([]*filesItem, len(ac.a.iis))} g, ctx := errgroup.WithContext(ctx) g.SetLimit(ac.a.mergeWorkers) closeFiles := true @@ -1570,10 +1570,9 @@ func (ac *AggregatorRoTx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs return ac.d[kv.ReceiptDomain].ht.IdxRange(k, fromTs, toTs, asc, limit, tx) default: // check the ii - if v, ok := ac.iis[name]; ok { - return v.IdxRange(k, fromTs, toTs, asc, limit, tx) + if ii := ac.searchII(name); ii != nil { + return ii.IdxRange(k, fromTs, toTs, asc, limit, tx) } - return nil, fmt.Errorf("unexpected history name: %s", name) } } @@ -1622,7 +1621,7 @@ func (ac *AggregatorRoTx) nastyFileRead(name kv.Domain, from, to uint64) (*seg.R type AggregatorRoTx struct { a *Aggregator d [kv.DomainLen]*DomainRoTx - iis map[kv.InvertedIdx]*InvertedIndexRoTx + iis []*InvertedIndexRoTx id uint64 // auto-increment id of ctx for logs _leakID uint64 // set only if TRACE_AGG=true @@ -1633,7 +1632,7 @@ func (a *Aggregator) BeginFilesRo() *AggregatorRoTx { a: a, id: a.aggRoTxAutoIncrement.Add(1), _leakID: a.leakDetector.Add(), - iis: make(map[kv.InvertedIdx]*InvertedIndexRoTx, len(a.iis)), + iis: make([]*InvertedIndexRoTx, len(a.iis)), } a.visibleFilesLock.RLock() diff --git a/erigon-lib/state/aggregator_files.go b/erigon-lib/state/aggregator_files.go index d397a3ef3bb..b129d87cac1 100644 --- a/erigon-lib/state/aggregator_files.go +++ b/erigon-lib/state/aggregator_files.go @@ -24,11 +24,7 @@ type SelectedStaticFilesV3 struct { d [kv.DomainLen][]*filesItem dHist [kv.DomainLen][]*filesItem dIdx [kv.DomainLen][]*filesItem - ii map[kv.InvertedIdx][]*filesItem -} - -func NewSelectedStaticFilesV3() *SelectedStaticFilesV3 { - return &SelectedStaticFilesV3{ii: make(map[kv.InvertedIdx][]*filesItem)} + ii [][]*filesItem } func (sf SelectedStaticFilesV3) Close() { @@ -37,9 +33,7 @@ func (sf SelectedStaticFilesV3) Close() { clist = append(clist, sf.d[id], sf.dIdx[id], sf.dHist[id]) } - for _, i := range sf.ii { - clist = append(clist, i) - } + clist = append(clist, sf.ii...) for _, group := range clist { for _, item := range group { if item != nil { @@ -55,7 +49,7 @@ func (sf SelectedStaticFilesV3) Close() { } func (ac *AggregatorRoTx) staticFilesInRange(r *RangesV3) (*SelectedStaticFilesV3, error) { - sf := NewSelectedStaticFilesV3() + sf := &SelectedStaticFilesV3{ii: make([][]*filesItem, len(r.invertedIndex))} for id := range ac.d { if !r.domain[id].any() { continue @@ -75,11 +69,7 @@ type MergedFilesV3 struct { d [kv.DomainLen]*filesItem dHist [kv.DomainLen]*filesItem dIdx [kv.DomainLen]*filesItem - iis map[kv.InvertedIdx]*filesItem -} - -func NewMergedFilesV3() *MergedFilesV3 { - return &MergedFilesV3{iis: make(map[kv.InvertedIdx]*filesItem)} + iis []*filesItem } func (mf MergedFilesV3) FrozenList() (frozen []string) { @@ -109,10 +99,7 @@ func (mf MergedFilesV3) Close() { for id := range mf.d { clist = append(clist, mf.d[id], mf.dHist[id], mf.dIdx[id]) } - for _, ii := range mf.iis { - clist = append(clist, ii) - } - + clist = append(clist, mf.iis...) for _, item := range clist { if item != nil { if item.decompressor != nil { diff --git a/erigon-lib/state/domain_shared.go b/erigon-lib/state/domain_shared.go index 557bb60e607..ff3195eca21 100644 --- a/erigon-lib/state/domain_shared.go +++ b/erigon-lib/state/domain_shared.go @@ -98,7 +98,7 @@ type SharedDomains struct { storage *btree2.Map[string, dataWithPrevStep] domainWriters [kv.DomainLen]*domainBufferedWriter - iiWriters map[kv.InvertedIdx]*invertedIndexBufferedWriter + iiWriters []*invertedIndexBufferedWriter currentChangesAccumulator *StateChangeSet pastChangesAccumulator map[string]*StateChangeSet @@ -114,12 +114,12 @@ type HasAgg interface { func NewSharedDomains(tx kv.Tx, logger log.Logger) (*SharedDomains, error) { sd := &SharedDomains{ - logger: logger, - storage: btree2.NewMap[string, dataWithPrevStep](128), - iiWriters: map[kv.InvertedIdx]*invertedIndexBufferedWriter{}, + logger: logger, + storage: btree2.NewMap[string, dataWithPrevStep](128), //trace: true, } sd.SetTx(tx) + sd.iiWriters = make([]*invertedIndexBufferedWriter, len(sd.aggTx.iis)) sd.aggTx.a.DiscardHistory(kv.CommitmentDomain) @@ -662,8 +662,10 @@ func (sd *SharedDomains) delAccountStorage(addr, loc []byte, preVal []byte, prev } func (sd *SharedDomains) IndexAdd(table kv.InvertedIdx, key []byte) (err error) { - if writer, ok := sd.iiWriters[table]; ok { - return writer.Add(key) + for _, writer := range sd.iiWriters { + if writer.name == table { + return writer.Add(key) + } } panic(fmt.Errorf("unknown index %s", table)) } diff --git a/erigon-lib/state/domain_test.go b/erigon-lib/state/domain_test.go index cf125bd5728..aa3c4e4d1c3 100644 --- a/erigon-lib/state/domain_test.go +++ b/erigon-lib/state/domain_test.go @@ -1031,6 +1031,7 @@ func emptyTestDomain(aggStep uint64) *Domain { cfg.hist.iiCfg.salt = &salt cfg.hist.iiCfg.dirs = datadir2.New(os.TempDir()) cfg.hist.iiCfg.aggregationStep = aggStep + cfg.hist.iiCfg.name = kv.InvertedIdx("dummy") d, err := NewDomain(cfg, log.New()) if err != nil { diff --git a/erigon-lib/state/integrity.go b/erigon-lib/state/integrity.go index b83b43e157a..27b62037c98 100644 --- a/erigon-lib/state/integrity.go +++ b/erigon-lib/state/integrity.go @@ -60,11 +60,8 @@ func (ac *AggregatorRoTx) IntegrityInvertedIndexAllValuesAreInRange(ctx context. } default: // check the ii - if v, ok := ac.iis[name]; ok { - err := v.IntegrityInvertedIndexAllValuesAreInRange(ctx, failFast, fromStep) - if err != nil { - return err - } + if v := ac.searchII(name); v != nil { + return v.IntegrityInvertedIndexAllValuesAreInRange(ctx, failFast, fromStep) } panic(fmt.Sprintf("unexpected: %s", name)) } diff --git a/erigon-lib/state/inverted_index.go b/erigon-lib/state/inverted_index.go index 99df82db5aa..a400ef1c863 100644 --- a/erigon-lib/state/inverted_index.go +++ b/erigon-lib/state/inverted_index.go @@ -82,6 +82,7 @@ type iiCfg struct { aggregationStep uint64 // amount of transactions inside single aggregation step keysTable string // bucket name for index keys; txnNum_u64 -> key (k+auto_increment) valuesTable string // bucket name for index values; k -> txnNum_u64 , Needs to be table with DupSort + name kv.InvertedIdx withExistence bool // defines if existence index should be built compression seg.FileCompression // compression type for inverted index keys and values @@ -381,6 +382,7 @@ type invertedIndexBufferedWriter struct { txNum uint64 aggregationStep uint64 txNumBytes [8]byte + name kv.InvertedIdx } // loadFunc - is analog of etl.Identity, but it signaling to etl - use .Put instead of .AppendDup - to allow duplicates @@ -455,6 +457,7 @@ func (iit *InvertedIndexRoTx) newWriter(tmpdir string, discard bool) *invertedIn // etl collector doesn't fsync: means if have enough ram, all files produced by all collectors will be in ram indexKeys: etl.NewCollector(iit.ii.filenameBase+".flush.ii.keys", tmpdir, etl.NewSortableBuffer(WALCollectorRAM), iit.ii.logger).LogLvl(log.LvlTrace), index: etl.NewCollector(iit.ii.filenameBase+".flush.ii.vals", tmpdir, etl.NewSortableBuffer(WALCollectorRAM), iit.ii.logger).LogLvl(log.LvlTrace), + name: iit.name, } w.indexKeys.SortAndFlushInBackground(true) w.index.SortAndFlushInBackground(true) @@ -472,6 +475,7 @@ func (ii *InvertedIndex) BeginFilesRo() *InvertedIndexRoTx { ii: ii, visible: ii._visible, files: files, + name: ii.name, } } func (iit *InvertedIndexRoTx) Close() { @@ -503,6 +507,7 @@ func (iit *InvertedIndexRoTx) Close() { } type MergeRange struct { + name string // entity name needMerge bool from uint64 to uint64 @@ -516,7 +521,7 @@ func (mr *MergeRange) String(prefix string, aggStep uint64) string { if prefix != "" { prefix += "=" } - return fmt.Sprintf("%s%d-%d", prefix, mr.from/aggStep, mr.to/aggStep) + return fmt.Sprintf("%s%s%d-%d", prefix, mr.name, mr.from/aggStep, mr.to/aggStep) } func (mr *MergeRange) Equal(other *MergeRange) bool { @@ -525,6 +530,7 @@ func (mr *MergeRange) Equal(other *MergeRange) bool { type InvertedIndexRoTx struct { ii *InvertedIndex + name kv.InvertedIdx files visibleFiles visible *iiVisible getters []*seg.Reader diff --git a/erigon-lib/state/merge.go b/erigon-lib/state/merge.go index d6b6baf2c32..a6484de8c7d 100644 --- a/erigon-lib/state/merge.go +++ b/erigon-lib/state/merge.go @@ -132,7 +132,7 @@ func (dt *DomainRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) DomainRanges { fromTxNum := item.endTxNum - span if fromTxNum < item.startTxNum { if !r.values.needMerge || fromTxNum < r.values.from { - r.values = MergeRange{true, fromTxNum, item.endTxNum} + r.values = MergeRange{"", true, fromTxNum, item.endTxNum} } } } @@ -155,10 +155,10 @@ func (ht *HistoryRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) HistoryRanges foundSuperSet := r.history.from == item.startTxNum && item.endTxNum >= r.history.to if foundSuperSet { - r.history = MergeRange{false, startTxNum, item.endTxNum} + r.history = MergeRange{from: startTxNum, to: item.endTxNum} } else if startTxNum < item.startTxNum { if !r.history.needMerge || startTxNum < r.history.from { - r.history = MergeRange{true, startTxNum, item.endTxNum} + r.history = MergeRange{"", true, startTxNum, item.endTxNum} } } } @@ -211,7 +211,7 @@ func (iit *InvertedIndexRoTx) findMergeRange(maxEndTxNum, maxSpan uint64) *Merge } } } - return &MergeRange{minFound, startTxNum, endTxNum} + return &MergeRange{string(iit.name), minFound, startTxNum, endTxNum} } type HistoryRanges struct { @@ -1082,4 +1082,12 @@ func hasCoverVisibleFile(visibleFiles []visibleFile, item *filesItem) bool { } func (ac *AggregatorRoTx) DbgDomain(idx kv.Domain) *DomainRoTx { return ac.d[idx] } -func (ac *AggregatorRoTx) DbgII(idx kv.InvertedIdx) *InvertedIndexRoTx { return ac.iis[idx] } +func (ac *AggregatorRoTx) DbgII(idx kv.InvertedIdx) *InvertedIndexRoTx { return ac.searchII(idx) } +func (ac *AggregatorRoTx) searchII(idx kv.InvertedIdx) *InvertedIndexRoTx { + for _, iit := range ac.iis { + if iit.name == idx { + return iit + } + } + return nil +} diff --git a/erigon-lib/state/merge_test.go b/erigon-lib/state/merge_test.go index 794514f5f2e..75ba823121a 100644 --- a/erigon-lib/state/merge_test.go +++ b/erigon-lib/state/merge_test.go @@ -80,6 +80,7 @@ func TestFindMergeRangeCornerCases(t *testing.T) { assert.True(t, mr.needMerge) assert.Equal(t, 0, int(mr.from)) assert.Equal(t, 4, int(mr.to)) + assert.Equal(t, string(ii.name), mr.name) idxF := ic.staticFilesInRange(mr.from, mr.to) assert.Equal(t, 3, len(idxF)) @@ -439,6 +440,7 @@ func TestFindMergeRangeCornerCases(t *testing.T) { assert.True(t, mr.needMerge) require.Equal(t, 0, int(mr.from)) require.Equal(t, 4, int(mr.to)) + require.Equal(t, string(ii.name), mr.name) idxFiles := ic.staticFilesInRange(mr.from, mr.to) require.Equal(t, 3, len(idxFiles)) }) diff --git a/erigon-lib/state/squeeze.go b/erigon-lib/state/squeeze.go index fe8cf6a663a..73b11e34f63 100644 --- a/erigon-lib/state/squeeze.go +++ b/erigon-lib/state/squeeze.go @@ -110,25 +110,26 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { return nil } - rng := NewRangesV3() - rng.domain = [5]DomainRanges{ - kv.AccountsDomain: { - name: kv.AccountsDomain, - values: MergeRange{true, 0, math.MaxUint64}, - history: HistoryRanges{}, - aggStep: ac.a.StepSize(), - }, - kv.StorageDomain: { - name: kv.StorageDomain, - values: MergeRange{true, 0, math.MaxUint64}, - history: HistoryRanges{}, - aggStep: ac.a.StepSize(), - }, - kv.CommitmentDomain: { - name: kv.CommitmentDomain, - values: MergeRange{true, 0, math.MaxUint64}, - history: HistoryRanges{}, - aggStep: ac.a.StepSize(), + rng := &RangesV3{ + domain: [5]DomainRanges{ + kv.AccountsDomain: { + name: kv.AccountsDomain, + values: MergeRange{"", true, 0, math.MaxUint64}, + history: HistoryRanges{}, + aggStep: ac.a.StepSize(), + }, + kv.StorageDomain: { + name: kv.StorageDomain, + values: MergeRange{"", true, 0, math.MaxUint64}, + history: HistoryRanges{}, + aggStep: ac.a.StepSize(), + }, + kv.CommitmentDomain: { + name: kv.CommitmentDomain, + values: MergeRange{"", true, 0, math.MaxUint64}, + history: HistoryRanges{}, + aggStep: ac.a.StepSize(), + }, }, } sf, err := ac.staticFilesInRange(rng) @@ -320,13 +321,14 @@ func (a *Aggregator) RebuildCommitmentFiles(ctx context.Context, rwDb kv.RwDB, t acRo := a.BeginFilesRo() // this tx is used to read existing domain files and closed in the end defer acRo.Close() - rng := NewRangesV3() - rng.domain = [5]DomainRanges{ - kv.AccountsDomain: { - name: kv.AccountsDomain, - values: MergeRange{true, 0, math.MaxUint64}, - history: HistoryRanges{}, - aggStep: a.StepSize(), + rng := &RangesV3{ + domain: [5]DomainRanges{ + kv.AccountsDomain: { + name: kv.AccountsDomain, + values: MergeRange{"", true, 0, math.MaxUint64}, + history: HistoryRanges{}, + aggStep: a.StepSize(), + }, }, } sf, err := acRo.staticFilesInRange(rng)