diff --git a/data/basics/overflow.go b/data/basics/overflow.go index 3e28d17aa4..e344a2ada0 100644 --- a/data/basics/overflow.go +++ b/data/basics/overflow.go @@ -18,6 +18,7 @@ package basics import ( "math" + "math/big" ) // OverflowTracker is used to track when an operation causes an overflow @@ -163,3 +164,21 @@ func (t *OverflowTracker) SubR(a Round, b Round) Round { func (t *OverflowTracker) ScalarMulA(a MicroAlgos, b uint64) MicroAlgos { return MicroAlgos{Raw: t.Mul(a.Raw, b)} } + +// Muldiv computes a*b/c. The overflow flag indicates that +// the result was 2^64 or greater. +func Muldiv(a uint64, b uint64, c uint64) (res uint64, overflow bool) { + var aa big.Int + aa.SetUint64(a) + + var bb big.Int + bb.SetUint64(b) + + var cc big.Int + cc.SetUint64(c) + + aa.Mul(&aa, &bb) + aa.Div(&aa, &cc) + + return aa.Uint64(), !aa.IsUint64() +} diff --git a/data/basics/userBalance.go b/data/basics/userBalance.go index 1f56369d20..7ec30b836a 100644 --- a/data/basics/userBalance.go +++ b/data/basics/userBalance.go @@ -462,6 +462,49 @@ func (u AccountData) IsZero() bool { return reflect.DeepEqual(u, AccountData{}) } +// NormalizedOnlineBalance returns a ``normalized'' balance for this account. +// +// The normalization compensates for rewards that have not yet been applied, +// by computing a balance normalized to round 0. To normalize, we estimate +// the microalgo balance that an account should have had at round 0, in order +// to end up at its current balance when rewards are included. +// +// The benefit of the normalization procedure is that an account's normalized +// balance does not change over time (unlike the actual algo balance that includes +// rewards). This makes it possible to compare normalized balances between two +// accounts, to sort them, and get results that are close to what we would get +// if we computed the exact algo balance of the accounts at a given round number. +// +// The normalization can lead to some inconsistencies in comparisons between +// account balances, because the growth rate of rewards for accounts depends +// on how recently the account has been touched (our rewards do not implement +// compounding). However, online accounts have to periodically renew +// participation keys, so the scale of the inconsistency is small. +func (u AccountData) NormalizedOnlineBalance(proto config.ConsensusParams) uint64 { + if u.Status != Online { + return 0 + } + + // If this account had one RewardUnit of microAlgos in round 0, it would + // have perRewardUnit microAlgos at the account's current rewards level. + perRewardUnit := u.RewardsBase + proto.RewardUnit + + // To normalize, we compute, mathematically, + // u.MicroAlgos / perRewardUnit * proto.RewardUnit, as + // (u.MicroAlgos * proto.RewardUnit) / perRewardUnit. + norm, overflowed := Muldiv(u.MicroAlgos.ToUint64(), proto.RewardUnit, perRewardUnit) + + // Mathematically should be impossible to overflow + // because perRewardUnit >= proto.RewardUnit, as long + // as u.RewardBase isn't huge enough to cause overflow.. + if overflowed { + logging.Base().Panicf("overflow computing normalized balance %d * %d / (%d + %d)", + u.MicroAlgos.ToUint64(), proto.RewardUnit, u.RewardsBase, proto.RewardUnit) + } + + return norm +} + // BalanceRecord pairs an account's address with its associated data. type BalanceRecord struct { _struct struct{} `codec:",omitempty,omitemptyarray"` diff --git a/ledger/accountdb.go b/ledger/accountdb.go index c6f148dacb..9a98c0b54b 100644 --- a/ledger/accountdb.go +++ b/ledger/accountdb.go @@ -87,6 +87,18 @@ var creatablesMigration = []string{ `ALTER TABLE assetcreators ADD COLUMN ctype INTEGER DEFAULT 0`, } +func createNormalizedOnlineBalanceIndex(idxname string, tablename string) string { + return fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s + ON %s ( normalizedonlinebalance, address, data ) + WHERE normalizedonlinebalance>0`, idxname, tablename) +} + +var createOnlineAccountIndex = []string{ + `ALTER TABLE accountbase + ADD COLUMN normalizedonlinebalance INTEGER`, + createNormalizedOnlineBalanceIndex("onlineaccountbals", "accountbase"), +} + var accountsResetExprs = []string{ `DROP TABLE IF EXISTS acctrounds`, `DROP TABLE IF EXISTS accounttotals`, @@ -100,7 +112,7 @@ var accountsResetExprs = []string{ // accountDBVersion is the database version that this binary would know how to support and how to upgrade to. // details about the content of each of the versions can be found in the upgrade functions upgradeDatabaseSchemaXXXX // and their descriptions. -var accountDBVersion = int32(3) +var accountDBVersion = int32(4) type accountDelta struct { old basics.AccountData @@ -136,17 +148,26 @@ func writeCatchpointStagingCreatable(ctx context.Context, tx *sql.Tx, addr basic return nil } -func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []encodedBalanceRecord) error { - insertStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, data) VALUES(?, ?)") +func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []encodedBalanceRecord, proto config.ConsensusParams) error { + insertStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, normalizedonlinebalance, data) VALUES(?, ?, ?)") if err != nil { return err } for _, balance := range bals { - result, err := insertStmt.ExecContext(ctx, balance.Address[:], balance.AccountData) + var data basics.AccountData + err = protocol.Decode(balance.AccountData, &data) if err != nil { return err } + + normBalance := data.NormalizedOnlineBalance(proto) + + result, err := insertStmt.ExecContext(ctx, balance.Address[:], normBalance, balance.AccountData) + if err != nil { + return err + } + aff, err := result.RowsAffected() if err != nil { return err @@ -154,42 +175,68 @@ func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []enco if aff != 1 { return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff) } - } return nil } func resetCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, newCatchup bool) (err error) { - s := "DROP TABLE IF EXISTS catchpointbalances;" - s += "DROP TABLE IF EXISTS catchpointassetcreators;" - s += "DROP TABLE IF EXISTS catchpointaccounthashes;" - s += "DELETE FROM accounttotals where id='catchpointStaging';" + s := []string{ + "DROP TABLE IF EXISTS catchpointbalances", + "DROP TABLE IF EXISTS catchpointassetcreators", + "DROP TABLE IF EXISTS catchpointaccounthashes", + "DELETE FROM accounttotals where id='catchpointStaging'", + } + if newCatchup { - s += "CREATE TABLE IF NOT EXISTS catchpointassetcreators(asset integer primary key, creator blob, ctype integer);" - s += "CREATE TABLE IF NOT EXISTS catchpointbalances(address blob primary key, data blob);" - s += "CREATE TABLE IF NOT EXISTS catchpointaccounthashes(id integer primary key, data blob);" + // SQLite has no way to rename an existing index. So, we need + // to cook up a fresh name for the index, which will be kept + // around after we rename the table from "catchpointbalances" + // to "accountbase". To construct a unique index name, we + // use the current time. + idxname := fmt.Sprintf("onlineaccountbals%d", time.Now().UnixNano()) + + s = append(s, + "CREATE TABLE IF NOT EXISTS catchpointassetcreators (asset integer primary key, creator blob, ctype integer)", + "CREATE TABLE IF NOT EXISTS catchpointbalances (address blob primary key, data blob, normalizedonlinebalance integer)", + "CREATE TABLE IF NOT EXISTS catchpointaccounthashes (id integer primary key, data blob)", + createNormalizedOnlineBalanceIndex(idxname, "catchpointbalances"), + ) } - _, err = tx.Exec(s) - return err + + for _, stmt := range s { + _, err = tx.Exec(stmt) + if err != nil { + return err + } + } + + return nil } // applyCatchpointStagingBalances switches the staged catchpoint catchup tables onto the actual // tables and update the correct balance round. This is the final step in switching onto the new catchpoint round. func applyCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, balancesRound basics.Round) (err error) { - s := "ALTER TABLE accountbase RENAME TO accountbase_old;" - s += "ALTER TABLE assetcreators RENAME TO assetcreators_old;" - s += "ALTER TABLE accounthashes RENAME TO accounthashes_old;" - s += "ALTER TABLE catchpointbalances RENAME TO accountbase;" - s += "ALTER TABLE catchpointassetcreators RENAME TO assetcreators;" - s += "ALTER TABLE catchpointaccounthashes RENAME TO accounthashes;" - s += "DROP TABLE IF EXISTS accountbase_old;" - s += "DROP TABLE IF EXISTS assetcreators_old;" - s += "DROP TABLE IF EXISTS accounthashes_old;" - - _, err = tx.Exec(s) - if err != nil { - return err + stmts := []string{ + "ALTER TABLE accountbase RENAME TO accountbase_old", + "ALTER TABLE assetcreators RENAME TO assetcreators_old", + "ALTER TABLE accounthashes RENAME TO accounthashes_old", + + "ALTER TABLE catchpointbalances RENAME TO accountbase", + "ALTER TABLE catchpointassetcreators RENAME TO assetcreators", + "ALTER TABLE catchpointaccounthashes RENAME TO accounthashes", + + "DROP TABLE IF EXISTS accountbase_old", + "DROP TABLE IF EXISTS assetcreators_old", + "DROP TABLE IF EXISTS accounthashes_old", + } + + for _, stmt := range stmts { + _, err = tx.Exec(stmt) + if err != nil { + return err + } } + _, err = tx.Exec("INSERT OR REPLACE INTO acctrounds(id, rnd) VALUES('acctbase', ?)", balancesRound) if err != nil { return err @@ -269,6 +316,76 @@ func accountsInit(tx *sql.Tx, initAccounts map[basics.Address]basics.AccountData return nil } +// accountsAddNormalizedBalance adds the normalizedonlinebalance column +// to the accountbase table. +func accountsAddNormalizedBalance(tx *sql.Tx, proto config.ConsensusParams) error { + var exists bool + err := tx.QueryRow("SELECT 1 FROM pragma_table_info('accountbase') WHERE name='normalizedonlinebalance'").Scan(&exists) + if err == nil { + // Already exists. + return nil + } + if err != sql.ErrNoRows { + return err + } + + for _, stmt := range createOnlineAccountIndex { + _, err := tx.Exec(stmt) + if err != nil { + return err + } + } + + rows, err := tx.Query("SELECT address, data FROM accountbase") + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var addrbuf []byte + var buf []byte + err = rows.Scan(&addrbuf, &buf) + if err != nil { + return err + } + + var data basics.AccountData + err = protocol.Decode(buf, &data) + if err != nil { + return err + } + + normBalance := data.NormalizedOnlineBalance(proto) + if normBalance > 0 { + _, err = tx.Exec("UPDATE accountbase SET normalizedonlinebalance=? WHERE address=?", normBalance, addrbuf) + if err != nil { + return err + } + } + } + + return rows.Err() +} + +// accountDataToOnline returns the part of the AccountData that matters +// for online accounts (to answer top-N queries). We store a subset of +// the full AccountData because we need to store a large number of these +// in memory (say, 1M), and storing that many AccountData could easily +// cause us to run out of memory. +func accountDataToOnline(address basics.Address, ad *basics.AccountData, proto config.ConsensusParams) *onlineAccount { + return &onlineAccount{ + Address: address, + MicroAlgos: ad.MicroAlgos, + RewardsBase: ad.RewardsBase, + NormalizedOnlineBalance: ad.NormalizedOnlineBalance(proto), + VoteID: ad.VoteID, + VoteFirstValid: ad.VoteFirstValid, + VoteLastValid: ad.VoteLastValid, + VoteKeyDilution: ad.VoteKeyDilution, + } +} + func resetAccountHashes(tx *sql.Tx) (err error) { _, err = tx.Exec(`DELETE FROM accounthashes`) return @@ -554,6 +671,50 @@ func (qs *accountsDbQueries) close() { } } +// accountsOnlineTop returns the top n online accounts starting at position offset +// (that is, the top offset'th account through the top offset+n-1'th account). +// +// The accounts are sorted by their normalized balance and address. The normalized +// balance has to do with the reward parts of online account balances. See the +// normalization procedure in AccountData.NormalizedOnlineBalance(). +// +// Note that this does not check if the accounts have a vote key valid for any +// particular round (past, present, or future). +func accountsOnlineTop(tx *sql.Tx, offset, n uint64, proto config.ConsensusParams) (map[basics.Address]*onlineAccount, error) { + rows, err := tx.Query("SELECT address, data FROM accountbase WHERE normalizedonlinebalance>0 ORDER BY normalizedonlinebalance DESC, address DESC LIMIT ? OFFSET ?", n, offset) + if err != nil { + return nil, err + } + defer rows.Close() + + res := make(map[basics.Address]*onlineAccount, n) + for rows.Next() { + var addrbuf []byte + var buf []byte + err = rows.Scan(&addrbuf, &buf) + if err != nil { + return nil, err + } + + var data basics.AccountData + err = protocol.Decode(buf, &data) + if err != nil { + return nil, err + } + + var addr basics.Address + if len(addrbuf) != len(addr) { + err = fmt.Errorf("Account DB address length mismatch: %d != %d", len(addrbuf), len(addr)) + return nil, err + } + + copy(addr[:], addrbuf) + res[addr] = accountDataToOnline(addr, &data, proto) + } + + return res, rows.Err() +} + func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err error) { rows, err := tx.Query("SELECT address, data FROM accountbase") if err != nil { @@ -619,7 +780,7 @@ func accountsPutTotals(tx *sql.Tx, totals AccountTotals, catchpointStaging bool) } // accountsNewRound updates the accountbase and assetcreators by applying the provided deltas to the accounts / creatables. -func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creatables map[basics.CreatableIndex]modifiedCreatable) (err error) { +func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creatables map[basics.CreatableIndex]modifiedCreatable, proto config.ConsensusParams) (err error) { var insertCreatableIdxStmt, deleteCreatableIdxStmt, deleteStmt, replaceStmt *sql.Stmt @@ -629,7 +790,7 @@ func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creat } defer deleteStmt.Close() - replaceStmt, err = tx.Prepare("REPLACE INTO accountbase (address, data) VALUES (?, ?)") + replaceStmt, err = tx.Prepare("REPLACE INTO accountbase (address, normalizedonlinebalance, data) VALUES (?, ?, ?)") if err != nil { return } @@ -640,12 +801,12 @@ func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creat // prune empty accounts _, err = deleteStmt.Exec(addr[:]) } else { - _, err = replaceStmt.Exec(addr[:], protocol.Encode(&data.new)) + normBalance := data.new.NormalizedOnlineBalance(proto) + _, err = replaceStmt.Exec(addr[:], normBalance, protocol.Encode(&data.new)) } if err != nil { return } - } if len(creatables) > 0 { diff --git a/ledger/accountdb_test.go b/ledger/accountdb_test.go index 9d9374847b..9e73b4b106 100644 --- a/ledger/accountdb_test.go +++ b/ledger/accountdb_test.go @@ -17,9 +17,11 @@ package ledger import ( + "bytes" "context" "database/sql" "fmt" + "sort" "testing" "github.com/stretchr/testify/require" @@ -52,7 +54,8 @@ func randomAccountData(rewardsLevel uint64) basics.AccountData { } data.RewardsBase = rewardsLevel - + data.VoteFirstValid = 0 + data.VoteLastValid = 1000 return data } @@ -335,6 +338,10 @@ func checkAccounts(t *testing.T, tx *sql.Tx, rnd basics.Round, accts map[basics. require.NoError(t, err) defer aq.close() + proto := config.Consensus[protocol.ConsensusCurrentVersion] + err = accountsAddNormalizedBalance(tx, proto) + require.NoError(t, err) + var totalOnline, totalOffline, totalNotPart uint64 for addr, data := range accts { @@ -369,6 +376,46 @@ func checkAccounts(t *testing.T, tx *sql.Tx, rnd basics.Round, accts map[basics. d, err := aq.lookup(randomAddress()) require.NoError(t, err) require.Equal(t, d, basics.AccountData{}) + + onlineAccounts := make(map[basics.Address]*onlineAccount) + for addr, data := range accts { + if data.Status == basics.Online { + onlineAccounts[addr] = accountDataToOnline(addr, &data, proto) + } + } + + for i := 0; i < len(onlineAccounts); i++ { + dbtop, err := accountsOnlineTop(tx, 0, uint64(i), proto) + require.NoError(t, err) + require.Equal(t, i, len(dbtop)) + + // Compute the top-N accounts ourselves + var testtop []onlineAccount + for _, data := range onlineAccounts { + testtop = append(testtop, *data) + } + + sort.Slice(testtop, func(i, j int) bool { + ibal := testtop[i].NormalizedOnlineBalance + jbal := testtop[j].NormalizedOnlineBalance + if ibal > jbal { + return true + } + if ibal < jbal { + return false + } + return bytes.Compare(testtop[i].Address[:], testtop[j].Address[:]) > 0 + }) + + for j := 0; j < i; j++ { + _, ok := dbtop[testtop[j].Address] + require.True(t, ok) + } + } + + top, err := accountsOnlineTop(tx, 0, uint64(len(onlineAccounts)+1), proto) + require.NoError(t, err) + require.Equal(t, len(top), len(onlineAccounts)) } func TestAccountDBInit(t *testing.T) { @@ -475,7 +522,7 @@ func TestAccountDBRound(t *testing.T) { accts = newaccts ctbsWithDeletes := randomCreatableSampling(i, ctbsList, randomCtbs, expectedDbImage, numElementsPerSegement) - err = accountsNewRound(tx, updates, ctbsWithDeletes) + err = accountsNewRound(tx, updates, ctbsWithDeletes, proto) require.NoError(t, err) err = totalsNewRounds(tx, []map[basics.Address]accountDelta{updates}, []AccountTotals{{}}, []config.ConsensusParams{proto}) require.NoError(t, err) diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index fc2928c036..78d05fe8b6 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -17,6 +17,7 @@ package ledger import ( + "container/heap" "context" "database/sql" "encoding/hex" @@ -400,6 +401,109 @@ func (au *accountUpdates) listCreatables(maxCreatableIdx basics.CreatableIndex, return res, nil } +// onlineTop returns the top n online accounts, sorted by their normalized +// balance and address, whose voting keys are valid in voteRnd. See the +// normalization description in AccountData.NormalizedOnlineBalance(). +func (au *accountUpdates) onlineTop(rnd basics.Round, voteRnd basics.Round, n uint64) ([]*onlineAccount, error) { + au.accountsMu.RLock() + defer au.accountsMu.RUnlock() + offset, err := au.roundOffset(rnd) + if err != nil { + return nil, err + } + + proto := au.protos[offset] + + // Determine how many accounts have been modified in-memory, + // so that we obtain enough top accounts from disk (accountdb). + // If the *onlineAccount is nil, that means the account is offline + // as of the most recent change to that account, or its vote key + // is not valid in voteRnd. Otherwise, the *onlineAccount is the + // representation of the most recent state of the account, and it + // is online and can vote in voteRnd. + modifiedAccounts := make(map[basics.Address]*onlineAccount) + for o := uint64(0); o < offset; o++ { + for addr, d := range au.deltas[o] { + if d.new.Status != basics.Online { + modifiedAccounts[addr] = nil + continue + } + + if !(d.new.VoteFirstValid <= voteRnd && voteRnd <= d.new.VoteLastValid) { + modifiedAccounts[addr] = nil + continue + } + + modifiedAccounts[addr] = accountDataToOnline(addr, &d.new, proto) + } + } + + // Build up a set of candidate accounts. Start by loading the + // top N + len(modifiedAccounts) accounts from disk (accountdb). + // This ensures that, even if the worst case if all in-memory + // changes are deleting the top accounts in accountdb, we still + // will have top N left. + // + // Keep asking for more accounts until we get the desired number, + // or there are no more accounts left. + candidates := make(map[basics.Address]*onlineAccount) + batchOffset := uint64(0) + batchSize := uint64(1024) + for uint64(len(candidates)) < n+uint64(len(modifiedAccounts)) { + var accts map[basics.Address]*onlineAccount + err = au.dbs.rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + accts, err = accountsOnlineTop(tx, batchOffset, batchSize, proto) + return + }) + if err != nil { + return nil, err + } + + for addr, data := range accts { + if !(data.VoteFirstValid <= voteRnd && voteRnd <= data.VoteLastValid) { + continue + } + candidates[addr] = data + } + + // If we got fewer than batchSize accounts, there are no + // more accounts to look at. + if uint64(len(accts)) < batchSize { + break + } + + batchOffset += batchSize + } + + // Now update the candidates based on the in-memory deltas. + for addr, oa := range modifiedAccounts { + if oa == nil { + delete(candidates, addr) + } else { + candidates[addr] = oa + } + } + + // Get the top N accounts from the candidate set, by inserting all of + // the accounts into a heap and then pulling out N elements from the + // heap. + topHeap := &onlineTopHeap{ + accts: nil, + } + + for _, data := range candidates { + heap.Push(topHeap, data) + } + + var res []*onlineAccount + for topHeap.Len() > 0 && uint64(len(res)) < n { + acct := heap.Pop(topHeap).(*onlineAccount) + res = append(res, acct) + } + + return res, nil +} + // GetLastCatchpointLabel retrieves the last catchpoint label that was stored to the database. func (au *accountUpdates) GetLastCatchpointLabel() string { au.accountsMu.RLock() @@ -838,6 +942,12 @@ func (au *accountUpdates) accountsInitialize(ctx context.Context, tx *sql.Tx) (b au.log.Warnf("accountsInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 2 : %v", err) return 0, err } + case 3: + dbVersion, err = au.upgradeDatabaseSchema3(ctx, tx) + if err != nil { + au.log.Warnf("accountsInitialize failed to upgrade accounts database (ledger.tracker.sqlite) from schema 3 : %v", err) + return 0, err + } default: return 0, fmt.Errorf("accountsInitialize unable to upgrade database from schema version %d", dbVersion) } @@ -1074,6 +1184,22 @@ func (au *accountUpdates) upgradeDatabaseSchema2(ctx context.Context, tx *sql.Tx return 3, nil } +// upgradeDatabaseSchema3 upgrades the database schema from version 3 to version 4, +// adding the normalizedonlinebalance column to the accountbase table. +func (au *accountUpdates) upgradeDatabaseSchema3(ctx context.Context, tx *sql.Tx) (updatedDBVersion int32, err error) { + err = accountsAddNormalizedBalance(tx, au.ledger.GenesisProto()) + if err != nil { + return 0, err + } + + // update version + _, err = db.SetUserVersion(ctx, tx, 4) + if err != nil { + return 0, fmt.Errorf("accountsInitialize unable to update database schema version from 3 to 4: %v", err) + } + return 4, nil +} + // deleteStoredCatchpoints iterates over the storedcatchpoints table and deletes all the files stored on disk. // once all the files have been deleted, it would go ahead and remove the entries from the table. func (au *accountUpdates) deleteStoredCatchpoints(ctx context.Context, dbQueries *accountsDbQueries) (err error) { @@ -1418,6 +1544,8 @@ func (au *accountUpdates) commitRound(offset uint64, dbRound basics.Round, lookb beforeUpdatingBalancesTime := time.Now() var trieBalancesHash crypto.Digest + genesisProto := au.ledger.GenesisProto() + err := au.dbs.wdb.AtomicCommitWriteLock(func(ctx context.Context, tx *sql.Tx) (err error) { treeTargetRound := basics.Round(0) if au.catchpointInterval > 0 { @@ -1438,7 +1566,7 @@ func (au *accountUpdates) commitRound(offset uint64, dbRound basics.Round, lookb treeTargetRound = dbRound + basics.Round(offset) } for i := uint64(0); i < offset; i++ { - err = accountsNewRound(tx, deltas[i], creatableDeltas[i]) + err = accountsNewRound(tx, deltas[i], creatableDeltas[i], genesisProto) if err != nil { return err } diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index ff198f30f1..fc0c7789d3 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -110,6 +110,13 @@ func (ml *mockLedgerForTracker) GenesisHash() crypto.Digest { return crypto.Digest{} } +func (ml *mockLedgerForTracker) GenesisProto() config.ConsensusParams { + if len(ml.blocks) > 0 { + return config.Consensus[ml.blocks[0].block.CurrentProtocol] + } + return config.Consensus[protocol.ConsensusCurrentVersion] +} + // this function used to be in acctupdates.go, but we were never using it for production purposes. This // function has a conceptual flaw in that it attempts to load the entire balances into memory. This might // not work if we have large number of balances. On these unit testing, however, it's not the case, and it's @@ -978,6 +985,9 @@ func TestListCreatables(t *testing.T) { err = accountsInit(tx, accts, proto) require.NoError(t, err) + err = accountsAddNormalizedBalance(tx, proto) + require.NoError(t, err) + au := &accountUpdates{} au.accountsq, err = accountsDbInit(tx, tx) require.NoError(t, err) @@ -997,7 +1007,7 @@ func TestListCreatables(t *testing.T) { // ******* No deletes ******* // sync with the database var updates map[basics.Address]accountDelta - err = accountsNewRound(tx, updates, ctbsWithDeletes) + err = accountsNewRound(tx, updates, ctbsWithDeletes, proto) require.NoError(t, err) // nothing left in cache au.creatables = make(map[basics.CreatableIndex]modifiedCreatable) @@ -1013,7 +1023,7 @@ func TestListCreatables(t *testing.T) { // ******* Results are obtained from the database and from the cache ******* // ******* Deletes are in the database and in the cache ******* // sync with the database. This has deletes synced to the database. - err = accountsNewRound(tx, updates, au.creatables) + err = accountsNewRound(tx, updates, au.creatables, proto) require.NoError(t, err) // get new creatables in the cache. There will be deletes in the cache from the previous batch. au.creatables = randomCreatableSampling(3, ctbsList, randomCtbs, diff --git a/ledger/archival_test.go b/ledger/archival_test.go index ddb32b4957..beb9d51d78 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -89,6 +89,10 @@ func (wl *wrappedLedger) GenesisHash() crypto.Digest { return wl.l.GenesisHash() } +func (wl *wrappedLedger) GenesisProto() config.ConsensusParams { + return wl.l.GenesisProto() +} + func getInitState() (genesisInitState InitState) { blk := bookkeeping.Block{} blk.CurrentProtocol = protocol.ConsensusCurrentVersion diff --git a/ledger/catchpointwriter_test.go b/ledger/catchpointwriter_test.go index b9672da107..41d0bb5ed6 100644 --- a/ledger/catchpointwriter_test.go +++ b/ledger/catchpointwriter_test.go @@ -269,7 +269,9 @@ func TestFullCatchpointWriter(t *testing.T) { require.NoError(t, err) // create a ledger. - l, err := OpenLedger(ml.log, "TestFullCatchpointWriter", true, InitState{}, conf) + var initState InitState + initState.Block.CurrentProtocol = protocol.ConsensusCurrentVersion + l, err := OpenLedger(ml.log, "TestFullCatchpointWriter", true, initState, conf) require.NoError(t, err) defer l.Close() accessor := MakeCatchpointCatchupAccessor(l, l.log) diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go index af7236b180..7c5e2be602 100644 --- a/ledger/catchupaccessor.go +++ b/ledger/catchupaccessor.go @@ -290,6 +290,7 @@ func (c *CatchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte return fmt.Errorf("processStagingBalances received a chunk with no accounts") } + proto := c.ledger.GenesisProto() wdb := c.ledger.trackerDB().wdb err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { // create the merkle trie for the balances @@ -308,7 +309,7 @@ func (c *CatchpointCatchupAccessorImpl) processStagingBalances(ctx context.Conte progress.cachedTrie.SetCommitter(mc) } - err = writeCatchpointStagingBalances(ctx, tx, balances.Balances) + err = writeCatchpointStagingBalances(ctx, tx, balances.Balances, proto) if err != nil { return } diff --git a/ledger/ledger.go b/ledger/ledger.go index 8c6181c93a..f7d6c82bb4 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -546,6 +546,11 @@ func (l *Ledger) GenesisHash() crypto.Digest { return l.genesisHash } +// GenesisProto returns the initial protocol for this ledger. +func (l *Ledger) GenesisProto() config.ConsensusParams { + return l.genesisProto +} + // GetCatchpointCatchupState returns the current state of the catchpoint catchup. func (l *Ledger) GetCatchpointCatchupState(ctx context.Context) (state CatchpointCatchupState, err error) { return MakeCatchpointCatchupAccessor(l, l.log).GetState(ctx) diff --git a/ledger/onlineacct.go b/ledger/onlineacct.go new file mode 100644 index 0000000000..489d8624cc --- /dev/null +++ b/ledger/onlineacct.go @@ -0,0 +1,89 @@ +// Copyright (C) 2019-2020 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +import ( + "bytes" + + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" +) + +// An onlineAccount corresponds to an account whose AccountData.Status +// is Online. This is used for a Merkle tree commitment of online +// accounts, which is subsequently used to validate participants for +// a compact certificate. +type onlineAccount struct { + // These are a subset of the fields from the corresponding AccountData. + Address basics.Address + MicroAlgos basics.MicroAlgos + RewardsBase uint64 + NormalizedOnlineBalance uint64 + VoteID crypto.OneTimeSignatureVerifier + VoteFirstValid basics.Round + VoteLastValid basics.Round + VoteKeyDilution uint64 +} + +// onlineTopHeap implements heap.Interface for tracking top N online accounts. +type onlineTopHeap struct { + accts []*onlineAccount +} + +// Len implements sort.Interface +func (h *onlineTopHeap) Len() int { + return len(h.accts) +} + +// Less implements sort.Interface +func (h *onlineTopHeap) Less(i, j int) bool { + // For the heap, "less" means the element is returned earlier by Pop(), + // so we actually implement "greater-than" here. + ibal := h.accts[i].NormalizedOnlineBalance + jbal := h.accts[j].NormalizedOnlineBalance + + if ibal > jbal { + return true + } + if ibal < jbal { + return false + } + + bcmp := bytes.Compare(h.accts[i].Address[:], h.accts[j].Address[:]) + if bcmp > 0 { + return true + } + + return false +} + +// Swap implements sort.Interface +func (h *onlineTopHeap) Swap(i, j int) { + h.accts[i], h.accts[j] = h.accts[j], h.accts[i] +} + +// Push implements heap.Interface +func (h *onlineTopHeap) Push(x interface{}) { + h.accts = append(h.accts, x.(*onlineAccount)) +} + +// Pop implements heap.Interface +func (h *onlineTopHeap) Pop() interface{} { + res := h.accts[len(h.accts)-1] + h.accts = h.accts[:len(h.accts)-1] + return res +} diff --git a/ledger/tracker.go b/ledger/tracker.go index 91cabfe260..f756139217 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" + "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" @@ -92,6 +93,7 @@ type ledgerForTracker interface { Block(basics.Round) (bookkeeping.Block, error) BlockHdr(basics.Round) (bookkeeping.BlockHeader, error) GenesisHash() crypto.Digest + GenesisProto() config.ConsensusParams } type trackerRegistry struct {