Skip to content

Commit

Permalink
feat(server/v2/stf): delayed marshalling of typed event (#22684)
Browse files Browse the repository at this point in the history
(cherry picked from commit 7fa2356)

# Conflicts:
#	server/v2/stf/core_event_service.go
  • Loading branch information
julienrbrt authored and mergify[bot] committed Dec 4, 2024
1 parent a595e3c commit db69831
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 64 deletions.
28 changes: 21 additions & 7 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ type consensus[T transaction.Tx] struct {
streamingManager streaming.Manager
mempool mempool.Mempool[T]

cfg Config
chainID string
indexedEvents map[string]struct{}
cfg Config
chainID string
indexedABCIEvents map[string]struct{}

initialHeight uint64
// this is only available after this node has committed a block (in FinalizeBlock),
Expand Down Expand Up @@ -105,9 +105,16 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
return nil, err
}

events, err := intoABCIEvents(resp.Events, c.indexedEvents)
if err != nil {
return nil, err
events := make([]abci.Event, 0)
if !c.cfg.AppTomlConfig.DisableABCIEvents {
events, err = intoABCIEvents(
resp.Events,
c.indexedABCIEvents,
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
)
if err != nil {
return nil, err
}
}

cometResp := &abciproto.CheckTxResponse{
Expand All @@ -116,6 +123,7 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques
GasUsed: uint64ToInt64(resp.GasUsed),
Events: events,
}

if resp.Error != nil {
space, code, log := errorsmod.ABCIInfo(resp.Error, c.cfg.AppTomlConfig.Trace)
cometResp.Code = code
Expand Down Expand Up @@ -557,7 +565,13 @@ func (c *consensus[T]) FinalizeBlock(
return nil, err
}

return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace)
return finalizeBlockResponse(
resp,
cp,
appHash,
c.indexedABCIEvents,
c.cfg.AppTomlConfig,
)
}

func (c *consensus[T]) internalFinalizeBlock(
Expand Down
26 changes: 15 additions & 11 deletions server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type Config struct {
func DefaultAppTomlConfig() *AppTomlConfig {
return &AppTomlConfig{
MinRetainBlocks: 0,
IndexEvents: make([]string, 0),
HaltHeight: 0,
HaltTime: 0,
Address: "tcp://127.0.0.1:26658",
Expand All @@ -28,22 +27,27 @@ func DefaultAppTomlConfig() *AppTomlConfig {
Target: make(map[string]indexer.Config),
ChannelBufferSize: 1024,
},
IndexABCIEvents: make([]string, 0),
DisableIndexABCIEvents: false,
DisableABCIEvents: false,
}
}

type AppTomlConfig struct {
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
IndexEvents []string `mapstructure:"index-events" toml:"index-events" comment:"index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`
MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."`
HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."`
Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."`
Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"`
Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."`
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`

// Sub configs
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."`
DisableIndexABCIEvents bool `mapstructure:"disable-index-abci-events" toml:"disable-index-abci-events" comment:"disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse."`
DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing."`
}

// CfgOption is a function that allows to overwrite the default server configuration.
Expand Down
6 changes: 5 additions & 1 deletion server/v2/cometbft/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a
return nil, errorsmod.Wrap(err, "failed to simulate tx")
}

bz, err := intoABCISimulationResponse(txResult, c.indexedEvents)
bz, err := intoABCISimulationResponse(
txResult,
c.indexedABCIEvents,
c.cfg.AppTomlConfig.DisableIndexABCIEvents,
)
if err != nil {
return nil, errorsmod.Wrap(err, "failed to marshal txResult")
}
Expand Down
8 changes: 4 additions & 4 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func New[T transaction.Tx](
}
}

indexEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexEvents))
for _, e := range srv.config.AppTomlConfig.IndexEvents {
indexEvents[e] = struct{}{}
indexedABCIEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexABCIEvents))
for _, e := range srv.config.AppTomlConfig.IndexABCIEvents {
indexedABCIEvents[e] = struct{}{}
}

sc := store.GetStateCommitment().(snapshots.CommitSnapshotter)
Expand Down Expand Up @@ -183,7 +183,7 @@ func New[T transaction.Tx](
checkTxHandler: srv.serverOptions.CheckTxHandler,
extendVote: srv.serverOptions.ExtendVoteHandler,
chainID: chainID,
indexedEvents: indexEvents,
indexedABCIEvents: indexedABCIEvents,
initialHeight: 0,
queryHandlersMap: queryHandlers,
getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry),
Expand Down
77 changes: 39 additions & 38 deletions server/v2/cometbft/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,23 @@ func finalizeBlockResponse(
cp *cmtproto.ConsensusParams,
appHash []byte,
indexSet map[string]struct{},
debug bool,
cfg *AppTomlConfig,
) (*abci.FinalizeBlockResponse, error) {
allEvents := append(in.BeginBlockEvents, in.EndBlockEvents...)

events, err := intoABCIEvents(allEvents, indexSet)
if err != nil {
return nil, err
events := make([]abci.Event, 0)

if !cfg.DisableABCIEvents {
var err error
events, err = intoABCIEvents(
append(in.BeginBlockEvents, in.EndBlockEvents...),
indexSet,
cfg.DisableIndexABCIEvents,
)
if err != nil {
return nil, err
}
}

txResults, err := intoABCITxResults(in.TxResults, indexSet, debug)
txResults, err := intoABCITxResults(in.TxResults, indexSet, cfg)
if err != nil {
return nil, err
}
Expand All @@ -91,6 +98,7 @@ func finalizeBlockResponse(
AppHash: appHash,
ConsensusParamUpdates: cp,
}

return resp, nil
}

Expand All @@ -108,72 +116,65 @@ func intoABCIValidatorUpdates(updates []appmodulev2.ValidatorUpdate) []abci.Vali
return valsetUpdates
}

func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{}, debug bool) ([]*abci.ExecTxResult, error) {
func intoABCITxResults(
results []server.TxResult,
indexSet map[string]struct{},
cfg *AppTomlConfig,
) ([]*abci.ExecTxResult, error) {
res := make([]*abci.ExecTxResult, len(results))
for i := range results {
events, err := intoABCIEvents(results[i].Events, indexSet)
if err != nil {
return nil, err
var err error
events := make([]abci.Event, 0)

if !cfg.DisableABCIEvents {
events, err = intoABCIEvents(results[i].Events, indexSet, cfg.DisableIndexABCIEvents)
if err != nil {
return nil, err
}
}

res[i] = responseExecTxResultWithEvents(
results[i].Error,
results[i].GasWanted,
results[i].GasUsed,
events,
debug,
cfg.Trace,
)
}

return res, nil
}

func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci.Event, error) {
func intoABCIEvents(events []event.Event, indexSet map[string]struct{}, indexNone bool) ([]abci.Event, error) {
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(events))
for i, e := range events {
attributes, err := e.Attributes()
attrs, err := e.Attributes()
if err != nil {
return nil, err
}

abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
Attributes: make([]abci.EventAttribute, len(attrs)),
}

for j, attr := range attributes {
for j, attr := range attrs {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
Index: index || indexAll,
Index: !indexNone && (index || indexAll),
}
}
}
return abciEvents, nil
}

func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}) ([]byte, error) {
indexAll := len(indexSet) == 0
abciEvents := make([]abci.Event, len(txRes.Events))
for i, e := range txRes.Events {
attributes, err := e.Attributes()
if err != nil {
return nil, err
}
abciEvents[i] = abci.Event{
Type: e.Type,
Attributes: make([]abci.EventAttribute, len(attributes)),
}

for j, attr := range attributes {
_, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)]
abciEvents[i].Attributes[j] = abci.EventAttribute{
Key: attr.Key,
Value: attr.Value,
Index: index || indexAll,
}
}
func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}, indexNone bool) ([]byte, error) {
abciEvents, err := intoABCIEvents(txRes.Events, indexSet, indexNone)
if err != nil {
return nil, err
}

msgResponses := make([]*gogoany.Any, len(txRes.Resp))
Expand Down
112 changes: 112 additions & 0 deletions server/v2/stf/core_event_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package stf

import (
"bytes"
"context"
"encoding/json"
"maps"
"slices"

"github.com/cosmos/gogoproto/jsonpb"
gogoproto "github.com/cosmos/gogoproto/proto"

"cosmossdk.io/core/event"
"cosmossdk.io/core/transaction"
)

func NewEventService() event.Service {
return eventService{}
}

type eventService struct{}

// EventManager implements event.Service.
func (eventService) EventManager(ctx context.Context) event.Manager {
exCtx, err := getExecutionCtxFromContext(ctx)

Check failure on line 25 in server/v2/stf/core_event_service.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: getExecutionCtxFromContext

Check failure on line 25 in server/v2/stf/core_event_service.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: getExecutionCtxFromContext

Check failure on line 25 in server/v2/stf/core_event_service.go

View workflow job for this annotation

GitHub Actions / tests (01)

undefined: getExecutionCtxFromContext
if err != nil {
panic(err)
}

return &eventManager{exCtx}
}

var _ event.Manager = (*eventManager)(nil)

type eventManager struct {
executionContext *executionContext

Check failure on line 36 in server/v2/stf/core_event_service.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: executionContext

Check failure on line 36 in server/v2/stf/core_event_service.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: executionContext (typecheck)

Check failure on line 36 in server/v2/stf/core_event_service.go

View workflow job for this annotation

GitHub Actions / tests (01)

undefined: executionContext
}

// Emit emits an typed event that is defined in the protobuf file.
// In the future these events will be added to consensus.
func (em *eventManager) Emit(tev transaction.Msg) error {
ev := event.Event{
Type: gogoproto.MessageName(tev),
Attributes: func() ([]event.Attribute, error) {
outerEvent, err := TypedEventToEvent(tev)
if err != nil {
return nil, err
}

return outerEvent.Attributes()
},
Data: func() (json.RawMessage, error) {
buf := new(bytes.Buffer)
jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil}
if err := jm.Marshal(buf, tev); err != nil {
return nil, err
}

return buf.Bytes(), nil
},
}

em.executionContext.events = append(em.executionContext.events, ev)
return nil
}

// EmitKV emits a key value pair event.
func (em *eventManager) EmitKV(eventType string, attrs ...event.Attribute) error {
ev := event.Event{
Type: eventType,
Attributes: func() ([]event.Attribute, error) {
return attrs, nil
},
Data: func() (json.RawMessage, error) {
return json.Marshal(attrs)
},
}

em.executionContext.events = append(em.executionContext.events, ev)
return nil
}

// TypedEventToEvent takes typed event and converts to Event object
func TypedEventToEvent(tev transaction.Msg) (event.Event, error) {
evtType := gogoproto.MessageName(tev)
buf := new(bytes.Buffer)
jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil}
if err := jm.Marshal(buf, tev); err != nil {
return event.Event{}, err
}

var attrMap map[string]json.RawMessage
if err := json.Unmarshal(buf.Bytes(), &attrMap); err != nil {
return event.Event{}, err
}

// sort the keys to ensure the order is always the same
keys := slices.Sorted(maps.Keys(attrMap))
attrs := make([]event.Attribute, 0, len(attrMap))
for _, k := range keys {
v := attrMap[k]
attrs = append(attrs, event.Attribute{
Key: k,
Value: string(v),
})
}

return event.Event{
Type: evtType,
Attributes: func() ([]event.Attribute, error) { return attrs, nil },
}, nil
}
Loading

0 comments on commit db69831

Please sign in to comment.