From a70260de10c8eba10e1c5a32c2132a1e5d59ffcc Mon Sep 17 00:00:00 2001 From: Ignacio Corderi Date: Mon, 5 Dec 2022 15:54:21 -0300 Subject: [PATCH] ledger: move blockdb into a storage package (#4841) --- ledger/accountdb.go | 25 ++--- ledger/accountdb_test.go | 2 +- ledger/archival_test.go | 17 ++-- ledger/blockdb_test.go | 23 ++--- ledger/blockqueue.go | 15 +-- ledger/catchpointtracker.go | 124 ++--------------------- ledger/catchpointtracker_test.go | 28 +++--- ledger/catchupaccessor.go | 17 ++-- ledger/ledger.go | 13 +-- ledger/msgp_gen.go | 54 ----------- ledger/{ => store/blockdb}/blockdb.go | 59 ++++++----- ledger/store/hashing.go | 135 ++++++++++++++++++++++++++ ledger/{ => store}/hashkind_string.go | 14 +-- ledger/trackerdb.go | 2 +- 14 files changed, 260 insertions(+), 268 deletions(-) rename ledger/{ => store/blockdb}/blockdb.go (76%) create mode 100644 ledger/store/hashing.go rename ledger/{ => store}/hashkind_string.go (75%) diff --git a/ledger/accountdb.go b/ledger/accountdb.go index c79fc45139..4304dc9885 100644 --- a/ledger/accountdb.go +++ b/ledger/accountdb.go @@ -36,6 +36,7 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/ledger/store" + "github.com/algorand/go-algorand/ledger/store/blockdb" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" @@ -286,7 +287,7 @@ func prepareNormalizedBalancesV5(bals []encodedBalanceRecordV5, proto config.Con return nil, err } normalizedAccountBalances[i].AccountHashes = make([][]byte, 1) - normalizedAccountBalances[i].AccountHashes[0] = accountHashBuilder(balance.Address, accountDataV5, balance.AccountData) + normalizedAccountBalances[i].AccountHashes[0] = store.AccountHashBuilder(balance.Address, accountDataV5, balance.AccountData) if len(resources) > 0 { normalizedAccountBalances[i].Resources = make(map[basics.CreatableIndex]store.ResourcesData, len(resources)) normalizedAccountBalances[i].EncodedResources = make(map[basics.CreatableIndex][]byte, len(resources)) @@ -325,7 +326,7 @@ func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.Con normalizedAccountBalances[i].PartialBalance = true } else { normalizedAccountBalances[i].AccountHashes = make([][]byte, 1+len(balance.Resources)) - normalizedAccountBalances[i].AccountHashes[0] = accountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].AccountData, balance.AccountData) + normalizedAccountBalances[i].AccountHashes[0] = store.AccountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].AccountData, balance.AccountData) curHashIdx++ } if len(balance.Resources) > 0 { @@ -337,7 +338,7 @@ func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.Con if err != nil { return nil, err } - normalizedAccountBalances[i].AccountHashes[curHashIdx], err = resourcesHashBuilderV6(&resData, balance.Address, basics.CreatableIndex(cidx), resData.UpdateRound, res) + normalizedAccountBalances[i].AccountHashes[curHashIdx], err = store.ResourcesHashBuilderV6(&resData, balance.Address, basics.CreatableIndex(cidx), resData.UpdateRound, res) if err != nil { return nil, err } @@ -1279,11 +1280,11 @@ func performTxTailTableMigration(ctx context.Context, tx *sql.Tx, blockDb db.Acc // when migrating there is only MaxTxnLife blocks in the block DB // since the original txTail.commmittedUpTo preserved only (rnd+1)-MaxTxnLife = 1000 blocks back err = blockDb.Atomic(func(ctx context.Context, blockTx *sql.Tx) error { - latestBlockRound, err := blockLatest(blockTx) + latestBlockRound, err := blockdb.BlockLatest(blockTx) if err != nil { return fmt.Errorf("latest block number cannot be retrieved : %w", err) } - latestHdr, err := blockGetHdr(blockTx, dbRound) + latestHdr, err := blockdb.BlockGetHdr(blockTx, dbRound) if err != nil { return fmt.Errorf("latest block header %d cannot be retrieved : %w", dbRound, err) } @@ -1299,7 +1300,7 @@ func performTxTailTableMigration(ctx context.Context, tx *sql.Tx, blockDb db.Acc if firstRound == basics.Round(0) { firstRound++ } - if _, err := blockGet(blockTx, firstRound); err != nil { + if _, err := blockdb.BlockGet(blockTx, firstRound); err != nil { // looks like not catchpoint but a regular migration, start from maxTxnLife + deeperBlockHistory back firstRound = (latestBlockRound + 1).SubSaturate(maxTxnLife + deeperBlockHistory) if firstRound == basics.Round(0) { @@ -1308,7 +1309,7 @@ func performTxTailTableMigration(ctx context.Context, tx *sql.Tx, blockDb db.Acc } tailRounds := make([][]byte, 0, maxTxnLife) for rnd := firstRound; rnd <= dbRound; rnd++ { - blk, err := blockGet(blockTx, rnd) + blk, err := blockdb.BlockGet(blockTx, rnd) if err != nil { return fmt.Errorf("block for round %d ( %d - %d ) cannot be retrieved : %w", rnd, firstRound, dbRound, err) } @@ -1343,7 +1344,7 @@ func performOnlineRoundParamsTailMigration(ctx context.Context, tx *sql.Tx, bloc currentProto = initProto } else { err = blockDb.Atomic(func(ctx context.Context, blockTx *sql.Tx) error { - hdr, err := blockGetHdr(blockTx, rnd) + hdr, err := blockdb.BlockGetHdr(blockTx, rnd) if err != nil { return err } @@ -1514,7 +1515,7 @@ func performOnlineAccountsTableMigration(ctx context.Context, tx *sql.Tx, progre return fmt.Errorf("accountsInitialize was unable to MakeTrie: %v", err) } for addr, state := range acctRehash { - deleteHash := accountHashBuilderV6(addr, &state.old, state.oldEnc) + deleteHash := store.AccountHashBuilderV6(addr, &state.old, state.oldEnc) deleted, err := trie.Delete(deleteHash) if err != nil { return fmt.Errorf("performOnlineAccountsTableMigration failed to delete hash '%s' from merkle trie for account %v: %w", hex.EncodeToString(deleteHash), addr, err) @@ -1523,7 +1524,7 @@ func performOnlineAccountsTableMigration(ctx context.Context, tx *sql.Tx, progre log.Warnf("performOnlineAccountsTableMigration failed to delete hash '%s' from merkle trie for account %v", hex.EncodeToString(deleteHash), addr) } - addHash := accountHashBuilderV6(addr, &state.new, state.newEnc) + addHash := store.AccountHashBuilderV6(addr, &state.new, state.newEnc) added, err := trie.Add(addHash) if err != nil { return fmt.Errorf("performOnlineAccountsTableMigration attempted to add duplicate hash '%s' to merkle trie for account %v: %w", hex.EncodeToString(addHash), addr, err) @@ -2466,7 +2467,7 @@ func (iterator *orderedAccountsIter) Next(ctx context.Context) (acct []accountAd if iterator.step == oaiStepInsertAccountData { var lastAddrID int64 baseCb := func(addr basics.Address, rowid int64, accountData *store.BaseAccountData, encodedAccountData []byte) (err error) { - hash := accountHashBuilderV6(addr, accountData, encodedAccountData) + hash := store.AccountHashBuilderV6(addr, accountData, encodedAccountData) _, err = iterator.insertStmt.ExecContext(ctx, rowid, hash) if err != nil { return @@ -2477,7 +2478,7 @@ func (iterator *orderedAccountsIter) Next(ctx context.Context) (acct []accountAd resCb := func(addr basics.Address, cidx basics.CreatableIndex, resData *store.ResourcesData, encodedResourceData []byte, lastResource bool) error { if resData != nil { - hash, err := resourcesHashBuilderV6(resData, addr, cidx, resData.UpdateRound, encodedResourceData) + hash, err := store.ResourcesHashBuilderV6(resData, addr, cidx, resData.UpdateRound, encodedResourceData) if err != nil { return err } diff --git a/ledger/accountdb_test.go b/ledger/accountdb_test.go index b1a027f161..b109e2b470 100644 --- a/ledger/accountdb_test.go +++ b/ledger/accountdb_test.go @@ -3244,7 +3244,7 @@ func TestRemoveOfflineStateProofID(t *testing.T) { if expected && ba.Status != basics.Online { require.Equal(t, merklesignature.Commitment{}, ba.StateProofID) } - addHash := accountHashBuilderV6(addr, &ba, encodedAcctData) + addHash := store.AccountHashBuilderV6(addr, &ba, encodedAcctData) added, err := trie.Add(addHash) require.NoError(t, err) require.True(t, added) diff --git a/ledger/archival_test.go b/ledger/archival_test.go index 6e61ee0abf..50d367d65e 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -40,6 +40,7 @@ import ( "github.com/algorand/go-algorand/data/transactions/logic" "github.com/algorand/go-algorand/ledger/internal" "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/ledger/store/blockdb" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" @@ -219,10 +220,10 @@ func TestArchivalRestart(t *testing.T) { var latest, earliest basics.Round err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - latest, err = blockLatest(tx) + latest, err = blockdb.BlockLatest(tx) require.NoError(t, err) - earliest, err = blockEarliest(tx) + earliest, err = blockdb.BlockEarliest(tx) require.NoError(t, err) return err }) @@ -236,10 +237,10 @@ func TestArchivalRestart(t *testing.T) { defer l.Close() err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - latest, err = blockLatest(tx) + latest, err = blockdb.BlockLatest(tx) require.NoError(t, err) - earliest, err = blockEarliest(tx) + earliest, err = blockdb.BlockEarliest(tx) require.NoError(t, err) return err }) @@ -754,10 +755,10 @@ func TestArchivalFromNonArchival(t *testing.T) { var latest, earliest basics.Round err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - latest, err = blockLatest(tx) + latest, err = blockdb.BlockLatest(tx) require.NoError(t, err) - earliest, err = blockEarliest(tx) + earliest, err = blockdb.BlockEarliest(tx) require.NoError(t, err) return err }) @@ -774,10 +775,10 @@ func TestArchivalFromNonArchival(t *testing.T) { defer l.Close() err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - latest, err = blockLatest(tx) + latest, err = blockdb.BlockLatest(tx) require.NoError(t, err) - earliest, err = blockEarliest(tx) + earliest, err = blockdb.BlockEarliest(tx) require.NoError(t, err) return err }) diff --git a/ledger/blockdb_test.go b/ledger/blockdb_test.go index 39cf6eaa0a..25fe573c08 100644 --- a/ledger/blockdb_test.go +++ b/ledger/blockdb_test.go @@ -26,6 +26,7 @@ import ( "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger/store/blockdb" storetesting "github.com/algorand/go-algorand/ledger/store/testing" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" @@ -78,11 +79,11 @@ func blockChainBlocks(be []blockEntry) []bookkeeping.Block { } func checkBlockDB(t *testing.T, tx *sql.Tx, blocks []blockEntry) { - next, err := blockNext(tx) + next, err := blockdb.BlockNext(tx) require.NoError(t, err) require.Equal(t, next, basics.Round(len(blocks))) - latest, err := blockLatest(tx) + latest, err := blockdb.BlockLatest(tx) if len(blocks) == 0 { require.Error(t, err) } else { @@ -90,7 +91,7 @@ func checkBlockDB(t *testing.T, tx *sql.Tx, blocks []blockEntry) { require.Equal(t, latest, basics.Round(len(blocks))-1) } - earliest, err := blockEarliest(tx) + earliest, err := blockdb.BlockEarliest(tx) if len(blocks) == 0 { require.Error(t, err) } else { @@ -99,17 +100,17 @@ func checkBlockDB(t *testing.T, tx *sql.Tx, blocks []blockEntry) { } for rnd := basics.Round(0); rnd < basics.Round(len(blocks)); rnd++ { - blk, err := blockGet(tx, rnd) + blk, err := blockdb.BlockGet(tx, rnd) require.NoError(t, err) require.Equal(t, blk, blocks[rnd].block) - blk, cert, err := blockGetCert(tx, rnd) + blk, cert, err := blockdb.BlockGetCert(tx, rnd) require.NoError(t, err) require.Equal(t, blk, blocks[rnd].block) require.Equal(t, cert, blocks[rnd].cert) } - _, err = blockGet(tx, basics.Round(len(blocks))) + _, err = blockdb.BlockGet(tx, basics.Round(len(blocks))) require.Error(t, err) } @@ -124,7 +125,7 @@ func TestBlockDBEmpty(t *testing.T) { require.NoError(t, err) defer tx.Rollback() - err = blockInit(tx, nil) + err = blockdb.BlockInit(tx, nil) require.NoError(t, err) checkBlockDB(t, tx, nil) } @@ -142,11 +143,11 @@ func TestBlockDBInit(t *testing.T) { blocks := randomInitChain(protocol.ConsensusCurrentVersion, 10) - err = blockInit(tx, blockChainBlocks(blocks)) + err = blockdb.BlockInit(tx, blockChainBlocks(blocks)) require.NoError(t, err) checkBlockDB(t, tx, blocks) - err = blockInit(tx, blockChainBlocks(blocks)) + err = blockdb.BlockInit(tx, blockChainBlocks(blocks)) require.NoError(t, err) checkBlockDB(t, tx, blocks) } @@ -164,13 +165,13 @@ func TestBlockDBAppend(t *testing.T) { blocks := randomInitChain(protocol.ConsensusCurrentVersion, 10) - err = blockInit(tx, blockChainBlocks(blocks)) + err = blockdb.BlockInit(tx, blockChainBlocks(blocks)) require.NoError(t, err) checkBlockDB(t, tx, blocks) for i := 0; i < 10; i++ { blkent := randomBlock(basics.Round(len(blocks))) - err = blockPut(tx, blkent.block, blkent.cert) + err = blockdb.BlockPut(tx, blkent.block, blkent.cert) require.NoError(t, err) blocks = append(blocks, blkent) diff --git a/ledger/blockqueue.go b/ledger/blockqueue.go index cb14d62943..4dfa1ae2cc 100644 --- a/ledger/blockqueue.go +++ b/ledger/blockqueue.go @@ -29,6 +29,7 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/ledger/store/blockdb" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/metrics" @@ -61,7 +62,7 @@ func bqInit(l *Ledger) (*blockQueue, error) { start := time.Now() err := bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { var err0 error - bq.lastCommitted, err0 = blockLatest(tx) + bq.lastCommitted, err0 = blockdb.BlockLatest(tx) return err0 }) ledgerBlockqInitMicros.AddMicrosecondsSince(start, nil) @@ -111,7 +112,7 @@ func (bq *blockQueue) syncer() { ledgerSyncBlockputCount.Inc(nil) err := bq.l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { for _, e := range workQ { - err0 := blockPut(tx, e.block, e.cert) + err0 := blockdb.BlockPut(tx, e.block, e.cert) if err0 != nil { return err0 } @@ -146,7 +147,7 @@ func (bq *blockQueue) syncer() { bfstart := time.Now() ledgerSyncBlockforgetCount.Inc(nil) err = bq.l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - return blockForgetBefore(tx, minToSave) + return blockdb.BlockForgetBefore(tx, minToSave) }) ledgerSyncBlockforgetMicros.AddMicrosecondsSince(bfstart, nil) if err != nil { @@ -261,7 +262,7 @@ func (bq *blockQueue) getBlock(r basics.Round) (blk bookkeeping.Block, err error ledgerGetblockCount.Inc(nil) err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { var err0 error - blk, err0 = blockGet(tx, r) + blk, err0 = blockdb.BlockGet(tx, r) return err0 }) ledgerGetblockMicros.AddMicrosecondsSince(start, nil) @@ -283,7 +284,7 @@ func (bq *blockQueue) getBlockHdr(r basics.Round) (hdr bookkeeping.BlockHeader, ledgerGetblockhdrCount.Inc(nil) err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { var err0 error - hdr, err0 = blockGetHdr(tx, r) + hdr, err0 = blockdb.BlockGetHdr(tx, r) return err0 }) ledgerGetblockhdrMicros.AddMicrosecondsSince(start, nil) @@ -309,7 +310,7 @@ func (bq *blockQueue) getEncodedBlockCert(r basics.Round) (blk []byte, cert []by ledgerGeteblockcertCount.Inc(nil) err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { var err0 error - blk, cert, err0 = blockGetEncodedCert(tx, r) + blk, cert, err0 = blockdb.BlockGetEncodedCert(tx, r) return err0 }) ledgerGeteblockcertMicros.AddMicrosecondsSince(start, nil) @@ -331,7 +332,7 @@ func (bq *blockQueue) getBlockCert(r basics.Round) (blk bookkeeping.Block, cert ledgerGetblockcertCount.Inc(nil) err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { var err0 error - blk, cert, err0 = blockGetCert(tx, r) + blk, cert, err0 = blockdb.BlockGetCert(tx, r) return err0 }) ledgerGetblockcertMicros.AddMicrosecondsSince(start, nil) diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index d4cab44f11..e66d7b6be2 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -22,7 +22,6 @@ import ( "compress/gzip" "context" "database/sql" - "encoding/binary" "encoding/hex" "errors" "fmt" @@ -950,7 +949,7 @@ func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccoun for i := 0; i < accountsDeltas.len(); i++ { delta := accountsDeltas.getByIdx(i) if !delta.oldAcct.AccountData.IsEmpty() { - deleteHash := accountHashBuilderV6(delta.address, &delta.oldAcct.AccountData, protocol.Encode(&delta.oldAcct.AccountData)) + deleteHash := store.AccountHashBuilderV6(delta.address, &delta.oldAcct.AccountData, protocol.Encode(&delta.oldAcct.AccountData)) deleted, err = ct.balancesTrie.Delete(deleteHash) if err != nil { return fmt.Errorf("failed to delete hash '%s' from merkle trie for account %v: %w", hex.EncodeToString(deleteHash), delta.address, err) @@ -963,7 +962,7 @@ func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccoun } if !delta.newAcct.IsEmpty() { - addHash := accountHashBuilderV6(delta.address, &delta.newAcct, protocol.Encode(&delta.newAcct)) + addHash := store.AccountHashBuilderV6(delta.address, &delta.newAcct, protocol.Encode(&delta.newAcct)) added, err = ct.balancesTrie.Add(addHash) if err != nil { return fmt.Errorf("attempted to add duplicate hash '%s' to merkle trie for account %v: %w", hex.EncodeToString(addHash), delta.address, err) @@ -980,7 +979,7 @@ func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccoun resDelta := resourcesDeltas.getByIdx(i) addr := resDelta.address if !resDelta.oldResource.Data.IsEmpty() { - deleteHash, err := resourcesHashBuilderV6(&resDelta.oldResource.Data, addr, resDelta.oldResource.Aidx, resDelta.oldResource.Data.UpdateRound, protocol.Encode(&resDelta.oldResource.Data)) + deleteHash, err := store.ResourcesHashBuilderV6(&resDelta.oldResource.Data, addr, resDelta.oldResource.Aidx, resDelta.oldResource.Data.UpdateRound, protocol.Encode(&resDelta.oldResource.Data)) if err != nil { return err } @@ -996,7 +995,7 @@ func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccoun } if !resDelta.newResource.IsEmpty() { - addHash, err := resourcesHashBuilderV6(&resDelta.newResource, addr, resDelta.oldResource.Aidx, resDelta.newResource.UpdateRound, protocol.Encode(&resDelta.newResource)) + addHash, err := store.ResourcesHashBuilderV6(&resDelta.newResource, addr, resDelta.oldResource.Aidx, resDelta.newResource.UpdateRound, protocol.Encode(&resDelta.newResource)) if err != nil { return err } @@ -1021,7 +1020,7 @@ func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccoun if mv.data != nil && bytes.Equal(mv.oldData, mv.data) { continue // changed back within the delta span } - deleteHash := kvHashBuilderV6(key, mv.oldData) + deleteHash := store.KvHashBuilderV6(key, mv.oldData) deleted, err = ct.balancesTrie.Delete(deleteHash) if err != nil { return fmt.Errorf("failed to delete kv hash '%s' from merkle trie for key %v: %w", hex.EncodeToString(deleteHash), key, err) @@ -1034,7 +1033,7 @@ func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccoun } if mv.data != nil { - addHash := kvHashBuilderV6(key, mv.data) + addHash := store.KvHashBuilderV6(key, mv.data) added, err = ct.balancesTrie.Add(addHash) if err != nil { return fmt.Errorf("attempted to add duplicate kv hash '%s' from merkle trie for key %v: %w", hex.EncodeToString(addHash), key, err) @@ -1421,115 +1420,6 @@ func removeSingleCatchpointFileFromDisk(dbDirectory, fileToDelete string) (err e return nil } -func hashBufV6(affinity uint64, kind hashKind) []byte { - hash := make([]byte, 4+crypto.DigestSize) - // write out the lowest 32 bits of the affinity value. This should improve - // the caching of the trie by allowing recent updates to be in-cache, and - // "older" nodes will be left alone. - for i, prefix := 3, affinity; i >= 0; i, prefix = i-1, prefix>>8 { - // the following takes the prefix & 255 -> hash[i] - hash[i] = byte(prefix) - } - hash[hashKindEncodingIndex] = byte(kind) - return hash -} - -func finishV6(v6hash []byte, prehash []byte) []byte { - entryHash := crypto.Hash(prehash) - copy(v6hash[5:], entryHash[1:]) - return v6hash[:] - -} - -// accountHashBuilderV6 calculates the hash key used for the trie by combining the account address and the account data -func accountHashBuilderV6(addr basics.Address, accountData *store.BaseAccountData, encodedAccountData []byte) []byte { - hashIntPrefix := accountData.UpdateRound - if hashIntPrefix == 0 { - hashIntPrefix = accountData.RewardsBase - } - hash := hashBufV6(hashIntPrefix, accountHK) - // write out the lowest 32 bits of the reward base. This should improve the caching of the trie by allowing - // recent updated to be in-cache, and "older" nodes will be left alone. - - prehash := make([]byte, crypto.DigestSize+len(encodedAccountData)) - copy(prehash[:], addr[:]) - copy(prehash[crypto.DigestSize:], encodedAccountData[:]) - - return finishV6(hash, prehash) -} - -// hashKind enumerates the possible data types hashed into a catchpoint merkle -// trie. Each merkle trie hash includes the hashKind byte at a known-offset. -// By encoding hashKind at a known-offset, it's possible for hash readers to -// disambiguate the hashed resource. -// -//go:generate stringer -type=hashKind -type hashKind byte - -// Defines known kinds of hashes. Changing an enum ordinal value is a -// breaking change. -const ( - accountHK hashKind = iota - assetHK - appHK - kvHK -) - -// hashKindEncodingIndex defines the []byte offset where the hash kind is -// encoded. -const hashKindEncodingIndex = 4 - -func rdGetCreatableHashKind(rd *store.ResourcesData, a basics.Address, ci basics.CreatableIndex) (hashKind, error) { - if rd.IsAsset() { - return assetHK, nil - } else if rd.IsApp() { - return appHK, nil - } - return accountHK, fmt.Errorf("unknown creatable for addr %s, aidx %d, data %v", a.String(), ci, rd) -} - -// resourcesHashBuilderV6 calculates the hash key used for the trie by combining the creatable's resource data and its index -func resourcesHashBuilderV6(rd *store.ResourcesData, addr basics.Address, cidx basics.CreatableIndex, updateRound uint64, encodedResourceData []byte) ([]byte, error) { - hk, err := rdGetCreatableHashKind(rd, addr, cidx) - if err != nil { - return nil, err - } - - hash := hashBufV6(updateRound, hk) - - prehash := make([]byte, 8+crypto.DigestSize+len(encodedResourceData)) - copy(prehash[:], addr[:]) - binary.LittleEndian.PutUint64(prehash[crypto.DigestSize:], uint64(cidx)) - copy(prehash[crypto.DigestSize+8:], encodedResourceData[:]) - - return finishV6(hash, prehash), nil -} - -// kvHashBuilderV6 calculates the hash key used for the trie by combining the key and value -func kvHashBuilderV6(key string, value []byte) []byte { - hash := hashBufV6(0, kvHK) - - prehash := make([]byte, len(key)+len(value)) - copy(prehash[:], key) - copy(prehash[len(key):], value) - - return finishV6(hash, prehash) -} - -// accountHashBuilder calculates the hash key used for the trie by combining the account address and the account data -func accountHashBuilder(addr basics.Address, accountData basics.AccountData, encodedAccountData []byte) []byte { - hash := make([]byte, 4+crypto.DigestSize) - // write out the lowest 32 bits of the reward base. This should improve the caching of the trie by allowing - // recent updated to be in-cache, and "older" nodes will be left alone. - for i, rewards := 3, accountData.RewardsBase; i >= 0; i, rewards = i-1, rewards>>8 { - // the following takes the rewards & 255 -> hash[i] - hash[i] = byte(rewards) - } - entryHash := crypto.Hash(append(addr[:], encodedAccountData[:]...)) - copy(hash[4:], entryHash[:]) - return hash[:] -} - func (ct *catchpointTracker) catchpointEnabled() bool { return ct.catchpointInterval != 0 } @@ -1659,7 +1549,7 @@ func (ct *catchpointTracker) initializeHashes(ctx context.Context, tx *sql.Tx, r if err != nil { return err } - hash := kvHashBuilderV6(string(k), v) + hash := store.KvHashBuilderV6(string(k), v) trieHashCount++ pendingTrieHashes++ added, err := trie.Add(hash) diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index ddbdd21917..473d559a05 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -1362,30 +1362,30 @@ func TestHashContract(t *testing.T) { type testCase struct { genHash func() []byte expectedHex string - expectedHashKind hashKind + expectedHashKind store.HashKind } accountCase := func(genHash func() []byte, expectedHex string) testCase { return testCase{ - genHash, expectedHex, accountHK, + genHash, expectedHex, store.AccountHK, } } resourceAssetCase := func(genHash func() []byte, expectedHex string) testCase { return testCase{ - genHash, expectedHex, assetHK, + genHash, expectedHex, store.AssetHK, } } resourceAppCase := func(genHash func() []byte, expectedHex string) testCase { return testCase{ - genHash, expectedHex, appHK, + genHash, expectedHex, store.AppHK, } } kvCase := func(genHash func() []byte, expectedHex string) testCase { return testCase{ - genHash, expectedHex, kvHK, + genHash, expectedHex, store.KvHK, } } @@ -1397,7 +1397,7 @@ func TestHashContract(t *testing.T) { b := store.BaseAccountData{ UpdateRound: 1024, } - return accountHashBuilderV6(a, &b, protocol.Encode(&b)) + return store.AccountHashBuilderV6(a, &b, protocol.Encode(&b)) }, "0000040000c3c39a72c146dc6bcb87b499b63ef730145a8fe4a187c96e9a52f74ef17f54", ), @@ -1406,7 +1406,7 @@ func TestHashContract(t *testing.T) { b := store.BaseAccountData{ RewardsBase: 10000, } - return accountHashBuilderV6(a, &b, protocol.Encode(&b)) + return store.AccountHashBuilderV6(a, &b, protocol.Encode(&b)) }, "0000271000804b58bcc81190c3c7343c1db9c737621ff0438104bdd20a25d12aa4e9b6e5", ), @@ -1422,7 +1422,7 @@ func TestHashContract(t *testing.T) { Manager: a, } - bytes, err := resourcesHashBuilderV6(&r, a, 7, 1024, protocol.Encode(&r)) + bytes, err := store.ResourcesHashBuilderV6(&r, a, 7, 1024, protocol.Encode(&r)) require.NoError(t, err) return bytes }, @@ -1440,7 +1440,7 @@ func TestHashContract(t *testing.T) { GlobalStateSchemaNumUint: 2, } - bytes, err := resourcesHashBuilderV6(&r, a, 7, 1024, protocol.Encode(&r)) + bytes, err := store.ResourcesHashBuilderV6(&r, a, 7, 1024, protocol.Encode(&r)) require.NoError(t, err) return bytes }, @@ -1451,7 +1451,7 @@ func TestHashContract(t *testing.T) { kvs := []testCase{ kvCase( func() []byte { - return kvHashBuilderV6("sample key", []byte("sample value")) + return store.KvHashBuilderV6("sample key", []byte("sample value")) }, "0000000003cca3d1a8d7d724daa445c795ad277a7a64b351b4b9407f738841282f9c348b", ), @@ -1461,12 +1461,12 @@ func TestHashContract(t *testing.T) { for i, tc := range allCases { t.Run(fmt.Sprintf("index=%d", i), func(t *testing.T) { h := tc.genHash() - require.Equal(t, byte(tc.expectedHashKind), h[hashKindEncodingIndex]) + require.Equal(t, byte(tc.expectedHashKind), h[store.HashKindEncodingIndex]) require.Equal(t, tc.expectedHex, hex.EncodeToString(h)) }) } - hasTestCoverageForKind := func(hk hashKind) bool { + hasTestCoverageForKind := func(hk store.HashKind) bool { for _, c := range allCases { if c.expectedHashKind == hk { return true @@ -1476,8 +1476,8 @@ func TestHashContract(t *testing.T) { } for i := byte(0); i < 255; i++ { - if !strings.HasPrefix(hashKind(i).String(), "hashKind(") { - require.True(t, hasTestCoverageForKind(hashKind(i)), fmt.Sprintf("Missing test coverage for hashKind ordinal value = %d", i)) + if !strings.HasPrefix(store.HashKind(i).String(), "hashKind(") { + require.True(t, hasTestCoverageForKind(store.HashKind(i)), fmt.Sprintf("Missing test coverage for hashKind ordinal value = %d", i)) } } } diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go index 36d04ad8ac..c642c26b2b 100644 --- a/ledger/catchupaccessor.go +++ b/ledger/catchupaccessor.go @@ -33,6 +33,7 @@ import ( "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/ledger/store" + "github.com/algorand/go-algorand/ledger/store/blockdb" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" @@ -121,7 +122,7 @@ func (w *stagingWriterImpl) writeKVs(ctx context.Context, kvrs []encodedKVRecord for i := 0; i < len(kvrs); i++ { keys[i] = kvrs[i].Key values[i] = kvrs[i].Value - hashes[i] = kvHashBuilderV6(string(keys[i]), values[i]) + hashes[i] = store.KvHashBuilderV6(string(keys[i]), values[i]) } return crw.WriteCatchpointStagingKVs(ctx, keys, values, hashes) @@ -621,7 +622,7 @@ func (c *catchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte // The function is _not_ a general purpose way to count hashes by hash kind. func countHashes(hashes [][]byte) (accountCount, kvCount uint64) { for _, hash := range hashes { - if hash[hashKindEncodingIndex] == byte(kvHK) { + if hash[store.HashKindEncodingIndex] == byte(store.KvHK) { kvCount++ } else { accountCount++ @@ -733,7 +734,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro var added bool added, err = trie.Add(hash) if !added { - return fmt.Errorf("CatchpointCatchupAccessorImpl::BuildMerkleTrie: The provided catchpoint file contained the same account more than once. hash = '%s' hash kind = %s", hex.EncodeToString(hash), hashKind(hash[hashKindEncodingIndex])) + return fmt.Errorf("CatchpointCatchupAccessorImpl::BuildMerkleTrie: The provided catchpoint file contained the same account more than once. hash = '%s' hash kind = %s", hex.EncodeToString(hash), store.HashKind(hash[store.HashKindEncodingIndex])) } if err != nil { return @@ -915,7 +916,7 @@ func (c *catchpointCatchupAccessorImpl) StoreFirstBlock(ctx context.Context, blk start := time.Now() ledgerStorefirstblockCount.Inc(nil) err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - return blockStartCatchupStaging(tx, *blk) + return blockdb.BlockStartCatchupStaging(tx, *blk) }) ledgerStorefirstblockMicros.AddMicrosecondsSince(start, nil) if err != nil { @@ -930,7 +931,7 @@ func (c *catchpointCatchupAccessorImpl) StoreBlock(ctx context.Context, blk *boo start := time.Now() ledgerCatchpointStoreblockCount.Inc(nil) err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - return blockPutStaging(tx, *blk) + return blockdb.BlockPutStaging(tx, *blk) }) ledgerCatchpointStoreblockMicros.AddMicrosecondsSince(start, nil) if err != nil { @@ -946,10 +947,10 @@ func (c *catchpointCatchupAccessorImpl) FinishBlocks(ctx context.Context, applyC ledgerCatchpointFinishblocksCount.Inc(nil) err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { if applyChanges { - return blockCompleteCatchup(tx) + return blockdb.BlockCompleteCatchup(tx) } // TODO: unused, either actually implement cleanup on catchpoint failure, or delete this - return blockAbortCatchup(tx) + return blockdb.BlockAbortCatchup(tx) }) ledgerCatchpointFinishblocksMicros.AddMicrosecondsSince(start, nil) if err != nil { @@ -964,7 +965,7 @@ func (c *catchpointCatchupAccessorImpl) EnsureFirstBlock(ctx context.Context) (b start := time.Now() ledgerCatchpointEnsureblock1Count.Inc(nil) err = blockDbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - blk, err = blockEnsureSingleBlock(tx) + blk, err = blockdb.BlockEnsureSingleBlock(tx) return }) ledgerCatchpointEnsureblock1Micros.AddMicrosecondsSince(start, nil) diff --git a/ledger/ledger.go b/ledger/ledger.go index 09ec1b3cd3..8791f5c9cb 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -35,6 +35,7 @@ import ( "github.com/algorand/go-algorand/ledger/apply" "github.com/algorand/go-algorand/ledger/internal" "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/ledger/store/blockdb" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" @@ -251,12 +252,12 @@ func (l *Ledger) verifyMatchingGenesisHash() (err error) { start := time.Now() ledgerVerifygenhashCount.Inc(nil) err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - latest, err := blockLatest(tx) + latest, err := blockdb.BlockLatest(tx) if err != nil { return err } - hdr, err := blockGetHdr(tx, latest) + hdr, err := blockdb.BlockGetHdr(tx, latest) if err != nil { return err } @@ -340,7 +341,7 @@ func (l *Ledger) setSynchronousMode(ctx context.Context, synchronousMode db.Sync // - creates and populates it with genesis blocks // - ensures DB is in good shape for archival mode and resets it if not func initBlocksDB(tx *sql.Tx, l *Ledger, initBlocks []bookkeeping.Block, isArchival bool) (err error) { - err = blockInit(tx, initBlocks) + err = blockdb.BlockInit(tx, initBlocks) if err != nil { err = fmt.Errorf("initBlocksDB.blockInit %v", err) return err @@ -348,7 +349,7 @@ func initBlocksDB(tx *sql.Tx, l *Ledger, initBlocks []bookkeeping.Block, isArchi // in archival mode check if DB contains all blocks up to the latest if isArchival { - earliest, err := blockEarliest(tx) + earliest, err := blockdb.BlockEarliest(tx) if err != nil { err = fmt.Errorf("initBlocksDB.blockEarliest %v", err) return err @@ -358,12 +359,12 @@ func initBlocksDB(tx *sql.Tx, l *Ledger, initBlocks []bookkeeping.Block, isArchi // So reset the DB and init it again if earliest != basics.Round(0) { l.log.Warnf("resetting blocks DB (earliest block is %v)", earliest) - err := blockResetDB(tx) + err := blockdb.BlockResetDB(tx) if err != nil { err = fmt.Errorf("initBlocksDB.blockResetDB %v", err) return err } - err = blockInit(tx, initBlocks) + err = blockdb.BlockInit(tx, initBlocks) if err != nil { err = fmt.Errorf("initBlocksDB.blockInit 2 %v", err) return err diff --git a/ledger/msgp_gen.go b/ledger/msgp_gen.go index 61f4d02ccf..ddd01a87f5 100644 --- a/ledger/msgp_gen.go +++ b/ledger/msgp_gen.go @@ -65,14 +65,6 @@ import ( // |-----> (*) Msgsize // |-----> (*) MsgIsZero // -// hashKind -// |-----> MarshalMsg -// |-----> CanMarshalMsg -// |-----> (*) UnmarshalMsg -// |-----> (*) CanUnmarshalMsg -// |-----> Msgsize -// |-----> MsgIsZero -// // MarshalMsg implements msgp.Marshaler func (z CatchpointCatchupState) MarshalMsg(b []byte) (o []byte) { @@ -1733,49 +1725,3 @@ func (z *encodedKVRecordV6) Msgsize() (s int) { func (z *encodedKVRecordV6) MsgIsZero() bool { return (len((*z).Key) == 0) && (len((*z).Value) == 0) } - -// MarshalMsg implements msgp.Marshaler -func (z hashKind) MarshalMsg(b []byte) (o []byte) { - o = msgp.Require(b, z.Msgsize()) - o = msgp.AppendByte(o, byte(z)) - return -} - -func (_ hashKind) CanMarshalMsg(z interface{}) bool { - _, ok := (z).(hashKind) - if !ok { - _, ok = (z).(*hashKind) - } - return ok -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *hashKind) UnmarshalMsg(bts []byte) (o []byte, err error) { - { - var zb0001 byte - zb0001, bts, err = msgp.ReadByteBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - (*z) = hashKind(zb0001) - } - o = bts - return -} - -func (_ *hashKind) CanUnmarshalMsg(z interface{}) bool { - _, ok := (z).(*hashKind) - return ok -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z hashKind) Msgsize() (s int) { - s = msgp.ByteSize - return -} - -// MsgIsZero returns whether this is a zero value -func (z hashKind) MsgIsZero() bool { - return z == 0 -} diff --git a/ledger/blockdb.go b/ledger/store/blockdb/blockdb.go similarity index 76% rename from ledger/blockdb.go rename to ledger/store/blockdb/blockdb.go index 1919fdb80b..6417d84529 100644 --- a/ledger/blockdb.go +++ b/ledger/store/blockdb/blockdb.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with go-algorand. If not, see . -package ledger +package blockdb import ( "database/sql" @@ -44,7 +44,8 @@ var blockResetExprs = []string{ `DROP TABLE IF EXISTS blocks`, } -func blockInit(tx *sql.Tx, initBlocks []bookkeeping.Block) error { +// BlockInit initializes blockdb +func BlockInit(tx *sql.Tx, initBlocks []bookkeeping.Block) error { for _, tableCreate := range blockSchema { _, err := tx.Exec(tableCreate) if err != nil { @@ -52,14 +53,14 @@ func blockInit(tx *sql.Tx, initBlocks []bookkeeping.Block) error { } } - next, err := blockNext(tx) + next, err := BlockNext(tx) if err != nil { return err } if next == 0 { for _, blk := range initBlocks { - err = blockPut(tx, blk, agreement.Certificate{}) + err = BlockPut(tx, blk, agreement.Certificate{}) if err != nil { serr, ok := err.(sqlite3.Error) if ok && serr.Code == sqlite3.ErrConstraint { @@ -73,7 +74,8 @@ func blockInit(tx *sql.Tx, initBlocks []bookkeeping.Block) error { return nil } -func blockResetDB(tx *sql.Tx) error { +// BlockResetDB resets blockdb +func BlockResetDB(tx *sql.Tx) error { for _, stmt := range blockResetExprs { _, err := tx.Exec(stmt) if err != nil { @@ -83,7 +85,8 @@ func blockResetDB(tx *sql.Tx) error { return nil } -func blockGet(tx *sql.Tx, rnd basics.Round) (blk bookkeeping.Block, err error) { +// BlockGet retrieves a block by a round number +func BlockGet(tx *sql.Tx, rnd basics.Round) (blk bookkeeping.Block, err error) { var buf []byte err = tx.QueryRow("SELECT blkdata FROM blocks WHERE rnd=?", rnd).Scan(&buf) if err != nil { @@ -98,7 +101,8 @@ func blockGet(tx *sql.Tx, rnd basics.Round) (blk bookkeeping.Block, err error) { return } -func blockGetHdr(tx *sql.Tx, rnd basics.Round) (hdr bookkeeping.BlockHeader, err error) { +// BlockGetHdr retrieves a block header by a round number +func BlockGetHdr(tx *sql.Tx, rnd basics.Round) (hdr bookkeeping.BlockHeader, err error) { var buf []byte err = tx.QueryRow("SELECT hdrdata FROM blocks WHERE rnd=?", rnd).Scan(&buf) if err != nil { @@ -113,7 +117,8 @@ func blockGetHdr(tx *sql.Tx, rnd basics.Round) (hdr bookkeeping.BlockHeader, err return } -func blockGetEncodedCert(tx *sql.Tx, rnd basics.Round) (blk []byte, cert []byte, err error) { +// BlockGetEncodedCert retrieves raw block and cert by a round number +func BlockGetEncodedCert(tx *sql.Tx, rnd basics.Round) (blk []byte, cert []byte, err error) { err = tx.QueryRow("SELECT blkdata, certdata FROM blocks WHERE rnd=?", rnd).Scan(&blk, &cert) if err != nil { if err == sql.ErrNoRows { @@ -125,8 +130,9 @@ func blockGetEncodedCert(tx *sql.Tx, rnd basics.Round) (blk []byte, cert []byte, return } -func blockGetCert(tx *sql.Tx, rnd basics.Round) (blk bookkeeping.Block, cert agreement.Certificate, err error) { - blkbuf, certbuf, err := blockGetEncodedCert(tx, rnd) +// BlockGetCert retrieves block and cert by a round number +func BlockGetCert(tx *sql.Tx, rnd basics.Round) (blk bookkeeping.Block, cert agreement.Certificate, err error) { + blkbuf, certbuf, err := BlockGetEncodedCert(tx, rnd) if err != nil { return } @@ -145,7 +151,8 @@ func blockGetCert(tx *sql.Tx, rnd basics.Round) (blk bookkeeping.Block, cert agr return } -func blockPut(tx *sql.Tx, blk bookkeeping.Block, cert agreement.Certificate) error { +// BlockPut stores block and certificate +func BlockPut(tx *sql.Tx, blk bookkeeping.Block, cert agreement.Certificate) error { var max sql.NullInt64 err := tx.QueryRow("SELECT MAX(rnd) FROM blocks").Scan(&max) if err != nil { @@ -174,7 +181,8 @@ func blockPut(tx *sql.Tx, blk bookkeeping.Block, cert agreement.Certificate) err return err } -func blockNext(tx *sql.Tx) (basics.Round, error) { +// BlockNext returns the next expected round number +func BlockNext(tx *sql.Tx) (basics.Round, error) { var max sql.NullInt64 err := tx.QueryRow("SELECT MAX(rnd) FROM blocks").Scan(&max) if err != nil { @@ -188,7 +196,8 @@ func blockNext(tx *sql.Tx) (basics.Round, error) { return 0, nil } -func blockLatest(tx *sql.Tx) (basics.Round, error) { +// BlockLatest returns the latest persisted round number +func BlockLatest(tx *sql.Tx) (basics.Round, error) { var max sql.NullInt64 err := tx.QueryRow("SELECT MAX(rnd) FROM blocks").Scan(&max) if err != nil { @@ -202,7 +211,8 @@ func blockLatest(tx *sql.Tx) (basics.Round, error) { return 0, fmt.Errorf("no blocks present") } -func blockEarliest(tx *sql.Tx) (basics.Round, error) { +// BlockEarliest returns the lowest persisted round number +func BlockEarliest(tx *sql.Tx) (basics.Round, error) { var min sql.NullInt64 err := tx.QueryRow("SELECT MIN(rnd) FROM blocks").Scan(&min) if err != nil { @@ -216,8 +226,9 @@ func blockEarliest(tx *sql.Tx) (basics.Round, error) { return 0, fmt.Errorf("no blocks present") } -func blockForgetBefore(tx *sql.Tx, rnd basics.Round) error { - next, err := blockNext(tx) +// BlockForgetBefore removes block entries with round numbers less than the specified round +func BlockForgetBefore(tx *sql.Tx, rnd basics.Round) error { + next, err := BlockNext(tx) if err != nil { return err } @@ -230,7 +241,8 @@ func blockForgetBefore(tx *sql.Tx, rnd basics.Round) error { return err } -func blockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block) error { +// BlockStartCatchupStaging initializes catchup for catchpoint +func BlockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block) error { // delete the old catchpointblocks table, if there is such. for _, stmt := range blockResetExprs { stmt = strings.Replace(stmt, "blocks", "catchpointblocks", 1) @@ -262,7 +274,8 @@ func blockStartCatchupStaging(tx *sql.Tx, blk bookkeeping.Block) error { return nil } -func blockCompleteCatchup(tx *sql.Tx) (err error) { +// BlockCompleteCatchup applies catchpoint caught up blocks +func BlockCompleteCatchup(tx *sql.Tx) (err error) { _, err = tx.Exec("ALTER TABLE blocks RENAME TO blocks_old") if err != nil { return err @@ -278,8 +291,8 @@ func blockCompleteCatchup(tx *sql.Tx) (err error) { return nil } -// TODO: unused, either actually implement cleanup on catchpoint failure, or delete this -func blockAbortCatchup(tx *sql.Tx) error { +// BlockAbortCatchup TODO: unused, either actually implement cleanup on catchpoint failure, or delete this +func BlockAbortCatchup(tx *sql.Tx) error { // delete the old catchpointblocks table, if there is such. for _, stmt := range blockResetExprs { stmt = strings.Replace(stmt, "blocks", "catchpointblocks", 1) @@ -291,7 +304,8 @@ func blockAbortCatchup(tx *sql.Tx) error { return nil } -func blockPutStaging(tx *sql.Tx, blk bookkeeping.Block) (err error) { +// BlockPutStaging store a block into catchpoint staging table +func BlockPutStaging(tx *sql.Tx, blk bookkeeping.Block) (err error) { // insert the new entry _, err = tx.Exec("INSERT INTO catchpointblocks (rnd, proto, hdrdata, blkdata) VALUES (?, ?, ?, ?)", blk.Round(), @@ -305,7 +319,8 @@ func blockPutStaging(tx *sql.Tx, blk bookkeeping.Block) (err error) { return nil } -func blockEnsureSingleBlock(tx *sql.Tx) (blk bookkeeping.Block, err error) { +// BlockEnsureSingleBlock retains only one (highest) block in catchpoint staging table +func BlockEnsureSingleBlock(tx *sql.Tx) (blk bookkeeping.Block, err error) { // delete all the blocks that aren't the latest one. var max sql.NullInt64 err = tx.QueryRow("SELECT MAX(rnd) FROM catchpointblocks").Scan(&max) diff --git a/ledger/store/hashing.go b/ledger/store/hashing.go new file mode 100644 index 0000000000..aa3f85eb56 --- /dev/null +++ b/ledger/store/hashing.go @@ -0,0 +1,135 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package store + +import ( + "encoding/binary" + "fmt" + + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" +) + +// HashKind enumerates the possible data types hashed into a catchpoint merkle +// trie. Each merkle trie hash includes the hashKind byte at a known-offset. +// By encoding hashKind at a known-offset, it's possible for hash readers to +// disambiguate the hashed resource. +// +//go:generate stringer -type=hashKind +//msgp:ignore HashKind +type HashKind byte + +// Defines known kinds of hashes. Changing an enum ordinal value is a +// breaking change. +const ( + AccountHK HashKind = iota + AssetHK + AppHK + KvHK +) + +// HashKindEncodingIndex defines the []byte offset where the hash kind is +// encoded. +const HashKindEncodingIndex = 4 + +// AccountHashBuilder calculates the hash key used for the trie by combining the account address and the account data +func AccountHashBuilder(addr basics.Address, accountData basics.AccountData, encodedAccountData []byte) []byte { + hash := make([]byte, 4+crypto.DigestSize) + // write out the lowest 32 bits of the reward base. This should improve the caching of the trie by allowing + // recent updated to be in-cache, and "older" nodes will be left alone. + for i, rewards := 3, accountData.RewardsBase; i >= 0; i, rewards = i-1, rewards>>8 { + // the following takes the rewards & 255 -> hash[i] + hash[i] = byte(rewards) + } + entryHash := crypto.Hash(append(addr[:], encodedAccountData[:]...)) + copy(hash[4:], entryHash[:]) + return hash[:] +} + +// AccountHashBuilderV6 calculates the hash key used for the trie by combining the account address and the account data +func AccountHashBuilderV6(addr basics.Address, accountData *BaseAccountData, encodedAccountData []byte) []byte { + hashIntPrefix := accountData.UpdateRound + if hashIntPrefix == 0 { + hashIntPrefix = accountData.RewardsBase + } + hash := hashBufV6(hashIntPrefix, AccountHK) + // write out the lowest 32 bits of the reward base. This should improve the caching of the trie by allowing + // recent updated to be in-cache, and "older" nodes will be left alone. + + prehash := make([]byte, crypto.DigestSize+len(encodedAccountData)) + copy(prehash[:], addr[:]) + copy(prehash[crypto.DigestSize:], encodedAccountData[:]) + + return finishV6(hash, prehash) +} + +// ResourcesHashBuilderV6 calculates the hash key used for the trie by combining the creatable's resource data and its index +func ResourcesHashBuilderV6(rd *ResourcesData, addr basics.Address, cidx basics.CreatableIndex, updateRound uint64, encodedResourceData []byte) ([]byte, error) { + hk, err := rdGetCreatableHashKind(rd, addr, cidx) + if err != nil { + return nil, err + } + + hash := hashBufV6(updateRound, hk) + + prehash := make([]byte, 8+crypto.DigestSize+len(encodedResourceData)) + copy(prehash[:], addr[:]) + binary.LittleEndian.PutUint64(prehash[crypto.DigestSize:], uint64(cidx)) + copy(prehash[crypto.DigestSize+8:], encodedResourceData[:]) + + return finishV6(hash, prehash), nil +} + +func rdGetCreatableHashKind(rd *ResourcesData, a basics.Address, ci basics.CreatableIndex) (HashKind, error) { + if rd.IsAsset() { + return AssetHK, nil + } else if rd.IsApp() { + return AppHK, nil + } + return AccountHK, fmt.Errorf("unknown creatable for addr %s, aidx %d, data %v", a.String(), ci, rd) +} + +// KvHashBuilderV6 calculates the hash key used for the trie by combining the key and value +func KvHashBuilderV6(key string, value []byte) []byte { + hash := hashBufV6(0, KvHK) + + prehash := make([]byte, len(key)+len(value)) + copy(prehash[:], key) + copy(prehash[len(key):], value) + + return finishV6(hash, prehash) +} + +func hashBufV6(affinity uint64, kind HashKind) []byte { + hash := make([]byte, 4+crypto.DigestSize) + // write out the lowest 32 bits of the affinity value. This should improve + // the caching of the trie by allowing recent updates to be in-cache, and + // "older" nodes will be left alone. + for i, prefix := 3, affinity; i >= 0; i, prefix = i-1, prefix>>8 { + // the following takes the prefix & 255 -> hash[i] + hash[i] = byte(prefix) + } + hash[HashKindEncodingIndex] = byte(kind) + return hash +} + +func finishV6(v6hash []byte, prehash []byte) []byte { + entryHash := crypto.Hash(prehash) + copy(v6hash[5:], entryHash[1:]) + return v6hash[:] + +} diff --git a/ledger/hashkind_string.go b/ledger/store/hashkind_string.go similarity index 75% rename from ledger/hashkind_string.go rename to ledger/store/hashkind_string.go index 6549ae63b6..a20405fc31 100644 --- a/ledger/hashkind_string.go +++ b/ledger/store/hashkind_string.go @@ -1,6 +1,6 @@ // Code generated by "stringer -type=hashKind"; DO NOT EDIT. -package ledger +package store import "strconv" @@ -8,18 +8,18 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} - _ = x[accountHK-0] - _ = x[assetHK-1] - _ = x[appHK-2] - _ = x[kvHK-3] + _ = x[AccountHK-0] + _ = x[AssetHK-1] + _ = x[AppHK-2] + _ = x[KvHK-3] } const _hashKind_name = "accountHKassetHKappHKkvHK" var _hashKind_index = [...]uint8{0, 9, 16, 21, 25} -func (i hashKind) String() string { - if i >= hashKind(len(_hashKind_index)-1) { +func (i HashKind) String() string { + if i >= HashKind(len(_hashKind_index)-1) { return "hashKind(" + strconv.FormatInt(int64(i), 10) + ")" } return _hashKind_name[_hashKind_index[i]:_hashKind_index[i+1]] diff --git a/ledger/trackerdb.go b/ledger/trackerdb.go index 8f4d4b2162..917a14e5f6 100644 --- a/ledger/trackerdb.go +++ b/ledger/trackerdb.go @@ -374,7 +374,7 @@ func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema4(ctx context.Context var totalHashesDeleted int for _, addr := range addresses { - hash := accountHashBuilder(addr, basics.AccountData{}, []byte{0x80}) + hash := store.AccountHashBuilder(addr, basics.AccountData{}, []byte{0x80}) deleted, err := trie.Delete(hash) if err != nil { tu.log.Errorf("upgradeDatabaseSchema4: failed to delete hash '%s' from merkle trie for account %v: %v", hex.EncodeToString(hash), addr, err)