Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prefetcher: improve error codes #3815

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions ledger/internal/prefetcher/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package prefetcher

import (
"fmt"

"github.com/algorand/go-algorand/data/basics"
)

// GroupTaskError indicates the group index of the unfulfilled resource
type GroupTaskError struct {
err error
GroupIdx int64
Address *basics.Address
CreatableIndex basics.CreatableIndex
CreatableType basics.CreatableType
}

// Error satisfies builtin interface `error`
func (err *GroupTaskError) Error() string {
return fmt.Sprintf("prefetch failed for groupIdx %d, address: %s, creatableIndex %d, creatableType %d, cause: %v",
err.GroupIdx, err.Address, err.CreatableIndex, err.CreatableType, err.err)
}

// Unwrap provides access to the underlying error
func (err *GroupTaskError) Unwrap() error {
return err.err
}
21 changes: 16 additions & 5 deletions ledger/internal/prefetcher/prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type LoadedTransactionGroup struct {

// Err indicates whether any of the balances in this structure have failed to load. In case of an error, at least
// one of the entries in the balances would be uninitialized.
Err error
Err *GroupTaskError
}

// accountPrefetcher used to prefetch accounts balances and resources before the evaluator is being called.
Expand Down Expand Up @@ -146,6 +146,7 @@ type preloaderTaskQueue struct {
type groupTaskDone struct {
groupIdx int64
err error
task *preloaderTask
}

func allocPreloaderQueue(count int, maxTxnGroupEntries int) preloaderTaskQueue {
Expand Down Expand Up @@ -408,7 +409,13 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) {
if done.err != nil {
// if there is an error, report the error to the output channel.
p.outChan <- LoadedTransactionGroup{
Err: done.err,
Err: &GroupTaskError{
err: done.err,
GroupIdx: done.groupIdx,
Address: done.task.address,
CreatableIndex: done.task.creatableIndex,
CreatableType: done.task.creatableType,
},
}
return
}
Expand Down Expand Up @@ -463,14 +470,18 @@ func (gt *groupTask) markCompletionResource(idx int, res LoadedResourcesEntry, g
}
}

func (gt *groupTask) markCompletionAcctError(err error, groupDoneCh chan groupTaskDone) {
func (gt *groupTask) markCompletionAcctError(err error, task *preloaderTask, groupDoneCh chan groupTaskDone) {
for {
curVal := atomic.LoadInt64(&gt.incompleteCount)
if curVal <= 0 {
return
}
if atomic.CompareAndSwapInt64(&gt.incompleteCount, curVal, 0) {
groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex, err: err}
groupDoneCh <- groupTaskDone{
groupIdx: gt.groupTaskIndex,
err: err,
task: task,
}
return
}
}
Expand Down Expand Up @@ -558,6 +569,6 @@ func (p *accountPrefetcher) asyncPrefetchRoutine(queue *preloaderTaskQueue, task
// in every case we get here, the task is gurenteed to be a non-nil.
for _, wt := range task.groups {
// notify the channel of the error.
wt.markCompletionAcctError(err, groupDoneCh)
wt.markCompletionAcctError(err, task, groupDoneCh)
}
}
2 changes: 1 addition & 1 deletion ledger/internal/prefetcher/prefetcher_alignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func prefetch(t *testing.T, l prefetcher.Ledger, txn transactions.Transaction) l
loaded, ok := <-ch
require.True(t, ok)

require.NoError(t, loaded.Err)
require.Nil(t, loaded.Err)
require.Equal(t, group, loaded.TxnGroup)

_, ok = <-ch
Expand Down
203 changes: 190 additions & 13 deletions ledger/internal/prefetcher/prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package prefetcher_test

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -49,19 +50,44 @@ func makeAddress(addressSeed int) (o basics.Address) {

const proto = protocol.ConsensusCurrentVersion

type lookupError struct{}

func (le lookupError) Error() string {
return "lookup error"
}

type assetLookupError struct{}

func (ale assetLookupError) Error() string {
return "asset lookup error"
}

type getCreatorError struct{}

func (gce getCreatorError) Error() string {
return "get creator error"
}

type prefetcherTestLedger struct {
round basics.Round
balances map[basics.Address]ledgercore.AccountData
creators map[basics.CreatableIndex]basics.Address
round basics.Round
balances map[basics.Address]ledgercore.AccountData
creators map[basics.CreatableIndex]basics.Address
errorTriggerAddress map[basics.Address]bool
}

const errorTriggerCreatableIndex = 1000001
const errorTriggerAssetIndex = 1000002

func (l *prefetcherTestLedger) BlockHdr(basics.Round) (bookkeeping.BlockHeader, error) {
return bookkeeping.BlockHeader{}, nil
}
func (l *prefetcherTestLedger) CheckDup(config.ConsensusParams, basics.Round, basics.Round, basics.Round, transactions.Txid, ledgercore.Txlease) error {
return nil
}
func (l *prefetcherTestLedger) LookupWithoutRewards(_ basics.Round, addr basics.Address) (ledgercore.AccountData, basics.Round, error) {
func (l *prefetcherTestLedger) LookupWithoutRewards(rnd basics.Round, addr basics.Address) (ledgercore.AccountData, basics.Round, error) {
if _, has := l.errorTriggerAddress[addr]; has {
return ledgercore.AccountData{}, l.round, lookupError{}
}
if data, has := l.balances[addr]; has {
return data, l.round, nil
}
Expand All @@ -71,9 +97,15 @@ func (l *prefetcherTestLedger) LookupApplication(rnd basics.Round, addr basics.A
return ledgercore.AppResource{}, nil
}
func (l *prefetcherTestLedger) LookupAsset(rnd basics.Round, addr basics.Address, aidx basics.AssetIndex) (ledgercore.AssetResource, error) {
if aidx == errorTriggerAssetIndex {
return ledgercore.AssetResource{}, assetLookupError{}
}
return ledgercore.AssetResource{}, nil
}
func (l *prefetcherTestLedger) GetCreatorForRound(_ basics.Round, cidx basics.CreatableIndex, _ basics.CreatableType) (basics.Address, bool, error) {
if cidx == errorTriggerCreatableIndex {
return basics.Address{}, false, getCreatorError{}
}
if addr, has := l.creators[cidx]; has {
return addr, true, nil
}
Expand Down Expand Up @@ -151,23 +183,30 @@ func compareLoadedResourcesEntries(t *testing.T, expected []prefetcher.LoadedRes
require.Equal(t, expectedForTest, actualForTest)
}

func TestEvaluatorPrefetcher(t *testing.T) {
partitiontest.PartitionTest(t)

rnd := basics.Round(5)
var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
func getPrefetcherTestLedger(rnd basics.Round) *prefetcherTestLedger {

var ledger = &prefetcherTestLedger{
round: rnd,
balances: make(map[basics.Address]ledgercore.AccountData),
creators: make(map[basics.CreatableIndex]basics.Address),
round: rnd,
balances: make(map[basics.Address]ledgercore.AccountData),
creators: make(map[basics.CreatableIndex]basics.Address),
errorTriggerAddress: make(map[basics.Address]bool),
}
ledger.balances[makeAddress(1)] = ledgercore.AccountData{
AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}},
}
ledger.creators[1001] = makeAddress(2)
ledger.creators[2001] = makeAddress(15)

return ledger
}

func TestEvaluatorPrefetcher(t *testing.T) {
partitiontest.PartitionTest(t)

rnd := basics.Round(5)
var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}

ledger := getPrefetcherTestLedger(rnd)
type testCase struct {
name string
signedTxn transactions.SignedTxn
Expand Down Expand Up @@ -485,7 +524,7 @@ func TestEvaluatorPrefetcher(t *testing.T) {

loadedTxnGroup, ok := <-preloadedTxnGroupsCh
require.True(t, ok)
require.NoError(t, loadedTxnGroup.Err)
require.Nil(t, loadedTxnGroup.Err)
compareLoadedAccountDataEntries(t, testCase.accounts, loadedTxnGroup.Accounts)
compareLoadedResourcesEntries(t, testCase.resources, loadedTxnGroup.Resources)

Expand All @@ -495,6 +534,144 @@ func TestEvaluatorPrefetcher(t *testing.T) {
}
}

// Test for error from LookupAsset
func TestAssetLookupError(t *testing.T) {
partitiontest.PartitionTest(t)

rnd := basics.Round(5)
var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
ledger := getPrefetcherTestLedger(rnd)
assetTransferTxn :=
transactions.SignedTxn{
Txn: transactions.Transaction{
Type: protocol.AssetTransferTx,
Header: transactions.Header{
Sender: makeAddress(1),
},
AssetTransferTxnFields: transactions.AssetTransferTxnFields{
XferAsset: 1001,
AssetSender: makeAddress(2),
AssetReceiver: makeAddress(2),
AssetCloseTo: makeAddress(2),
},
},
}

errorReceived := false
groups := make([][]transactions.SignedTxnWithAD, 5)
for i := 0; i < 5; i++ {
groups[i] = make([]transactions.SignedTxnWithAD, 2)
for j := 0; j < 2; j++ {
groups[i][j].SignedTxn = assetTransferTxn
if i == 2 {
// force error in asset lookup in the second txn group only
groups[i][j].SignedTxn.Txn.AssetTransferTxnFields.XferAsset = errorTriggerAssetIndex
}
}
}
preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto])
for loadedTxnGroup := range preloadedTxnGroupsCh {
if loadedTxnGroup.Err != nil {
errorReceived = true
require.Equal(t, int64(2), loadedTxnGroup.Err.GroupIdx)
require.True(t, errors.Is(loadedTxnGroup.Err, assetLookupError{}))
require.Equal(t, makeAddress(2), *loadedTxnGroup.Err.Address)
require.Equal(t, errorTriggerAssetIndex, int(loadedTxnGroup.Err.CreatableIndex))
require.Equal(t, basics.AssetCreatable, loadedTxnGroup.Err.CreatableType)
}
}
require.True(t, errorReceived)
}

// Test for error from GetCreatorForRound
func TestGetCreatorForRoundError(t *testing.T) {
partitiontest.PartitionTest(t)

rnd := basics.Round(5)
var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
ledger := getPrefetcherTestLedger(rnd)

createAssetTxn :=
transactions.SignedTxn{
Txn: transactions.Transaction{
Type: protocol.AssetConfigTx,
Header: transactions.Header{
Sender: makeAddress(1),
},
AssetConfigTxnFields: transactions.AssetConfigTxnFields{
ConfigAsset: errorTriggerCreatableIndex,
},
},
}

errorReceived := false

groups := make([][]transactions.SignedTxnWithAD, 5)
for i := 0; i < 5; i++ {
groups[i] = make([]transactions.SignedTxnWithAD, 10)
for j := 0; j < 10; j++ {
groups[i][j].SignedTxn = createAssetTxn
}
}
preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto])

for loadedTxnGroup := range preloadedTxnGroupsCh {
if loadedTxnGroup.Err != nil {
errorReceived = true
require.True(t, errors.Is(loadedTxnGroup.Err, getCreatorError{}))
require.Nil(t, loadedTxnGroup.Err.Address)
require.Equal(t, errorTriggerCreatableIndex, int(loadedTxnGroup.Err.CreatableIndex))
require.Equal(t, basics.AssetCreatable, loadedTxnGroup.Err.CreatableType)
}
}
require.True(t, errorReceived)
}

// Test for error from LookupWithoutRewards
func TestLookupWithoutRewards(t *testing.T) {
partitiontest.PartitionTest(t)

rnd := basics.Round(5)
var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
ledger := getPrefetcherTestLedger(rnd)

createAssetTxn :=
transactions.SignedTxn{
Txn: transactions.Transaction{
Type: protocol.AssetConfigTx,
Header: transactions.Header{
Sender: makeAddress(1),
},
AssetConfigTxnFields: transactions.AssetConfigTxnFields{
ConfigAsset: 1001,
},
},
}

errorReceived := false

groups := make([][]transactions.SignedTxnWithAD, 5)
for i := 0; i < 5; i++ {
groups[i] = make([]transactions.SignedTxnWithAD, 10)
for j := 0; j < 10; j++ {
groups[i][j].SignedTxn = createAssetTxn
}
}
ledger.errorTriggerAddress[createAssetTxn.Txn.Sender] = true
preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto])

for loadedTxnGroup := range preloadedTxnGroupsCh {
if loadedTxnGroup.Err != nil {
errorReceived = true
require.True(t, errors.Is(loadedTxnGroup.Err, lookupError{}))
require.Equal(t, makeAddress(1), *loadedTxnGroup.Err.Address)
require.Equal(t, 0, int(loadedTxnGroup.Err.CreatableIndex))
require.Equal(t, basics.AssetCreatable, loadedTxnGroup.Err.CreatableType)
}
}
require.True(t, errorReceived)
}

func TestEvaluatorPrefetcherQueueExpansion(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down