Skip to content

Commit

Permalink
ledger: move blockdb into a storage package (#4841)
Browse files Browse the repository at this point in the history
  • Loading branch information
icorderi authored Dec 5, 2022
1 parent 6acecbd commit a70260d
Show file tree
Hide file tree
Showing 14 changed files with 260 additions and 268 deletions.
25 changes: 13 additions & 12 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store"
"github.com/algorand/go-algorand/ledger/store/blockdb"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/db"
Expand Down Expand Up @@ -286,7 +287,7 @@ func prepareNormalizedBalancesV5(bals []encodedBalanceRecordV5, proto config.Con
return nil, err
}
normalizedAccountBalances[i].AccountHashes = make([][]byte, 1)
normalizedAccountBalances[i].AccountHashes[0] = accountHashBuilder(balance.Address, accountDataV5, balance.AccountData)
normalizedAccountBalances[i].AccountHashes[0] = store.AccountHashBuilder(balance.Address, accountDataV5, balance.AccountData)
if len(resources) > 0 {
normalizedAccountBalances[i].Resources = make(map[basics.CreatableIndex]store.ResourcesData, len(resources))
normalizedAccountBalances[i].EncodedResources = make(map[basics.CreatableIndex][]byte, len(resources))
Expand Down Expand Up @@ -325,7 +326,7 @@ func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.Con
normalizedAccountBalances[i].PartialBalance = true
} else {
normalizedAccountBalances[i].AccountHashes = make([][]byte, 1+len(balance.Resources))
normalizedAccountBalances[i].AccountHashes[0] = accountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].AccountData, balance.AccountData)
normalizedAccountBalances[i].AccountHashes[0] = store.AccountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].AccountData, balance.AccountData)
curHashIdx++
}
if len(balance.Resources) > 0 {
Expand All @@ -337,7 +338,7 @@ func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.Con
if err != nil {
return nil, err
}
normalizedAccountBalances[i].AccountHashes[curHashIdx], err = resourcesHashBuilderV6(&resData, balance.Address, basics.CreatableIndex(cidx), resData.UpdateRound, res)
normalizedAccountBalances[i].AccountHashes[curHashIdx], err = store.ResourcesHashBuilderV6(&resData, balance.Address, basics.CreatableIndex(cidx), resData.UpdateRound, res)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1279,11 +1280,11 @@ func performTxTailTableMigration(ctx context.Context, tx *sql.Tx, blockDb db.Acc
// when migrating there is only MaxTxnLife blocks in the block DB
// since the original txTail.commmittedUpTo preserved only (rnd+1)-MaxTxnLife = 1000 blocks back
err = blockDb.Atomic(func(ctx context.Context, blockTx *sql.Tx) error {
latestBlockRound, err := blockLatest(blockTx)
latestBlockRound, err := blockdb.BlockLatest(blockTx)
if err != nil {
return fmt.Errorf("latest block number cannot be retrieved : %w", err)
}
latestHdr, err := blockGetHdr(blockTx, dbRound)
latestHdr, err := blockdb.BlockGetHdr(blockTx, dbRound)
if err != nil {
return fmt.Errorf("latest block header %d cannot be retrieved : %w", dbRound, err)
}
Expand All @@ -1299,7 +1300,7 @@ func performTxTailTableMigration(ctx context.Context, tx *sql.Tx, blockDb db.Acc
if firstRound == basics.Round(0) {
firstRound++
}
if _, err := blockGet(blockTx, firstRound); err != nil {
if _, err := blockdb.BlockGet(blockTx, firstRound); err != nil {
// looks like not catchpoint but a regular migration, start from maxTxnLife + deeperBlockHistory back
firstRound = (latestBlockRound + 1).SubSaturate(maxTxnLife + deeperBlockHistory)
if firstRound == basics.Round(0) {
Expand All @@ -1308,7 +1309,7 @@ func performTxTailTableMigration(ctx context.Context, tx *sql.Tx, blockDb db.Acc
}
tailRounds := make([][]byte, 0, maxTxnLife)
for rnd := firstRound; rnd <= dbRound; rnd++ {
blk, err := blockGet(blockTx, rnd)
blk, err := blockdb.BlockGet(blockTx, rnd)
if err != nil {
return fmt.Errorf("block for round %d ( %d - %d ) cannot be retrieved : %w", rnd, firstRound, dbRound, err)
}
Expand Down Expand Up @@ -1343,7 +1344,7 @@ func performOnlineRoundParamsTailMigration(ctx context.Context, tx *sql.Tx, bloc
currentProto = initProto
} else {
err = blockDb.Atomic(func(ctx context.Context, blockTx *sql.Tx) error {
hdr, err := blockGetHdr(blockTx, rnd)
hdr, err := blockdb.BlockGetHdr(blockTx, rnd)
if err != nil {
return err
}
Expand Down Expand Up @@ -1514,7 +1515,7 @@ func performOnlineAccountsTableMigration(ctx context.Context, tx *sql.Tx, progre
return fmt.Errorf("accountsInitialize was unable to MakeTrie: %v", err)
}
for addr, state := range acctRehash {
deleteHash := accountHashBuilderV6(addr, &state.old, state.oldEnc)
deleteHash := store.AccountHashBuilderV6(addr, &state.old, state.oldEnc)
deleted, err := trie.Delete(deleteHash)
if err != nil {
return fmt.Errorf("performOnlineAccountsTableMigration failed to delete hash '%s' from merkle trie for account %v: %w", hex.EncodeToString(deleteHash), addr, err)
Expand All @@ -1523,7 +1524,7 @@ func performOnlineAccountsTableMigration(ctx context.Context, tx *sql.Tx, progre
log.Warnf("performOnlineAccountsTableMigration failed to delete hash '%s' from merkle trie for account %v", hex.EncodeToString(deleteHash), addr)
}

addHash := accountHashBuilderV6(addr, &state.new, state.newEnc)
addHash := store.AccountHashBuilderV6(addr, &state.new, state.newEnc)
added, err := trie.Add(addHash)
if err != nil {
return fmt.Errorf("performOnlineAccountsTableMigration attempted to add duplicate hash '%s' to merkle trie for account %v: %w", hex.EncodeToString(addHash), addr, err)
Expand Down Expand Up @@ -2466,7 +2467,7 @@ func (iterator *orderedAccountsIter) Next(ctx context.Context) (acct []accountAd
if iterator.step == oaiStepInsertAccountData {
var lastAddrID int64
baseCb := func(addr basics.Address, rowid int64, accountData *store.BaseAccountData, encodedAccountData []byte) (err error) {
hash := accountHashBuilderV6(addr, accountData, encodedAccountData)
hash := store.AccountHashBuilderV6(addr, accountData, encodedAccountData)
_, err = iterator.insertStmt.ExecContext(ctx, rowid, hash)
if err != nil {
return
Expand All @@ -2477,7 +2478,7 @@ func (iterator *orderedAccountsIter) Next(ctx context.Context) (acct []accountAd

resCb := func(addr basics.Address, cidx basics.CreatableIndex, resData *store.ResourcesData, encodedResourceData []byte, lastResource bool) error {
if resData != nil {
hash, err := resourcesHashBuilderV6(resData, addr, cidx, resData.UpdateRound, encodedResourceData)
hash, err := store.ResourcesHashBuilderV6(resData, addr, cidx, resData.UpdateRound, encodedResourceData)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion ledger/accountdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3244,7 +3244,7 @@ func TestRemoveOfflineStateProofID(t *testing.T) {
if expected && ba.Status != basics.Online {
require.Equal(t, merklesignature.Commitment{}, ba.StateProofID)
}
addHash := accountHashBuilderV6(addr, &ba, encodedAcctData)
addHash := store.AccountHashBuilderV6(addr, &ba, encodedAcctData)
added, err := trie.Add(addHash)
require.NoError(t, err)
require.True(t, added)
Expand Down
17 changes: 9 additions & 8 deletions ledger/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger/internal"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store/blockdb"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
Expand Down Expand Up @@ -219,10 +220,10 @@ func TestArchivalRestart(t *testing.T) {

var latest, earliest basics.Round
err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
latest, err = blockLatest(tx)
latest, err = blockdb.BlockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
earliest, err = blockdb.BlockEarliest(tx)
require.NoError(t, err)
return err
})
Expand All @@ -236,10 +237,10 @@ func TestArchivalRestart(t *testing.T) {
defer l.Close()

err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
latest, err = blockLatest(tx)
latest, err = blockdb.BlockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
earliest, err = blockdb.BlockEarliest(tx)
require.NoError(t, err)
return err
})
Expand Down Expand Up @@ -754,10 +755,10 @@ func TestArchivalFromNonArchival(t *testing.T) {

var latest, earliest basics.Round
err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
latest, err = blockLatest(tx)
latest, err = blockdb.BlockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
earliest, err = blockdb.BlockEarliest(tx)
require.NoError(t, err)
return err
})
Expand All @@ -774,10 +775,10 @@ func TestArchivalFromNonArchival(t *testing.T) {
defer l.Close()

err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
latest, err = blockLatest(tx)
latest, err = blockdb.BlockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
earliest, err = blockdb.BlockEarliest(tx)
require.NoError(t, err)
return err
})
Expand Down
23 changes: 12 additions & 11 deletions ledger/blockdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/store/blockdb"
storetesting "github.com/algorand/go-algorand/ledger/store/testing"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
Expand Down Expand Up @@ -78,19 +79,19 @@ func blockChainBlocks(be []blockEntry) []bookkeeping.Block {
}

func checkBlockDB(t *testing.T, tx *sql.Tx, blocks []blockEntry) {
next, err := blockNext(tx)
next, err := blockdb.BlockNext(tx)
require.NoError(t, err)
require.Equal(t, next, basics.Round(len(blocks)))

latest, err := blockLatest(tx)
latest, err := blockdb.BlockLatest(tx)
if len(blocks) == 0 {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, latest, basics.Round(len(blocks))-1)
}

earliest, err := blockEarliest(tx)
earliest, err := blockdb.BlockEarliest(tx)
if len(blocks) == 0 {
require.Error(t, err)
} else {
Expand All @@ -99,17 +100,17 @@ func checkBlockDB(t *testing.T, tx *sql.Tx, blocks []blockEntry) {
}

for rnd := basics.Round(0); rnd < basics.Round(len(blocks)); rnd++ {
blk, err := blockGet(tx, rnd)
blk, err := blockdb.BlockGet(tx, rnd)
require.NoError(t, err)
require.Equal(t, blk, blocks[rnd].block)

blk, cert, err := blockGetCert(tx, rnd)
blk, cert, err := blockdb.BlockGetCert(tx, rnd)
require.NoError(t, err)
require.Equal(t, blk, blocks[rnd].block)
require.Equal(t, cert, blocks[rnd].cert)
}

_, err = blockGet(tx, basics.Round(len(blocks)))
_, err = blockdb.BlockGet(tx, basics.Round(len(blocks)))
require.Error(t, err)
}

Expand All @@ -124,7 +125,7 @@ func TestBlockDBEmpty(t *testing.T) {
require.NoError(t, err)
defer tx.Rollback()

err = blockInit(tx, nil)
err = blockdb.BlockInit(tx, nil)
require.NoError(t, err)
checkBlockDB(t, tx, nil)
}
Expand All @@ -142,11 +143,11 @@ func TestBlockDBInit(t *testing.T) {

blocks := randomInitChain(protocol.ConsensusCurrentVersion, 10)

err = blockInit(tx, blockChainBlocks(blocks))
err = blockdb.BlockInit(tx, blockChainBlocks(blocks))
require.NoError(t, err)
checkBlockDB(t, tx, blocks)

err = blockInit(tx, blockChainBlocks(blocks))
err = blockdb.BlockInit(tx, blockChainBlocks(blocks))
require.NoError(t, err)
checkBlockDB(t, tx, blocks)
}
Expand All @@ -164,13 +165,13 @@ func TestBlockDBAppend(t *testing.T) {

blocks := randomInitChain(protocol.ConsensusCurrentVersion, 10)

err = blockInit(tx, blockChainBlocks(blocks))
err = blockdb.BlockInit(tx, blockChainBlocks(blocks))
require.NoError(t, err)
checkBlockDB(t, tx, blocks)

for i := 0; i < 10; i++ {
blkent := randomBlock(basics.Round(len(blocks)))
err = blockPut(tx, blkent.block, blkent.cert)
err = blockdb.BlockPut(tx, blkent.block, blkent.cert)
require.NoError(t, err)

blocks = append(blocks, blkent)
Expand Down
15 changes: 8 additions & 7 deletions ledger/blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store/blockdb"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/metrics"
Expand Down Expand Up @@ -61,7 +62,7 @@ func bqInit(l *Ledger) (*blockQueue, error) {
start := time.Now()
err := bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
bq.lastCommitted, err0 = blockLatest(tx)
bq.lastCommitted, err0 = blockdb.BlockLatest(tx)
return err0
})
ledgerBlockqInitMicros.AddMicrosecondsSince(start, nil)
Expand Down Expand Up @@ -111,7 +112,7 @@ func (bq *blockQueue) syncer() {
ledgerSyncBlockputCount.Inc(nil)
err := bq.l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
for _, e := range workQ {
err0 := blockPut(tx, e.block, e.cert)
err0 := blockdb.BlockPut(tx, e.block, e.cert)
if err0 != nil {
return err0
}
Expand Down Expand Up @@ -146,7 +147,7 @@ func (bq *blockQueue) syncer() {
bfstart := time.Now()
ledgerSyncBlockforgetCount.Inc(nil)
err = bq.l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
return blockForgetBefore(tx, minToSave)
return blockdb.BlockForgetBefore(tx, minToSave)
})
ledgerSyncBlockforgetMicros.AddMicrosecondsSince(bfstart, nil)
if err != nil {
Expand Down Expand Up @@ -261,7 +262,7 @@ func (bq *blockQueue) getBlock(r basics.Round) (blk bookkeeping.Block, err error
ledgerGetblockCount.Inc(nil)
err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
blk, err0 = blockGet(tx, r)
blk, err0 = blockdb.BlockGet(tx, r)
return err0
})
ledgerGetblockMicros.AddMicrosecondsSince(start, nil)
Expand All @@ -283,7 +284,7 @@ func (bq *blockQueue) getBlockHdr(r basics.Round) (hdr bookkeeping.BlockHeader,
ledgerGetblockhdrCount.Inc(nil)
err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
hdr, err0 = blockGetHdr(tx, r)
hdr, err0 = blockdb.BlockGetHdr(tx, r)
return err0
})
ledgerGetblockhdrMicros.AddMicrosecondsSince(start, nil)
Expand All @@ -309,7 +310,7 @@ func (bq *blockQueue) getEncodedBlockCert(r basics.Round) (blk []byte, cert []by
ledgerGeteblockcertCount.Inc(nil)
err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
blk, cert, err0 = blockGetEncodedCert(tx, r)
blk, cert, err0 = blockdb.BlockGetEncodedCert(tx, r)
return err0
})
ledgerGeteblockcertMicros.AddMicrosecondsSince(start, nil)
Expand All @@ -331,7 +332,7 @@ func (bq *blockQueue) getBlockCert(r basics.Round) (blk bookkeeping.Block, cert
ledgerGetblockcertCount.Inc(nil)
err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
blk, cert, err0 = blockGetCert(tx, r)
blk, cert, err0 = blockdb.BlockGetCert(tx, r)
return err0
})
ledgerGetblockcertMicros.AddMicrosecondsSince(start, nil)
Expand Down
Loading

0 comments on commit a70260d

Please sign in to comment.