Skip to content

Commit

Permalink
Create RFC for processor (#1140)
Browse files Browse the repository at this point in the history
* Added Processor

Added processor functionality

* pr updates

* Pr

* PR comments

* Reduce proto checks

* Linting

* Pr

* PR comments

* PR comments
  • Loading branch information
AlgoStephenAkiki authored Aug 5, 2022
1 parent 595d3bf commit 3d2720c
Show file tree
Hide file tree
Showing 25 changed files with 1,038 additions and 400 deletions.
33 changes: 18 additions & 15 deletions api/handlers_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"fmt"
test2 "github.com/sirupsen/logrus/hooks/test"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand All @@ -14,7 +15,6 @@ import (

"github.com/labstack/echo/v4"
"github.com/sirupsen/logrus"
test2 "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -26,13 +26,12 @@ import (
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/indexer/processor"

"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/idb/postgres"
pgtest "github.com/algorand/indexer/idb/postgres/testing"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/algorand/indexer/processors/blockprocessor"
"github.com/algorand/indexer/util/test"
)

Expand Down Expand Up @@ -61,7 +60,7 @@ func testServerImplementation(db idb.IndexerDb) *ServerImplementation {
return &ServerImplementation{db: db, timeout: 30 * time.Second, opts: defaultOpts}
}

func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*postgres.IndexerDb, func(), processor.Processor, *ledger.Ledger) {
func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*postgres.IndexerDb, func(), func(cert *rpcs.EncodedBlockCert) error, *ledger.Ledger) {
_, connStr, shutdownFunc := pgtest.SetupPostgres(t)

db, _, err := postgres.OpenPostgres(connStr, idb.IndexerDbOptions{}, nil)
Expand All @@ -78,9 +77,12 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis) (*postgres.IndexerDb, f
logger, _ := test2.NewNullLogger()
l, err := test.MakeTestLedger(logger)
require.NoError(t, err)
proc, err := blockprocessor.MakeProcessorWithLedger(logger, l, db.AddBlock)
proc, err := blockprocessor.MakeBlockProcessorWithLedger(logger, l, db.AddBlock)
require.NoError(t, err, "failed to open ledger")
return db, newShutdownFunc, proc, l

f := blockprocessor.MakeBlockProcessorHandlerAdapter(&proc, db.AddBlock)

return db, newShutdownFunc, f, l
}

func TestApplicationHandlers(t *testing.T) {
Expand Down Expand Up @@ -120,7 +122,8 @@ func TestApplicationHandlers(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn, &optInTxnA, &optInTxnB)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})

require.NoError(t, err)

//////////
Expand Down Expand Up @@ -247,7 +250,7 @@ func TestAccountExcludeParameters(t *testing.T) {
&appOptInTxnA, &appOptInTxnB, &assetOptInTxnA, &assetOptInTxnB)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

//////////
Expand Down Expand Up @@ -453,7 +456,7 @@ func TestAccountMaxResultsLimit(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, ptxns...)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

//////////
Expand Down Expand Up @@ -857,7 +860,7 @@ func TestInnerTxn(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

for _, tc := range testcases {
Expand Down Expand Up @@ -910,7 +913,7 @@ func TestPagingRootTxnDeduplication(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

testcases := []struct {
Expand Down Expand Up @@ -1058,7 +1061,7 @@ func TestKeyregTransactionWithStateProofKeys(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &txn)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

e := echo.New()
Expand Down Expand Up @@ -1163,7 +1166,7 @@ func TestAccountClearsNonUTF8(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &createAsset)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

verify := func(params generated.AssetParams) {
Expand Down Expand Up @@ -1285,7 +1288,7 @@ func TestLookupInnerLogs(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

for _, tc := range testcases {
Expand Down Expand Up @@ -1384,7 +1387,7 @@ func TestLookupMultiInnerLogs(t *testing.T) {
block, err := test.MakeBlockForTxns(test.MakeGenesisBlock().BlockHeader, &appCall)
require.NoError(t, err)

err = proc.Process(&rpcs.EncodedBlockCert{Block: block})
err = proc(&rpcs.EncodedBlockCert{Block: block})
require.NoError(t, err)

for _, tc := range testcases {
Expand Down
28 changes: 19 additions & 9 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import (
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/util"

"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/indexer/api"
"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/config"
"github.com/algorand/indexer/fetcher"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/importer"
"github.com/algorand/indexer/processor"
"github.com/algorand/indexer/processor/blockprocessor"
"github.com/algorand/indexer/processors"
"github.com/algorand/indexer/processors/blockprocessor"
iutil "github.com/algorand/indexer/util"
"github.com/algorand/indexer/util/metrics"
)
Expand Down Expand Up @@ -368,14 +369,22 @@ func runBlockImporter(ctx context.Context, cfg *daemonConfig, wg *sync.WaitGroup
logger.Info("Initializing block import handler.")
imp := importer.NewImporter(db)

processorOpts := processors.BlockProcessorConfig{
Catchpoint: cfg.catchpoint,
IndexerDatadir: opts.IndexerDatadir,
AlgodDataDir: opts.AlgodDataDir,
AlgodToken: opts.AlgodToken,
AlgodAddr: opts.AlgodAddr,
}

logger.Info("Initializing local ledger.")
proc, err := blockprocessor.MakeProcessorWithLedgerInit(ctx, logger, cfg.catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock)
proc, err := blockprocessor.MakeBlockProcessorWithLedgerInit(ctx, logger, nextDBRound, &genesis, processorOpts, imp.ImportBlock)
if err != nil {
maybeFail(err, "blockprocessor.MakeProcessor() err %v", err)
maybeFail(err, "blockprocessor.MakeBlockProcessor() err %v", err)
}

bot.SetNextRound(proc.NextRoundToProcess())
handler := blockHandler(proc, 1*time.Second)
handler := blockHandler(&proc, imp.ImportBlock, 1*time.Second)
bot.SetBlockHandler(handler)

logger.Info("Starting block importer.")
Expand Down Expand Up @@ -456,10 +465,10 @@ func makeOptions(daemonConfig *daemonConfig) (options api.ExtraOptions) {

// blockHandler creates a handler complying to the fetcher block handler interface. In case of a failure it keeps
// attempting to add the block until the fetcher shuts down.
func blockHandler(proc processor.Processor, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error {
func blockHandler(proc *blockprocessor.BlockProcessor, blockHandler func(block *ledgercore.ValidatedBlock) error, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error {
return func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
for {
err := handleBlock(block, proc)
err := handleBlock(block, proc, blockHandler)
if err == nil {
// return on success.
return nil
Expand All @@ -476,9 +485,10 @@ func blockHandler(proc processor.Processor, retryDelay time.Duration) func(conte
}
}

func handleBlock(block *rpcs.EncodedBlockCert, proc processor.Processor) error {
func handleBlock(block *rpcs.EncodedBlockCert, proc *blockprocessor.BlockProcessor, handler func(block *ledgercore.ValidatedBlock) error) error {
start := time.Now()
err := proc.Process(block)
f := blockprocessor.MakeBlockProcessorHandlerAdapter(proc, handler)
err := f(block)
if err != nil {
logger.WithError(err).Errorf(
"block %d import failed", block.Block.Round())
Expand Down
13 changes: 7 additions & 6 deletions cmd/algorand-indexer/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/rpcs"

"github.com/algorand/indexer/processor/blockprocessor"
"github.com/algorand/indexer/processors/blockprocessor"
itest "github.com/algorand/indexer/util/test"
)

type mockImporter struct {
}

var errMockImportBlock = errors.New("Process() invalid round blockCert.Block.Round(): 1234 nextRoundToProcess: 1")
var errMockImportBlock = errors.New("invalid round blockCert.Block.Round(): 1234 nextRoundToProcess: 1")

func (imp *mockImporter) ImportBlock(vb *ledgercore.ValidatedBlock) error {
return nil
Expand All @@ -49,10 +49,9 @@ func TestImportRetryAndCancel(t *testing.T) {
l, err := itest.MakeTestLedger(ledgerLogger)
assert.NoError(t, err)
defer l.Close()
proc, err := blockprocessor.MakeProcessorWithLedger(logger, l, nil)
proc, err := blockprocessor.MakeBlockProcessorWithLedger(logger, l, imp.ImportBlock)
assert.Nil(t, err)
proc.SetHandler(imp.ImportBlock)
handler := blockHandler(proc, 50*time.Millisecond)
handler := blockHandler(&proc, imp.ImportBlock, 50*time.Millisecond)
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -74,7 +73,9 @@ func TestImportRetryAndCancel(t *testing.T) {

for _, entry := range hook.Entries {
assert.Equal(t, entry.Message, "block 1234 import failed")
assert.Equal(t, entry.Data["error"], errMockImportBlock)

tmpStr := entry.Data["error"].(error).Error()
assert.Contains(t, tmpStr, errMockImportBlock.Error())
}

// Wait for handler to exit.
Expand Down
76 changes: 76 additions & 0 deletions data/block_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package data

import (
"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/rpcs"
)

// RoundProvider is the interface which all data types sent to Exporters should implement
Expand All @@ -13,6 +15,13 @@ type RoundProvider interface {
Empty() bool
}

// InitProvider is the interface that can be used when initializing to get common algod related
// variables
type InitProvider interface {
Genesis() *bookkeeping.Genesis
NextDBRound() basics.Round
}

// BlockData is provided to the Exporter on each round.
type BlockData struct {

Expand All @@ -29,6 +38,73 @@ type BlockData struct {
Certificate *agreement.Certificate
}

// MakeBlockDataFromValidatedBlock makes BlockData from agreement.ValidatedBlock
func MakeBlockDataFromValidatedBlock(input ledgercore.ValidatedBlock) BlockData {
blockData := BlockData{}
blockData.UpdateFromValidatedBlock(input)
return blockData
}

// UpdateFromValidatedBlock updates BlockData from ValidatedBlock input
func (blkData *BlockData) UpdateFromValidatedBlock(input ledgercore.ValidatedBlock) {
blkData.BlockHeader = input.Block().BlockHeader
blkData.Payset = input.Block().Payset
delta := input.Delta()
blkData.Delta = &delta
}

// UpdateFromEncodedBlockCertificate updates BlockData from EncodedBlockCert info
func (blkData *BlockData) UpdateFromEncodedBlockCertificate(input *rpcs.EncodedBlockCert) {
if input == nil {
return
}

blkData.BlockHeader = input.Block.BlockHeader
blkData.Payset = input.Block.Payset

cert := input.Certificate
blkData.Certificate = &cert
}

// MakeBlockDataFromEncodedBlockCertificate makes BlockData from rpcs.EncodedBlockCert
func MakeBlockDataFromEncodedBlockCertificate(input *rpcs.EncodedBlockCert) BlockData {
blkData := BlockData{}
blkData.UpdateFromEncodedBlockCertificate(input)
return blkData
}

// ValidatedBlock returns a validated block from the BlockData object
func (blkData BlockData) ValidatedBlock() ledgercore.ValidatedBlock {
tmpBlock := bookkeeping.Block{
BlockHeader: blkData.BlockHeader,
Payset: blkData.Payset,
}

tmpDelta := ledgercore.StateDelta{}
if blkData.Delta != nil {
tmpDelta = *blkData.Delta
}
return ledgercore.MakeValidatedBlock(tmpBlock, tmpDelta)
}

// EncodedBlockCertificate returns an encoded block certificate from the BlockData object
func (blkData BlockData) EncodedBlockCertificate() rpcs.EncodedBlockCert {

tmpBlock := bookkeeping.Block{
BlockHeader: blkData.BlockHeader,
Payset: blkData.Payset,
}

tmpCert := agreement.Certificate{}
if blkData.Certificate != nil {
tmpCert = *blkData.Certificate
}
return rpcs.EncodedBlockCert{
Block: tmpBlock,
Certificate: tmpCert,
}
}

// Round returns the round to which the BlockData corresponds
func (blkData BlockData) Round() uint64 {
return uint64(blkData.BlockHeader.Round)
Expand Down
Loading

0 comments on commit 3d2720c

Please sign in to comment.