Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimistic Execution (backport #16581) #18205

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
### Features

* (server) [#18162](https://github.com/cosmos/cosmos-sdk/pull/18162) Start gRPC & API server in standalone mode.
* (baseapp) [#16581](https://github.com/cosmos/cosmos-sdk/pull/16581) Implement Optimistic Execution as an experimental feature (not enabled by default).

### Improvements

Expand Down Expand Up @@ -326,7 +327,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (crypto/keyring) [#13734](https://github.com/cosmos/cosmos-sdk/pull/13834) The keyring's `Sign` method now takes a new `signMode` argument. It is only used if the signing key is a Ledger hardware device. You can set it to 0 in all other cases.
* (snapshots) [14048](https://github.com/cosmos/cosmos-sdk/pull/14048) Move the Snapshot package to the store package. This is done in an effort group all storage related logic under one package.
* (signing) [#13701](https://github.com/cosmos/cosmos-sdk/pull/) Add `context.Context` as an argument `x/auth/signing.VerifySignature`.
* (store) [#11825](https://github.com/cosmos/cosmos-sdk/pull/11825) Make extension snapshotter interface safer to use, renamed the util function `WriteExtensionItem` to `WriteExtensionPayload`.
* (store) [#11825](https://github.com/cosmos/cosmos-sdk/pull/11825) Make extension snapshotter interface safer to use, renamed the util function `WriteExtensionItem` to `WriteExtensionPayload`.

### Client Breaking Changes

Expand Down Expand Up @@ -401,4 +402,4 @@ Ref: https://keepachangelog.com/en/1.0.0/

## Previous Versions

[CHANGELOG of previous versions](https://github.com/cosmos/cosmos-sdk/blob/main/CHANGELOG.md#v0470---2023-03-14).
[CHANGELOG of previous versions](https://github.com/cosmos/cosmos-sdk/blob/main/CHANGELOG.md#v0470---2023-03-14).
95 changes: 83 additions & 12 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > app.initialHeight {
// abort any running OE
app.optimisticExec.Abort()
app.setState(execModeFinalize, header)
}
Comment on lines 494 to 500
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change potentially affects state.

Call sequence:

(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).ProcessProposal (baseapp/abci.go:468)


Expand Down Expand Up @@ -534,6 +536,19 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
}

// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if resp.Status == abci.ResponseProcessProposal_ACCEPT &&
app.optimisticExec.Enabled() &&
req.Height > app.initialHeight {
app.optimisticExec.Execute(req)
}

return resp, nil
}

Expand Down Expand Up @@ -671,17 +686,11 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r
return resp, err
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
// EndBlock (if defined).
//
// For each raw transaction, i.e. a byte slice, BaseApp will only execute it if
// it adheres to the sdk.Tx interface. Otherwise, the raw transaction will be
// skipped. This is to support compatibility with proposers injecting vote
// extensions into the proposal, which should not themselves be executed in cases
// where they adhere to the sdk.Tx interface.
func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
// internalFinalizeBlock executes the block, called by the Optimistic
// Execution flow or by the FinalizeBlock ABCI method. The context received is
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.ctx
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
var events []abci.Event

if err := app.checkHalt(req.Height, req.Time); err != nil {
Expand Down Expand Up @@ -754,6 +763,15 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
return nil, err
}

// First check for an abort signal after beginBlock, as it's the first place
// we spend any significant amount of time.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

events = append(events, beginBlock.Events...)

// Iterate over all raw transactions in the proposal and attempt to execute
Expand All @@ -780,6 +798,14 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
}

Expand All @@ -792,6 +818,14 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
return nil, err
}

// check after endBlock if we should abort, to avoid propagating the result
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)

Expand All @@ -800,10 +834,47 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
TxResults: txResults,
ValidatorUpdates: endBlock.ValidatorUpdates,
ConsensusParamUpdates: &cp,
AppHash: app.workingHash(),
}, nil
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
// EndBlock (if defined).
//
// For each raw transaction, i.e. a byte slice, BaseApp will only execute it if
// it adheres to the sdk.Tx interface. Otherwise, the raw transaction will be
// skipped. This is to support compatibility with proposers injecting vote
// extensions into the proposal, which should not themselves be executed in cases
// where they adhere to the sdk.Tx interface.
func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
if app.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := app.optimisticExec.AbortIfNeeded(req.Hash)
// Wait for the OE to finish, regardless of whether it was aborted or not
res, err := app.optimisticExec.WaitResult()

// only return if we are not aborting
if !aborted {
if res != nil {
res.AppHash = app.workingHash()
}
return res, err
}

// if it was aborted, we need to reset the state
app.finalizeBlockState = nil
app.optimisticExec.Reset()
}

// if no OE is running, just run the block (this is either a block replay or a OE that got aborted)
res, err := app.internalFinalizeBlock(context.Background(), req)
if res != nil {
res.AppHash = app.workingHash()
}
return res, err
}

// checkHalt checkes if height or time exceeds halt-height or halt-time respectively.
func (app *BaseApp) checkHalt(height int64, time time.Time) error {
var halt bool
Expand Down
41 changes: 41 additions & 0 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2312,3 +2312,44 @@ func TestABCI_PrepareProposal_Panic(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 3, len(resPrepareProposal.Txs))
}

func TestOptimisticExecution(t *testing.T) {
suite := NewBaseAppSuite(t, baseapp.SetOptimisticExecution())

_, err := suite.baseApp.InitChain(&abci.RequestInitChain{
ConsensusParams: &cmtproto.ConsensusParams{},
})
require.NoError(t, err)

// run 50 blocks
for i := 0; i < 50; i++ {
tx := newTxCounter(t, suite.txConfig, 0, 1)
txBytes, err := suite.txConfig.TxEncoder()(tx)
require.NoError(t, err)

reqProcProp := abci.RequestProcessProposal{
Txs: [][]byte{txBytes},
Height: suite.baseApp.LastBlockHeight() + 1,
Hash: []byte("some-hash" + strconv.FormatInt(suite.baseApp.LastBlockHeight()+1, 10)),
}

respProcProp, err := suite.baseApp.ProcessProposal(&reqProcProp)
require.Equal(t, abci.ResponseProcessProposal_ACCEPT, respProcProp.Status)
require.NoError(t, err)

reqFinalizeBlock := abci.RequestFinalizeBlock{
Height: reqProcProp.Height,
Txs: reqProcProp.Txs,
Hash: reqProcProp.Hash,
}

respFinalizeBlock, err := suite.baseApp.FinalizeBlock(&reqFinalizeBlock)
require.NoError(t, err)
require.Len(t, respFinalizeBlock.TxResults, 1)

_, err = suite.baseApp.Commit()
require.NoError(t, err)
}

require.Equal(t, int64(50), suite.baseApp.LastBlockHeight())
}
6 changes: 6 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"cosmossdk.io/store/snapshots"
storetypes "cosmossdk.io/store/types"

"github.com/cosmos/cosmos-sdk/baseapp/oe"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
Expand Down Expand Up @@ -178,6 +179,11 @@ type BaseApp struct {
chainID string

cdc codec.Codec

// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
157 changes: 157 additions & 0 deletions baseapp/oe/optimistic_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package oe

import (
"bytes"
"context"
"encoding/hex"
"math/rand"
"sync"
"time"

abci "github.com/cometbft/cometbft/abci/types"

"cosmossdk.io/log"
)

// FinalizeBlockFunc is the function that is called by the OE to finalize the
// block. It is the same as the one in the ABCI app.
type FinalizeBlockFunc func(context.Context, *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error)

// OptimisticExecution is a struct that contains the OE context. It is used to
// run the FinalizeBlock function in a goroutine, and to abort it if needed.
type OptimisticExecution struct {
finalizeBlockFunc FinalizeBlockFunc // ABCI FinalizeBlock function with a context
logger log.Logger

mtx sync.Mutex
stopCh chan struct{}
request *abci.RequestFinalizeBlock
response *abci.ResponseFinalizeBlock
err error
cancelFunc func() // cancel function for the context
initialized bool // A boolean value indicating whether the struct has been initialized

// debugging/testing options
abortRate int // number from 0 to 100 that determines the percentage of OE that should be aborted
}

// NewOptimisticExecution initializes the Optimistic Execution context but does not start it.
func NewOptimisticExecution(logger log.Logger, fn FinalizeBlockFunc, opts ...func(*OptimisticExecution)) *OptimisticExecution {
logger = logger.With(log.ModuleKey, "oe")
oe := &OptimisticExecution{logger: logger, finalizeBlockFunc: fn}
for _, opt := range opts {
opt(oe)
}
return oe
}

// WithAbortRate sets the abort rate for the OE. The abort rate is a number from
// 0 to 100 that determines the percentage of OE that should be aborted.
// This is for testing purposes only and must not be used in production.
func WithAbortRate(rate int) func(*OptimisticExecution) {
return func(oe *OptimisticExecution) {
oe.abortRate = rate
}
}

// Reset resets the OE context. Must be called whenever we want to invalidate
// the current OE.
func (oe *OptimisticExecution) Reset() {
oe.mtx.Lock()
defer oe.mtx.Unlock()
oe.request = nil
oe.response = nil
oe.err = nil
oe.initialized = false
}

func (oe *OptimisticExecution) Enabled() bool {
return oe != nil
}

// Initialized returns true if the OE was initialized, meaning that it contains
// a request and it was run or it is running.
func (oe *OptimisticExecution) Initialized() bool {
if oe == nil {
return false
}
oe.mtx.Lock()
defer oe.mtx.Unlock()

return oe.initialized
}

// Execute initializes the OE and starts it in a goroutine.
func (oe *OptimisticExecution) Execute(req *abci.RequestProcessProposal) {
oe.mtx.Lock()
defer oe.mtx.Unlock()

oe.stopCh = make(chan struct{})
oe.request = &abci.RequestFinalizeBlock{
Txs: req.Txs,
DecidedLastCommit: req.ProposedLastCommit,
Misbehavior: req.Misbehavior,
Hash: req.Hash,
Height: req.Height,
Time: req.Time,
NextValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
}

oe.logger.Debug("OE started", "height", req.Height, "hash", hex.EncodeToString(req.Hash), "time", req.Time.String())
ctx, cancel := context.WithCancel(context.Background())
oe.cancelFunc = cancel
oe.initialized = true

go func() {
start := time.Now()
Dismissed Show dismissed Hide dismissed
resp, err := oe.finalizeBlockFunc(ctx, oe.request)
oe.mtx.Lock()
executionTime := time.Since(start)
oe.logger.Debug("OE finished", "duration", executionTime.String(), "height", req.Height, "hash", hex.EncodeToString(req.Hash))
oe.response, oe.err = resp, err
close(oe.stopCh)
oe.mtx.Unlock()
}()
Comment on lines +106 to +115

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

// AbortIfNeeded aborts the OE if the request hash is not the same as the one in
// the running OE. Returns true if the OE was aborted.
func (oe *OptimisticExecution) AbortIfNeeded(reqHash []byte) bool {
if oe == nil {
return false
}

oe.mtx.Lock()
defer oe.mtx.Unlock()

if !bytes.Equal(oe.request.Hash, reqHash) {
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
oe.cancelFunc()
return true
} else if oe.abortRate > 0 && rand.Intn(100) < oe.abortRate {
// this is for test purposes only, we can emulate a certain percentage of
// OE needed to be aborted.
oe.cancelFunc()
oe.logger.Error("OE aborted due to test abort rate")
return true
}

return false
}

// Abort aborts the OE unconditionally and waits for it to finish.
func (oe *OptimisticExecution) Abort() {
if oe == nil || oe.cancelFunc == nil {
return
}

oe.cancelFunc()
<-oe.stopCh
}

// WaitResult waits for the OE to finish and returns the result.
func (oe *OptimisticExecution) WaitResult() (*abci.ResponseFinalizeBlock, error) {
<-oe.stopCh
return oe.response, oe.err
}
Loading