diff --git a/erigon-lib/kv/kv_interface.go b/erigon-lib/kv/kv_interface.go index 1399d2a97d0..f6b1077d814 100644 --- a/erigon-lib/kv/kv_interface.go +++ b/erigon-lib/kv/kv_interface.go @@ -155,6 +155,7 @@ const ( ConsensusDB Label = 3 DownloaderDB Label = 4 InMem Label = 5 + HeimdallDB Label = 6 ) func (l Label) String() string { @@ -171,6 +172,8 @@ func (l Label) String() string { return "downloader" case InMem: return "inMem" + case HeimdallDB: + return "heimdall" default: return "unknown" } @@ -189,6 +192,8 @@ func UnmarshalLabel(s string) Label { return DownloaderDB case "inMem": return InMem + case "heimdall": + return HeimdallDB default: panic(fmt.Sprintf("unexpected label: %s", s)) } diff --git a/eth/backend.go b/eth/backend.go index ef7c6518cb5..52e67683958 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -984,6 +984,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.polygonSyncService = polygonsync.NewService( logger, chainConfig, + dirs.DataDir, tmpdir, sentryClient, p2pConfig.MaxPeers, diff --git a/polygon/heimdall/database.go b/polygon/heimdall/database.go new file mode 100644 index 00000000000..c38000d1c1e --- /dev/null +++ b/polygon/heimdall/database.go @@ -0,0 +1,73 @@ +package heimdall + +import ( + "context" + "path/filepath" + "sync" + + "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/log/v3" + + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" +) + +type Database struct { + db kv.RwDB + + dataDir string + openOnce sync.Once + + logger log.Logger +} + +func NewDatabase( + dataDir string, + logger log.Logger, +) *Database { + return &Database{dataDir: dataDir, logger: logger} +} + +var databaseTablesCfg = kv.TableCfg{ + kv.BorCheckpoints: {}, + kv.BorMilestones: {}, + kv.BorSpans: {}, +} + +func (db *Database) open(ctx context.Context) error { + label := kv.HeimdallDB + dbPath := filepath.Join(db.dataDir, label.String()) + db.logger.Info("Opening Database", "label", label.String(), "path", dbPath) + + var err error + db.db, err = mdbx.NewMDBX(db.logger). + Label(label). + Path(dbPath). + WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return databaseTablesCfg }). + MapSize(16 * datasize.GB). + GrowthStep(16 * datasize.MB). + Open(ctx) + return err +} + +func (db *Database) OpenOnce(ctx context.Context) error { + var err error + db.openOnce.Do(func() { + err = db.open(ctx) + }) + return err +} + +func (db *Database) Close() { + if db.db != nil { + db.db.Close() + } +} + +func (db *Database) BeginRo(ctx context.Context) (kv.Tx, error) { + return db.db.BeginRo(ctx) +} + +func (db *Database) BeginRw(ctx context.Context) (kv.RwTx, error) { + return db.db.BeginRw(ctx) +} diff --git a/polygon/heimdall/entity_store.go b/polygon/heimdall/entity_store.go index 42d002da0a5..c9d59b37ae3 100644 --- a/polygon/heimdall/entity_store.go +++ b/polygon/heimdall/entity_store.go @@ -22,34 +22,30 @@ type entityStore[TEntity Entity] interface { RangeFromBlockNum(ctx context.Context, startBlockNum uint64) ([]TEntity, error) } +type RangeIndexFactory func(ctx context.Context) (*RangeIndex, error) + type entityStoreImpl[TEntity Entity] struct { - tx kv.RwTx + db *Database table string - makeEntity func() TEntity - getLastEntityId func(ctx context.Context, tx kv.Tx) (uint64, bool, error) - loadEntityBytes func(ctx context.Context, tx kv.Getter, id uint64) ([]byte, error) + makeEntity func() TEntity - blockNumToIdIndexFactory func(ctx context.Context) (*RangeIndex, error) + blockNumToIdIndexFactory RangeIndexFactory blockNumToIdIndex *RangeIndex prepareOnce sync.Once } func newEntityStore[TEntity Entity]( - tx kv.RwTx, + db *Database, table string, makeEntity func() TEntity, - getLastEntityId func(ctx context.Context, tx kv.Tx) (uint64, bool, error), - loadEntityBytes func(ctx context.Context, tx kv.Getter, id uint64) ([]byte, error), - blockNumToIdIndexFactory func(ctx context.Context) (*RangeIndex, error), + blockNumToIdIndexFactory RangeIndexFactory, ) entityStore[TEntity] { return &entityStoreImpl[TEntity]{ - tx: tx, + db: db, table: table, - makeEntity: makeEntity, - getLastEntityId: getLastEntityId, - loadEntityBytes: loadEntityBytes, + makeEntity: makeEntity, blockNumToIdIndexFactory: blockNumToIdIndexFactory, } @@ -58,12 +54,16 @@ func newEntityStore[TEntity Entity]( func (s *entityStoreImpl[TEntity]) Prepare(ctx context.Context) error { var err error s.prepareOnce.Do(func() { + err = s.db.OpenOnce(ctx) + if err != nil { + return + } s.blockNumToIdIndex, err = s.blockNumToIdIndexFactory(ctx) if err != nil { return } - iteratorFactory := func() (iter.KV, error) { return s.tx.Range(s.table, nil, nil) } - err = buildBlockNumToIdIndex(ctx, s.blockNumToIdIndex, iteratorFactory, s.entityUnmarshalJSON) + iteratorFactory := func(tx kv.Tx) (iter.KV, error) { return tx.Range(s.table, nil, nil) } + err = buildBlockNumToIdIndex(ctx, s.blockNumToIdIndex, s.db.BeginRo, iteratorFactory, s.entityUnmarshalJSON) }) return err } @@ -73,7 +73,28 @@ func (s *entityStoreImpl[TEntity]) Close() { } func (s *entityStoreImpl[TEntity]) GetLastEntityId(ctx context.Context) (uint64, bool, error) { - return s.getLastEntityId(ctx, s.tx) + tx, err := s.db.BeginRo(ctx) + if err != nil { + return 0, false, err + } + defer tx.Rollback() + + cursor, err := tx.Cursor(s.table) + if err != nil { + return 0, false, err + } + defer cursor.Close() + + lastKey, _, err := cursor.Last() + if err != nil { + return 0, false, err + } + // not found + if lastKey == nil { + return 0, false, nil + } + + return entityStoreKeyParse(lastKey), true, nil } // Zero value of any type T @@ -102,6 +123,10 @@ func entityStoreKey(id uint64) [8]byte { return key } +func entityStoreKeyParse(key []byte) uint64 { + return binary.BigEndian.Uint64(key) +} + func (s *entityStoreImpl[TEntity]) entityUnmarshalJSON(jsonBytes []byte) (TEntity, error) { entity := s.makeEntity() if err := json.Unmarshal(jsonBytes, entity); err != nil { @@ -111,7 +136,14 @@ func (s *entityStoreImpl[TEntity]) entityUnmarshalJSON(jsonBytes []byte) (TEntit } func (s *entityStoreImpl[TEntity]) GetEntity(ctx context.Context, id uint64) (TEntity, error) { - jsonBytes, err := s.loadEntityBytes(ctx, s.tx, id) + tx, err := s.db.BeginRo(ctx) + if err != nil { + return Zero[TEntity](), err + } + defer tx.Rollback() + + key := entityStoreKey(id) + jsonBytes, err := tx.GetOne(s.table, key[:]) if err != nil { return Zero[TEntity](), err } @@ -124,14 +156,22 @@ func (s *entityStoreImpl[TEntity]) GetEntity(ctx context.Context, id uint64) (TE } func (s *entityStoreImpl[TEntity]) PutEntity(ctx context.Context, id uint64, entity TEntity) error { + tx, err := s.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + jsonBytes, err := json.Marshal(entity) if err != nil { return err } key := entityStoreKey(id) - err = s.tx.Put(s.table, key[:], jsonBytes) - if err != nil { + if err = tx.Put(s.table, key[:], jsonBytes); err != nil { + return err + } + if err = tx.Commit(); err != nil { return err } @@ -152,9 +192,15 @@ func (s *entityStoreImpl[TEntity]) FindByBlockNum(ctx context.Context, blockNum return s.GetEntity(ctx, id) } -func (s *entityStoreImpl[TEntity]) RangeFromId(_ context.Context, startId uint64) ([]TEntity, error) { +func (s *entityStoreImpl[TEntity]) RangeFromId(ctx context.Context, startId uint64) ([]TEntity, error) { + tx, err := s.db.BeginRo(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback() + startKey := entityStoreKey(startId) - it, err := s.tx.Range(s.table, startKey[:], nil) + it, err := tx.Range(s.table, startKey[:], nil) if err != nil { return nil, err } @@ -191,10 +237,17 @@ func (s *entityStoreImpl[TEntity]) RangeFromBlockNum(ctx context.Context, startB func buildBlockNumToIdIndex[TEntity Entity]( ctx context.Context, index *RangeIndex, - iteratorFactory func() (iter.KV, error), + txFactory func(context.Context) (kv.Tx, error), + iteratorFactory func(tx kv.Tx) (iter.KV, error), entityUnmarshalJSON func([]byte) (TEntity, error), ) error { - it, err := iteratorFactory() + tx, err := txFactory(ctx) + if err != nil { + return err + } + defer tx.Rollback() + + it, err := iteratorFactory(tx) if err != nil { return err } diff --git a/polygon/heimdall/service.go b/polygon/heimdall/service.go index 5fc6a4a7381..ae53d269509 100644 --- a/polygon/heimdall/service.go +++ b/polygon/heimdall/service.go @@ -10,7 +10,6 @@ import ( libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/polygon/polygoncommon" - "github.com/ledgerwatch/erigon/turbo/services" ) type Service interface { @@ -21,55 +20,31 @@ type Service interface { type service struct { scraper *Scraper + db *Database checkpointStore entityStore[*Checkpoint] milestoneStore entityStore[*Milestone] spanStore entityStore[*Span] } -func newCheckpointStore(tx kv.RwTx, reader services.BorCheckpointReader, blockNumToIdIndexFactory func(context.Context) (*RangeIndex, error)) entityStore[*Checkpoint] { - makeEntity := func() *Checkpoint { return new(Checkpoint) } - return newEntityStore(tx, kv.BorCheckpoints, makeEntity, reader.LastCheckpointId, reader.Checkpoint, blockNumToIdIndexFactory) -} - -func newMilestoneStore(tx kv.RwTx, reader services.BorMilestoneReader, blockNumToIdIndexFactory func(context.Context) (*RangeIndex, error)) entityStore[*Milestone] { - makeEntity := func() *Milestone { return new(Milestone) } - return newEntityStore(tx, kv.BorMilestones, makeEntity, reader.LastMilestoneId, reader.Milestone, blockNumToIdIndexFactory) -} - -func newSpanStore(tx kv.RwTx, reader services.BorSpanReader, blockNumToIdIndexFactory func(context.Context) (*RangeIndex, error)) entityStore[*Span] { - makeEntity := func() *Span { return new(Span) } - return newEntityStore(tx, kv.BorSpans, makeEntity, reader.LastSpanId, reader.Span, blockNumToIdIndexFactory) +func makeType[T any]() *T { + return new(T) } func NewService( heimdallUrl string, + dataDir string, tmpDir string, logger log.Logger, ) Service { - // TODO: implementing these is an upcoming task - txProvider := func() kv.RwTx { /* TODO */ return nil } - readerProvider := func() reader { /* TODO */ return nil } - - tx := txProvider() - if tx == nil { - // TODO: implement and remove - logger.Warn("heimdall.Service txProvider is not implemented yet") - return nil - } - reader := readerProvider() - if reader == nil { - // TODO: implement and remove - logger.Warn("heimdall.Service readerProvider is not implemented yet") - return nil - } + db := NewDatabase(dataDir, logger) blockNumToIdIndexFactory := func(ctx context.Context) (*RangeIndex, error) { return NewRangeIndex(ctx, tmpDir, logger) } - checkpointStore := newCheckpointStore(tx, reader, blockNumToIdIndexFactory) - milestoneStore := newMilestoneStore(tx, reader, blockNumToIdIndexFactory) - spanStore := newSpanStore(tx, reader, blockNumToIdIndexFactory) + checkpointStore := newEntityStore(db, kv.BorCheckpoints, makeType[Checkpoint], blockNumToIdIndexFactory) + milestoneStore := newEntityStore(db, kv.BorMilestones, makeType[Milestone], blockNumToIdIndexFactory) + spanStore := newEntityStore(db, kv.BorSpans, makeType[Span], blockNumToIdIndexFactory) client := NewHeimdallClient(heimdallUrl, logger) scraper := NewScraper( @@ -84,6 +59,7 @@ func NewService( return &service{ scraper: scraper, + db: db, checkpointStore: checkpointStore, milestoneStore: milestoneStore, spanStore: spanStore, @@ -132,6 +108,7 @@ func (s *service) RegisterSpanObserver(callback func(*Span)) polygoncommon.Unreg } func (s *service) Run(ctx context.Context) error { + defer s.db.Close() defer s.checkpointStore.Close() defer s.milestoneStore.Close() defer s.spanStore.Close() diff --git a/polygon/sync/service.go b/polygon/sync/service.go index 64aaf1fb566..0fa767c69c1 100644 --- a/polygon/sync/service.go +++ b/polygon/sync/service.go @@ -36,6 +36,7 @@ type service struct { func NewService( logger log.Logger, chainConfig *chain.Config, + dataDir string, tmpDir string, sentryClient direct.SentryClient, maxPeers int, @@ -53,6 +54,7 @@ func NewService( heimdallService := heimdall.NewHeimdall(heimdallClient, logger) heimdallServiceV2 := heimdall.NewService( heimdallUrl, + dataDir, tmpDir, logger, )