Skip to content

Commit

Permalink
polygon/heimdall: database (#10377)
Browse files Browse the repository at this point in the history
Create a separate mdbx database in `<datadir>/heimdall`.
Rewrite entityStore to use it.
  • Loading branch information
battlmonstr authored May 21, 2024
1 parent 1d8dc37 commit 4cae699
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 56 deletions.
5 changes: 5 additions & 0 deletions erigon-lib/kv/kv_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ const (
ConsensusDB Label = 3
DownloaderDB Label = 4
InMem Label = 5
HeimdallDB Label = 6
)

func (l Label) String() string {
Expand All @@ -171,6 +172,8 @@ func (l Label) String() string {
return "downloader"
case InMem:
return "inMem"
case HeimdallDB:
return "heimdall"
default:
return "unknown"
}
Expand All @@ -189,6 +192,8 @@ func UnmarshalLabel(s string) Label {
return DownloaderDB
case "inMem":
return InMem
case "heimdall":
return HeimdallDB
default:
panic(fmt.Sprintf("unexpected label: %s", s))
}
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.polygonSyncService = polygonsync.NewService(
logger,
chainConfig,
dirs.DataDir,
tmpdir,
sentryClient,
p2pConfig.MaxPeers,
Expand Down
73 changes: 73 additions & 0 deletions polygon/heimdall/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package heimdall

import (
"context"
"path/filepath"
"sync"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
)

type Database struct {
db kv.RwDB

dataDir string
openOnce sync.Once

logger log.Logger
}

func NewDatabase(
dataDir string,
logger log.Logger,
) *Database {
return &Database{dataDir: dataDir, logger: logger}
}

var databaseTablesCfg = kv.TableCfg{
kv.BorCheckpoints: {},
kv.BorMilestones: {},
kv.BorSpans: {},
}

func (db *Database) open(ctx context.Context) error {
label := kv.HeimdallDB
dbPath := filepath.Join(db.dataDir, label.String())
db.logger.Info("Opening Database", "label", label.String(), "path", dbPath)

var err error
db.db, err = mdbx.NewMDBX(db.logger).
Label(label).
Path(dbPath).
WithTableCfg(func(_ kv.TableCfg) kv.TableCfg { return databaseTablesCfg }).
MapSize(16 * datasize.GB).
GrowthStep(16 * datasize.MB).
Open(ctx)
return err
}

func (db *Database) OpenOnce(ctx context.Context) error {
var err error
db.openOnce.Do(func() {
err = db.open(ctx)
})
return err
}

func (db *Database) Close() {
if db.db != nil {
db.db.Close()
}
}

func (db *Database) BeginRo(ctx context.Context) (kv.Tx, error) {
return db.db.BeginRo(ctx)
}

func (db *Database) BeginRw(ctx context.Context) (kv.RwTx, error) {
return db.db.BeginRw(ctx)
}
99 changes: 76 additions & 23 deletions polygon/heimdall/entity_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,30 @@ type entityStore[TEntity Entity] interface {
RangeFromBlockNum(ctx context.Context, startBlockNum uint64) ([]TEntity, error)
}

type RangeIndexFactory func(ctx context.Context) (*RangeIndex, error)

type entityStoreImpl[TEntity Entity] struct {
tx kv.RwTx
db *Database
table string

makeEntity func() TEntity
getLastEntityId func(ctx context.Context, tx kv.Tx) (uint64, bool, error)
loadEntityBytes func(ctx context.Context, tx kv.Getter, id uint64) ([]byte, error)
makeEntity func() TEntity

blockNumToIdIndexFactory func(ctx context.Context) (*RangeIndex, error)
blockNumToIdIndexFactory RangeIndexFactory
blockNumToIdIndex *RangeIndex
prepareOnce sync.Once
}

func newEntityStore[TEntity Entity](
tx kv.RwTx,
db *Database,
table string,
makeEntity func() TEntity,
getLastEntityId func(ctx context.Context, tx kv.Tx) (uint64, bool, error),
loadEntityBytes func(ctx context.Context, tx kv.Getter, id uint64) ([]byte, error),
blockNumToIdIndexFactory func(ctx context.Context) (*RangeIndex, error),
blockNumToIdIndexFactory RangeIndexFactory,
) entityStore[TEntity] {
return &entityStoreImpl[TEntity]{
tx: tx,
db: db,
table: table,

makeEntity: makeEntity,
getLastEntityId: getLastEntityId,
loadEntityBytes: loadEntityBytes,
makeEntity: makeEntity,

blockNumToIdIndexFactory: blockNumToIdIndexFactory,
}
Expand All @@ -58,12 +54,16 @@ func newEntityStore[TEntity Entity](
func (s *entityStoreImpl[TEntity]) Prepare(ctx context.Context) error {
var err error
s.prepareOnce.Do(func() {
err = s.db.OpenOnce(ctx)
if err != nil {
return
}
s.blockNumToIdIndex, err = s.blockNumToIdIndexFactory(ctx)
if err != nil {
return
}
iteratorFactory := func() (iter.KV, error) { return s.tx.Range(s.table, nil, nil) }
err = buildBlockNumToIdIndex(ctx, s.blockNumToIdIndex, iteratorFactory, s.entityUnmarshalJSON)
iteratorFactory := func(tx kv.Tx) (iter.KV, error) { return tx.Range(s.table, nil, nil) }
err = buildBlockNumToIdIndex(ctx, s.blockNumToIdIndex, s.db.BeginRo, iteratorFactory, s.entityUnmarshalJSON)
})
return err
}
Expand All @@ -73,7 +73,28 @@ func (s *entityStoreImpl[TEntity]) Close() {
}

func (s *entityStoreImpl[TEntity]) GetLastEntityId(ctx context.Context) (uint64, bool, error) {
return s.getLastEntityId(ctx, s.tx)
tx, err := s.db.BeginRo(ctx)
if err != nil {
return 0, false, err
}
defer tx.Rollback()

cursor, err := tx.Cursor(s.table)
if err != nil {
return 0, false, err
}
defer cursor.Close()

lastKey, _, err := cursor.Last()
if err != nil {
return 0, false, err
}
// not found
if lastKey == nil {
return 0, false, nil
}

return entityStoreKeyParse(lastKey), true, nil
}

// Zero value of any type T
Expand Down Expand Up @@ -102,6 +123,10 @@ func entityStoreKey(id uint64) [8]byte {
return key
}

func entityStoreKeyParse(key []byte) uint64 {
return binary.BigEndian.Uint64(key)
}

func (s *entityStoreImpl[TEntity]) entityUnmarshalJSON(jsonBytes []byte) (TEntity, error) {
entity := s.makeEntity()
if err := json.Unmarshal(jsonBytes, entity); err != nil {
Expand All @@ -111,7 +136,14 @@ func (s *entityStoreImpl[TEntity]) entityUnmarshalJSON(jsonBytes []byte) (TEntit
}

func (s *entityStoreImpl[TEntity]) GetEntity(ctx context.Context, id uint64) (TEntity, error) {
jsonBytes, err := s.loadEntityBytes(ctx, s.tx, id)
tx, err := s.db.BeginRo(ctx)
if err != nil {
return Zero[TEntity](), err
}
defer tx.Rollback()

key := entityStoreKey(id)
jsonBytes, err := tx.GetOne(s.table, key[:])
if err != nil {
return Zero[TEntity](), err
}
Expand All @@ -124,14 +156,22 @@ func (s *entityStoreImpl[TEntity]) GetEntity(ctx context.Context, id uint64) (TE
}

func (s *entityStoreImpl[TEntity]) PutEntity(ctx context.Context, id uint64, entity TEntity) error {
tx, err := s.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()

jsonBytes, err := json.Marshal(entity)
if err != nil {
return err
}

key := entityStoreKey(id)
err = s.tx.Put(s.table, key[:], jsonBytes)
if err != nil {
if err = tx.Put(s.table, key[:], jsonBytes); err != nil {
return err
}
if err = tx.Commit(); err != nil {
return err
}

Expand All @@ -152,9 +192,15 @@ func (s *entityStoreImpl[TEntity]) FindByBlockNum(ctx context.Context, blockNum
return s.GetEntity(ctx, id)
}

func (s *entityStoreImpl[TEntity]) RangeFromId(_ context.Context, startId uint64) ([]TEntity, error) {
func (s *entityStoreImpl[TEntity]) RangeFromId(ctx context.Context, startId uint64) ([]TEntity, error) {
tx, err := s.db.BeginRo(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback()

startKey := entityStoreKey(startId)
it, err := s.tx.Range(s.table, startKey[:], nil)
it, err := tx.Range(s.table, startKey[:], nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -191,10 +237,17 @@ func (s *entityStoreImpl[TEntity]) RangeFromBlockNum(ctx context.Context, startB
func buildBlockNumToIdIndex[TEntity Entity](
ctx context.Context,
index *RangeIndex,
iteratorFactory func() (iter.KV, error),
txFactory func(context.Context) (kv.Tx, error),
iteratorFactory func(tx kv.Tx) (iter.KV, error),
entityUnmarshalJSON func([]byte) (TEntity, error),
) error {
it, err := iteratorFactory()
tx, err := txFactory(ctx)
if err != nil {
return err
}
defer tx.Rollback()

it, err := iteratorFactory(tx)
if err != nil {
return err
}
Expand Down
43 changes: 10 additions & 33 deletions polygon/heimdall/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/polygon/polygoncommon"
"github.com/ledgerwatch/erigon/turbo/services"
)

type Service interface {
Expand All @@ -21,55 +20,31 @@ type Service interface {
type service struct {
scraper *Scraper

db *Database
checkpointStore entityStore[*Checkpoint]
milestoneStore entityStore[*Milestone]
spanStore entityStore[*Span]
}

func newCheckpointStore(tx kv.RwTx, reader services.BorCheckpointReader, blockNumToIdIndexFactory func(context.Context) (*RangeIndex, error)) entityStore[*Checkpoint] {
makeEntity := func() *Checkpoint { return new(Checkpoint) }
return newEntityStore(tx, kv.BorCheckpoints, makeEntity, reader.LastCheckpointId, reader.Checkpoint, blockNumToIdIndexFactory)
}

func newMilestoneStore(tx kv.RwTx, reader services.BorMilestoneReader, blockNumToIdIndexFactory func(context.Context) (*RangeIndex, error)) entityStore[*Milestone] {
makeEntity := func() *Milestone { return new(Milestone) }
return newEntityStore(tx, kv.BorMilestones, makeEntity, reader.LastMilestoneId, reader.Milestone, blockNumToIdIndexFactory)
}

func newSpanStore(tx kv.RwTx, reader services.BorSpanReader, blockNumToIdIndexFactory func(context.Context) (*RangeIndex, error)) entityStore[*Span] {
makeEntity := func() *Span { return new(Span) }
return newEntityStore(tx, kv.BorSpans, makeEntity, reader.LastSpanId, reader.Span, blockNumToIdIndexFactory)
func makeType[T any]() *T {
return new(T)
}

func NewService(
heimdallUrl string,
dataDir string,
tmpDir string,
logger log.Logger,
) Service {
// TODO: implementing these is an upcoming task
txProvider := func() kv.RwTx { /* TODO */ return nil }
readerProvider := func() reader { /* TODO */ return nil }

tx := txProvider()
if tx == nil {
// TODO: implement and remove
logger.Warn("heimdall.Service txProvider is not implemented yet")
return nil
}
reader := readerProvider()
if reader == nil {
// TODO: implement and remove
logger.Warn("heimdall.Service readerProvider is not implemented yet")
return nil
}
db := NewDatabase(dataDir, logger)

blockNumToIdIndexFactory := func(ctx context.Context) (*RangeIndex, error) {
return NewRangeIndex(ctx, tmpDir, logger)
}

checkpointStore := newCheckpointStore(tx, reader, blockNumToIdIndexFactory)
milestoneStore := newMilestoneStore(tx, reader, blockNumToIdIndexFactory)
spanStore := newSpanStore(tx, reader, blockNumToIdIndexFactory)
checkpointStore := newEntityStore(db, kv.BorCheckpoints, makeType[Checkpoint], blockNumToIdIndexFactory)
milestoneStore := newEntityStore(db, kv.BorMilestones, makeType[Milestone], blockNumToIdIndexFactory)
spanStore := newEntityStore(db, kv.BorSpans, makeType[Span], blockNumToIdIndexFactory)

client := NewHeimdallClient(heimdallUrl, logger)
scraper := NewScraper(
Expand All @@ -84,6 +59,7 @@ func NewService(
return &service{
scraper: scraper,

db: db,
checkpointStore: checkpointStore,
milestoneStore: milestoneStore,
spanStore: spanStore,
Expand Down Expand Up @@ -132,6 +108,7 @@ func (s *service) RegisterSpanObserver(callback func(*Span)) polygoncommon.Unreg
}

func (s *service) Run(ctx context.Context) error {
defer s.db.Close()
defer s.checkpointStore.Close()
defer s.milestoneStore.Close()
defer s.spanStore.Close()
Expand Down
2 changes: 2 additions & 0 deletions polygon/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type service struct {
func NewService(
logger log.Logger,
chainConfig *chain.Config,
dataDir string,
tmpDir string,
sentryClient direct.SentryClient,
maxPeers int,
Expand All @@ -53,6 +54,7 @@ func NewService(
heimdallService := heimdall.NewHeimdall(heimdallClient, logger)
heimdallServiceV2 := heimdall.NewService(
heimdallUrl,
dataDir,
tmpDir,
logger,
)
Expand Down

0 comments on commit 4cae699

Please sign in to comment.