Skip to content

Commit

Permalink
implement tracking of top-N online accounts in the ledger (#1361)
Browse files Browse the repository at this point in the history
Implement tracking of top-N online accounts in the ledger
  • Loading branch information
zeldovich authored Sep 9, 2020
1 parent 6bc1c01 commit 4849f51
Show file tree
Hide file tree
Showing 12 changed files with 549 additions and 38 deletions.
19 changes: 19 additions & 0 deletions data/basics/overflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package basics

import (
"math"
"math/big"
)

// OverflowTracker is used to track when an operation causes an overflow
Expand Down Expand Up @@ -163,3 +164,21 @@ func (t *OverflowTracker) SubR(a Round, b Round) Round {
func (t *OverflowTracker) ScalarMulA(a MicroAlgos, b uint64) MicroAlgos {
return MicroAlgos{Raw: t.Mul(a.Raw, b)}
}

// Muldiv computes a*b/c. The overflow flag indicates that
// the result was 2^64 or greater.
func Muldiv(a uint64, b uint64, c uint64) (res uint64, overflow bool) {
var aa big.Int
aa.SetUint64(a)

var bb big.Int
bb.SetUint64(b)

var cc big.Int
cc.SetUint64(c)

aa.Mul(&aa, &bb)
aa.Div(&aa, &cc)

return aa.Uint64(), !aa.IsUint64()
}
43 changes: 43 additions & 0 deletions data/basics/userBalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,49 @@ func (u AccountData) IsZero() bool {
return reflect.DeepEqual(u, AccountData{})
}

// NormalizedOnlineBalance returns a ``normalized'' balance for this account.
//
// The normalization compensates for rewards that have not yet been applied,
// by computing a balance normalized to round 0. To normalize, we estimate
// the microalgo balance that an account should have had at round 0, in order
// to end up at its current balance when rewards are included.
//
// The benefit of the normalization procedure is that an account's normalized
// balance does not change over time (unlike the actual algo balance that includes
// rewards). This makes it possible to compare normalized balances between two
// accounts, to sort them, and get results that are close to what we would get
// if we computed the exact algo balance of the accounts at a given round number.
//
// The normalization can lead to some inconsistencies in comparisons between
// account balances, because the growth rate of rewards for accounts depends
// on how recently the account has been touched (our rewards do not implement
// compounding). However, online accounts have to periodically renew
// participation keys, so the scale of the inconsistency is small.
func (u AccountData) NormalizedOnlineBalance(proto config.ConsensusParams) uint64 {
if u.Status != Online {
return 0
}

// If this account had one RewardUnit of microAlgos in round 0, it would
// have perRewardUnit microAlgos at the account's current rewards level.
perRewardUnit := u.RewardsBase + proto.RewardUnit

// To normalize, we compute, mathematically,
// u.MicroAlgos / perRewardUnit * proto.RewardUnit, as
// (u.MicroAlgos * proto.RewardUnit) / perRewardUnit.
norm, overflowed := Muldiv(u.MicroAlgos.ToUint64(), proto.RewardUnit, perRewardUnit)

// Mathematically should be impossible to overflow
// because perRewardUnit >= proto.RewardUnit, as long
// as u.RewardBase isn't huge enough to cause overflow..
if overflowed {
logging.Base().Panicf("overflow computing normalized balance %d * %d / (%d + %d)",
u.MicroAlgos.ToUint64(), proto.RewardUnit, u.RewardsBase, proto.RewardUnit)
}

return norm
}

// BalanceRecord pairs an account's address with its associated data.
type BalanceRecord struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`
Expand Down
223 changes: 192 additions & 31 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ var creatablesMigration = []string{
`ALTER TABLE assetcreators ADD COLUMN ctype INTEGER DEFAULT 0`,
}

func createNormalizedOnlineBalanceIndex(idxname string, tablename string) string {
return fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s
ON %s ( normalizedonlinebalance, address, data )
WHERE normalizedonlinebalance>0`, idxname, tablename)
}

var createOnlineAccountIndex = []string{
`ALTER TABLE accountbase
ADD COLUMN normalizedonlinebalance INTEGER`,
createNormalizedOnlineBalanceIndex("onlineaccountbals", "accountbase"),
}

var accountsResetExprs = []string{
`DROP TABLE IF EXISTS acctrounds`,
`DROP TABLE IF EXISTS accounttotals`,
Expand All @@ -100,7 +112,7 @@ var accountsResetExprs = []string{
// accountDBVersion is the database version that this binary would know how to support and how to upgrade to.
// details about the content of each of the versions can be found in the upgrade functions upgradeDatabaseSchemaXXXX
// and their descriptions.
var accountDBVersion = int32(3)
var accountDBVersion = int32(4)

type accountDelta struct {
old basics.AccountData
Expand Down Expand Up @@ -136,60 +148,95 @@ func writeCatchpointStagingCreatable(ctx context.Context, tx *sql.Tx, addr basic
return nil
}

func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []encodedBalanceRecord) error {
insertStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, data) VALUES(?, ?)")
func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []encodedBalanceRecord, proto config.ConsensusParams) error {
insertStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, normalizedonlinebalance, data) VALUES(?, ?, ?)")
if err != nil {
return err
}

for _, balance := range bals {
result, err := insertStmt.ExecContext(ctx, balance.Address[:], balance.AccountData)
var data basics.AccountData
err = protocol.Decode(balance.AccountData, &data)
if err != nil {
return err
}

normBalance := data.NormalizedOnlineBalance(proto)

result, err := insertStmt.ExecContext(ctx, balance.Address[:], normBalance, balance.AccountData)
if err != nil {
return err
}

aff, err := result.RowsAffected()
if err != nil {
return err
}
if aff != 1 {
return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff)
}

}
return nil
}

func resetCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, newCatchup bool) (err error) {
s := "DROP TABLE IF EXISTS catchpointbalances;"
s += "DROP TABLE IF EXISTS catchpointassetcreators;"
s += "DROP TABLE IF EXISTS catchpointaccounthashes;"
s += "DELETE FROM accounttotals where id='catchpointStaging';"
s := []string{
"DROP TABLE IF EXISTS catchpointbalances",
"DROP TABLE IF EXISTS catchpointassetcreators",
"DROP TABLE IF EXISTS catchpointaccounthashes",
"DELETE FROM accounttotals where id='catchpointStaging'",
}

if newCatchup {
s += "CREATE TABLE IF NOT EXISTS catchpointassetcreators(asset integer primary key, creator blob, ctype integer);"
s += "CREATE TABLE IF NOT EXISTS catchpointbalances(address blob primary key, data blob);"
s += "CREATE TABLE IF NOT EXISTS catchpointaccounthashes(id integer primary key, data blob);"
// SQLite has no way to rename an existing index. So, we need
// to cook up a fresh name for the index, which will be kept
// around after we rename the table from "catchpointbalances"
// to "accountbase". To construct a unique index name, we
// use the current time.
idxname := fmt.Sprintf("onlineaccountbals%d", time.Now().UnixNano())

s = append(s,
"CREATE TABLE IF NOT EXISTS catchpointassetcreators (asset integer primary key, creator blob, ctype integer)",
"CREATE TABLE IF NOT EXISTS catchpointbalances (address blob primary key, data blob, normalizedonlinebalance integer)",
"CREATE TABLE IF NOT EXISTS catchpointaccounthashes (id integer primary key, data blob)",
createNormalizedOnlineBalanceIndex(idxname, "catchpointbalances"),
)
}
_, err = tx.Exec(s)
return err

for _, stmt := range s {
_, err = tx.Exec(stmt)
if err != nil {
return err
}
}

return nil
}

// applyCatchpointStagingBalances switches the staged catchpoint catchup tables onto the actual
// tables and update the correct balance round. This is the final step in switching onto the new catchpoint round.
func applyCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, balancesRound basics.Round) (err error) {
s := "ALTER TABLE accountbase RENAME TO accountbase_old;"
s += "ALTER TABLE assetcreators RENAME TO assetcreators_old;"
s += "ALTER TABLE accounthashes RENAME TO accounthashes_old;"
s += "ALTER TABLE catchpointbalances RENAME TO accountbase;"
s += "ALTER TABLE catchpointassetcreators RENAME TO assetcreators;"
s += "ALTER TABLE catchpointaccounthashes RENAME TO accounthashes;"
s += "DROP TABLE IF EXISTS accountbase_old;"
s += "DROP TABLE IF EXISTS assetcreators_old;"
s += "DROP TABLE IF EXISTS accounthashes_old;"

_, err = tx.Exec(s)
if err != nil {
return err
stmts := []string{
"ALTER TABLE accountbase RENAME TO accountbase_old",
"ALTER TABLE assetcreators RENAME TO assetcreators_old",
"ALTER TABLE accounthashes RENAME TO accounthashes_old",

"ALTER TABLE catchpointbalances RENAME TO accountbase",
"ALTER TABLE catchpointassetcreators RENAME TO assetcreators",
"ALTER TABLE catchpointaccounthashes RENAME TO accounthashes",

"DROP TABLE IF EXISTS accountbase_old",
"DROP TABLE IF EXISTS assetcreators_old",
"DROP TABLE IF EXISTS accounthashes_old",
}

for _, stmt := range stmts {
_, err = tx.Exec(stmt)
if err != nil {
return err
}
}

_, err = tx.Exec("INSERT OR REPLACE INTO acctrounds(id, rnd) VALUES('acctbase', ?)", balancesRound)
if err != nil {
return err
Expand Down Expand Up @@ -269,6 +316,76 @@ func accountsInit(tx *sql.Tx, initAccounts map[basics.Address]basics.AccountData
return nil
}

// accountsAddNormalizedBalance adds the normalizedonlinebalance column
// to the accountbase table.
func accountsAddNormalizedBalance(tx *sql.Tx, proto config.ConsensusParams) error {
var exists bool
err := tx.QueryRow("SELECT 1 FROM pragma_table_info('accountbase') WHERE name='normalizedonlinebalance'").Scan(&exists)
if err == nil {
// Already exists.
return nil
}
if err != sql.ErrNoRows {
return err
}

for _, stmt := range createOnlineAccountIndex {
_, err := tx.Exec(stmt)
if err != nil {
return err
}
}

rows, err := tx.Query("SELECT address, data FROM accountbase")
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var addrbuf []byte
var buf []byte
err = rows.Scan(&addrbuf, &buf)
if err != nil {
return err
}

var data basics.AccountData
err = protocol.Decode(buf, &data)
if err != nil {
return err
}

normBalance := data.NormalizedOnlineBalance(proto)
if normBalance > 0 {
_, err = tx.Exec("UPDATE accountbase SET normalizedonlinebalance=? WHERE address=?", normBalance, addrbuf)
if err != nil {
return err
}
}
}

return rows.Err()
}

// accountDataToOnline returns the part of the AccountData that matters
// for online accounts (to answer top-N queries). We store a subset of
// the full AccountData because we need to store a large number of these
// in memory (say, 1M), and storing that many AccountData could easily
// cause us to run out of memory.
func accountDataToOnline(address basics.Address, ad *basics.AccountData, proto config.ConsensusParams) *onlineAccount {
return &onlineAccount{
Address: address,
MicroAlgos: ad.MicroAlgos,
RewardsBase: ad.RewardsBase,
NormalizedOnlineBalance: ad.NormalizedOnlineBalance(proto),
VoteID: ad.VoteID,
VoteFirstValid: ad.VoteFirstValid,
VoteLastValid: ad.VoteLastValid,
VoteKeyDilution: ad.VoteKeyDilution,
}
}

func resetAccountHashes(tx *sql.Tx) (err error) {
_, err = tx.Exec(`DELETE FROM accounthashes`)
return
Expand Down Expand Up @@ -554,6 +671,50 @@ func (qs *accountsDbQueries) close() {
}
}

// accountsOnlineTop returns the top n online accounts starting at position offset
// (that is, the top offset'th account through the top offset+n-1'th account).
//
// The accounts are sorted by their normalized balance and address. The normalized
// balance has to do with the reward parts of online account balances. See the
// normalization procedure in AccountData.NormalizedOnlineBalance().
//
// Note that this does not check if the accounts have a vote key valid for any
// particular round (past, present, or future).
func accountsOnlineTop(tx *sql.Tx, offset, n uint64, proto config.ConsensusParams) (map[basics.Address]*onlineAccount, error) {
rows, err := tx.Query("SELECT address, data FROM accountbase WHERE normalizedonlinebalance>0 ORDER BY normalizedonlinebalance DESC, address DESC LIMIT ? OFFSET ?", n, offset)
if err != nil {
return nil, err
}
defer rows.Close()

res := make(map[basics.Address]*onlineAccount, n)
for rows.Next() {
var addrbuf []byte
var buf []byte
err = rows.Scan(&addrbuf, &buf)
if err != nil {
return nil, err
}

var data basics.AccountData
err = protocol.Decode(buf, &data)
if err != nil {
return nil, err
}

var addr basics.Address
if len(addrbuf) != len(addr) {
err = fmt.Errorf("Account DB address length mismatch: %d != %d", len(addrbuf), len(addr))
return nil, err
}

copy(addr[:], addrbuf)
res[addr] = accountDataToOnline(addr, &data, proto)
}

return res, rows.Err()
}

func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err error) {
rows, err := tx.Query("SELECT address, data FROM accountbase")
if err != nil {
Expand Down Expand Up @@ -619,7 +780,7 @@ func accountsPutTotals(tx *sql.Tx, totals AccountTotals, catchpointStaging bool)
}

// accountsNewRound updates the accountbase and assetcreators by applying the provided deltas to the accounts / creatables.
func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creatables map[basics.CreatableIndex]modifiedCreatable) (err error) {
func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creatables map[basics.CreatableIndex]modifiedCreatable, proto config.ConsensusParams) (err error) {

var insertCreatableIdxStmt, deleteCreatableIdxStmt, deleteStmt, replaceStmt *sql.Stmt

Expand All @@ -629,7 +790,7 @@ func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creat
}
defer deleteStmt.Close()

replaceStmt, err = tx.Prepare("REPLACE INTO accountbase (address, data) VALUES (?, ?)")
replaceStmt, err = tx.Prepare("REPLACE INTO accountbase (address, normalizedonlinebalance, data) VALUES (?, ?, ?)")
if err != nil {
return
}
Expand All @@ -640,12 +801,12 @@ func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creat
// prune empty accounts
_, err = deleteStmt.Exec(addr[:])
} else {
_, err = replaceStmt.Exec(addr[:], protocol.Encode(&data.new))
normBalance := data.new.NormalizedOnlineBalance(proto)
_, err = replaceStmt.Exec(addr[:], normBalance, protocol.Encode(&data.new))
}
if err != nil {
return
}

}

if len(creatables) > 0 {
Expand Down
Loading

0 comments on commit 4849f51

Please sign in to comment.