Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Backport tx error messages PRs to v0.37 branch #6866

Open
wants to merge 59 commits into
base: v0.37
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
b7867bc
Added implementation of storing in the badger db transaction result e…
UlyanaAndrukhiv Sep 10, 2024
411005e
Added fetching and storing transaction result error messages, refacto…
UlyanaAndrukhiv Sep 12, 2024
e65b635
Updated API LookupErrorMessage calls, updated tx result error message…
UlyanaAndrukhiv Sep 17, 2024
30177db
Linted
UlyanaAndrukhiv Sep 17, 2024
13a687c
Added verification check for process transaction result error messages
UlyanaAndrukhiv Sep 17, 2024
f5d4965
Added tests for storing transaction result error messages
UlyanaAndrukhiv Sep 17, 2024
c1fe4fb
Updated storing tx result error messages by index, updated tests
UlyanaAndrukhiv Sep 18, 2024
7cbf1d6
Added constant message storing to db if no execution nodes return a v…
UlyanaAndrukhiv Sep 18, 2024
1c9be8f
Updated storing failed tx error messages for LookupErrorMessageByTran…
UlyanaAndrukhiv Sep 18, 2024
d599645
Added missed check for LookupErrorMessageByTransactionID
UlyanaAndrukhiv Sep 18, 2024
f88d59b
Updated lookup error messages api calls
UlyanaAndrukhiv Sep 19, 2024
c60b8c0
Updated lookup error messages api checks, added part of backend tx tests
UlyanaAndrukhiv Sep 19, 2024
508ee23
Added more tests for LookupErrorMessages, added godoc for tests
UlyanaAndrukhiv Sep 19, 2024
7fa0aed
Removed outdated comment
UlyanaAndrukhiv Sep 19, 2024
9c2cce3
Updated access tests
UlyanaAndrukhiv Sep 19, 2024
323a8f8
Simplified creation of mock connection factory in backend tests
UlyanaAndrukhiv Sep 20, 2024
38c6f8f
Added functional test for properly fetching processed and storing tra…
UlyanaAndrukhiv Sep 23, 2024
5b3944b
Refactored backend api according to suggestion
UlyanaAndrukhiv Sep 23, 2024
50be0a8
Added integration test for storing tx error messages
UlyanaAndrukhiv Sep 24, 2024
202e7cc
Updated last commit
UlyanaAndrukhiv Sep 24, 2024
43e7e3c
Updated TestTransactionResultErrorMessagesAreFetched
UlyanaAndrukhiv Sep 24, 2024
c88cb46
Linted
UlyanaAndrukhiv Sep 24, 2024
33fd510
Removed unnecessary changes
UlyanaAndrukhiv Sep 25, 2024
59a5978
Changed creation place of TransactionResultErrorMessages storage, upd…
UlyanaAndrukhiv Oct 1, 2024
d16fd5f
Added missed check for engine to avoid extra work
UlyanaAndrukhiv Oct 1, 2024
f0de904
Updated according to suggested comments
UlyanaAndrukhiv Oct 4, 2024
e156cc1
Updated api according to comments
UlyanaAndrukhiv Oct 4, 2024
e4dd0bc
Updated godoc for storage
UlyanaAndrukhiv Oct 7, 2024
2f29bed
Updated store tx error message as suggested
UlyanaAndrukhiv Oct 7, 2024
ca21b6f
Added another trigger point to get error messages, updated error hand…
UlyanaAndrukhiv Oct 7, 2024
40c6f7d
Udated tests
UlyanaAndrukhiv Oct 7, 2024
595a8f1
Added db Exists testing to transaction result messages testing
UlyanaAndrukhiv Oct 7, 2024
9075281
Updated process of handling transaction error messages by creting new…
UlyanaAndrukhiv Oct 14, 2024
f907458
Added retry mechanism for tx error messages handling in case of failure
UlyanaAndrukhiv Oct 14, 2024
25b5104
Removed unused parameter in function
UlyanaAndrukhiv Oct 15, 2024
954271b
Updated unite tests
UlyanaAndrukhiv Oct 15, 2024
64ba3b3
Updated naming and flag description
UlyanaAndrukhiv Oct 16, 2024
919a4d0
Updated fetching tx error messages according to comments, added expon…
UlyanaAndrukhiv Oct 17, 2024
a7383e0
Updated godoc
UlyanaAndrukhiv Oct 17, 2024
9e6a45d
Updated godoc
UlyanaAndrukhiv Oct 17, 2024
cc4189a
Apply suggestions from code review
UlyanaAndrukhiv Oct 18, 2024
b595a1b
Updated logging and errors according to comments
UlyanaAndrukhiv Oct 18, 2024
c817416
Refactored syncing tx error messages by separated core and created se…
UlyanaAndrukhiv Oct 22, 2024
586ce4f
Added aditional checks for ingestion engine in case if tx error messa…
UlyanaAndrukhiv Oct 23, 2024
c896b53
Added missing mock methods because of the v0.37 branch rpc backend impl
AndriiDiachuk Jan 13, 2025
4044464
Added command to access node, implemented Validator to command
UlyanaAndrukhiv Sep 26, 2024
87451ce
Updated validation for command
UlyanaAndrukhiv Oct 3, 2024
2870124
Added tests for command
UlyanaAndrukhiv Oct 3, 2024
96c52d3
Updated readme for admin commands
UlyanaAndrukhiv Oct 3, 2024
eb7babc
Updated test
UlyanaAndrukhiv Oct 3, 2024
70909e0
Added godoc for tests
UlyanaAndrukhiv Oct 3, 2024
84ecc65
Moved mock closer to unittest mock module, updated tests
UlyanaAndrukhiv Oct 3, 2024
a0647f8
Added godoc
UlyanaAndrukhiv Oct 8, 2024
9b93ae9
Updated godoc
UlyanaAndrukhiv Oct 8, 2024
e081bfe
Merge pull request #6525 from The-K-R-O-K/UlyanaAndrukhiv/6413-backfi…
peterargue Nov 7, 2024
ee4efbf
Commented umerged code
AndriiDiachuk Jan 14, 2025
69a74eb
Merge pull request #6499 from The-K-R-O-K/UlyanaAndrukhiv/6497-refact…
peterargue Nov 1, 2024
3f998c2
Linted
AndriiDiachuk Jan 14, 2025
3ce30d5
Mocked missing methods for test:
AndriiDiachuk Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,15 +1434,19 @@ func (h *Handler) SendAndSubscribeTransactionStatuses(
messageIndex := counters.NewMonotonousCounter(0)
return subscription.HandleSubscription(sub, func(txResults []*TransactionResult) error {
for i := range txResults {
value := messageIndex.Increment()
index := messageIndex.Value()
if ok := messageIndex.Set(index + 1); !ok {
return status.Errorf(codes.Internal, "message index already incremented to %d", messageIndex.Value())
}

err = stream.Send(&access.SendAndSubscribeTransactionStatusesResponse{
TransactionResults: TransactionResultToMessage(txResults[i]),
MessageIndex: value,
MessageIndex: index,
})
if err != nil {
return rpc.ConvertError(err, "could not send response", codes.Internal)
}

}

return nil
Expand Down
5 changes: 5 additions & 0 deletions admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,8 @@ curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "protocol-snapshot"}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "protocol-snapshot", "data": { "blocks-to-skip": 10 }}'
```

### To backfill transaction error messages
```
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "backfill-tx-error-messages", "data": { "start-height": 340, "end-height": 343, "execution-node-ids":["ec7b934df29248d574ae1cc33ae77f22f0fcf96a79e009224c46374d1837824e", "8cbdc8d24a28899a33140cb68d4146cd6f2f6c18c57f54c299f26351d126919e"] }}'
```
239 changes: 239 additions & 0 deletions admin/commands/storage/backfill_tx_error_messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package storage

import (
"context"
"fmt"

"github.com/onflow/flow-go/admin"
"github.com/onflow/flow-go/admin/commands"
"github.com/onflow/flow-go/engine/access/index"
"github.com/onflow/flow-go/engine/access/rpc/backend"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"

execproto "github.com/onflow/flow/protobuf/go/flow/execution"
)

var _ commands.AdminCommand = (*BackfillTxErrorMessagesCommand)(nil)

// backfillTxErrorMessagesRequest represents the input parameters for
// backfilling transaction error messages.
type backfillTxErrorMessagesRequest struct {
startHeight uint64 // Start height from which to begin backfilling.
endHeight uint64 // End height up to which backfilling is performed.
executionNodeIds flow.IdentityList // List of execution node IDs to be used for backfilling.
}

// BackfillTxErrorMessagesCommand executes a command to backfill
// transaction error messages by fetching them from execution nodes.
type BackfillTxErrorMessagesCommand struct {
state protocol.State
txResultsIndex *index.TransactionResultsIndex
txErrorMessages storage.TransactionResultErrorMessages
backend *backend.Backend
}

// NewBackfillTxErrorMessagesCommand creates a new instance of BackfillTxErrorMessagesCommand
func NewBackfillTxErrorMessagesCommand(
state protocol.State,
txResultsIndex *index.TransactionResultsIndex,
txErrorMessages storage.TransactionResultErrorMessages,
backend *backend.Backend,
) commands.AdminCommand {
return &BackfillTxErrorMessagesCommand{
state: state,
txResultsIndex: txResultsIndex,
txErrorMessages: txErrorMessages,
backend: backend,
}
}

// Validator validates the input for the backfill command. The input is validated
// for field types, boundaries, and coherence of start and end heights.
//
// Expected errors during normal operation:
// - admin.InvalidAdminReqError - if start-height is greater than end-height or
// if the input format is invalid, if an invalid execution node ID is provided.
func (b *BackfillTxErrorMessagesCommand) Validator(request *admin.CommandRequest) error {
input, ok := request.Data.(map[string]interface{})
if !ok {
return admin.NewInvalidAdminReqFormatError("expected map[string]any")
}

data := &backfillTxErrorMessagesRequest{}

rootHeight := b.state.Params().SealedRoot().Height
data.startHeight = rootHeight // Default value

if startHeightIn, ok := input["start-height"]; ok {
if startHeight, err := parseN(startHeightIn); err != nil {
return admin.NewInvalidAdminReqErrorf("invalid 'start-height' field: %w", err)
} else if startHeight > rootHeight {
data.startHeight = startHeight
}
}

sealed, err := b.state.Sealed().Head()
if err != nil {
return fmt.Errorf("failed to lookup sealed header: %w", err)
}
data.endHeight = sealed.Height // Default value

if endHeightIn, ok := input["end-height"]; ok {
if endHeight, err := parseN(endHeightIn); err != nil {
return admin.NewInvalidAdminReqErrorf("invalid 'end-height' field: %w", err)
} else if endHeight < sealed.Height {
data.endHeight = endHeight
}
}

if data.endHeight < data.startHeight {
return admin.NewInvalidAdminReqErrorf("start-height %d should not be smaller than end-height %d", data.startHeight, data.endHeight)
}

identities, err := b.state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution))
if err != nil {
return fmt.Errorf("failed to retreive execution IDs: %w", err)
}

if executionNodeIdsIn, ok := input["execution-node-ids"]; ok {
executionNodeIds, err := b.parseExecutionNodeIds(executionNodeIdsIn, identities)
if err != nil {
return err
}
data.executionNodeIds = executionNodeIds
} else {
// in case no execution node ids provided, the command will use any valid execution node
data.executionNodeIds = identities
}

request.ValidatorData = data

return nil
}

// Handler performs the backfilling operation by fetching missing transaction
// error messages for blocks within the specified height range. Uses execution nodes
// from data.executionNodeIds if available, otherwise defaults to valid execution nodes.
//
// No errors are expected during normal operation.
func (b *BackfillTxErrorMessagesCommand) Handler(ctx context.Context, request *admin.CommandRequest) (interface{}, error) {
if b.txErrorMessages == nil {
return nil, fmt.Errorf("failed to backfill, could not get transaction error messages storage")
}

data := request.ValidatorData.(*backfillTxErrorMessagesRequest)

for height := data.startHeight; height <= data.endHeight; height++ {
header, err := b.state.AtHeight(height).Head()
if err != nil {
return nil, fmt.Errorf("failed to get block header: %w", err)
}

blockID := header.ID()

exists, err := b.txErrorMessages.Exists(blockID)
if err != nil {
return nil, fmt.Errorf("could not check existance of transaction result error messages: %w", err)
}

if exists {
continue
}

results, err := b.txResultsIndex.ByBlockID(blockID, height)
if err != nil {
return nil, fmt.Errorf("failed to get result by block ID: %w", err)
}

fetchTxErrorMessages := false
for _, txResult := range results {
if txResult.Failed {
fetchTxErrorMessages = true
}
}

if !fetchTxErrorMessages {
continue
}

req := &execproto.GetTransactionErrorMessagesByBlockIDRequest{
BlockId: convert.IdentifierToMessage(blockID),
}

resp, execNode, err := b.backend.GetTransactionErrorMessagesFromAnyEN(ctx, data.executionNodeIds.ToSkeleton(), req)
if err != nil {
return nil, fmt.Errorf("failed to retrieve transaction error messages for block id %#v: %w", blockID, err)
}

err = b.storeTransactionResultErrorMessages(blockID, resp, execNode)
if err != nil {
return nil, fmt.Errorf("could not store error messages: %w", err)
}
}

return nil, nil
}

// parseExecutionNodeIds converts a list of node IDs from input to flow.IdentityList.
// Returns an error if the IDs are invalid or empty.
//
// Expected errors during normal operation:
// - admin.InvalidAdminReqParameterError - if execution-node-ids is empty or has an invalid format.
func (b *BackfillTxErrorMessagesCommand) parseExecutionNodeIds(executionNodeIdsIn interface{}, allIdentities flow.IdentityList) (flow.IdentityList, error) {
var ids flow.IdentityList

switch executionNodeIds := executionNodeIdsIn.(type) {
case []string:
if len(executionNodeIds) == 0 {
return nil, admin.NewInvalidAdminReqParameterError("execution-node-ids", "must be a non empty list of string", executionNodeIdsIn)
}
requestedENIdentifiers, err := commonrpc.IdentifierList(executionNodeIds)
if err != nil {
return nil, admin.NewInvalidAdminReqParameterError("execution-node-ids", err.Error(), executionNodeIdsIn)
}

for _, en := range requestedENIdentifiers {
id, exists := allIdentities.ByNodeID(en)
if !exists {
return nil, admin.NewInvalidAdminReqParameterError("execution-node-ids", "could not found execution nodes by provided ids", executionNodeIdsIn)
}
ids = append(ids, id)
}
default:
return nil, admin.NewInvalidAdminReqParameterError("execution-node-ids", "must be a list of string", executionNodeIdsIn)
}

return ids, nil
}

// storeTransactionResultErrorMessages saves retrieved error messages for a given block ID.
//
// No errors are expected during normal operation.
func (b *BackfillTxErrorMessagesCommand) storeTransactionResultErrorMessages(
blockID flow.Identifier,
errorMessagesResponses []*execproto.GetTransactionErrorMessagesResponse_Result,
execNode *flow.IdentitySkeleton,
) error {
errorMessages := make([]flow.TransactionResultErrorMessage, 0, len(errorMessagesResponses))
for _, value := range errorMessagesResponses {
errorMessage := flow.TransactionResultErrorMessage{
ErrorMessage: value.ErrorMessage,
TransactionID: convert.MessageToIdentifier(value.TransactionId),
Index: value.Index,
ExecutorID: execNode.NodeID,
}
errorMessages = append(errorMessages, errorMessage)
}

err := b.txErrorMessages.Store(blockID, errorMessages)
if err != nil {
return fmt.Errorf("failed to store transaction error messages: %w", err)
}

return nil
}
Loading
Loading