From 724633a6f647e6751aae6aefe041f950dd3c4a48 Mon Sep 17 00:00:00 2001 From: Will Winder Date: Thu, 19 Jan 2023 14:19:59 -0500 Subject: [PATCH] Remove topAccountListener (#5027) --- logging/telemetryspec/event.go | 11 - node/node.go | 5 - node/topAccountListener.go | 212 ------------------ node/topAccountListener_test.go | 368 -------------------------------- 4 files changed, 596 deletions(-) delete mode 100644 node/topAccountListener.go delete mode 100644 node/topAccountListener_test.go diff --git a/logging/telemetryspec/event.go b/logging/telemetryspec/event.go index a212037500..34d161bce5 100644 --- a/logging/telemetryspec/event.go +++ b/logging/telemetryspec/event.go @@ -99,17 +99,6 @@ type BlockAcceptedEventDetails struct { VoteBufLen uint64 } -// TopAccountsEvent event -const TopAccountsEvent Event = "TopAccounts" - -// TopAccountEventDetails contains details for the BlockAcceptedEvent -type TopAccountEventDetails struct { - Round uint64 - OnlineAccounts []map[string]interface{} - OnlineCirculation uint64 - OfflineCirculation uint64 -} - // AccountRegisteredEvent event const AccountRegisteredEvent Event = "AccountRegistered" diff --git a/node/node.go b/node/node.go index 45e1e20ab1..c4275a6e6f 100644 --- a/node/node.go +++ b/node/node.go @@ -198,8 +198,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd p2pNode.SetPrioScheme(node) node.net = p2pNode - accountListener := makeTopAccountListener(log) - // load stored data genesisDir := filepath.Join(rootDir, genesis.ID()) ledgerPathnamePrefix := filepath.Join(genesisDir, config.LedgerFilenamePrefix) @@ -232,9 +230,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd node, } - if node.config.EnableTopAccountsReporting { - blockListeners = append(blockListeners, &accountListener) - } node.ledger.RegisterBlockListeners(blockListeners) txHandlerOpts := data.TxHandlerOpts{ TxPool: node.transactionPool, diff --git a/node/topAccountListener.go b/node/topAccountListener.go deleted file mode 100644 index 4b1e50ac69..0000000000 --- a/node/topAccountListener.go +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright (C) 2019-2023 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . - -package node - -import ( - "sort" - - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/ledger/ledgercore" - "github.com/algorand/go-algorand/logging" - "github.com/algorand/go-algorand/logging/telemetryspec" - "github.com/algorand/go-algorand/protocol" -) - -const numTopAccounts = 20 - -type topAccountListener struct { - log logging.Logger - - round basics.Round - - onlineCirculation basics.MicroAlgos - - totalCirculation basics.MicroAlgos - - // Cached between rounds to optimize ledger lookups. - accounts []basics.AccountDetail -} - -func makeTopAccountListener(log logging.Logger) topAccountListener { - return topAccountListener{ - log: log, - // TODO: If needed, increase size of this slice to buffer some accounts beyond the TopN. - accounts: make([]basics.AccountDetail, 0, numTopAccounts), - } -} - -func (t *topAccountListener) init(balances basics.BalanceDetail) { - t.round = balances.Round - t.onlineCirculation = balances.OnlineMoney - t.totalCirculation = balances.TotalMoney - t.accounts = t.accounts[:0] - - // TODO: After ledger refactor this might be replaced with a loop processing pages of results from a SQL command. - t.accounts = updateTopAccounts(t.accounts, balances.Accounts) -} - -// BlockListener event, triggered when the ledger writes a new block. -func (t *topAccountListener) OnNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta) { - // XXX revise for new ledger API - // t.update(block, balances) - - // If number of accounts after update is insufficient, do a full re-init - if len(t.accounts) < numTopAccounts { - // XXX revise for new ledger API - // t.init(balances) - } - - t.sendEvent() -} - -// Account cache update logic here. -func (t *topAccountListener) update(b bookkeeping.Block, balances basics.BalanceDetail) { - lastRound := t.round - - // Update metadata. - t.round = balances.Round - t.onlineCirculation = balances.OnlineMoney - t.totalCirculation = balances.TotalMoney - - // Invalidate accounts if a round is missed (this also causes the accounts to be lazily initialized). - if lastRound+1 != balances.Round { - t.accounts = t.accounts[:0] - return - } - - // No transactions to update. - if len(balances.Accounts) == 0 { - return - } - - // Lookup map for updated accounts. - accountSet := make(map[basics.Address]bool) - - payset, err := b.DecodePaysetFlat() - if err != nil { - return - } - - for _, txad := range payset { - tx := txad.SignedTxn - if tx.Txn.Type == protocol.PaymentTx { - accountSet[tx.Txn.Receiver] = true - if tx.Txn.CloseRemainderTo != (basics.Address{}) { - accountSet[tx.Txn.CloseRemainderTo] = true - } - } - accountSet[tx.Txn.Src()] = true - } - - // TODO: This loop may not be needed with the ledger refactor. - // Since the balance list currently is unrelated to the transaction list, must iterate balances. - for _, tx := range balances.Accounts { - accountSet[tx.Address] = true - } - - // Remove any accounts in the updated accountSet (they'll be merged back if necessary) - t.accounts = removeSome(t.accounts, func(addr basics.AccountDetail) bool { return accountSet[addr.Address] }) - - // Grab the smallest record after removing modified accounts - smallestAccountSize := basics.MicroAlgos{Raw: 0} - if len(t.accounts) != 0 { - smallestAccountSize = t.accounts[len(t.accounts)-1].Algos - } - - t.accounts = updateTopAccounts(t.accounts, balances.Accounts) - - // Truncate any accounts after the smallest balance. - // This triggers a full re-init if the length falls below 'numTopAccounts' - for i, acct := range t.accounts { - if acct.Algos.LessThan(smallestAccountSize) { - t.accounts = t.accounts[:i] - return - } - } -} - -// Helper method to defragment a slice using a predicate to identify stale entries. -func removeSome(slice []basics.AccountDetail, predicate func(basics.AccountDetail) bool) []basics.AccountDetail { - // Remove updated accounts (they'll be merged back in as necessary) - next, end := 0, 0 - for (next + end) < len(slice) { - if predicate(slice[next+end]) { - end++ - } else { - slice[next] = slice[next+end] - next++ - } - } - - return slice[:next] -} - -// Merge largest accounts from balances into topN, removing values from topN as necessary. -// The underlying capacity will not be modified, but the length may increase. -// Note: Doesn't check for duplicates. -func updateTopAccounts(topN []basics.AccountDetail, balances []basics.AccountDetail) []basics.AccountDetail { - for _, account := range balances { - balance := account.Algos - - // Quick check for topN if capacity is reached. - if account.Status != basics.Online || len(topN) != 0 && len(topN) == cap(topN) && balance.Raw <= topN[len(topN)-1].Algos.Raw { - continue - } - - // Find insertion point. - pos := sort.Search(len(topN), func(i int) bool { - return topN[i].Algos.LessThan(balance) - }) - - // Increase capacity if more space is available. - if len(topN) < cap(topN) { - topN = topN[:len(topN)+1] - } - - // Shift upper elements and insert - if pos < len(topN) { - copy(topN[pos+1:], topN[pos:]) - topN[pos] = account - } - } - - return topN -} - -// Compile current top account state into a telemetry event, and send it. -func (t *topAccountListener) sendEvent() { - // Build accounts object. - payload := make([]map[string]interface{}, 0) - fCirculation := float64(t.onlineCirculation.ToUint64()) - for _, account := range t.accounts[:] { - entry := make(map[string]interface{}) - entry["address"] = account.Address.String() - entry["balance"] = account.Algos.ToUint64() - entry["stake"] = float64(account.Algos.ToUint64()) / fCirculation - payload = append(payload, entry) - } - - // Send it out - t.log.EventWithDetails(telemetryspec.Accounts, telemetryspec.TopAccountsEvent, - telemetryspec.TopAccountEventDetails{ - Round: uint64(t.round), - OnlineAccounts: payload, - OnlineCirculation: t.onlineCirculation.ToUint64(), - OfflineCirculation: t.totalCirculation.ToUint64() - t.onlineCirculation.ToUint64(), - }) -} diff --git a/node/topAccountListener_test.go b/node/topAccountListener_test.go deleted file mode 100644 index 166baab72b..0000000000 --- a/node/topAccountListener_test.go +++ /dev/null @@ -1,368 +0,0 @@ -// Copyright (C) 2019-2023 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . - -package node - -import ( - "fmt" - "testing" - - "github.com/algorand/go-algorand/crypto" - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/bookkeeping" - "github.com/algorand/go-algorand/data/transactions" - "github.com/algorand/go-algorand/logging" - "github.com/algorand/go-algorand/protocol" - "github.com/algorand/go-algorand/test/partitiontest" -) - -// errorString is a trivial implementation of error. -type errorString struct { - s string -} - -func (e *errorString) Error() string { - return e.s -} - -func TestUpdateTopAccounts(t *testing.T) { - partitiontest.PartitionTest(t) - - var topN []basics.AccountDetail - var input []basics.AccountDetail - - // Empty target array. - topN = []basics.AccountDetail{} - input = []basics.AccountDetail{onlineDetail(byte(0), 1), onlineDetail(byte(1), 10)} - topN = updateTopAccounts(topN, input) - - if len(topN) != 0 { - t.Errorf("Target slice not 0: len(topN) == %d", len(topN)) - } - - // Extra space available - topN = make([]basics.AccountDetail, 0, 20) - input = []basics.AccountDetail{onlineDetail(byte(0), 1), onlineDetail(byte(1), 10)} - topN = updateTopAccounts(topN, input) - - if err := verifyAccountBalances([]uint64{10, 1}, topN); err != nil { - t.Error(err) - } - - // Overflow, unmodified - topN = make([]basics.AccountDetail, 0, 4) - input = []basics.AccountDetail{ - onlineDetail(byte(0), 11), - onlineDetail(byte(1), 12), - onlineDetail(byte(2), 13), - onlineDetail(byte(3), 14), - onlineDetail(byte(4), 1), - } - topN = updateTopAccounts(topN, input) - - if err := verifyAccountBalances([]uint64{14, 13, 12, 11}, topN); err != nil { - t.Error(err) - } - - // Overflow, insert front - topN = make([]basics.AccountDetail, 0, 4) - input = []basics.AccountDetail{ - onlineDetail(byte(1), 11), - onlineDetail(byte(2), 12), - onlineDetail(byte(3), 13), - onlineDetail(byte(4), 14), - onlineDetail(byte(5), 15), - } - topN = updateTopAccounts(topN, input) - - if err := verifyAccountBalances([]uint64{15, 14, 13, 12}, topN); err != nil { - t.Error(err) - } - - // Overflow, insert middle - topN = make([]basics.AccountDetail, 0, 4) - input = []basics.AccountDetail{ - onlineDetail(byte(1), 11), - onlineDetail(byte(2), 12), - onlineDetail(byte(3), 13), - onlineDetail(byte(4), 15), - onlineDetail(byte(5), 14), - } - topN = updateTopAccounts(topN, input) - - if err := verifyAccountBalances([]uint64{15, 14, 13, 12}, topN); err != nil { - t.Error(err) - } - - // Overflow, insert end - topN = make([]basics.AccountDetail, 0, 4) - input = []basics.AccountDetail{ - onlineDetail(byte(1), 11), - onlineDetail(byte(2), 13), - onlineDetail(byte(3), 14), - onlineDetail(byte(4), 15), - onlineDetail(byte(5), 12), - } - topN = updateTopAccounts(topN, input) - - if err := verifyAccountBalances([]uint64{15, 14, 13, 12}, topN); err != nil { - t.Error(err) - } - - // Ignore offline account, shouldn't change topN - topN = updateTopAccounts(topN, []basics.AccountDetail{detail(byte(6), 200, false)}) - topN = make([]basics.AccountDetail, 0, 4) - input = []basics.AccountDetail{ - onlineDetail(byte(1), 12), - onlineDetail(byte(2), 13), - onlineDetail(byte(3), 14), - onlineDetail(byte(4), 15), - detail(byte(5), 200, false), - } - topN = updateTopAccounts(topN, input) - - if err := verifyAccountBalances([]uint64{15, 14, 13, 12}, topN); err != nil { - t.Error(err) - } -} - -func TestRemoveSome(t *testing.T) { - partitiontest.PartitionTest(t) - - // Initialize slice with 100 accounts - var accountsSlice []basics.AccountDetail - for i := 0; i <= 100; i++ { - accountsSlice = append(accountsSlice, onlineDetail(byte(i), 10)) - } - - // Remove accounts where the first byte is divisible by 10 (which includes the first and last index - remove10s := func(details basics.AccountDetail) bool { - return getInt(details)%10 == 0 - } - - accountsSlice = removeSome(accountsSlice, remove10s) - - if len(accountsSlice) != 90 { - t.Errorf("Unexpected size found after removeSome/remove10s: 90 != %d", len(accountsSlice)) - } - for _, d := range accountsSlice { - if getInt(d)%10 == 0 { - t.Errorf("Unexpected value found after removeSome/remove10s: %d", getInt(d)) - } - } - - // Remove remaining accounts where the first byte is even - removeEven := func(details basics.AccountDetail) bool { - return getInt(details)%2 == 0 - } - - accountsSlice = removeSome(accountsSlice, removeEven) - - if len(accountsSlice) != 50 { - t.Errorf("Unexpected size found after removeSome/removeEven: 50 != %d", len(accountsSlice)) - } - for _, d := range accountsSlice { - if getInt(d)%2 == 0 { - t.Errorf("Unexpected value found after removeSome/removeEven: %d", getInt(d)) - } - } -} - -func TestUpdate(t *testing.T) { - partitiontest.PartitionTest(t) - - listener := topAccountListener{ - accounts: []basics.AccountDetail{}, - round: 1, - totalCirculation: basics.MicroAlgos{Raw: 100}, - onlineCirculation: basics.MicroAlgos{Raw: 100}, - } - - balanceUpdate := basics.BalanceDetail{ - Accounts: []basics.AccountDetail{}, - Round: 2, - OnlineMoney: basics.MicroAlgos{Raw: 100000}, - TotalMoney: basics.MicroAlgos{Raw: 1000000}, - } - - // Update when accounts is empty. - listener.update(bookkeeping.Block{}, balanceUpdate) - if err := verifyListener(listener, []uint64{}, 100000, 1000000, 2); err != nil { - t.Error(err) - } - - // Transactions causing acct 1 to increase reorders the TopN. - listener.accounts = []basics.AccountDetail{ - onlineDetail(byte(0), 15), - onlineDetail(byte(1), 10), - onlineDetail(byte(2), 5), - } - balanceUpdate.Accounts = []basics.AccountDetail{onlineDetail(byte(1), 100)} - balanceUpdate.Round++ - block := makeBlockWithTxnFor([]byte{3}, []byte{1}) - - listener.update(block, balanceUpdate) - - // 10 -> 100. - if err := verifyListener(listener, []uint64{100, 15, 5}, 100000, 1000000, 3); err != nil { - t.Error(err) - } - - // Transactions causing acct 1 to decrease and falls off topN truncates result. - listener.accounts = []basics.AccountDetail{ - onlineDetail(byte(0), 15), - onlineDetail(byte(1), 10), - onlineDetail(byte(2), 5), - } - balanceUpdate.Round++ - balanceUpdate.TotalMoney = basics.MicroAlgos{Raw: 99999999} - balanceUpdate.Accounts = []basics.AccountDetail{onlineDetail(byte(1), 1)} - block = makeBlockWithTxnFor([]byte{3}, []byte{1}) - listener.update(block, balanceUpdate) - - if err := verifyListener(listener, []uint64{15, 5}, 100000, 99999999, 4); err != nil { - t.Error(err) - } - - // Transactions causing adding a balance to a new account are not reflected in TopN, because they are smaller than - // the smallest value in TopN (even though there is capacity for it). - listener.accounts = make([]basics.AccountDetail, 3, 10) - listener.accounts[0] = onlineDetail(byte(0), 15) - listener.accounts[1] = onlineDetail(byte(1), 10) - listener.accounts[2] = onlineDetail(byte(2), 5) - - balanceUpdate.Round++ - balanceUpdate.Accounts = []basics.AccountDetail{onlineDetail(byte(3), 1)} - block = makeBlockWithTxnFor([]byte{5}, []byte{3}) - listener.update(block, balanceUpdate) - - if err := verifyListener(listener, []uint64{15, 10, 5}, 100000, 99999999, 5); err != nil { - t.Error(err) - } - - // Invalid round truncates accounts slice - listener.update(block, balanceUpdate) - if len(listener.accounts) != 0 { - t.Errorf("Accounts should be truncated to zero after unexpected round: len(topN) = %d", len(listener.accounts)) - } -} - -func TestInit(t *testing.T) { - partitiontest.PartitionTest(t) - - listener := makeTopAccountListener(logging.Base()) - - // "init" should remove existing values before adding new ones. - balanceUpdate := basics.BalanceDetail{ - Accounts: make([]basics.AccountDetail, 0, 10), - Round: 2, - OnlineMoney: basics.MicroAlgos{Raw: 100}, - TotalMoney: basics.MicroAlgos{Raw: 100}, - } - - listener.accounts = append(listener.accounts, onlineDetail(byte(10), 100)) - balanceUpdate.Accounts = []basics.AccountDetail{onlineDetail(byte(1), 1)} - - listener.init(balanceUpdate) - - if err := verifyListener(listener, []uint64{1}, 100, 100, 2); err != nil { - t.Error(err) - } -} - -func makeBlockWithTxnFor(senders []byte, receivers []byte) bookkeeping.Block { - var blk bookkeeping.Block - blk.BlockHeader.GenesisID = "foo" - crypto.RandBytes(blk.BlockHeader.GenesisHash[:]) - blk.CurrentProtocol = protocol.ConsensusFuture - - paysets := make([]transactions.SignedTxnInBlock, 0, len(receivers)) - for i, b := range receivers { - txib, err := blk.EncodeSignedTxn(transactions.SignedTxn{ - Txn: transactions.Transaction{ - Type: protocol.PaymentTx, - Header: transactions.Header{ - Sender: basics.Address{senders[i]}, - GenesisID: blk.BlockHeader.GenesisID, - GenesisHash: blk.BlockHeader.GenesisHash, - }, - PaymentTxnFields: transactions.PaymentTxnFields{ - Receiver: basics.Address{b}, - // If this ends up being used by topAccountListener, add it here. - // Amount: basics.MicroAlgos{123}, - }, - }}, transactions.ApplyData{}) - if err != nil { - panic(err) - } - - paysets = append(paysets, txib) - } - - blk.Payset = paysets - return blk -} - -// Helpers for working with data objects. -func onlineDetail(b byte, bal uint64) basics.AccountDetail { - return detail(b, bal, true) -} - -func detail(b byte, bal uint64, isOnline bool) basics.AccountDetail { - state := basics.Offline - if isOnline { - state = basics.Online - } - return basics.AccountDetail{ - Address: basics.Address{b}, - Algos: basics.MicroAlgos{Raw: bal}, - Status: state, - } -} - -func getInt(detail basics.AccountDetail) uint64 { - return uint64([32]byte(detail.Address)[0]) -} - -func verifyAccountBalances(expected []uint64, actual []basics.AccountDetail) error { - if len(expected) != len(actual) { - return &errorString{fmt.Sprintf("Lengths do not equal: expected(%d) != actual(%d)", len(expected), len(actual))} - } - - for i, a := range actual { - if expected[i] != a.Algos.Raw { - return &errorString{fmt.Sprintf("Unexpected result at actual[%d]: expected(%d) != actual(%d)", i, expected[i], a.Algos.Raw)} - } - } - - return nil -} - -func verifyListener(listener topAccountListener, expected []uint64, online uint64, total uint64, round uint64) error { - if listener.round != basics.Round(round) { - return &errorString{fmt.Sprintf("Unexpected round: actual(%d) != expected(%d)", uint64(listener.round), round)} - } - - if listener.onlineCirculation.Raw != online { - return &errorString{fmt.Sprintf("Unexpected online circulation: actual(%d) != expected(%d)", listener.onlineCirculation.Raw, online)} - } - - if listener.totalCirculation.Raw != total { - return &errorString{fmt.Sprintf("Unexpected total circulation: actual(%d) != expected(%d)", listener.totalCirculation.Raw, total)} - } - - return verifyAccountBalances(expected, listener.accounts) -}