Skip to content

Commit

Permalink
Merge pull request #5658 from onflow/leo/v0.33-limit-get-block-height…
Browse files Browse the repository at this point in the history
…-checkpoint-protocol-snapshot

Backport v0.33 limit get block height and checkpoint protocol snapshot
  • Loading branch information
zhangchiqing authored Apr 12, 2024
2 parents 874cd23 + 06ef2e4 commit 67f04e2
Show file tree
Hide file tree
Showing 17 changed files with 594 additions and 119 deletions.
6 changes: 6 additions & 0 deletions admin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,9 @@ curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "get_config" }}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "ingest-tx-rate-limit", "data": { "command": "set_config", "limit": 1, "burst": 1 }}'
```

### To create a protocol snapshot for latest checkpoint (execution node only)
```
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "protocol-snapshot"}'
curl localhost:9002/admin/run_command -H 'Content-Type: application/json' -d '{"commandName": "protocol-snapshot", "data": { "blocks-to-skip": 10 }}'
```
112 changes: 112 additions & 0 deletions admin/commands/storage/read_protocol_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package storage

import (
"context"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/admin"
"github.com/onflow/flow-go/admin/commands"
"github.com/onflow/flow-go/cmd/util/common"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/inmem"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
)

var _ commands.AdminCommand = (*ProtocolSnapshotCommand)(nil)

type protocolSnapshotData struct {
blocksToSkip uint
}

// ProtocolSnapshotCommand is a command that generates a protocol snapshot for a checkpoint (usually latest checkpoint)
// This command is only available for execution node
type ProtocolSnapshotCommand struct {
logger zerolog.Logger
state protocol.State
headers storage.Headers
seals storage.Seals
checkpointDir string // the directory where the checkpoint is stored
}

func NewProtocolSnapshotCommand(
logger zerolog.Logger,
state protocol.State,
headers storage.Headers,
seals storage.Seals,
checkpointDir string,
) *ProtocolSnapshotCommand {
return &ProtocolSnapshotCommand{
logger: logger,
state: state,
headers: headers,
seals: seals,
checkpointDir: checkpointDir,
}
}

func (s *ProtocolSnapshotCommand) Handler(_ context.Context, req *admin.CommandRequest) (interface{}, error) {
validated, ok := req.ValidatorData.(*protocolSnapshotData)
if !ok {
return nil, fmt.Errorf("fail to parse validator data")
}

blocksToSkip := validated.blocksToSkip

s.logger.Info().Uint("blocksToSkip", blocksToSkip).Msgf("admintool: generating protocol snapshot")

snapshot, sealedHeight, commit, err := common.GenerateProtocolSnapshotForCheckpoint(
s.logger, s.state, s.headers, s.seals, s.checkpointDir, blocksToSkip)
if err != nil {
return nil, fmt.Errorf("could not generate protocol snapshot for checkpoint, checkpointDir %v: %w",
s.checkpointDir, err)
}

header, err := snapshot.Head()
if err != nil {
return nil, fmt.Errorf("could not get header from snapshot: %w", err)
}

serializable, err := inmem.FromSnapshot(snapshot)
if err != nil {
return nil, fmt.Errorf("could not convert snapshot to serializable: %w", err)
}

s.logger.Info().
Uint64("finalized_height", header.Height). // finalized height
Hex("finalized_block_id", logging.Entity(header)).
Uint64("sealed_height", sealedHeight).
Hex("sealed_commit", commit[:]). // not the commit for the finalized height, but for the sealed height
Uint("blocks_to_skip", blocksToSkip).
Msgf("admintool: protocol snapshot generated successfully")

return commands.ConvertToMap(serializable.Encodable())
}

func (s *ProtocolSnapshotCommand) Validator(req *admin.CommandRequest) error {
// blocksToSkip is the number of blocks to skip when iterating the sealed heights to find the state commitment
// in the checkpoint file.
// default is 0
validated := &protocolSnapshotData{
blocksToSkip: uint(0),
}

input, ok := req.Data.(map[string]interface{})
if ok {
data, ok := input["blocks-to-skip"]

if ok {
n, ok := data.(float64)
if !ok {
return fmt.Errorf("could not parse blocks-to-skip: %v", data)
}
validated.blocksToSkip = uint(n)
}
}

req.ValidatorData = validated

return nil
}
9 changes: 9 additions & 0 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
AdminCommand("get-transactions", func(conf *NodeConfig) commands.AdminCommand {
return storageCommands.NewGetTransactionsCommand(conf.State, conf.Storage.Payloads, conf.Storage.Collections)
}).
AdminCommand("protocol-snapshot", func(conf *NodeConfig) commands.AdminCommand {
return storageCommands.NewProtocolSnapshotCommand(
conf.Logger,
conf.State,
conf.Storage.Headers,
conf.Storage.Seals,
exeNode.exeConf.triedir,
)
}).
Module("mutable follower state", exeNode.LoadMutableFollowerState).
Module("system specs", exeNode.LoadSystemSpecs).
Module("execution metrics", exeNode.LoadExecutionMetrics).
Expand Down
37 changes: 37 additions & 0 deletions cmd/util/cmd/read-protocol-state/cmd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ import (
"github.com/spf13/cobra"

"github.com/onflow/flow-go/cmd/util/cmd/common"
commonFuncs "github.com/onflow/flow-go/cmd/util/common"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/state/protocol/inmem"
)

var flagCheckpointDir string
var flagCheckpointScanStep uint
var flagCheckpointScanEndHeight int64

var SnapshotCmd = &cobra.Command{
Use: "snapshot",
Short: "Read snapshot from protocol state",
Expand All @@ -26,6 +32,15 @@ func init() {

SnapshotCmd.Flags().BoolVar(&flagSealed, "sealed", false,
"get sealed block")

SnapshotCmd.Flags().StringVar(&flagCheckpointDir, "checkpoint-dir", "",
"(execution node only) get snapshot from the latest checkpoint file in the given checkpoint directory")

SnapshotCmd.Flags().UintVar(&flagCheckpointScanStep, "checkpoint-scan-step", 0,
"(execution node only) scan step for finding sealed height by checkpoint (use with --checkpoint-dir flag)")

SnapshotCmd.Flags().Int64Var(&flagCheckpointScanEndHeight, "checkpoint-scan-end-height", -1,
"(execution node only) scan end height for finding sealed height by checkpoint (use with --checkpoint-dir flag)")
}

func runSnapshot(*cobra.Command, []string) {
Expand All @@ -49,6 +64,28 @@ func runSnapshot(*cobra.Command, []string) {
} else if flagSealed {
log.Info().Msgf("get last sealed snapshot")
snapshot = state.Sealed()
} else if flagCheckpointDir != "" {
log.Info().Msgf("get snapshot for latest checkpoint in directory %v (step: %v, endHeight: %v)",
flagCheckpointDir, flagCheckpointScanStep, flagCheckpointScanEndHeight)
var protocolSnapshot protocol.Snapshot
var sealedHeight uint64
var sealedCommit flow.StateCommitment
if flagCheckpointScanEndHeight < 0 {
// using default end height which is the last sealed height
protocolSnapshot, sealedHeight, sealedCommit, err = commonFuncs.GenerateProtocolSnapshotForCheckpoint(
log.Logger, state, storages.Headers, storages.Seals, flagCheckpointDir, flagCheckpointScanStep)
} else {
// using customized end height
protocolSnapshot, sealedHeight, sealedCommit, err = commonFuncs.GenerateProtocolSnapshotForCheckpointWithHeights(
log.Logger, state, storages.Headers, storages.Seals, flagCheckpointDir, flagCheckpointScanStep, uint64(flagCheckpointScanEndHeight))
}

if err != nil {
log.Fatal().Err(err).Msgf("could not generate protocol snapshot for checkpoint in dir: %v", flagCheckpointDir)
}

snapshot = protocolSnapshot
log.Info().Msgf("snapshot found, sealed height %v, commit %x", sealedHeight, sealedCommit)
}

head, err := snapshot.Head()
Expand Down
Loading

0 comments on commit 67f04e2

Please sign in to comment.