Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement tracking of top-N online accounts in the ledger #1361

Merged
merged 13 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions data/basics/overflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package basics

import (
"math"
"math/big"
)

// OverflowTracker is used to track when an operation causes an overflow
Expand Down Expand Up @@ -163,3 +164,21 @@ func (t *OverflowTracker) SubR(a Round, b Round) Round {
func (t *OverflowTracker) ScalarMulA(a MicroAlgos, b uint64) MicroAlgos {
return MicroAlgos{Raw: t.Mul(a.Raw, b)}
}

// Muldiv computes a*b/c. The overflow flag indicates that
// the result was 2^64 or greater.
func Muldiv(a uint64, b uint64, c uint64) (res uint64, overflow bool) {
var aa big.Int
aa.SetUint64(a)

var bb big.Int
bb.SetUint64(b)

var cc big.Int
cc.SetUint64(c)

aa.Mul(&aa, &bb)
aa.Div(&aa, &cc)

return aa.Uint64(), !aa.IsUint64()
}
43 changes: 43 additions & 0 deletions data/basics/userBalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,49 @@ func (u AccountData) IsZero() bool {
return reflect.DeepEqual(u, AccountData{})
}

// NormalizedOnlineBalance returns a ``normalized'' balance for this account.
//
// The normalization compensates for rewards that have not yet been applied,
// by computing a balance normalized to round 0. To normalize, we estimate
// the microalgo balance that an account should have had at round 0, in order
// to end up at its current balance when rewards are included.
//
// The benefit of the normalization procedure is that an account's normalized
// balance does not change over time (unlike the actual algo balance that includes
// rewards). This makes it possible to compare normalized balances between two
// accounts, to sort them, and get results that are close to what we would get
// if we computed the exact algo balance of the accounts at a given round number.
//
// The normalization can lead to some inconsistencies in comparisons between
// account balances, because the growth rate of rewards for accounts depends
// on how recently the account has been touched (our rewards do not implement
// compounding). However, online accounts have to periodically renew
// participation keys, so the scale of the inconsistency is small.
func (u AccountData) NormalizedOnlineBalance(proto config.ConsensusParams) uint64 {
if u.Status != Online {
return 0
}

// If this account had one RewardUnit of microAlgos in round 0, it would
// have perRewardUnit microAlgos at the account's current rewards level.
perRewardUnit := u.RewardsBase + proto.RewardUnit

// To normalize, we compute, mathematically,
// u.MicroAlgos / perRewardUnit * proto.RewardUnit, as
// (u.MicroAlgos * proto.RewardUnit) / perRewardUnit.
norm, overflowed := Muldiv(u.MicroAlgos.ToUint64(), proto.RewardUnit, perRewardUnit)

// Mathematically should be impossible to overflow
// because perRewardUnit >= proto.RewardUnit, as long
// as u.RewardBase isn't huge enough to cause overflow..
if overflowed {
logging.Base().Panicf("overflow computing normalized balance %d * %d / (%d + %d)",
u.MicroAlgos.ToUint64(), proto.RewardUnit, u.RewardsBase, proto.RewardUnit)
}

return norm
}

// BalanceRecord pairs an account's address with its associated data.
type BalanceRecord struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`
Expand Down
223 changes: 192 additions & 31 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ var creatablesMigration = []string{
`ALTER TABLE assetcreators ADD COLUMN ctype INTEGER DEFAULT 0`,
}

func createNormalizedOnlineBalanceIndex(idxname string, tablename string) string {
return fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s
ON %s ( normalizedonlinebalance, address, data )
WHERE normalizedonlinebalance>0`, idxname, tablename)
}

var createOnlineAccountIndex = []string{
`ALTER TABLE accountbase
ADD COLUMN normalizedonlinebalance INTEGER`,
createNormalizedOnlineBalanceIndex("onlineaccountbals", "accountbase"),
}

var accountsResetExprs = []string{
`DROP TABLE IF EXISTS acctrounds`,
`DROP TABLE IF EXISTS accounttotals`,
Expand All @@ -100,7 +112,7 @@ var accountsResetExprs = []string{
// accountDBVersion is the database version that this binary would know how to support and how to upgrade to.
// details about the content of each of the versions can be found in the upgrade functions upgradeDatabaseSchemaXXXX
// and their descriptions.
var accountDBVersion = int32(2)
var accountDBVersion = int32(3)

type accountDelta struct {
old basics.AccountData
Expand Down Expand Up @@ -136,60 +148,95 @@ func writeCatchpointStagingCreatable(ctx context.Context, tx *sql.Tx, addr basic
return nil
}

func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []encodedBalanceRecord) error {
insertStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, data) VALUES(?, ?)")
func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []encodedBalanceRecord, proto config.ConsensusParams) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty confident that this isn't what you want to do:
Using the genesis config.ConsensusParams here means that accounts that would be using fast catchup would calculate the normalized online balances based on the genesis params and not by the most recent params.

I would suggest you'll:

  1. add the calculated normalized online balances to the catchpoint file
  2. "restore" it in the same way, without recalculating anything.
  3. add the calculated normalized online balances to the hashing of the account, so that the merkletrie would include that.
  4. update the version of the catchpoint file, so that it would "break" compatibility with older files.

I have yet had the need to perform #4, so I don't know how hard it would be. ( shouldn't be too hard ).

I will be available to help you with this one if you'd like next week.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proto is used only for the value of proto.RewardUnit, which cannot change in a given network, so it's OK to pass the proto for the genesis block or the proto for a recent block.

Copy link
Contributor

@tsachiherman tsachiherman Aug 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we should always use the genesis round proto in the call to
err = accountsNewRound(tx, deltas[i], creatableDeltas[i], protos[i+1]) as well.

since we don't want to make the assumption that "it won't change"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a reasonable idea; thanks! Pushed.

insertStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, normalizedonlinebalance, data) VALUES(?, ?, ?)")
if err != nil {
return err
}

for _, balance := range bals {
result, err := insertStmt.ExecContext(ctx, balance.Address[:], balance.AccountData)
var data basics.AccountData
err = protocol.Decode(balance.AccountData, &data)
if err != nil {
return err
}

normBalance := data.NormalizedOnlineBalance(proto)

result, err := insertStmt.ExecContext(ctx, balance.Address[:], normBalance, balance.AccountData)
if err != nil {
return err
}

aff, err := result.RowsAffected()
if err != nil {
return err
}
if aff != 1 {
return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff)
}

}
return nil
}

func resetCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, newCatchup bool) (err error) {
s := "DROP TABLE IF EXISTS catchpointbalances;"
s += "DROP TABLE IF EXISTS catchpointassetcreators;"
s += "DROP TABLE IF EXISTS catchpointaccounthashes;"
s += "DELETE FROM accounttotals where id='catchpointStaging';"
s := []string{
"DROP TABLE IF EXISTS catchpointbalances",
"DROP TABLE IF EXISTS catchpointassetcreators",
"DROP TABLE IF EXISTS catchpointaccounthashes",
"DELETE FROM accounttotals where id='catchpointStaging'",
}

if newCatchup {
s += "CREATE TABLE IF NOT EXISTS catchpointassetcreators(asset integer primary key, creator blob, ctype integer);"
s += "CREATE TABLE IF NOT EXISTS catchpointbalances(address blob primary key, data blob);"
s += "CREATE TABLE IF NOT EXISTS catchpointaccounthashes(id integer primary key, data blob);"
// SQLite has no way to rename an existing index. So, we need
// to cook up a fresh name for the index, which will be kept
// around after we rename the table from "catchpointbalances"
// to "accountbase". To construct a unique index name, we
// use the current time.
idxname := fmt.Sprintf("onlineaccountbals%d", time.Now().UnixNano())

s = append(s,
"CREATE TABLE IF NOT EXISTS catchpointassetcreators (asset integer primary key, creator blob, ctype integer)",
"CREATE TABLE IF NOT EXISTS catchpointbalances (address blob primary key, data blob, normalizedonlinebalance integer)",
"CREATE TABLE IF NOT EXISTS catchpointaccounthashes (id integer primary key, data blob)",
createNormalizedOnlineBalanceIndex(idxname, "catchpointbalances"),
)
}
_, err = tx.Exec(s)
return err

for _, stmt := range s {
_, err = tx.Exec(stmt)
if err != nil {
return err
}
}

return nil
}

// applyCatchpointStagingBalances switches the staged catchpoint catchup tables onto the actual
// tables and update the correct balance round. This is the final step in switching onto the new catchpoint round.
func applyCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, balancesRound basics.Round) (err error) {
s := "ALTER TABLE accountbase RENAME TO accountbase_old;"
s += "ALTER TABLE assetcreators RENAME TO assetcreators_old;"
s += "ALTER TABLE accounthashes RENAME TO accounthashes_old;"
s += "ALTER TABLE catchpointbalances RENAME TO accountbase;"
s += "ALTER TABLE catchpointassetcreators RENAME TO assetcreators;"
s += "ALTER TABLE catchpointaccounthashes RENAME TO accounthashes;"
s += "DROP TABLE IF EXISTS accountbase_old;"
s += "DROP TABLE IF EXISTS assetcreators_old;"
s += "DROP TABLE IF EXISTS accounthashes_old;"

_, err = tx.Exec(s)
if err != nil {
return err
stmts := []string{
"ALTER TABLE accountbase RENAME TO accountbase_old",
"ALTER TABLE assetcreators RENAME TO assetcreators_old",
"ALTER TABLE accounthashes RENAME TO accounthashes_old",

"ALTER TABLE catchpointbalances RENAME TO accountbase",
"ALTER TABLE catchpointassetcreators RENAME TO assetcreators",
"ALTER TABLE catchpointaccounthashes RENAME TO accounthashes",

"DROP TABLE IF EXISTS accountbase_old",
"DROP TABLE IF EXISTS assetcreators_old",
"DROP TABLE IF EXISTS accounthashes_old",
}

for _, stmt := range stmts {
_, err = tx.Exec(stmt)
if err != nil {
return err
}
}

_, err = tx.Exec("INSERT OR REPLACE INTO acctrounds(id, rnd) VALUES('acctbase', ?)", balancesRound)
if err != nil {
return err
Expand Down Expand Up @@ -269,6 +316,76 @@ func accountsInit(tx *sql.Tx, initAccounts map[basics.Address]basics.AccountData
return nil
}

// accountsAddNormalizedBalance adds the normalizedonlinebalance column
// to the accountbase table.
func accountsAddNormalizedBalance(tx *sql.Tx, proto config.ConsensusParams) error {
var exists bool
err := tx.QueryRow("SELECT 1 FROM pragma_table_info('accountbase') WHERE name='normalizedonlinebalance'").Scan(&exists)
if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the database versioning, you dont really need to verify that the column present/missing, but it wouldn't be incorrect either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out to be convenient because in the accountdb test code, I don't want to be invoking acctupdates code, but I still need some way to add the normalizedonlinebalance column. Without this check, I would need some other way to determine if this column has been added or not.

// Already exists.
return nil
}
if err != sql.ErrNoRows {
return err
}

for _, stmt := range createOnlineAccountIndex {
_, err := tx.Exec(stmt)
if err != nil {
return err
}
}

rows, err := tx.Query("SELECT address, data FROM accountbase")
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var addrbuf []byte
var buf []byte
err = rows.Scan(&addrbuf, &buf)
if err != nil {
return err
}

var data basics.AccountData
err = protocol.Decode(buf, &data)
if err != nil {
return err
}

normBalance := data.NormalizedOnlineBalance(proto)
if normBalance > 0 {
_, err = tx.Exec("UPDATE accountbase SET normalizedonlinebalance=? WHERE address=?", normBalance, addrbuf)
if err != nil {
return err
}
}
}

return rows.Err()
}

// accountDataToOnline returns the part of the AccountData that matters
// for online accounts (to answer top-N queries). We store a subset of
// the full AccountData because we need to store a large number of these
// in memory (say, 1M), and storing that many AccountData could easily
// cause us to run out of memory.
func accountDataToOnline(address basics.Address, ad *basics.AccountData, proto config.ConsensusParams) *onlineAccount {
return &onlineAccount{
Address: address,
MicroAlgos: ad.MicroAlgos,
RewardsBase: ad.RewardsBase,
NormalizedOnlineBalance: ad.NormalizedOnlineBalance(proto),
VoteID: ad.VoteID,
VoteFirstValid: ad.VoteFirstValid,
VoteLastValid: ad.VoteLastValid,
VoteKeyDilution: ad.VoteKeyDilution,
}
}

func resetAccountHashes(tx *sql.Tx) (err error) {
_, err = tx.Exec(`DELETE FROM accounthashes`)
return
Expand Down Expand Up @@ -554,6 +671,50 @@ func (qs *accountsDbQueries) close() {
}
}

// accountsOnlineTop returns the top n online accounts starting at position offset
// (that is, the top offset'th account through the top offset+n-1'th account).
//
// The accounts are sorted by their normalized balance and address. The normalized
// balance has to do with the reward parts of online account balances. See the
// normalization procedure in AccountData.NormalizedOnlineBalance().
//
// Note that this does not check if the accounts have a vote key valid for any
// particular round (past, present, or future).
func accountsOnlineTop(tx *sql.Tx, offset, n uint64, proto config.ConsensusParams) (map[basics.Address]*onlineAccount, error) {
rows, err := tx.Query("SELECT address, data FROM accountbase WHERE normalizedonlinebalance>0 ORDER BY normalizedonlinebalance DESC, address DESC LIMIT ? OFFSET ?", n, offset)
if err != nil {
return nil, err
}
defer rows.Close()

res := make(map[basics.Address]*onlineAccount, n)
for rows.Next() {
var addrbuf []byte
var buf []byte
err = rows.Scan(&addrbuf, &buf)
if err != nil {
return nil, err
}

var data basics.AccountData
err = protocol.Decode(buf, &data)
if err != nil {
return nil, err
}

var addr basics.Address
if len(addrbuf) != len(addr) {
err = fmt.Errorf("Account DB address length mismatch: %d != %d", len(addrbuf), len(addr))
return nil, err
}

copy(addr[:], addrbuf)
res[addr] = accountDataToOnline(addr, &data, proto)
}

return res, rows.Err()
}

func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err error) {
rows, err := tx.Query("SELECT address, data FROM accountbase")
if err != nil {
Expand Down Expand Up @@ -619,7 +780,7 @@ func accountsPutTotals(tx *sql.Tx, totals AccountTotals, catchpointStaging bool)
}

// accountsNewRound updates the accountbase and assetcreators by applying the provided deltas to the accounts / creatables.
func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creatables map[basics.CreatableIndex]modifiedCreatable) (err error) {
func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creatables map[basics.CreatableIndex]modifiedCreatable, proto config.ConsensusParams) (err error) {

var insertCreatableIdxStmt, deleteCreatableIdxStmt, deleteStmt, replaceStmt *sql.Stmt

Expand All @@ -629,7 +790,7 @@ func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creat
}
defer deleteStmt.Close()

replaceStmt, err = tx.Prepare("REPLACE INTO accountbase (address, data) VALUES (?, ?)")
replaceStmt, err = tx.Prepare("REPLACE INTO accountbase (address, normalizedonlinebalance, data) VALUES (?, ?, ?)")
if err != nil {
return
}
Expand All @@ -640,12 +801,12 @@ func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creat
// prune empty accounts
_, err = deleteStmt.Exec(addr[:])
} else {
_, err = replaceStmt.Exec(addr[:], protocol.Encode(&data.new))
normBalance := data.new.NormalizedOnlineBalance(proto)
_, err = replaceStmt.Exec(addr[:], normBalance, protocol.Encode(&data.new))
}
if err != nil {
return
}

}

if len(creatables) > 0 {
Expand Down
Loading