diff --git a/erigon-lib/kv/kv_interface.go b/erigon-lib/kv/kv_interface.go index 0a5327c6458..f8832cbbb02 100644 --- a/erigon-lib/kv/kv_interface.go +++ b/erigon-lib/kv/kv_interface.go @@ -229,6 +229,11 @@ type Closer interface { Close() } +type OnFreezeFunc func(frozenFileNames []string) +type SnapshotNotifier interface { + OnFreeze(f OnFreezeFunc) +} + // RoDB - Read-only version of KV. type RoDB interface { Closer @@ -516,6 +521,7 @@ type TemporalPutDel interface { type TemporalRoDB interface { RoDB + SnapshotNotifier ViewTemporal(ctx context.Context, f func(tx TemporalTx) error) error BeginTemporalRo(ctx context.Context) (TemporalTx, error) } diff --git a/erigon-lib/kv/remotedb/kv_remote.go b/erigon-lib/kv/remotedb/kv_remote.go index 6e64863ebb4..43a3d34cccf 100644 --- a/erigon-lib/kv/remotedb/kv_remote.go +++ b/erigon-lib/kv/remotedb/kv_remote.go @@ -228,6 +228,7 @@ func (db *DB) Update(ctx context.Context, f func(tx kv.RwTx) error) (err error) func (db *DB) UpdateNosync(ctx context.Context, f func(tx kv.RwTx) error) (err error) { return errors.New("remote db provider doesn't support .UpdateNosync method") } +func (db *DB) OnFreeze(f kv.OnFreezeFunc) { panic("not implemented") } func (tx *tx) ViewID() uint64 { return tx.viewID } func (tx *tx) CollectMetrics() {} diff --git a/erigon-lib/kv/temporal/kv_temporal.go b/erigon-lib/kv/temporal/kv_temporal.go index ebe1367dd0b..399af03919e 100644 --- a/erigon-lib/kv/temporal/kv_temporal.go +++ b/erigon-lib/kv/temporal/kv_temporal.go @@ -152,6 +152,13 @@ func (db *DB) UpdateNosync(ctx context.Context, f func(tx kv.RwTx) error) error return tx.Commit() } +func (db *DB) Close() { + db.RwDB.Close() + db.agg.Close() +} + +func (db *DB) OnFreeze(f kv.OnFreezeFunc) { db.agg.OnFreeze(f) } + type Tx struct { *mdbx.MdbxTx db *DB diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index fac019852f5..ac1bccda2d1 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -82,7 +82,7 @@ type Aggregator struct { wg sync.WaitGroup // goroutines spawned by Aggregator, to ensure all of them are finish at agg.Close - onFreeze OnFreezeFunc + onFreeze kv.OnFreezeFunc ps *background.ProgressSet @@ -95,8 +95,6 @@ type Aggregator struct { produce bool } -type OnFreezeFunc func(frozenFileNames []string) - const AggregatorSqueezeCommitmentValues = true const MaxNonFuriousDirtySpacePerTx = 64 * datasize.MB @@ -377,8 +375,8 @@ func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadi return nil } -func (a *Aggregator) StepSize() uint64 { return a.aggregationStep } -func (a *Aggregator) OnFreeze(f OnFreezeFunc) { a.onFreeze = f } +func (a *Aggregator) StepSize() uint64 { return a.aggregationStep } +func (a *Aggregator) OnFreeze(f kv.OnFreezeFunc) { a.onFreeze = f } func (a *Aggregator) DisableFsync() { for _, d := range a.d { d.DisableFsync() diff --git a/eth/backend.go b/eth/backend.go index 55677cc86f7..e1b1edcb6d2 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -204,7 +204,6 @@ type Ethereum struct { forkValidator *engine_helpers.ForkValidator downloader *downloader.Downloader - agg *libstate.Aggregator blockSnapshots *freezeblocks.RoSnapshots blockReader services.FullBlockReader blockWriter *blockio.BlockWriter @@ -343,7 +342,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger return nil, err } - backend.agg, backend.blockSnapshots, backend.blockReader, backend.blockWriter = agg, allSnapshots, blockReader, blockWriter + backend.blockSnapshots, backend.blockReader, backend.blockWriter = allSnapshots, blockReader, blockWriter backend.chainDB, err = temporal.New(rawChainDB, agg) if err != nil { @@ -1420,7 +1419,7 @@ func (s *Ethereum) setUpSnapDownloader(ctx context.Context, downloaderCfg *downl s.downloaderClient = direct.NewDownloaderClient(bittorrentServer) } - s.agg.OnFreeze(func(frozenFileNames []string) { + s.chainDB.OnFreeze(func(frozenFileNames []string) { events := s.notifications.Events events.OnNewSnapshot() if s.downloaderClient != nil { @@ -1643,9 +1642,6 @@ func (s *Ethereum) Stop() error { if s.txPoolDB != nil { s.txPoolDB.Close() } - if s.agg != nil { - s.agg.Close() - } s.chainDB.Close() if s.silkwormRPCDaemonService != nil {