diff --git a/README.md b/README.md index ee2e9d78c..99e84347a 100644 --- a/README.md +++ b/README.md @@ -195,6 +195,7 @@ Settings can be provided from the command line, a configuration file, or an envi | default-balances-limit | | default-balances-limit | INDEXER_DEFAULT_BALANCES_LIMIT | | max-applications-limit | | max-applications-limit | INDEXER_MAX_APPLICATIONS_LIMIT | | default-applications-limit | | default-applications-limit | INDEXER_DEFAULT_APPLICATIONS_LIMIT | +| data-dir | i | data | INDEXER_DATA | ## Command line diff --git a/api/handlers_e2e_test.go b/api/handlers_e2e_test.go index 02ec85027..6cabbc454 100644 --- a/api/handlers_e2e_test.go +++ b/api/handlers_e2e_test.go @@ -13,15 +13,19 @@ import ( "time" "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/crypto/merklesignature" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/rpcs" + "github.com/algorand/indexer/processor" + "github.com/algorand/indexer/processor/blockprocessor" "github.com/labstack/echo/v4" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/algorand/go-algorand-sdk/encoding/json" - "github.com/algorand/go-algorand/crypto/merklesignature" - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/indexer/api/generated/v2" @@ -56,7 +60,7 @@ func testServerImplementation(db idb.IndexerDb) *ServerImplementation { return &ServerImplementation{db: db, timeout: 30 * time.Second, opts: defaultOpts} } -func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeeping.Block) (*postgres.IndexerDb /*db*/, func() /*shutdownFunc*/) { +func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*postgres.IndexerDb, func(), processor.Processor, *ledger.Ledger) { _, connStr, shutdownFunc := pgtest.SetupPostgres(t) db, _, err := postgres.OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) @@ -70,15 +74,16 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeepin err = db.LoadGenesis(genesis) require.NoError(t, err) - err = db.AddBlock(&genesisBlock) - require.NoError(t, err) - - return db, newShutdownFunc + l := test.MakeTestLedger("ledger") + proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock) + require.NoError(t, err, "failed to open ledger") + return db, newShutdownFunc, proc, l } func TestApplicationHandlers(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // A block containing an app call txn with ExtraProgramPages, that the creator and another account have opted into @@ -112,8 +117,8 @@ func TestApplicationHandlers(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn, &optInTxnA, &optInTxnB) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) ////////// // When // We query the app @@ -218,8 +223,9 @@ func TestApplicationHandlers(t *testing.T) { } func TestAccountExcludeParameters(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // A block containing a creator of an app, an asset, who also holds and has opted-into those apps. @@ -238,8 +244,8 @@ func TestAccountExcludeParameters(t *testing.T) { &appOptInTxnA, &appOptInTxnB, &assetOptInTxnA, &assetOptInTxnB) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) ////////// // When // We look up the address using various exclude parameters. @@ -389,8 +395,9 @@ type accountsErrorResponse struct { } func TestAccountMaxResultsLimit(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // A block containing an address that has created 10 apps, deleted 5 apps, and created 10 assets, @@ -443,8 +450,8 @@ func TestAccountMaxResultsLimit(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, ptxns...) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) ////////// // When // We look up the address using a ServerImplementation with a maxAccountsAPIResults limit set, @@ -768,8 +775,9 @@ func TestAccountMaxResultsLimit(t *testing.T) { } func TestBlockNotFound(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, _, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // An empty database. @@ -833,8 +841,9 @@ func TestInnerTxn(t *testing.T) { }, } - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // a DB with some inner txns in it. @@ -845,8 +854,8 @@ func TestInnerTxn(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { @@ -881,8 +890,9 @@ func TestInnerTxn(t *testing.T) { // transaction group does not allow the root transaction to be returned on both // pages. func TestPagingRootTxnDeduplication(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // a DB with some inner txns in it. @@ -897,8 +907,8 @@ func TestPagingRootTxnDeduplication(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) testcases := []struct { name string @@ -1004,8 +1014,9 @@ func TestPagingRootTxnDeduplication(t *testing.T) { } func TestKeyregTransactionWithStateProofKeys(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // A block containing a key reg txn with state proof key @@ -1044,8 +1055,8 @@ func TestKeyregTransactionWithStateProofKeys(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) e := echo.New() { @@ -1098,8 +1109,9 @@ func TestVersion(t *testing.T) { /////////// // Given // An API and context /////////// - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, _, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() api := testServerImplementation(db) e := echo.New() @@ -1127,8 +1139,9 @@ func TestVersion(t *testing.T) { } func TestAccountClearsNonUTF8(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // a DB with some inner txns in it. @@ -1147,8 +1160,8 @@ func TestAccountClearsNonUTF8(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &createAsset) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) verify := func(params generated.AssetParams) { compareB64 := func(expected string, actual *[]byte) { @@ -1257,8 +1270,9 @@ func TestLookupInnerLogs(t *testing.T) { }, } - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // a DB with some inner txns in it. @@ -1268,8 +1282,8 @@ func TestLookupInnerLogs(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { @@ -1355,8 +1369,9 @@ func TestLookupMultiInnerLogs(t *testing.T) { }, } - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis()) defer shutdownFunc() + defer l.Close() /////////// // Given // a DB with some inner txns in it. @@ -1366,8 +1381,8 @@ func TestLookupMultiInnerLogs(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { diff --git a/cmd/algorand-indexer/daemon.go b/cmd/algorand-indexer/daemon.go index b105177bb..8913e8325 100644 --- a/cmd/algorand-indexer/daemon.go +++ b/cmd/algorand-indexer/daemon.go @@ -3,6 +3,8 @@ package main import ( "context" "fmt" + "io" + "io/ioutil" "os" "os/signal" "strings" @@ -10,9 +12,9 @@ import ( "syscall" "time" - "github.com/spf13/cobra" - "github.com/spf13/viper" - + "github.com/algorand/go-algorand-sdk/client/v2/algod" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/rpcs" "github.com/algorand/indexer/api" "github.com/algorand/indexer/api/generated/v2" @@ -20,7 +22,11 @@ import ( "github.com/algorand/indexer/fetcher" "github.com/algorand/indexer/idb" "github.com/algorand/indexer/importer" + "github.com/algorand/indexer/processor" + "github.com/algorand/indexer/processor/blockprocessor" "github.com/algorand/indexer/util/metrics" + "github.com/spf13/cobra" + "github.com/spf13/viper" ) var ( @@ -76,10 +82,6 @@ var daemonCmd = &cobra.Command{ algodDataDir = os.Getenv("ALGORAND_DATA") } - if indexerDataDir == "" { - indexerDataDir = os.Getenv("INDEXER_DATA") - } - if indexerDataDir != "" { if _, err := os.Stat(indexerDataDir); os.IsNotExist(err) { err := os.Mkdir(indexerDataDir, 0755) @@ -123,6 +125,10 @@ var daemonCmd = &cobra.Command{ defer db.Close() var wg sync.WaitGroup if bot != nil { + if indexerDataDir == "" { + fmt.Fprint(os.Stderr, "missing indexer data directory") + os.Exit(1) + } wg.Add(1) go func() { defer wg.Done() @@ -135,13 +141,22 @@ var daemonCmd = &cobra.Command{ _, err := importer.EnsureInitialImport(db, genesisReader, logger) maybeFail(err, "importer.EnsureInitialImport() error") logger.Info("Initializing block import handler.") + imp := importer.NewImporter(db) - nextRound, err := db.GetNextRoundToAccount() - maybeFail(err, "failed to get next round, %v", err) - bot.SetNextRound(nextRound) + logger.Info("Initializing local ledger.") + genesisReader = importer.GetGenesisFile(genesisJSONPath, bot.Algod(), logger) + genesis, err := readGenesis(genesisReader) + maybeFail(err, "Error reading genesis file") + genesisBlock, err := getGenesisBlock(bot.Algod()) + maybeFail(err, "Error getting genesis block") - imp := importer.NewImporter(db) - handler := blockHandler(imp, 1*time.Second) + proc, err := blockprocessor.MakeProcessor(&genesis, &genesisBlock, indexerDataDir, imp.ImportBlock) + if err != nil { + maybeFail(err, "blockprocessor.MakeProcessor() err %v", err) + } + + bot.SetNextRound(proc.NextRoundToProcess()) + handler := blockHandler(proc, 1*time.Second) bot.SetBlockHandler(handler) logger.Info("Starting block importer.") @@ -203,6 +218,8 @@ func init() { viper.RegisterAlias("algod-net", "algod-address") viper.RegisterAlias("server", "server-address") viper.RegisterAlias("token", "api-token") + viper.RegisterAlias("data-dir", "data") + } // makeOptions converts CLI options to server options @@ -265,10 +282,10 @@ func makeOptions() (options api.ExtraOptions) { // blockHandler creates a handler complying to the fetcher block handler interface. In case of a failure it keeps // attempting to add the block until the fetcher shuts down. -func blockHandler(imp importer.Importer, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error { +func blockHandler(proc processor.Processor, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error { return func(ctx context.Context, block *rpcs.EncodedBlockCert) error { for { - err := handleBlock(block, imp) + err := handleBlock(block, proc) if err == nil { // return on success. return nil @@ -285,12 +302,12 @@ func blockHandler(imp importer.Importer, retryDelay time.Duration) func(context. } } -func handleBlock(block *rpcs.EncodedBlockCert, imp importer.Importer) error { +func handleBlock(block *rpcs.EncodedBlockCert, proc processor.Processor) error { start := time.Now() - err := imp.ImportBlock(block) + err := proc.Process(block) if err != nil { logger.WithError(err).Errorf( - "adding block %d to database failed", block.Block.Round()) + "block %d import failed", block.Block.Round()) return fmt.Errorf("handleBlock() err: %w", err) } dt := time.Since(start) @@ -313,3 +330,34 @@ func handleBlock(block *rpcs.EncodedBlockCert, imp importer.Importer) error { return nil } + +func readGenesis(reader io.Reader) (bookkeeping.Genesis, error) { + var genesis bookkeeping.Genesis + if reader == nil { + return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: reader is nil") + } + gbytes, err := ioutil.ReadAll(reader) + if err != nil { + return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err) + } + err = protocol.DecodeJSON(gbytes, &genesis) + if err != nil { + return bookkeeping.Genesis{}, fmt.Errorf("readGenesis() err: %w", err) + } + return genesis, nil +} + +func getGenesisBlock(client *algod.Client) (bookkeeping.Block, error) { + data, err := client.BlockRaw(0).Do(context.Background()) + if err != nil { + return bookkeeping.Block{}, fmt.Errorf("getGenesisBlock() client err: %w", err) + } + + var block rpcs.EncodedBlockCert + err = protocol.Decode(data, &block) + if err != nil { + return bookkeeping.Block{}, fmt.Errorf("getGenesisBlock() decode err: %w", err) + } + + return block.Block, nil +} diff --git a/cmd/algorand-indexer/daemon_test.go b/cmd/algorand-indexer/daemon_test.go index d83c33dbb..4669e04d4 100644 --- a/cmd/algorand-indexer/daemon_test.go +++ b/cmd/algorand-indexer/daemon_test.go @@ -3,12 +3,18 @@ package main import ( "context" "errors" + "io" + "strings" "sync" "testing" "time" + "github.com/algorand/go-algorand-sdk/encoding/json" "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/rpcs" + "github.com/algorand/indexer/processor/blockprocessor" + itest "github.com/algorand/indexer/util/test" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" ) @@ -16,10 +22,10 @@ import ( type mockImporter struct { } -var errMockImportBlock = errors.New("mock import block error") +var errMockImportBlock = errors.New("Process() invalid round blockCert.Block.Round(): 1234 nextRoundToProcess: 1") -func (imp *mockImporter) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error { - return errMockImportBlock +func (imp *mockImporter) ImportBlock(vb *ledgercore.ValidatedBlock) error { + return nil } func TestImportRetryAndCancel(t *testing.T) { @@ -33,7 +39,12 @@ func TestImportRetryAndCancel(t *testing.T) { // create handler with mock importer and start, it should generate errors until cancelled. imp := &mockImporter{} - handler := blockHandler(imp, 50*time.Millisecond) + l := itest.MakeTestLedger("ledger") + defer l.Close() + proc, err := blockprocessor.MakeProcessorWithLedger(l, nil) + assert.Nil(t, err) + proc.SetHandler(imp.ImportBlock) + handler := blockHandler(proc, 50*time.Millisecond) var wg sync.WaitGroup wg.Add(1) go func() { @@ -54,7 +65,7 @@ func TestImportRetryAndCancel(t *testing.T) { } for _, entry := range hook.Entries { - assert.Equal(t, entry.Message, "adding block 1234 to database failed") + assert.Equal(t, entry.Message, "block 1234 import failed") assert.Equal(t, entry.Data["error"], errMockImportBlock) } @@ -62,3 +73,32 @@ func TestImportRetryAndCancel(t *testing.T) { cancel() wg.Wait() } + +func TestReadGenesis(t *testing.T) { + var reader io.Reader + // nil reader + _, err := readGenesis(reader) + assert.Contains(t, err.Error(), "readGenesis() err: reader is nil") + // no match struct field + genesisStr := "{\"version\": 2}" + reader = strings.NewReader(genesisStr) + _, err = readGenesis(reader) + assert.Contains(t, err.Error(), "json decode error") + + genesis := bookkeeping.Genesis{ + SchemaID: "1", + Network: "test", + Proto: "test", + RewardsPool: "AAAA", + FeeSink: "AAAA", + } + + // read and decode genesis + reader = strings.NewReader(string(json.Encode(genesis))) + _, err = readGenesis(reader) + assert.Nil(t, err) + // read from empty reader + _, err = readGenesis(reader) + assert.Contains(t, err.Error(), "readGenesis() err: EOF") + +} diff --git a/cmd/import-validator/core/service.go b/cmd/import-validator/core/service.go index 075c7d46b..e37e1d82f 100644 --- a/cmd/import-validator/core/service.go +++ b/cmd/import-validator/core/service.go @@ -25,6 +25,7 @@ import ( "github.com/algorand/indexer/fetcher" "github.com/algorand/indexer/idb" "github.com/algorand/indexer/idb/postgres" + "github.com/algorand/indexer/processor/blockprocessor" "github.com/algorand/indexer/util" ) @@ -92,7 +93,8 @@ func openIndexerDb(postgresConnStr string, genesis *bookkeeping.Genesis, genesis } if nextRound == 0 { - err = db.AddBlock(genesisBlock) + vb := ledgercore.MakeValidatedBlock(*genesisBlock, ledgercore.StateDelta{}) + err = db.AddBlock(&vb) if err != nil { return nil, fmt.Errorf("openIndexerDb() err: %w", err) } @@ -119,13 +121,13 @@ func openLedger(ledgerPath string, genesis *bookkeeping.Genesis, genesisBlock *b GenesisHash: genesisBlock.GenesisHash(), } - ledger, err := ledger.OpenLedger( + l, err := ledger.OpenLedger( logger, path.Join(ledgerPath, "ledger"), false, initState, config.GetDefaultLocal()) if err != nil { return nil, fmt.Errorf("openLedger() open err: %w", err) } - return ledger, nil + return l, nil } func getModifiedState(l *ledger.Ledger, block *bookkeeping.Block) (map[basics.Address]struct{}, map[basics.Address]map[ledger.Creatable]struct{}, error) { @@ -349,9 +351,14 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg if nextRoundLedger >= nextRoundIndexer { wg.Add(1) + prc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock) + if err != nil { + return fmt.Errorf("catchup() err: %w", err) + } + blockCert := rpcs.EncodedBlockCert{Block: block.Block, Certificate: block.Certificate} go func() { start := time.Now() - err1 = db.AddBlock(&block.Block) + err1 = prc.Process(&blockCert) fmt.Printf( "%d transactions imported in %v\n", len(block.Block.Payset), time.Since(start)) diff --git a/idb/dummy/dummy.go b/idb/dummy/dummy.go index d4cc71980..7e7c5b842 100644 --- a/idb/dummy/dummy.go +++ b/idb/dummy/dummy.go @@ -5,6 +5,7 @@ import ( "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/ledger/ledgercore" log "github.com/sirupsen/logrus" "github.com/algorand/indexer/idb" @@ -22,7 +23,7 @@ func IndexerDb() idb.IndexerDb { func (db *dummyIndexerDb) Close() { } -func (db *dummyIndexerDb) AddBlock(block *bookkeeping.Block) error { +func (db *dummyIndexerDb) AddBlock(block *ledgercore.ValidatedBlock) error { db.log.Printf("AddBlock") return nil } diff --git a/idb/idb.go b/idb/idb.go index 82901ed1c..9d7ea63e3 100644 --- a/idb/idb.go +++ b/idb/idb.go @@ -13,6 +13,7 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/ledger/ledgercore" models "github.com/algorand/indexer/api/generated/v2" ) @@ -161,7 +162,7 @@ type IndexerDb interface { Close() // Import a block and do the accounting. - AddBlock(block *bookkeeping.Block) error + AddBlock(block *ledgercore.ValidatedBlock) error LoadGenesis(genesis bookkeeping.Genesis) (err error) diff --git a/idb/mocks/IndexerDb.go b/idb/mocks/IndexerDb.go index 0de088c21..b9c104750 100644 --- a/idb/mocks/IndexerDb.go +++ b/idb/mocks/IndexerDb.go @@ -9,6 +9,8 @@ import ( idb "github.com/algorand/indexer/idb" + ledgercore "github.com/algorand/go-algorand/ledger/ledgercore" + mock "github.com/stretchr/testify/mock" transactions "github.com/algorand/go-algorand/data/transactions" @@ -20,11 +22,11 @@ type IndexerDb struct { } // AddBlock provides a mock function with given fields: block -func (_m *IndexerDb) AddBlock(block *bookkeeping.Block) error { +func (_m *IndexerDb) AddBlock(block *ledgercore.ValidatedBlock) error { ret := _m.Called(block) var r0 error - if rf, ok := ret.Get(0).(func(*bookkeeping.Block) error); ok { + if rf, ok := ret.Get(0).(func(*ledgercore.ValidatedBlock) error); ok { r0 = rf(block) } else { r0 = ret.Error(0) diff --git a/idb/postgres/postgres.go b/idb/postgres/postgres.go index 1e3e490de..0fca17ce6 100644 --- a/idb/postgres/postgres.go +++ b/idb/postgres/postgres.go @@ -39,7 +39,6 @@ import ( pgutil "github.com/algorand/indexer/idb/postgres/internal/util" "github.com/algorand/indexer/idb/postgres/internal/writer" "github.com/algorand/indexer/util" - "github.com/algorand/indexer/util/metrics" ) var serializable = pgx.TxOptions{IsoLevel: pgx.Serializable} // be a real ACID database @@ -251,7 +250,8 @@ func prepareEvalResources(l *ledger_for_evaluator.LedgerForEvaluator, block *boo } // AddBlock is part of idb.IndexerDb. -func (db *IndexerDb) AddBlock(block *bookkeeping.Block) error { +func (db *IndexerDb) AddBlock(vb *ledgercore.ValidatedBlock) error { + block := vb.Block() db.log.Printf("adding block %d", block.Round()) db.accountingLock.Lock() @@ -263,115 +263,96 @@ func (db *IndexerDb) AddBlock(block *bookkeeping.Block) error { if err != nil { return fmt.Errorf("AddBlock() err: %w", err) } - if block.Round() != basics.Round(importstate.NextRoundToAccount) { + if block.Round() > basics.Round(importstate.NextRoundToAccount) { return fmt.Errorf( "AddBlock() adding block round %d but next round to account is %d", block.Round(), importstate.NextRoundToAccount) - } - importstate.NextRoundToAccount++ - err = db.setImportState(tx, &importstate) - if err != nil { - return fmt.Errorf("AddBlock() err: %w", err) - } - - w, err := writer.MakeWriter(tx) - if err != nil { - return fmt.Errorf("AddBlock() err: %w", err) - } - defer w.Close() - - if block.Round() == basics.Round(0) { - // Block 0 is special, we cannot run the evaluator on it. - err := w.AddBlock0(block) + } else if block.Round() == basics.Round(importstate.NextRoundToAccount) { + importstate.NextRoundToAccount++ + err = db.setImportState(tx, &importstate) if err != nil { return fmt.Errorf("AddBlock() err: %w", err) } - } else { - proto, ok := config.Consensus[block.BlockHeader.CurrentProtocol] - if !ok { - return fmt.Errorf( - "AddBlock() cannot find proto version %s", block.BlockHeader.CurrentProtocol) - } - protoChanged := !proto.EnableAssetCloseAmount - proto.EnableAssetCloseAmount = true - - var wg sync.WaitGroup - defer wg.Wait() - - // Write transaction participation and possibly transactions in a parallel db - // transaction. If `proto.EnableAssetCloseAmount` is already true, we can start - // writing transactions contained in the block early. - var err0 error - wg.Add(1) - go func() { - defer wg.Done() - - f := func(tx pgx.Tx) error { - if !protoChanged { - err := writer.AddTransactions(block, block.Payset, tx) - if err != nil { - return err - } - } - return writer.AddTransactionParticipation(block, tx) - } - err0 = db.txWithRetry(serializable, f) - }() - ledgerForEval, err := - ledger_for_evaluator.MakeLedgerForEvaluator(tx, block.Round()-1) + w, err := writer.MakeWriter(tx) if err != nil { return fmt.Errorf("AddBlock() err: %w", err) } - defer ledgerForEval.Close() + defer w.Close() - resources, err := prepareEvalResources(&ledgerForEval, block) - if err != nil { - return fmt.Errorf("AddBlock() eval err: %w", err) - } + if block.Round() == basics.Round(0) { + // Block 0 is special, we cannot run the evaluator on it. + err := w.AddBlock0(&block) + if err != nil { + return fmt.Errorf("AddBlock() err: %w", err) + } + } else { + proto, ok := config.Consensus[block.BlockHeader.CurrentProtocol] + if !ok { + return fmt.Errorf( + "AddBlock() cannot find proto version %s", block.BlockHeader.CurrentProtocol) + } + protoChanged := !proto.EnableAssetCloseAmount + proto.EnableAssetCloseAmount = true - start := time.Now() - delta, modifiedTxns, err := - ledger.EvalForIndexer(ledgerForEval, block, proto, resources) - if err != nil { - return fmt.Errorf("AddBlock() eval err: %w", err) - } - metrics.PostgresEvalTimeSeconds.Observe(time.Since(start).Seconds()) + var wg sync.WaitGroup + defer wg.Wait() - var err1 error - // Skip if transaction writing has already started. - if protoChanged { - // Write transactions in a parallel db transaction. + // Write transaction participation and possibly transactions in a parallel db + // transaction. If `proto.EnableAssetCloseAmount` is already true, we can start + // writing transactions contained in the block early. + var err0 error wg.Add(1) go func() { defer wg.Done() f := func(tx pgx.Tx) error { - return writer.AddTransactions(block, modifiedTxns, tx) + if !protoChanged { + err := writer.AddTransactions(&block, block.Payset, tx) + if err != nil { + return err + } + } + return writer.AddTransactionParticipation(&block, tx) } - err1 = db.txWithRetry(serializable, f) + err0 = db.txWithRetry(serializable, f) }() - } - err = w.AddBlock(block, modifiedTxns, delta) - if err != nil { - return fmt.Errorf("AddBlock() err: %w", err) - } + var err1 error + // Skip if transaction writing has already started. + if protoChanged { + // Write transactions in a parallel db transaction. + wg.Add(1) + go func() { + defer wg.Done() - // Wait for goroutines to finish and check for errors. If there is an error, we - // return our own error so that the main transaction does not commit. Hence, - // `txn` and `txn_participation` tables can only be ahead but not behind - // the other state. - wg.Wait() - isUniqueViolationFunc := func(err error) bool { - var pgerr *pgconn.PgError - return errors.As(err, &pgerr) && (pgerr.Code == pgerrcode.UniqueViolation) - } - if (err0 != nil) && !isUniqueViolationFunc(err0) { - return fmt.Errorf("AddBlock() err0: %w", err0) - } - if (err1 != nil) && !isUniqueViolationFunc(err1) { - return fmt.Errorf("AddBlock() err1: %w", err1) + f := func(tx pgx.Tx) error { + return writer.AddTransactions(&block, block.Payset, tx) + } + err1 = db.txWithRetry(serializable, f) + }() + } + + err = w.AddBlock(&block, block.Payset, vb.Delta()) + if err != nil { + return fmt.Errorf("AddBlock() err: %w", err) + } + + // Wait for goroutines to finish and check for errors. If there is an error, we + // return our own error so that the main transaction does not commit. Hence, + // `txn` and `txn_participation` tables can only be ahead but not behind + // the other state. + wg.Wait() + isUniqueViolationFunc := func(err error) bool { + var pgerr *pgconn.PgError + return errors.As(err, &pgerr) && (pgerr.Code == pgerrcode.UniqueViolation) + } + if (err0 != nil) && !isUniqueViolationFunc(err0) { + return fmt.Errorf("AddBlock() err0: %w", err0) + } + if (err1 != nil) && !isUniqueViolationFunc(err1) { + return fmt.Errorf("AddBlock() err1: %w", err1) + } } } diff --git a/idb/postgres/postgres_integration_common_test.go b/idb/postgres/postgres_integration_common_test.go index 4b4efa3cd..f8740eba4 100644 --- a/idb/postgres/postgres_integration_common_test.go +++ b/idb/postgres/postgres_integration_common_test.go @@ -5,6 +5,11 @@ import ( "testing" "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/indexer/processor" + "github.com/algorand/indexer/processor/blockprocessor" + "github.com/algorand/indexer/util/test" "github.com/jackc/pgx/v4/pgxpool" "github.com/stretchr/testify/require" @@ -19,13 +24,14 @@ func setupIdbWithConnectionString(t *testing.T, connStr string, genesis bookkeep err = idb.LoadGenesis(genesis) require.NoError(t, err) - err = idb.AddBlock(&genesisBlock) + vb := ledgercore.MakeValidatedBlock(genesisBlock, ledgercore.StateDelta{}) + err = idb.AddBlock(&vb) require.NoError(t, err) return idb } -func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeeping.Block) (*IndexerDb /*db*/, func() /*shutdownFunc*/) { +func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeeping.Block) (*IndexerDb /*db*/, func() /*shutdownFunc*/, processor.Processor, *ledger.Ledger) { _, connStr, shutdownFunc := pgtest.SetupPostgres(t) db := setupIdbWithConnectionString(t, connStr, genesis, genesisBlock) @@ -34,7 +40,11 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeepin shutdownFunc() } - return db, newShutdownFunc + l := test.MakeTestLedger("ledger") + proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock) + require.NoError(t, err, "failed to open ledger") + + return db, newShutdownFunc, proc, l } // Helper to execute a query returning an integer, for example COUNT(*). Returns -1 on an error. diff --git a/idb/postgres/postgres_integration_test.go b/idb/postgres/postgres_integration_test.go index 51dc07de8..a77d218f8 100644 --- a/idb/postgres/postgres_integration_test.go +++ b/idb/postgres/postgres_integration_test.go @@ -14,20 +14,22 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/rpcs" "github.com/algorand/go-codec/codec" + "github.com/algorand/indexer/api/generated/v2" + "github.com/algorand/indexer/idb/postgres/internal/encoding" + "github.com/algorand/indexer/idb/postgres/internal/schema" + pgutil "github.com/algorand/indexer/idb/postgres/internal/util" "github.com/algorand/indexer/importer" + "github.com/algorand/indexer/processor/blockprocessor" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/algorand/indexer/api/generated/v2" "github.com/algorand/indexer/idb" - "github.com/algorand/indexer/idb/postgres/internal/encoding" - "github.com/algorand/indexer/idb/postgres/internal/schema" pgtest "github.com/algorand/indexer/idb/postgres/internal/testing" - pgutil "github.com/algorand/indexer/idb/postgres/internal/util" "github.com/algorand/indexer/util/test" ) @@ -147,8 +149,9 @@ func assertAccountAsset(t *testing.T, db *pgxpool.Pool, addr basics.Address, ass // TestAssetCloseReopenTransfer tests a scenario that requires asset subround accounting func TestAssetCloseReopenTransfer(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() assetid := uint64(1) amt := uint64(10000) @@ -157,17 +160,13 @@ func TestAssetCloseReopenTransfer(t *testing.T) { /////////// // Given // A round scenario requiring subround accounting: AccountA is funded, closed, opts back, and funded again. /////////// - createAsset := test.MakeAssetConfigTxn( - 0, total, uint64(6), false, "mcn", "my coin", "http://antarctica.com", test.AccountD) + createAsset := test.MakeAssetConfigTxn(0, total, uint64(6), false, "mcn", "my coin", "http://antarctica.com", test.AccountD) optInA := test.MakeAssetOptInTxn(assetid, test.AccountA) - fundA := test.MakeAssetTransferTxn( - assetid, amt, test.AccountD, test.AccountA, basics.Address{}) + fundA := test.MakeAssetTransferTxn(assetid, amt, test.AccountD, test.AccountA, basics.Address{}) optInB := test.MakeAssetOptInTxn(assetid, test.AccountB) optInC := test.MakeAssetOptInTxn(assetid, test.AccountC) - closeA := test.MakeAssetTransferTxn( - assetid, 1000, test.AccountA, test.AccountB, test.AccountC) - payMain := test.MakeAssetTransferTxn( - assetid, amt, test.AccountD, test.AccountA, basics.Address{}) + closeA := test.MakeAssetTransferTxn(assetid, 1000, test.AccountA, test.AccountB, test.AccountC) + payMain := test.MakeAssetTransferTxn(assetid, amt, test.AccountD, test.AccountA, basics.Address{}) block, err := test.MakeBlockForTxns( test.MakeGenesisBlock().BlockHeader, &createAsset, &optInA, &fundA, &optInB, @@ -177,7 +176,7 @@ func TestAssetCloseReopenTransfer(t *testing.T) { ////////// // When // We commit the block to the database ////////// - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -195,8 +194,9 @@ func TestAssetCloseReopenTransfer(t *testing.T) { // TestReCreateAssetHolding checks the optin value of a defunct func TestReCreateAssetHolding(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() total := uint64(1000000) @@ -226,7 +226,7 @@ func TestReCreateAssetHolding(t *testing.T) { ////////// // When // We commit the round accounting to the database. ////////// - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -238,8 +238,9 @@ func TestReCreateAssetHolding(t *testing.T) { // TestMultipleAssetOptins make sure no-op transactions don't reset the default frozen value. func TestNoopOptins(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() /////////// // Given // @@ -261,7 +262,7 @@ func TestNoopOptins(t *testing.T) { ////////// // When // We commit the round accounting to the database. ////////// - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -272,8 +273,9 @@ func TestNoopOptins(t *testing.T) { // TestMultipleWriters tests that accounting cannot be double committed. func TestMultipleWriters(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() amt := uint64(10000) @@ -299,7 +301,7 @@ func TestMultipleWriters(t *testing.T) { go func() { defer wg.Done() <-start - errors <- db.AddBlock(&block) + errors <- proc.Process(&rpcs.EncodedBlockCert{Block: block}) }() } close(start) @@ -328,8 +330,9 @@ func TestMultipleWriters(t *testing.T) { // TestBlockWithTransactions tests that the block with transactions endpoint works. func TestBlockWithTransactions(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() round := uint64(1) assetid := uint64(1) @@ -358,7 +361,8 @@ func TestBlockWithTransactions(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, txns...) require.NoError(t, err) - err = db.AddBlock(&block) + + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -387,8 +391,9 @@ func TestBlockWithTransactions(t *testing.T) { } func TestRekeyBasic(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() /////////// // Given // Send rekey transaction @@ -398,7 +403,7 @@ func TestRekeyBasic(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -415,8 +420,9 @@ func TestRekeyBasic(t *testing.T) { } func TestRekeyToItself(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() /////////// // Given // Send rekey transactions @@ -426,7 +432,7 @@ func TestRekeyToItself(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) txn = test.MakePaymentTxn( @@ -435,7 +441,7 @@ func TestRekeyToItself(t *testing.T) { block, err = test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -452,8 +458,9 @@ func TestRekeyToItself(t *testing.T) { } func TestRekeyThreeTimesInSameRound(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() /////////// // Given // Send rekey transaction @@ -466,11 +473,12 @@ func TestRekeyThreeTimesInSameRound(t *testing.T) { basics.Address{}) txn2 := test.MakePaymentTxn( 1000, 0, 0, 0, 0, 0, test.AccountA, test.AccountA, basics.Address{}, test.AccountC) + block, err := test.MakeBlockForTxns( test.MakeGenesisBlock().BlockHeader, &txn0, &txn1, &txn2) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -487,8 +495,9 @@ func TestRekeyThreeTimesInSameRound(t *testing.T) { } func TestRekeyToItselfHasNotBeenRekeyed(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + _, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() /////////// // Given // Send rekey transaction @@ -502,14 +511,15 @@ func TestRekeyToItselfHasNotBeenRekeyed(t *testing.T) { ////////// // Then // No error when committing to the DB. ////////// - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) } // TestIgnoreDefaultFrozenConfigUpdate the creator asset holding should ignore default-frozen = true. func TestIgnoreDefaultFrozenConfigUpdate(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() assetid := uint64(1) total := uint64(1000000) @@ -533,7 +543,7 @@ func TestIgnoreDefaultFrozenConfigUpdate(t *testing.T) { ////////// // When // We commit the round accounting to the database. ////////// - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -546,8 +556,9 @@ func TestIgnoreDefaultFrozenConfigUpdate(t *testing.T) { // TestZeroTotalAssetCreate tests that the asset holding with total of 0 is created. func TestZeroTotalAssetCreate(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() assetid := uint64(1) total := uint64(0) @@ -564,7 +575,7 @@ func TestZeroTotalAssetCreate(t *testing.T) { ////////// // When // We commit the round accounting to the database. ////////// - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -608,8 +619,9 @@ func assertAssetHoldingDates(t *testing.T, db *pgxpool.Pool, address basics.Addr } func TestDestroyAssetBasic(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() assetID := uint64(1) @@ -618,7 +630,7 @@ func TestDestroyAssetBasic(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Destroy an asset. @@ -626,7 +638,7 @@ func TestDestroyAssetBasic(t *testing.T) { block, err = test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Check that the asset is deleted. @@ -643,8 +655,9 @@ func TestDestroyAssetBasic(t *testing.T) { } func TestDestroyAssetZeroSupply(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() assetID := uint64(1) @@ -654,7 +667,7 @@ func TestDestroyAssetZeroSupply(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0, &txn1) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Check that the asset is deleted. @@ -671,9 +684,11 @@ func TestDestroyAssetZeroSupply(t *testing.T) { } func TestDestroyAssetDeleteCreatorsHolding(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + assetID := uint64(1) // Create an asset. Create a transaction where all special addresses are different @@ -708,7 +723,7 @@ func TestDestroyAssetDeleteCreatorsHolding(t *testing.T) { block, err := test.MakeBlockForTxns( test.MakeGenesisBlock().BlockHeader, &txn0, &txn1, &txn2) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Check that the creator's asset holding is deleted. @@ -731,9 +746,11 @@ func TestDestroyAssetDeleteCreatorsHolding(t *testing.T) { // Test that block import adds the freeze/sender accounts to txn_participation. func TestAssetFreezeTxnParticipation(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + /////////// // Given // A block containing an asset freeze txn /////////// @@ -754,7 +771,7 @@ func TestAssetFreezeTxnParticipation(t *testing.T) { ////////// // When // We import the block. ////////// - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -774,9 +791,11 @@ func TestAssetFreezeTxnParticipation(t *testing.T) { // Test that block import adds accounts from inner txns to txn_participation. func TestInnerTxnParticipation(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + /////////// // Given // A block containing an app call txn with inners /////////// @@ -796,7 +815,7 @@ func TestInnerTxnParticipation(t *testing.T) { ////////// // When // We import the block. ////////// - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) ////////// @@ -819,9 +838,11 @@ func TestInnerTxnParticipation(t *testing.T) { } func TestAppExtraPages(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + // Create an app. // Create a transaction with ExtraProgramPages field set to 1 @@ -847,8 +868,8 @@ func TestAppExtraPages(t *testing.T) { block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) - require.NoError(t, err, "failed to commit") + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) + require.NoError(t, err) row := db.db.QueryRow(context.Background(), "SELECT index, params FROM app WHERE creator = $1", test.AccountA[:]) @@ -910,9 +931,11 @@ func assertKeytype(t *testing.T, db *IndexerDb, address basics.Address, keytype func TestKeytypeBasic(t *testing.T) { block := test.MakeGenesisBlock() - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), block) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + assertKeytype(t, db, test.AccountA, nil) // Sig @@ -921,7 +944,7 @@ func TestKeytypeBasic(t *testing.T) { block, err := test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) keytype := "sig" @@ -935,7 +958,7 @@ func TestKeytypeBasic(t *testing.T) { block, err = test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) keytype = "msig" @@ -945,16 +968,18 @@ func TestKeytypeBasic(t *testing.T) { // Test that asset amount >= 2^63 is handled correctly. Due to the specifics of // postgres it might be a problem. func TestLargeAssetAmount(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + assetid := uint64(1) txn := test.MakeAssetConfigTxn( 0, math.MaxUint64, 0, false, "mc", "mycoin", "", test.AccountA) block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) { @@ -1086,9 +1111,11 @@ func TestNonDisplayableUTF8(t *testing.T) { url := testcase.AssetURL t.Run(testcase.Name, func(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + txn := test.MakeAssetConfigTxn( 0, math.MaxUint64, 0, false, unit, name, url, test.AccountA) // Try to add cheeky inner txns lazily by adding an AD to the acfg txn @@ -1101,7 +1128,7 @@ func TestNonDisplayableUTF8(t *testing.T) { require.NoError(t, err) // Test 1: import/accounting should work. - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Test 2: asset results properly serialized @@ -1168,9 +1195,11 @@ func TestNonDisplayableUTF8(t *testing.T) { // TestReconfigAsset make sure we properly handle asset param merges. func TestReconfigAsset(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + unit := "co\000in" name := "algo" url := "https://algorand.com" @@ -1180,7 +1209,7 @@ func TestReconfigAsset(t *testing.T) { 0, math.MaxUint64, 0, false, unit, name, url, test.AccountA) block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) txn = transactions.SignedTxnWithAD{ @@ -1205,7 +1234,7 @@ func TestReconfigAsset(t *testing.T) { } block, err = test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Test 2: asset results properly serialized @@ -1229,16 +1258,18 @@ func TestReconfigAsset(t *testing.T) { func TestKeytypeResetsOnRekey(t *testing.T) { block := test.MakeGenesisBlock() - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), block) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + // Sig txn := test.MakePaymentTxn( 0, 0, 0, 0, 0, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) block, err := test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) keytype := "sig" @@ -1250,7 +1281,7 @@ func TestKeytypeResetsOnRekey(t *testing.T) { block, err = test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) assertKeytype(t, db, test.AccountA, nil) @@ -1264,7 +1295,7 @@ func TestKeytypeResetsOnRekey(t *testing.T) { block, err = test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) keytype = "msig" @@ -1274,9 +1305,11 @@ func TestKeytypeResetsOnRekey(t *testing.T) { // Test that after closing the account, keytype will be correctly set. func TestKeytypeDeletedAccount(t *testing.T) { block := test.MakeGenesisBlock() - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), block) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + assertKeytype(t, db, test.AccountA, nil) closeTxn := test.MakePaymentTxn( @@ -1284,7 +1317,7 @@ func TestKeytypeDeletedAccount(t *testing.T) { block, err := test.MakeBlockForTxns(block.BlockHeader, &closeTxn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) keytype := "sig" @@ -1293,8 +1326,9 @@ func TestKeytypeDeletedAccount(t *testing.T) { // TestAddBlockGenesis tests that adding block 0 is successful. func TestAddBlockGenesis(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, _, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() opts := idb.GetBlockOptions{ Transactions: true, @@ -1318,8 +1352,9 @@ func TestAddBlockAssetCloseAmountInTxnExtra(t *testing.T) { block := test.MakeGenesisBlock() block.UpgradeState.CurrentProtocol = protocol.ConsensusV24 - db, shutdownFunc := setupIdb(t, genesis, block) + db, shutdownFunc, proc, l := setupIdb(t, genesis, block) defer shutdownFunc() + defer l.Close() assetid := uint64(1) @@ -1339,7 +1374,7 @@ func TestAddBlockAssetCloseAmountInTxnExtra(t *testing.T) { &optinC, &closeB) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Check asset close amount in the `closeB` transaction. @@ -1364,6 +1399,7 @@ func TestAddBlockAssetCloseAmountInTxnExtra(t *testing.T) { func TestAddBlockIncrementsMaxRoundAccounted(t *testing.T) { _, connStr, shutdownFunc := pgtest.SetupPostgres(t) defer shutdownFunc() + db, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) require.NoError(t, err) defer db.Close() @@ -1375,17 +1411,19 @@ func TestAddBlockIncrementsMaxRoundAccounted(t *testing.T) { require.NoError(t, err) assert.Equal(t, uint64(0), round) - block := test.MakeGenesisBlock() - err = db.AddBlock(&block) - require.NoError(t, err) + l := test.MakeTestLedger("ledger") + defer l.Close() + proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock) + require.NoError(t, err, "failed to open ledger") round, err = db.GetNextRoundToAccount() require.NoError(t, err) assert.Equal(t, uint64(1), round) - block, err = test.MakeBlockForTxns(block.BlockHeader) + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader) require.NoError(t, err) - err = db.AddBlock(&block) + blockCert := rpcs.EncodedBlockCert{Block: block} + err = proc.Process(&blockCert) require.NoError(t, err) round, err = db.GetNextRoundToAccount() @@ -1394,7 +1432,8 @@ func TestAddBlockIncrementsMaxRoundAccounted(t *testing.T) { block, err = test.MakeBlockForTxns(block.BlockHeader) require.NoError(t, err) - err = db.AddBlock(&block) + blockCert = rpcs.EncodedBlockCert{Block: block} + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) round, err = db.GetNextRoundToAccount() @@ -1405,9 +1444,11 @@ func TestAddBlockIncrementsMaxRoundAccounted(t *testing.T) { // Test that AddBlock makes a record of an account that gets created and deleted in // the same round. func TestAddBlockCreateDeleteAccountSameRound(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + createTxn := test.MakePaymentTxn( 0, 5, 0, 0, 0, 0, test.AccountA, test.AccountE, basics.Address{}, basics.Address{}) deleteTxn := test.MakePaymentTxn( @@ -1416,7 +1457,7 @@ func TestAddBlockCreateDeleteAccountSameRound(t *testing.T) { test.MakeGenesisBlock().BlockHeader, &createTxn, &deleteTxn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) opts := idb.AccountQueryOptions{ @@ -1439,9 +1480,11 @@ func TestAddBlockCreateDeleteAccountSameRound(t *testing.T) { // Test that AddBlock makes a record of an asset that is created and deleted in // the same round. func TestAddBlockCreateDeleteAssetSameRound(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + assetid := uint64(1) createTxn := test.MakeAssetConfigTxn(0, 3, 0, false, "", "", "", test.AccountA) deleteTxn := test.MakeAssetDestroyTxn(assetid, test.AccountA) @@ -1449,7 +1492,7 @@ func TestAddBlockCreateDeleteAssetSameRound(t *testing.T) { test.MakeGenesisBlock().BlockHeader, &createTxn, &deleteTxn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Asset global state. @@ -1494,9 +1537,11 @@ func TestAddBlockCreateDeleteAssetSameRound(t *testing.T) { // Test that AddBlock makes a record of an app that is created and deleted in // the same round. func TestAddBlockCreateDeleteAppSameRound(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + appid := uint64(1) createTxn := test.MakeCreateAppTxn(test.AccountA) deleteTxn := test.MakeAppDestroyTxn(appid, test.AccountA) @@ -1504,7 +1549,7 @@ func TestAddBlockCreateDeleteAppSameRound(t *testing.T) { test.MakeGenesisBlock().BlockHeader, &createTxn, &deleteTxn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) opts := idb.ApplicationQuery{ @@ -1527,9 +1572,11 @@ func TestAddBlockCreateDeleteAppSameRound(t *testing.T) { // Test that AddBlock makes a record of an app that is created and deleted in // the same round. func TestAddBlockAppOptInOutSameRound(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + appid := uint64(1) createTxn := test.MakeCreateAppTxn(test.AccountA) optInTxn := test.MakeAppOptInTxn(appid, test.AccountB) @@ -1538,7 +1585,7 @@ func TestAddBlockAppOptInOutSameRound(t *testing.T) { test.MakeGenesisBlock().BlockHeader, &createTxn, &optInTxn, &optOutTxn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) opts := idb.AccountQueryOptions{ @@ -1649,6 +1696,7 @@ func TestSearchForInnerTransactionReturnsRootTransaction(t *testing.T) { // Given: A DB with one transaction containing inner transactions [app -> pay -> xfer] pdb, connStr, shutdownFunc := pgtest.SetupPostgres(t) defer shutdownFunc() + db := setupIdbWithConnectionString( t, connStr, test.MakeGenesis(), test.MakeGenesisBlock()) defer db.Close() @@ -1660,7 +1708,12 @@ func TestSearchForInnerTransactionReturnsRootTransaction(t *testing.T) { rootTxid := appCall.Txn.ID() err = pgutil.TxWithRetry(pdb, serializable, func(tx pgx.Tx) error { - return db.AddBlock(&block) + l := test.MakeTestLedger("ledger") + defer l.Close() + proc, err := blockprocessor.MakeProcessorWithLedger(l, db.AddBlock) + require.NoError(t, err, "failed to open ledger") + blockCert := rpcs.EncodedBlockCert{Block: block} + return proc.Process(&blockCert) }, nil) require.NoError(t, err) @@ -1732,9 +1785,11 @@ func TestNonUTF8Logs(t *testing.T) { testcase := testcase t.Run(testcase.Name, func(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + createAppTxn := test.MakeCreateAppTxn(test.AccountA) createAppTxn.ApplyData.EvalDelta = transactions.EvalDelta{ Logs: testcase.Logs, @@ -1766,7 +1821,7 @@ func TestNonUTF8Logs(t *testing.T) { require.NoError(t, err) // Test 1: import/accounting should work. - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Test 2: transaction results properly serialized @@ -1790,6 +1845,7 @@ func TestNonUTF8Logs(t *testing.T) { func TestLoadGenesisAccountTotals(t *testing.T) { _, connStr, shutdownFunc := pgtest.SetupPostgres(t) defer shutdownFunc() + db, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) require.NoError(t, err) defer db.Close() @@ -1808,9 +1864,11 @@ func TestLoadGenesisAccountTotals(t *testing.T) { } func TestTxnAssetID(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + assetid := uint64(1) createAssetTxn := test.MakeAssetConfigTxn( 0, 0, 0, false, "myasset", "ma", "", test.AccountA) @@ -1825,7 +1883,7 @@ func TestTxnAssetID(t *testing.T) { &createAppTxn, &destroyAppTxn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) txnRowsCh, _ := db.Transactions(context.Background(), idb.TransactionFilter{}) @@ -1844,13 +1902,15 @@ func TestTxnAssetID(t *testing.T) { } func TestBadTxnJsonEncoding(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + // Need to import a block header because the transactions query joins on it. block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) rootTxid := "abc" @@ -1907,9 +1967,11 @@ func TestBadTxnJsonEncoding(t *testing.T) { func TestKeytypeDoNotResetReceiver(t *testing.T) { block := test.MakeGenesisBlock() - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), block) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + assertKeytype(t, db, test.AccountA, nil) // Sigtype of account B becomes "sig". @@ -1917,7 +1979,7 @@ func TestKeytypeDoNotResetReceiver(t *testing.T) { 0, 0, 0, 0, 0, 0, test.AccountB, test.AccountB, basics.Address{}, basics.Address{}) block, err := test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) // Sigtype of account A becomes "sig" and B remains the same. @@ -1925,7 +1987,7 @@ func TestKeytypeDoNotResetReceiver(t *testing.T) { 0, 0, 0, 0, 0, 0, test.AccountA, test.AccountB, basics.Address{}, basics.Address{}) block, err = test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) keytype := "sig" @@ -1937,9 +1999,11 @@ func TestKeytypeDoNotResetReceiver(t *testing.T) { // the current round, AddBlock() still runs successfully. func TestAddBlockTxnTxnParticipationAhead(t *testing.T) { block := test.MakeGenesisBlock() - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), block) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + { query := `INSERT INTO txn (round, intra, typeenum, asset, txn, extra) VALUES (1, 0, 0, 0, 'null'::jsonb, 'null'::jsonb)` @@ -1957,21 +2021,23 @@ func TestAddBlockTxnTxnParticipationAhead(t *testing.T) { 0, 0, 0, 0, 0, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) block, err := test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) } // Test that AddBlock() writes to `txn_participation` table. func TestAddBlockTxnParticipationAdded(t *testing.T) { block := test.MakeGenesisBlock() - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), block) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() + txn := test.MakePaymentTxn( 0, 0, 0, 0, 0, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) block, err := test.MakeBlockForTxns(block.BlockHeader, &txn) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) tf := idb.TransactionFilter{ @@ -1990,8 +2056,9 @@ func TestAddBlockTxnParticipationAdded(t *testing.T) { // Transactions() doesn't return the rows ahead of the state. func TestTransactionsTxnAhead(t *testing.T) { block := test.MakeGenesisBlock() - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), block) + db, shutdownFunc, proc, l := setupIdb(t, test.MakeGenesis(), block) defer shutdownFunc() + defer l.Close() // Insert a transaction row at round 1 and check that Transactions() does not return // it. @@ -2012,7 +2079,7 @@ func TestTransactionsTxnAhead(t *testing.T) { { block, err := test.MakeBlockForTxns(block.BlockHeader) require.NoError(t, err) - err = db.AddBlock(&block) + err = proc.Process(&rpcs.EncodedBlockCert{Block: block}) require.NoError(t, err) } { @@ -2028,6 +2095,7 @@ func TestTransactionsTxnAhead(t *testing.T) { func TestGenesisHashCheckAtDBSetup(t *testing.T) { _, connStr, shutdownFunc := pgtest.SetupPostgres(t) defer shutdownFunc() + genesis := test.MakeGenesis() db := setupIdbWithConnectionString( t, connStr, genesis, test.MakeGenesisBlock()) @@ -2057,6 +2125,7 @@ type ImportState struct { func TestGenesisHashCheckAtInitialImport(t *testing.T) { _, connStr, shutdownFunc := pgtest.SetupPostgres(t) defer shutdownFunc() + genesis := test.MakeGenesis() db, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) require.NoError(t, err) @@ -2107,8 +2176,9 @@ func getResults(ctx context.Context, rows <-chan idb.AccountRow) (result []idb.A } func TestIndexerDb_GetAccounts(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, _, l := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer l.Close() ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() diff --git a/idb/postgres/postgres_rand_test.go b/idb/postgres/postgres_rand_test.go index e4fd44be8..826bc37c2 100644 --- a/idb/postgres/postgres_rand_test.go +++ b/idb/postgres/postgres_rand_test.go @@ -49,8 +49,9 @@ func generateAccountData() ledgercore.AccountData { // and that there are no problems around passing account address pointers to the postgres // driver which could be the same pointer if we are not careful. func TestWriteReadAccountData(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, _, ld := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer ld.Close() addresses := make(map[basics.Address]struct{}) var delta ledgercore.StateDelta @@ -213,8 +214,9 @@ func generateAppLocalStateDelta(t *testing.T) ledgercore.AppLocalStateDelta { // and that there are no problems around passing account address pointers to the postgres // driver which could be the same pointer if we are not careful. func TestWriteReadResources(t *testing.T) { - db, shutdownFunc := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) + db, shutdownFunc, _, ld := setupIdb(t, test.MakeGenesis(), test.MakeGenesisBlock()) defer shutdownFunc() + defer ld.Close() resources := make(map[basics.Address]map[ledger.Creatable]struct{}) var delta ledgercore.StateDelta diff --git a/importer/helper.go b/importer/helper.go index 7f1001373..f67339b75 100644 --- a/importer/helper.go +++ b/importer/helper.go @@ -16,10 +16,15 @@ import ( "time" "github.com/algorand/go-algorand-sdk/client/v2/algod" + "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/rpcs" + "github.com/algorand/indexer/processor/blockprocessor" + "github.com/algorand/indexer/util" log "github.com/sirupsen/logrus" "github.com/algorand/indexer/idb" @@ -64,13 +69,13 @@ func (h *ImportHelper) Import(db idb.IndexerDb, args []string) { pathsSorted = pathsSorted[:h.BlockFileLimit] } for _, gfname := range pathsSorted { - fb, ft := importFile(gfname, imp, h.Log) + fb, ft := importFile(gfname, imp, h.Log, h.GenesisJSONPath) blocks += fb txCount += ft } } else { // try without passing throug glob - fb, ft := importFile(fname, imp, h.Log) + fb, ft := importFile(fname, imp, h.Log, h.GenesisJSONPath) blocks += fb txCount += ft } @@ -90,7 +95,7 @@ func maybeFail(err error, l *log.Logger, errfmt string, params ...interface{}) { os.Exit(1) } -func importTar(imp Importer, tarfile io.Reader, l *log.Logger) (blockCount, txCount int, err error) { +func importTar(imp Importer, tarfile io.Reader, l *log.Logger, genesisReader io.Reader) (blockCount, txCount int, err error) { tf := tar.NewReader(tarfile) var header *tar.Header header, err = tf.Next() @@ -126,8 +131,27 @@ func importTar(imp Importer, tarfile io.Reader, l *log.Logger) (blockCount, txCo } sort.Slice(blocks, less) - for _, blockContainer := range blocks { - err = imp.ImportBlock(&blockContainer) + var genesis bookkeeping.Genesis + gbytes, err := ioutil.ReadAll(genesisReader) + if err != nil { + maybeFail(err, l, "error reading genesis, %v", err) + } + err = protocol.DecodeJSON(gbytes, &genesis) + if err != nil { + maybeFail(err, l, "error decoding genesis, %v", err) + } + genesisBlock := blocks[0] + initState, err := util.CreateInitState(&genesis, &genesisBlock.Block) + maybeFail(err, l, "Error getting genesis block") + + ld, err := ledger.OpenLedger(logging.NewLogger(), "ledger", true, initState, config.GetDefaultLocal()) + maybeFail(err, l, "Cannot open ledger") + + proc, err := blockprocessor.MakeProcessorWithLedger(ld, imp.ImportBlock) + maybeFail(err, l, "Error creating processor") + + for _, blockContainer := range blocks[1:] { + err = proc.Process(&blockContainer) if err != nil { return } @@ -136,15 +160,16 @@ func importTar(imp Importer, tarfile io.Reader, l *log.Logger) (blockCount, txCo return } -func importFile(fname string, imp Importer, l *log.Logger) (blocks, txCount int) { +func importFile(fname string, imp Importer, l *log.Logger, genesisPath string) (blocks, txCount int) { blocks = 0 txCount = 0 l.Infof("importing %s ...", fname) + genesisReader := GetGenesisFile(genesisPath, nil, l) if strings.HasSuffix(fname, ".tar") { fin, err := os.Open(fname) maybeFail(err, l, "%s: %v", fname, err) defer fin.Close() - tblocks, btxns, err := importTar(imp, fin, l) + tblocks, btxns, err := importTar(imp, fin, l, genesisReader) maybeFail(err, l, "%s: %v", fname, err) blocks += tblocks txCount += btxns @@ -153,21 +178,13 @@ func importFile(fname string, imp Importer, l *log.Logger) (blocks, txCount int) maybeFail(err, l, "%s: %v", fname, err) defer fin.Close() bzin := bzip2.NewReader(fin) - tblocks, btxns, err := importTar(imp, bzin, l) + tblocks, btxns, err := importTar(imp, bzin, l, genesisReader) maybeFail(err, l, "%s: %v", fname, err) blocks += tblocks txCount += btxns } else { - // assume a standalone block msgpack blob - blockbytes, err := ioutil.ReadFile(fname) - maybeFail(err, l, "%s: could not read, %v", fname, err) - var blockContainer rpcs.EncodedBlockCert - err = protocol.Decode(blockbytes, &blockContainer) - maybeFail(err, l, "cannot decode blockbytes err: %v", err) - err = imp.ImportBlock(&blockContainer) - maybeFail(err, l, "cannot import block err: %v", err) - blocks++ - txCount += len(blockContainer.Block.Payset) + //assume a standalone block msgpack blob + maybeFail(errors.New("cannot import a standalone block"), l, "not supported") } return } diff --git a/importer/importer.go b/importer/importer.go index 70532dd34..f338da2b8 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -4,14 +4,13 @@ import ( "fmt" "github.com/algorand/go-algorand/config" - "github.com/algorand/go-algorand/rpcs" - + "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/indexer/idb" ) // Importer is used to import blocks into an idb.IndexerDb object. type Importer interface { - ImportBlock(blockContainer *rpcs.EncodedBlockCert) error + ImportBlock(vb *ledgercore.ValidatedBlock) error } type importerImpl struct { @@ -19,14 +18,14 @@ type importerImpl struct { } // ImportBlock processes a block and adds it to the IndexerDb -func (imp *importerImpl) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error { - block := &blockContainer.Block +func (imp *importerImpl) ImportBlock(vb *ledgercore.ValidatedBlock) error { + block := vb.Block() _, ok := config.Consensus[block.CurrentProtocol] if !ok { return fmt.Errorf("protocol %s not found", block.CurrentProtocol) } - return imp.db.AddBlock(&blockContainer.Block) + return imp.db.AddBlock(vb) } // NewImporter creates a new importer object. diff --git a/misc/Dockerfile b/misc/Dockerfile index 8499c7ed2..6c5e5c5f4 100644 --- a/misc/Dockerfile +++ b/misc/Dockerfile @@ -25,5 +25,6 @@ RUN rm /opt/go/indexer/cmd/algorand-indexer/algorand-indexer RUN make RUN pip3 install -r misc/requirements.txt +ENV INDEXER_DATA="${HOME}/indexer/" # Run test script ENTRYPOINT ["/bin/bash", "-c", "sleep 5 && python3 misc/e2elive.py --connection-string \"$CONNECTION_STRING\" --indexer-bin /opt/go/indexer/cmd/algorand-indexer/algorand-indexer --indexer-port 9890"] diff --git a/processor/blockprocessor/block_processor.go b/processor/blockprocessor/block_processor.go index 6dd5d9a67..cea421566 100644 --- a/processor/blockprocessor/block_processor.go +++ b/processor/blockprocessor/block_processor.go @@ -2,89 +2,203 @@ package blockprocessor import ( "fmt" + "path" + "path/filepath" + "github.com/algorand/go-algorand/config" + algodConfig "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/rpcs" + "github.com/algorand/indexer/accounting" "github.com/algorand/indexer/processor" + iledger "github.com/algorand/indexer/processor/eval" + "github.com/algorand/indexer/util" ) type blockProcessor struct { - handler func(block *ledgercore.ValidatedBlock) error - nextRoundToProcess uint64 - ledger *ledger.Ledger + handler func(block *ledgercore.ValidatedBlock) error + ledger *ledger.Ledger +} + +// MakeProcessorWithLedger creates a block processor with a given ledger +func MakeProcessorWithLedger(l *ledger.Ledger, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { + if l == nil { + return nil, fmt.Errorf("MakeProcessorWithLedger() err: local ledger not initialized") + } + err := addGenesisBlock(l, handler) + if err != nil { + return nil, fmt.Errorf("MakeProcessorWithLedger() err: %w", err) + } + return &blockProcessor{ledger: l, handler: handler}, nil } // MakeProcessor creates a block processor -func MakeProcessor(ledger *ledger.Ledger, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { - if ledger == nil { - return nil, fmt.Errorf("MakeProcessor(): local ledger not initialized") +func MakeProcessor(genesis *bookkeeping.Genesis, genesisBlock *bookkeeping.Block, datadir string, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { + initState, err := util.CreateInitState(genesis, genesisBlock) + if err != nil { + return nil, fmt.Errorf("MakeProcessor() err: %w", err) + } + l, err := ledger.OpenLedger(logging.NewLogger(), filepath.Join(path.Dir(datadir), "ledger"), false, initState, algodConfig.GetDefaultLocal()) + if err != nil { + return nil, fmt.Errorf("MakeProcessor() err: %w", err) } - if handler != nil && ledger.Latest() == 0 { - blk, err := ledger.Block(0) + return MakeProcessorWithLedger(l, handler) +} + +func addGenesisBlock(l *ledger.Ledger, handler func(block *ledgercore.ValidatedBlock) error) error { + if handler != nil && uint64(l.Latest()) == 0 { + blk, err := l.Block(0) if err != nil { - return nil, fmt.Errorf("MakeProcessor() err: %w", err) + return fmt.Errorf("addGenesisBlock() err: %w", err) } vb := ledgercore.MakeValidatedBlock(blk, ledgercore.StateDelta{}) err = handler(&vb) if err != nil { - return nil, fmt.Errorf("MakeProcessor() handler err: %w", err) + return fmt.Errorf("addGenesisBlock() handler err: %w", err) } } - return &blockProcessor{ledger: ledger, nextRoundToProcess: uint64(ledger.Latest() + 1), handler: handler}, nil + return nil } // Process a raw algod block -func (processor *blockProcessor) Process(blockCert *rpcs.EncodedBlockCert) error { +func (proc *blockProcessor) Process(blockCert *rpcs.EncodedBlockCert) error { + if blockCert == nil { return fmt.Errorf("Process(): cannot process a nil block") } - if uint64(blockCert.Block.Round()) != processor.nextRoundToProcess { - return fmt.Errorf("Process() invalid round blockCert.Block.Round(): %d processor.nextRoundToProcess: %d", blockCert.Block.Round(), processor.nextRoundToProcess) + if uint64(blockCert.Block.Round()) != uint64(proc.ledger.Latest())+1 { + return fmt.Errorf("Process() invalid round blockCert.Block.Round(): %d nextRoundToProcess: %d", blockCert.Block.Round(), uint64(proc.ledger.Latest())+1) } - blkeval, err := processor.ledger.StartEvaluator(blockCert.Block.BlockHeader, len(blockCert.Block.Payset), 0) - if err != nil { - return fmt.Errorf("Process() block eval err: %w", err) + proto, ok := config.Consensus[blockCert.Block.BlockHeader.CurrentProtocol] + if !ok { + return fmt.Errorf( + "Process() cannot find proto version %s", blockCert.Block.BlockHeader.CurrentProtocol) } + protoChanged := !proto.EnableAssetCloseAmount + proto.EnableAssetCloseAmount = true - paysetgroups, err := blockCert.Block.DecodePaysetGroups() + ledgerForEval, err := iledger.MakeLedgerForEvaluator(proc.ledger) if err != nil { - return fmt.Errorf("Process() decode payset groups err: %w", err) + return fmt.Errorf("Process() err: %w", err) } - - for _, group := range paysetgroups { - err = blkeval.TransactionGroup(group) - if err != nil { - return fmt.Errorf("Process() apply transaction group err: %w", err) - } + resources, err := prepareEvalResources(&ledgerForEval, &blockCert.Block) + if err != nil { + panic(fmt.Errorf("Process() resources err: %w", err)) } - // validated block - vb, err := blkeval.GenerateBlock() + delta, modifiedTxns, err := + ledger.EvalForIndexer(ledgerForEval, &blockCert.Block, proto, resources) if err != nil { - return fmt.Errorf("Process() validated block err: %w", err) + return fmt.Errorf("Process() eval err: %w", err) + } + // validated block + var vb ledgercore.ValidatedBlock + vb = ledgercore.MakeValidatedBlock(blockCert.Block, delta) + if protoChanged { + block := bookkeeping.Block{ + BlockHeader: blockCert.Block.BlockHeader, + Payset: modifiedTxns, + } + vb = ledgercore.MakeValidatedBlock(block, delta) } + // execute handler before writing to local ledger - if processor.handler != nil { - err = processor.handler(vb) + if proc.handler != nil { + err = proc.handler(&vb) if err != nil { return fmt.Errorf("Process() handler err: %w", err) } } // write to ledger - err = processor.ledger.AddValidatedBlock(*vb, blockCert.Certificate) + err = proc.ledger.AddValidatedBlock(vb, blockCert.Certificate) if err != nil { return fmt.Errorf("Process() add validated block err: %w", err) } - processor.nextRoundToProcess = uint64(processor.ledger.Latest()) + 1 return nil } -func (processor *blockProcessor) SetHandler(handler func(block *ledgercore.ValidatedBlock) error) { - processor.handler = handler +func (proc *blockProcessor) SetHandler(handler func(block *ledgercore.ValidatedBlock) error) { + proc.handler = handler +} + +func (proc *blockProcessor) NextRoundToProcess() uint64 { + return uint64(proc.ledger.Latest()) + 1 } -func (processor *blockProcessor) NextRoundToProcess() uint64 { - return processor.nextRoundToProcess +// Preload all resources (account data, account resources, asset/app creators) for the +// evaluator. +func prepareEvalResources(l *iledger.LedgerForEvaluator, block *bookkeeping.Block) (ledger.EvalForIndexerResources, error) { + assetCreators, appCreators, err := prepareCreators(l, block.Payset) + if err != nil { + return ledger.EvalForIndexerResources{}, + fmt.Errorf("prepareEvalResources() err: %w", err) + } + + res := ledger.EvalForIndexerResources{ + Accounts: nil, + Resources: nil, + Creators: make(map[ledger.Creatable]ledger.FoundAddress), + } + + for index, foundAddress := range assetCreators { + creatable := ledger.Creatable{ + Index: basics.CreatableIndex(index), + Type: basics.AssetCreatable, + } + res.Creators[creatable] = foundAddress + } + for index, foundAddress := range appCreators { + creatable := ledger.Creatable{ + Index: basics.CreatableIndex(index), + Type: basics.AppCreatable, + } + res.Creators[creatable] = foundAddress + } + + res.Accounts, res.Resources, err = prepareAccountsResources(l, block.Payset, assetCreators, appCreators) + if err != nil { + return ledger.EvalForIndexerResources{}, + fmt.Errorf("prepareEvalResources() err: %w", err) + } + + return res, nil +} + +// Preload asset and app creators. +func prepareCreators(l *iledger.LedgerForEvaluator, payset transactions.Payset) (map[basics.AssetIndex]ledger.FoundAddress, map[basics.AppIndex]ledger.FoundAddress, error) { + assetsReq, appsReq := accounting.MakePreloadCreatorsRequest(payset) + + assets, err := l.GetAssetCreator(assetsReq) + if err != nil { + return nil, nil, fmt.Errorf("prepareCreators() err: %w", err) + } + apps, err := l.GetAppCreator(appsReq) + if err != nil { + return nil, nil, fmt.Errorf("prepareCreators() err: %w", err) + } + + return assets, apps, nil +} + +// Preload account data and account resources. +func prepareAccountsResources(l *iledger.LedgerForEvaluator, payset transactions.Payset, assetCreators map[basics.AssetIndex]ledger.FoundAddress, appCreators map[basics.AppIndex]ledger.FoundAddress) (map[basics.Address]*ledgercore.AccountData, map[basics.Address]map[ledger.Creatable]ledgercore.AccountResource, error) { + addressesReq, resourcesReq := + accounting.MakePreloadAccountsResourcesRequest(payset, assetCreators, appCreators) + + accounts, err := l.LookupWithoutRewards(addressesReq) + if err != nil { + return nil, nil, fmt.Errorf("prepareAccountsResources() err: %w", err) + } + resources, err := l.LookupResources(resourcesReq) + if err != nil { + return nil, nil, fmt.Errorf("prepareAccountsResources() err: %w", err) + } + + return accounts, resources, nil } diff --git a/processor/blockprocessor/block_processor_test.go b/processor/blockprocessor/block_processor_test.go index a80a82999..2ff7f9471 100644 --- a/processor/blockprocessor/block_processor_test.go +++ b/processor/blockprocessor/block_processor_test.go @@ -2,33 +2,29 @@ package blockprocessor_test import ( "fmt" - "log" "testing" "github.com/algorand/go-algorand/agreement" - "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" - "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/rpcs" block_processor "github.com/algorand/indexer/processor/blockprocessor" - "github.com/algorand/indexer/util" "github.com/algorand/indexer/util/test" "github.com/stretchr/testify/assert" ) func TestProcess(t *testing.T) { - l := makeTestLedger(t, "local_ledger") + l := test.MakeTestLedger("local_ledger") + defer l.Close() genesisBlock, err := l.Block(basics.Round(0)) assert.Nil(t, err) // create processor handler := func(vb *ledgercore.ValidatedBlock) error { return nil } - pr, _ := block_processor.MakeProcessor(l, handler) + pr, _ := block_processor.MakeProcessorWithLedger(l, handler) prevHeader := genesisBlock.BlockHeader - + assert.Equal(t, basics.Round(0), l.Latest()) // create a few rounds for i := 1; i <= 3; i++ { txn := test.MakePaymentTxn(0, uint64(i), 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) @@ -50,11 +46,12 @@ func TestProcess(t *testing.T) { } func TestFailedProcess(t *testing.T) { - l := makeTestLedger(t, "local_ledger2") + l := test.MakeTestLedger("local_ledger") + defer l.Close() // invalid processor - pr, err := block_processor.MakeProcessor(nil, nil) - assert.Contains(t, err.Error(), "MakeProcessor(): local ledger not initialized") - pr, err = block_processor.MakeProcessor(l, nil) + pr, err := block_processor.MakeProcessorWithLedger(nil, nil) + assert.Contains(t, err.Error(), "MakeProcessorWithLedger() err: local ledger not initialized") + pr, err = block_processor.MakeProcessorWithLedger(l, nil) assert.Nil(t, err) err = pr.Process(nil) assert.Contains(t, err.Error(), "Process(): cannot process a nil block") @@ -76,7 +73,7 @@ func TestFailedProcess(t *testing.T) { assert.Nil(t, err) rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} err = pr.Process(&rawBlock) - assert.Contains(t, err.Error(), "Process() apply transaction group") + assert.Contains(t, err.Error(), "ProcessBlockForIndexer() err") // stxn GenesisID not empty txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) @@ -85,7 +82,7 @@ func TestFailedProcess(t *testing.T) { block.Payset[0].Txn.GenesisID = "genesisID" rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} err = pr.Process(&rawBlock) - assert.Contains(t, err.Error(), "Process() decode payset groups err") + assert.Contains(t, err.Error(), "ProcessBlockForIndexer() err") // eval error: concensus protocol not supported txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) @@ -94,15 +91,15 @@ func TestFailedProcess(t *testing.T) { assert.Nil(t, err) rawBlock = rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} err = pr.Process(&rawBlock) - assert.Contains(t, err.Error(), "Process() block eval err") + assert.Contains(t, err.Error(), "Process() cannot find proto version testing") // handler error handler := func(vb *ledgercore.ValidatedBlock) error { return fmt.Errorf("handler error") } - _, err = block_processor.MakeProcessor(l, handler) - assert.Contains(t, err.Error(), "MakeProcessor() handler err") - pr, _ = block_processor.MakeProcessor(l, nil) + _, err = block_processor.MakeProcessorWithLedger(l, handler) + assert.Contains(t, err.Error(), "handler error") + pr, _ = block_processor.MakeProcessorWithLedger(l, nil) txn = test.MakePaymentTxn(0, 10, 0, 1, 1, 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) block, err = test.MakeBlockForTxns(genesisBlock.BlockHeader, &txn) assert.Nil(t, err) @@ -111,19 +108,3 @@ func TestFailedProcess(t *testing.T) { err = pr.Process(&rawBlock) assert.Contains(t, err.Error(), "Process() handler err") } - -func makeTestLedger(t *testing.T, prefix string) *ledger.Ledger { - // initialize local ledger - genesis := test.MakeGenesis() - genesisBlock := test.MakeGenesisBlock() - initState, err := util.CreateInitState(&genesis, &genesisBlock) - if err != nil { - log.Panicf("test init err: %v", err) - } - logger := logging.NewLogger() - l, err := ledger.OpenLedger(logger, prefix, true, initState, config.GetDefaultLocal()) - if err != nil { - log.Panicf("test init err: %v", err) - } - return l -} diff --git a/processor/eval/ledger_for_evaluator.go b/processor/eval/ledger_for_evaluator.go new file mode 100644 index 000000000..72517c601 --- /dev/null +++ b/processor/eval/ledger_for_evaluator.go @@ -0,0 +1,146 @@ +package eval + +import ( + "fmt" + + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/bookkeeping" + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/ledger/ledgercore" +) + +// LedgerForEvaluator implements the indexerLedgerForEval interface from +// go-algorand ledger/evalindexer.go and is used for accounting. +type LedgerForEvaluator struct { + Ledger *ledger.Ledger +} + +// MakeLedgerForEvaluator creates a LedgerForEvaluator object. +func MakeLedgerForEvaluator(ld *ledger.Ledger) (LedgerForEvaluator, error) { + l := LedgerForEvaluator{ + Ledger: ld, + } + return l, nil +} + +// Close shuts down LedgerForEvaluator. +func (l *LedgerForEvaluator) Close() { + l.Ledger.Close() +} + +// LatestBlockHdr is part of go-algorand's indexerLedgerForEval interface. +func (l LedgerForEvaluator) LatestBlockHdr() (bookkeeping.BlockHeader, error) { + return l.Ledger.BlockHdr(l.Ledger.Latest()) +} + +// LookupWithoutRewards is part of go-algorand's indexerLedgerForEval interface. +func (l LedgerForEvaluator) LookupWithoutRewards(addresses map[basics.Address]struct{}) (map[basics.Address]*ledgercore.AccountData, error) { + + addressesArr := make([]basics.Address, 0, len(addresses)) + for address := range addresses { + addressesArr = append(addressesArr, address) + } + + res := make(map[basics.Address]*ledgercore.AccountData, len(addresses)) + for _, address := range addressesArr { + acctData, _, err := l.Ledger.LookupWithoutRewards(l.Ledger.Latest(), address) + if err != nil { + return nil, err + } + empty := ledgercore.AccountData{} + if acctData == empty { + res[address] = nil + } else { + res[address] = &acctData + } + } + + return res, nil +} + +// LookupResources is part of go-algorand's indexerLedgerForEval interface. +func (l LedgerForEvaluator) LookupResources(input map[basics.Address]map[ledger.Creatable]struct{}) (map[basics.Address]map[ledger.Creatable]ledgercore.AccountResource, error) { + // Initialize the result `res` with the same structure as `input`. + res := make( + map[basics.Address]map[ledger.Creatable]ledgercore.AccountResource, len(input)) + for address, creatables := range input { + creatablesOutput := + make(map[ledger.Creatable]ledgercore.AccountResource, len(creatables)) + res[address] = creatablesOutput + for creatable := range creatables { + creatablesOutput[creatable] = ledgercore.AccountResource{} + } + } + + for address, creatables := range input { + for creatable := range creatables { + switch creatable.Type { + case basics.AssetCreatable: + resource, err := l.Ledger.LookupAsset(l.Ledger.Latest(), address, basics.AssetIndex(creatable.Index)) + if err != nil { + return nil, fmt.Errorf( + "LookupResources() %d failed", creatable.Type) + } + res[address][creatable] = ledgercore.AccountResource{AssetHolding: resource.AssetHolding, AssetParams: resource.AssetParams} + case basics.AppCreatable: + resource, err := l.Ledger.LookupApplication(l.Ledger.Latest(), address, basics.AppIndex(creatable.Index)) + if err != nil { + return nil, fmt.Errorf( + "LookupResources() %d failed", creatable.Type) + } + res[address][creatable] = ledgercore.AccountResource{AppLocalState: resource.AppLocalState, AppParams: resource.AppParams} + default: + return nil, fmt.Errorf( + "LookupResources() unknown creatable type %d", creatable.Type) + } + } + + } + return res, nil +} + +// GetAssetCreator is part of go-algorand's indexerLedgerForEval interface. +func (l LedgerForEvaluator) GetAssetCreator(indices map[basics.AssetIndex]struct{}) (map[basics.AssetIndex]ledger.FoundAddress, error) { + indicesArr := make([]basics.AssetIndex, 0, len(indices)) + for index := range indices { + indicesArr = append(indicesArr, index) + } + + res := make(map[basics.AssetIndex]ledger.FoundAddress, len(indices)) + for _, index := range indicesArr { + cidx := basics.CreatableIndex(index) + address, exists, err := l.Ledger.GetCreatorForRound(l.Ledger.Latest(), cidx, basics.AssetCreatable) + if err != nil { + return nil, fmt.Errorf("GetAssetCreator() err: %w", err) + } + res[index] = ledger.FoundAddress{Address: address, Exists: exists} + } + + return res, nil +} + +// GetAppCreator is part of go-algorand's indexerLedgerForEval interface. +func (l LedgerForEvaluator) GetAppCreator(indices map[basics.AppIndex]struct{}) (map[basics.AppIndex]ledger.FoundAddress, error) { + indicesArr := make([]basics.AppIndex, 0, len(indices)) + for index := range indices { + indicesArr = append(indicesArr, index) + } + + res := make(map[basics.AppIndex]ledger.FoundAddress, len(indices)) + for _, index := range indicesArr { + cidx := basics.CreatableIndex(index) + address, exists, err := l.Ledger.GetCreatorForRound(l.Ledger.Latest(), cidx, basics.AppCreatable) + if err != nil { + return nil, fmt.Errorf("GetAppCreator() err: %w", err) + } + res[index] = ledger.FoundAddress{Address: address, Exists: exists} + } + + return res, nil +} + +// LatestTotals is part of go-algorand's indexerLedgerForEval interface. +func (l LedgerForEvaluator) LatestTotals() (ledgercore.AccountTotals, error) { + _, totals, err := l.Ledger.LatestTotals() + return totals, err +} diff --git a/processor/eval/ledger_for_evaluator_test.go b/processor/eval/ledger_for_evaluator_test.go new file mode 100644 index 000000000..73b51b249 --- /dev/null +++ b/processor/eval/ledger_for_evaluator_test.go @@ -0,0 +1,549 @@ +package eval_test + +import ( + "crypto/rand" + "testing" + + "github.com/algorand/go-algorand/agreement" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/rpcs" + block_processor "github.com/algorand/indexer/processor/blockprocessor" + indxLeder "github.com/algorand/indexer/processor/eval" + "github.com/algorand/indexer/util/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLedgerForEvaluatorLatestBlockHdr(t *testing.T) { + + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + txn := test.MakePaymentTxn(0, 100, 0, 1, 1, + 0, test.AccountA, test.AccountA, basics.Address{}, basics.Address{}) + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := ld.LatestBlockHdr() + require.NoError(t, err) + + assert.Equal(t, block.BlockHeader, ret) +} + +func TestLedgerForEvaluatorAccountDataBasic(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + block_processor.MakeProcessorWithLedger(l, nil) + accountData, _, err := l.LookupWithoutRewards(0, test.AccountB) + require.NoError(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := + ld.LookupWithoutRewards(map[basics.Address]struct{}{test.AccountB: {}}) + require.NoError(t, err) + + accountDataRet := ret[test.AccountB] + require.NotNil(t, accountDataRet) + assert.Equal(t, accountData, *accountDataRet) +} + +func TestLedgerForEvaluatorAccountDataMissingAccount(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + var addr basics.Address + _, err = rand.Read(addr[:]) + ret, err := + ld.LookupWithoutRewards(map[basics.Address]struct{}{addr: {}}) + require.NoError(t, err) + + accountDataRet := ret[addr] + assert.Nil(t, accountDataRet) +} + +func TestLedgerForEvaluatorAsset(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + + txn0 := test.MakeAssetConfigTxn(0, 2, 0, false, "", "", "", test.AccountA) + txn1 := test.MakeAssetConfigTxn(0, 4, 0, false, "", "", "", test.AccountA) + txn2 := test.MakeAssetConfigTxn(0, 6, 0, false, "", "", "", test.AccountA) + txn3 := test.MakeAssetConfigTxn(0, 8, 0, false, "", "", "", test.AccountB) + txn4 := test.MakeAssetDestroyTxn(1, test.AccountA) + + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0, &txn1, &txn2, &txn3, &txn4) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := + ld.LookupResources(map[basics.Address]map[ledger.Creatable]struct{}{ + test.AccountA: { + {Index: 1, Type: basics.AssetCreatable}: {}, + {Index: 2, Type: basics.AssetCreatable}: {}, + {Index: 3, Type: basics.AssetCreatable}: {}, + }, + test.AccountB: { + {Index: 4, Type: basics.AssetCreatable}: {}, + }, + }) + require.NoError(t, err) + + expected := map[basics.Address]map[ledger.Creatable]ledgercore.AccountResource{ + test.AccountA: { + ledger.Creatable{Index: 1, Type: basics.AssetCreatable}: {}, + ledger.Creatable{Index: 2, Type: basics.AssetCreatable}: { + AssetHolding: &basics.AssetHolding{ + Amount: txn1.Txn.AssetParams.Total, + Frozen: txn1.Txn.AssetFrozen, + }, + AssetParams: &txn1.Txn.AssetParams, + }, + ledger.Creatable{Index: 3, Type: basics.AssetCreatable}: { + AssetHolding: &basics.AssetHolding{ + Amount: txn2.Txn.AssetParams.Total, + Frozen: txn2.Txn.AssetFrozen, + }, + AssetParams: &txn2.Txn.AssetParams, + }, + }, + test.AccountB: { + ledger.Creatable{Index: 4, Type: basics.AssetCreatable}: { + AssetHolding: &basics.AssetHolding{ + Amount: txn3.Txn.AssetParams.Total, + Frozen: txn3.Txn.AssetFrozen, + }, + AssetParams: &txn3.Txn.AssetParams, + }, + }, + } + assert.Equal(t, expected, ret) +} + +func TestLedgerForEvaluatorApp(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + + txn0 := test.MakeAppCallTxn(0, test.AccountA) + txn1 := test.MakeAppCallTxnWithLogs(0, test.AccountA, []string{"testing"}) + txn2 := test.MakeAppCallWithInnerTxn(test.AccountA, test.AccountA, test.AccountB, basics.Address{}, basics.Address{}) + txn3 := test.MakeAppCallWithMultiLogs(test.AccountA) + txn4 := test.MakeAppDestroyTxn(1, test.AccountA) + txn5 := test.MakeAppCallTxn(0, test.AccountB) + + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0, &txn1, &txn2, &txn3, &txn4, &txn5) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := + ld.LookupResources(map[basics.Address]map[ledger.Creatable]struct{}{ + test.AccountA: { + {Index: 1, Type: basics.AppCreatable}: {}, + {Index: 2, Type: basics.AppCreatable}: {}, + {Index: 3, Type: basics.AppCreatable}: {}, + {Index: 4, Type: basics.AppCreatable}: {}, + }, + test.AccountB: { + {Index: 6, Type: basics.AppCreatable}: {}, + }, + }) + require.NoError(t, err) + + expected := map[basics.Address]map[ledger.Creatable]ledgercore.AccountResource{ + test.AccountA: { + ledger.Creatable{Index: 1, Type: basics.AppCreatable}: {}, + ledger.Creatable{Index: 2, Type: basics.AppCreatable}: { + AppParams: &basics.AppParams{ + ApprovalProgram: txn1.Txn.ApprovalProgram, + ClearStateProgram: txn1.Txn.ClearStateProgram, + GlobalState: nil, + StateSchemas: basics.StateSchemas{}, + ExtraProgramPages: txn1.Txn.ExtraProgramPages, + }, + }, + ledger.Creatable{Index: 3, Type: basics.AppCreatable}: { + AppParams: &basics.AppParams{ + ApprovalProgram: txn2.Txn.ApprovalProgram, + ClearStateProgram: txn2.Txn.ClearStateProgram, + GlobalState: nil, + StateSchemas: basics.StateSchemas{}, + ExtraProgramPages: txn2.Txn.ExtraProgramPages, + }, + }, + ledger.Creatable{Index: 4, Type: basics.AppCreatable}: { + AppParams: &basics.AppParams{ + ApprovalProgram: txn3.Txn.ApprovalProgram, + ClearStateProgram: txn3.Txn.ClearStateProgram, + GlobalState: nil, + StateSchemas: basics.StateSchemas{}, + ExtraProgramPages: txn3.Txn.ExtraProgramPages, + }, + }, + }, + test.AccountB: { + ledger.Creatable{Index: 6, Type: basics.AppCreatable}: { + AppParams: &basics.AppParams{ + ApprovalProgram: txn5.Txn.ApprovalProgram, + ClearStateProgram: txn5.Txn.ClearStateProgram, + GlobalState: nil, + StateSchemas: basics.StateSchemas{}, + ExtraProgramPages: txn5.Txn.ExtraProgramPages, + }, + }, + }, + } + assert.Equal(t, expected, ret) +} + +func TestLedgerForEvaluatorFetchAllResourceTypes(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + + txn0 := test.MakeAppCallTxn(0, test.AccountA) + txn1 := test.MakeAssetConfigTxn(0, 2, 0, false, "", "", "", test.AccountA) + + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0, &txn1) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := + ld.LookupResources(map[basics.Address]map[ledger.Creatable]struct{}{ + test.AccountA: { + {Index: 1, Type: basics.AppCreatable}: {}, + {Index: 2, Type: basics.AssetCreatable}: {}, + }, + }) + require.NoError(t, err) + + expected := map[basics.Address]map[ledger.Creatable]ledgercore.AccountResource{ + test.AccountA: { + ledger.Creatable{Index: 1, Type: basics.AppCreatable}: { + AppParams: &basics.AppParams{ + ApprovalProgram: txn0.Txn.ApprovalProgram, + ClearStateProgram: txn0.Txn.ClearStateProgram, + GlobalState: nil, + StateSchemas: basics.StateSchemas{}, + ExtraProgramPages: txn0.Txn.ExtraProgramPages, + }, + }, + ledger.Creatable{Index: 2, Type: basics.AssetCreatable}: { + AssetHolding: &basics.AssetHolding{ + Amount: 2, + Frozen: false, + }, + AssetParams: &txn1.Txn.AssetParams, + }, + }, + } + assert.Equal(t, expected, ret) +} + +func TestLedgerForEvaluatorLookupMultipleAccounts(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + block_processor.MakeProcessorWithLedger(l, nil) + + addresses := []basics.Address{ + test.AccountA, test.AccountB, test.AccountC, test.AccountD} + + addressesMap := make(map[basics.Address]struct{}) + for _, address := range addresses { + addressesMap[address] = struct{}{} + } + addressesMap[test.FeeAddr] = struct{}{} + addressesMap[test.RewardAddr] = struct{}{} + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := + ld.LookupWithoutRewards(addressesMap) + require.NoError(t, err) + + for _, address := range addresses { + accountData, _ := ret[address] + require.NotNil(t, accountData) + } +} + +func TestLedgerForEvaluatorAssetCreatorBasic(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + + txn0 := test.MakeAssetConfigTxn(0, 2, 0, false, "", "", "", test.AccountA) + + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := ld.GetAssetCreator( + map[basics.AssetIndex]struct{}{basics.AssetIndex(1): {}}) + require.NoError(t, err) + + foundAddress, ok := ret[basics.AssetIndex(1)] + require.True(t, ok) + + expected := ledger.FoundAddress{ + Address: test.AccountA, + Exists: true, + } + assert.Equal(t, expected, foundAddress) +} + +func TestLedgerForEvaluatorAssetCreatorDeleted(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + + txn0 := test.MakeAssetConfigTxn(0, 2, 0, false, "", "", "", test.AccountA) + txn1 := test.MakeAssetDestroyTxn(1, test.AccountA) + + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0, &txn1) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := ld.GetAssetCreator( + map[basics.AssetIndex]struct{}{basics.AssetIndex(1): {}}) + require.NoError(t, err) + + foundAddress, ok := ret[basics.AssetIndex(1)] + require.True(t, ok) + + assert.False(t, foundAddress.Exists) +} + +func TestLedgerForEvaluatorAssetCreatorMultiple(t *testing.T) { + + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + + txn0 := test.MakeAssetConfigTxn(0, 2, 0, false, "", "", "", test.AccountA) + txn1 := test.MakeAssetConfigTxn(0, 2, 0, false, "", "", "", test.AccountB) + txn2 := test.MakeAssetConfigTxn(0, 2, 0, false, "", "", "", test.AccountC) + txn3 := test.MakeAssetConfigTxn(0, 2, 0, false, "", "", "", test.AccountD) + + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0, &txn1, &txn2, &txn3) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + indices := map[basics.AssetIndex]struct{}{ + 1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}} + ret, err := ld.GetAssetCreator(indices) + require.NoError(t, err) + + creatorsMap := map[basics.AssetIndex]basics.Address{ + 1: test.AccountA, + 2: test.AccountB, + 3: test.AccountC, + 4: test.AccountD, + } + + for i := 1; i <= 4; i++ { + index := basics.AssetIndex(i) + + foundAddress, ok := ret[index] + require.True(t, ok) + + expected := ledger.FoundAddress{ + Address: creatorsMap[index], + Exists: true, + } + assert.Equal(t, expected, foundAddress) + } + for i := 5; i <= 8; i++ { + index := basics.AssetIndex(i) + + foundAddress, ok := ret[index] + require.True(t, ok) + + assert.False(t, foundAddress.Exists) + } +} + +func TestLedgerForEvaluatorAppCreatorBasic(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + + txn0 := test.MakeAppCallTxn(0, test.AccountA) + + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := ld.GetAppCreator( + map[basics.AppIndex]struct{}{basics.AppIndex(1): {}}) + require.NoError(t, err) + + foundAddress, ok := ret[basics.AppIndex(1)] + require.True(t, ok) + + expected := ledger.FoundAddress{ + Address: test.AccountA, + Exists: true, + } + assert.Equal(t, expected, foundAddress) + +} + +func TestLedgerForEvaluatorAppCreatorDeleted(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + + txn0 := test.MakeAppCallTxn(0, test.AccountA) + txn1 := test.MakeAppDestroyTxn(1, test.AccountA) + + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0, &txn1) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + ret, err := ld.GetAppCreator( + map[basics.AppIndex]struct{}{basics.AppIndex(1): {}}) + require.NoError(t, err) + + foundAddress, ok := ret[basics.AppIndex(1)] + require.True(t, ok) + + assert.False(t, foundAddress.Exists) +} + +func TestLedgerForEvaluatorAppCreatorMultiple(t *testing.T) { + + l := test.MakeTestLedger("ledger") + defer l.Close() + pr, _ := block_processor.MakeProcessorWithLedger(l, nil) + + txn0 := test.MakeAppCallTxn(0, test.AccountA) + txn1 := test.MakeAppCallTxn(0, test.AccountB) + txn2 := test.MakeAppCallTxn(0, test.AccountC) + txn3 := test.MakeAppCallTxn(0, test.AccountD) + + block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn0, &txn1, &txn2, &txn3) + assert.Nil(t, err) + rawBlock := rpcs.EncodedBlockCert{Block: block, Certificate: agreement.Certificate{}} + err = pr.Process(&rawBlock) + assert.Nil(t, err) + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + creatorsMap := map[basics.AppIndex]basics.Address{ + 1: test.AccountA, + 2: test.AccountB, + 3: test.AccountC, + 4: test.AccountD, + } + + indices := map[basics.AppIndex]struct{}{ + 1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}} + ret, err := ld.GetAppCreator(indices) + require.NoError(t, err) + + assert.Equal(t, len(indices), len(ret)) + for i := 1; i <= 4; i++ { + index := basics.AppIndex(i) + + foundAddress, ok := ret[index] + require.True(t, ok) + + expected := ledger.FoundAddress{ + Address: creatorsMap[index], + Exists: true, + } + assert.Equal(t, expected, foundAddress) + } + for i := 5; i <= 8; i++ { + index := basics.AppIndex(i) + + foundAddress, ok := ret[index] + require.True(t, ok) + + assert.False(t, foundAddress.Exists) + } +} + +func TestLedgerForEvaluatorAccountTotals(t *testing.T) { + l := test.MakeTestLedger("ledger") + defer l.Close() + + ld, err := indxLeder.MakeLedgerForEvaluator(l) + require.NoError(t, err) + defer ld.Close() + + accountTotalsRead, err := ld.LatestTotals() + require.NoError(t, err) + + _, total, _ := l.LatestTotals() + assert.Equal(t, total, accountTotalsRead) + +} diff --git a/util/test/account_testutil.go b/util/test/account_testutil.go index dd1ae560d..17409a8a6 100644 --- a/util/test/account_testutil.go +++ b/util/test/account_testutil.go @@ -215,8 +215,10 @@ func MakeAppDestroyTxn(appid uint64, sender basics.Address) transactions.SignedT GenesisHash: GenesisHash, }, ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ - ApplicationID: basics.AppIndex(appid), - OnCompletion: transactions.DeleteApplicationOC, + ApplicationID: basics.AppIndex(appid), + OnCompletion: transactions.DeleteApplicationOC, + ApprovalProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, }, }, Sig: Signature, @@ -275,8 +277,10 @@ func MakeAppCallTxn(appid uint64, sender basics.Address) transactions.SignedTxnW GenesisHash: GenesisHash, }, ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ - ApplicationID: basics.AppIndex(appid), - OnCompletion: transactions.NoOpOC, + ApplicationID: basics.AppIndex(appid), + OnCompletion: transactions.NoOpOC, + ApprovalProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, }, }, Sig: Signature, @@ -327,8 +331,10 @@ func MakeAppCallWithInnerTxn(appSender, paymentSender, paymentReceiver, assetSen Sender: assetSender, }, ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ - ApplicationID: 789, - OnCompletion: transactions.NoOpOC, + ApplicationID: 789, + OnCompletion: transactions.NoOpOC, + ApprovalProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, }, }, }, diff --git a/util/test/testutil.go b/util/test/testutil.go index d97f731ed..085e21f9b 100644 --- a/util/test/testutil.go +++ b/util/test/testutil.go @@ -7,10 +7,13 @@ import ( "os" "runtime" + "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/data/basics" - + "github.com/algorand/go-algorand/ledger" + "github.com/algorand/go-algorand/logging" "github.com/algorand/indexer/idb" "github.com/algorand/indexer/util" + "github.com/sirupsen/logrus" ) var quiet = false @@ -114,3 +117,18 @@ func PrintTxnQuery(db idb.IndexerDb, q idb.TransactionFilter) { exitValue = 1 } } + +// MakeTestLedger creates an in-memory local ledger +func MakeTestLedger(prefix string) *ledger.Ledger { + genesis := MakeGenesis() + genesisBlock := MakeGenesisBlock() + initState, err := util.CreateInitState(&genesis, &genesisBlock) + if err != nil { + logrus.Panicf("test init err: %v", err) + } + l, err := ledger.OpenLedger(logging.NewLogger(), prefix, true, initState, config.GetDefaultLocal()) + if err != nil { + logrus.Panicf("test init err: %v", err) + } + return l +}