-
Notifications
You must be signed in to change notification settings - Fork 493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement tracking of top-N online accounts in the ledger #1361
Changes from 9 commits
4aae498
2f1991a
2b120c0
0a5fa87
1bf9b5a
eca678d
e9bdcde
06b8a9e
5934dee
db1494e
43f397d
111be87
153d7b4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,18 @@ var creatablesMigration = []string{ | |
`ALTER TABLE assetcreators ADD COLUMN ctype INTEGER DEFAULT 0`, | ||
} | ||
|
||
func createNormalizedOnlineBalanceIndex(idxname string, tablename string) string { | ||
return fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s | ||
ON %s ( normalizedonlinebalance, address, data ) | ||
WHERE normalizedonlinebalance>0`, idxname, tablename) | ||
} | ||
|
||
var createOnlineAccountIndex = []string{ | ||
`ALTER TABLE accountbase | ||
ADD COLUMN normalizedonlinebalance INTEGER`, | ||
createNormalizedOnlineBalanceIndex("onlineaccountbals", "accountbase"), | ||
} | ||
|
||
var accountsResetExprs = []string{ | ||
`DROP TABLE IF EXISTS acctrounds`, | ||
`DROP TABLE IF EXISTS accounttotals`, | ||
|
@@ -100,7 +112,7 @@ var accountsResetExprs = []string{ | |
// accountDBVersion is the database version that this binary would know how to support and how to upgrade to. | ||
// details about the content of each of the versions can be found in the upgrade functions upgradeDatabaseSchemaXXXX | ||
// and their descriptions. | ||
var accountDBVersion = int32(2) | ||
var accountDBVersion = int32(3) | ||
|
||
type accountDelta struct { | ||
old basics.AccountData | ||
|
@@ -136,60 +148,95 @@ func writeCatchpointStagingCreatable(ctx context.Context, tx *sql.Tx, addr basic | |
return nil | ||
} | ||
|
||
func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []encodedBalanceRecord) error { | ||
insertStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, data) VALUES(?, ?)") | ||
func writeCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, bals []encodedBalanceRecord, proto config.ConsensusParams) error { | ||
insertStmt, err := tx.PrepareContext(ctx, "INSERT INTO catchpointbalances(address, normalizedonlinebalance, data) VALUES(?, ?, ?)") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for _, balance := range bals { | ||
result, err := insertStmt.ExecContext(ctx, balance.Address[:], balance.AccountData) | ||
var data basics.AccountData | ||
err = protocol.Decode(balance.AccountData, &data) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
normBalance := data.NormalizedOnlineBalance(proto) | ||
|
||
result, err := insertStmt.ExecContext(ctx, balance.Address[:], normBalance, balance.AccountData) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
aff, err := result.RowsAffected() | ||
if err != nil { | ||
return err | ||
} | ||
if aff != 1 { | ||
return fmt.Errorf("number of affected record in insert was expected to be one, but was %d", aff) | ||
} | ||
|
||
} | ||
return nil | ||
} | ||
|
||
func resetCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, newCatchup bool) (err error) { | ||
s := "DROP TABLE IF EXISTS catchpointbalances;" | ||
s += "DROP TABLE IF EXISTS catchpointassetcreators;" | ||
s += "DROP TABLE IF EXISTS catchpointaccounthashes;" | ||
s += "DELETE FROM accounttotals where id='catchpointStaging';" | ||
s := []string{ | ||
"DROP TABLE IF EXISTS catchpointbalances", | ||
"DROP TABLE IF EXISTS catchpointassetcreators", | ||
"DROP TABLE IF EXISTS catchpointaccounthashes", | ||
"DELETE FROM accounttotals where id='catchpointStaging'", | ||
} | ||
|
||
if newCatchup { | ||
s += "CREATE TABLE IF NOT EXISTS catchpointassetcreators(asset integer primary key, creator blob, ctype integer);" | ||
s += "CREATE TABLE IF NOT EXISTS catchpointbalances(address blob primary key, data blob);" | ||
s += "CREATE TABLE IF NOT EXISTS catchpointaccounthashes(id integer primary key, data blob);" | ||
// SQLite has no way to rename an existing index. So, we need | ||
// to cook up a fresh name for the index, which will be kept | ||
// around after we rename the table from "catchpointbalances" | ||
// to "accountbase". To construct a unique index name, we | ||
// use the current time. | ||
idxname := fmt.Sprintf("onlineaccountbals%d", time.Now().UnixNano()) | ||
|
||
s = append(s, | ||
"CREATE TABLE IF NOT EXISTS catchpointassetcreators (asset integer primary key, creator blob, ctype integer)", | ||
"CREATE TABLE IF NOT EXISTS catchpointbalances (address blob primary key, data blob, normalizedonlinebalance integer)", | ||
"CREATE TABLE IF NOT EXISTS catchpointaccounthashes (id integer primary key, data blob)", | ||
createNormalizedOnlineBalanceIndex(idxname, "catchpointbalances"), | ||
) | ||
} | ||
_, err = tx.Exec(s) | ||
return err | ||
|
||
for _, stmt := range s { | ||
_, err = tx.Exec(stmt) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// applyCatchpointStagingBalances switches the staged catchpoint catchup tables onto the actual | ||
// tables and update the correct balance round. This is the final step in switching onto the new catchpoint round. | ||
func applyCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, balancesRound basics.Round) (err error) { | ||
s := "ALTER TABLE accountbase RENAME TO accountbase_old;" | ||
s += "ALTER TABLE assetcreators RENAME TO assetcreators_old;" | ||
s += "ALTER TABLE accounthashes RENAME TO accounthashes_old;" | ||
s += "ALTER TABLE catchpointbalances RENAME TO accountbase;" | ||
s += "ALTER TABLE catchpointassetcreators RENAME TO assetcreators;" | ||
s += "ALTER TABLE catchpointaccounthashes RENAME TO accounthashes;" | ||
s += "DROP TABLE IF EXISTS accountbase_old;" | ||
s += "DROP TABLE IF EXISTS assetcreators_old;" | ||
s += "DROP TABLE IF EXISTS accounthashes_old;" | ||
|
||
_, err = tx.Exec(s) | ||
if err != nil { | ||
return err | ||
stmts := []string{ | ||
"ALTER TABLE accountbase RENAME TO accountbase_old", | ||
"ALTER TABLE assetcreators RENAME TO assetcreators_old", | ||
"ALTER TABLE accounthashes RENAME TO accounthashes_old", | ||
|
||
"ALTER TABLE catchpointbalances RENAME TO accountbase", | ||
"ALTER TABLE catchpointassetcreators RENAME TO assetcreators", | ||
"ALTER TABLE catchpointaccounthashes RENAME TO accounthashes", | ||
|
||
"DROP TABLE IF EXISTS accountbase_old", | ||
"DROP TABLE IF EXISTS assetcreators_old", | ||
"DROP TABLE IF EXISTS accounthashes_old", | ||
} | ||
|
||
for _, stmt := range stmts { | ||
_, err = tx.Exec(stmt) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
_, err = tx.Exec("INSERT OR REPLACE INTO acctrounds(id, rnd) VALUES('acctbase', ?)", balancesRound) | ||
if err != nil { | ||
return err | ||
|
@@ -269,6 +316,76 @@ func accountsInit(tx *sql.Tx, initAccounts map[basics.Address]basics.AccountData | |
return nil | ||
} | ||
|
||
// accountsAddNormalizedBalance adds the normalizedonlinebalance column | ||
// to the accountbase table. | ||
func accountsAddNormalizedBalance(tx *sql.Tx, proto config.ConsensusParams) error { | ||
var exists bool | ||
err := tx.QueryRow("SELECT 1 FROM pragma_table_info('accountbase') WHERE name='normalizedonlinebalance'").Scan(&exists) | ||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the database versioning, you dont really need to verify that the column present/missing, but it wouldn't be incorrect either. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It turns out to be convenient because in the accountdb test code, I don't want to be invoking acctupdates code, but I still need some way to add the normalizedonlinebalance column. Without this check, I would need some other way to determine if this column has been added or not. |
||
// Already exists. | ||
return nil | ||
} | ||
if err != sql.ErrNoRows { | ||
return err | ||
} | ||
|
||
for _, stmt := range createOnlineAccountIndex { | ||
_, err := tx.Exec(stmt) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
rows, err := tx.Query("SELECT address, data FROM accountbase") | ||
if err != nil { | ||
return err | ||
} | ||
defer rows.Close() | ||
|
||
for rows.Next() { | ||
var addrbuf []byte | ||
var buf []byte | ||
err = rows.Scan(&addrbuf, &buf) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var data basics.AccountData | ||
err = protocol.Decode(buf, &data) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
normBalance := data.NormalizedOnlineBalance(proto) | ||
if normBalance > 0 { | ||
_, err = tx.Exec("UPDATE accountbase SET normalizedonlinebalance=? WHERE address=?", normBalance, addrbuf) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
|
||
return rows.Err() | ||
} | ||
|
||
// accountDataToOnline returns the part of the AccountData that matters | ||
// for online accounts (to answer top-N queries). We store a subset of | ||
// the full AccountData because we need to store a large number of these | ||
// in memory (say, 1M), and storing that many AccountData could easily | ||
// cause us to run out of memory. | ||
func accountDataToOnline(address basics.Address, ad *basics.AccountData, proto config.ConsensusParams) *onlineAccount { | ||
return &onlineAccount{ | ||
Address: address, | ||
MicroAlgos: ad.MicroAlgos, | ||
RewardsBase: ad.RewardsBase, | ||
NormalizedOnlineBalance: ad.NormalizedOnlineBalance(proto), | ||
VoteID: ad.VoteID, | ||
VoteFirstValid: ad.VoteFirstValid, | ||
VoteLastValid: ad.VoteLastValid, | ||
VoteKeyDilution: ad.VoteKeyDilution, | ||
} | ||
} | ||
|
||
func resetAccountHashes(tx *sql.Tx) (err error) { | ||
_, err = tx.Exec(`DELETE FROM accounthashes`) | ||
return | ||
|
@@ -554,6 +671,50 @@ func (qs *accountsDbQueries) close() { | |
} | ||
} | ||
|
||
// accountsOnlineTop returns the top n online accounts starting at position offset | ||
// (that is, the top offset'th account through the top offset+n-1'th account). | ||
// | ||
// The accounts are sorted by their normalized balance and address. The normalized | ||
// balance has to do with the reward parts of online account balances. See the | ||
// normalization procedure in AccountData.NormalizedOnlineBalance(). | ||
// | ||
// Note that this does not check if the accounts have a vote key valid for any | ||
// particular round (past, present, or future). | ||
func accountsOnlineTop(tx *sql.Tx, offset, n uint64, proto config.ConsensusParams) (map[basics.Address]*onlineAccount, error) { | ||
rows, err := tx.Query("SELECT address, data FROM accountbase WHERE normalizedonlinebalance>0 ORDER BY normalizedonlinebalance DESC, address DESC LIMIT ? OFFSET ?", n, offset) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer rows.Close() | ||
|
||
res := make(map[basics.Address]*onlineAccount, n) | ||
for rows.Next() { | ||
var addrbuf []byte | ||
var buf []byte | ||
err = rows.Scan(&addrbuf, &buf) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var data basics.AccountData | ||
err = protocol.Decode(buf, &data) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var addr basics.Address | ||
if len(addrbuf) != len(addr) { | ||
err = fmt.Errorf("Account DB address length mismatch: %d != %d", len(addrbuf), len(addr)) | ||
return nil, err | ||
} | ||
|
||
copy(addr[:], addrbuf) | ||
res[addr] = accountDataToOnline(addr, &data, proto) | ||
} | ||
|
||
return res, rows.Err() | ||
} | ||
|
||
func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err error) { | ||
rows, err := tx.Query("SELECT address, data FROM accountbase") | ||
if err != nil { | ||
|
@@ -619,7 +780,7 @@ func accountsPutTotals(tx *sql.Tx, totals AccountTotals, catchpointStaging bool) | |
} | ||
|
||
// accountsNewRound updates the accountbase and assetcreators by applying the provided deltas to the accounts / creatables. | ||
func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creatables map[basics.CreatableIndex]modifiedCreatable) (err error) { | ||
func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creatables map[basics.CreatableIndex]modifiedCreatable, proto config.ConsensusParams) (err error) { | ||
|
||
var insertCreatableIdxStmt, deleteCreatableIdxStmt, deleteStmt, replaceStmt *sql.Stmt | ||
|
||
|
@@ -629,7 +790,7 @@ func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creat | |
} | ||
defer deleteStmt.Close() | ||
|
||
replaceStmt, err = tx.Prepare("REPLACE INTO accountbase (address, data) VALUES (?, ?)") | ||
replaceStmt, err = tx.Prepare("REPLACE INTO accountbase (address, normalizedonlinebalance, data) VALUES (?, ?, ?)") | ||
if err != nil { | ||
return | ||
} | ||
|
@@ -640,12 +801,12 @@ func accountsNewRound(tx *sql.Tx, updates map[basics.Address]accountDelta, creat | |
// prune empty accounts | ||
_, err = deleteStmt.Exec(addr[:]) | ||
} else { | ||
_, err = replaceStmt.Exec(addr[:], protocol.Encode(&data.new)) | ||
normBalance := data.new.NormalizedOnlineBalance(proto) | ||
_, err = replaceStmt.Exec(addr[:], normBalance, protocol.Encode(&data.new)) | ||
} | ||
if err != nil { | ||
return | ||
} | ||
|
||
} | ||
|
||
if len(creatables) > 0 { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty confident that this isn't what you want to do:
Using the genesis config.ConsensusParams here means that accounts that would be using fast catchup would calculate the normalized online balances based on the genesis params and not by the most recent params.
I would suggest you'll:
I have yet had the need to perform #4, so I don't know how hard it would be. ( shouldn't be too hard ).
I will be available to help you with this one if you'd like next week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
proto
is used only for the value ofproto.RewardUnit
, which cannot change in a given network, so it's OK to pass the proto for the genesis block or the proto for a recent block.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, we should always use the genesis round proto in the call to
err = accountsNewRound(tx, deltas[i], creatableDeltas[i], protos[i+1])
as well.since we don't want to make the assumption that "it won't change"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems like a reasonable idea; thanks! Pushed.