From db698314f3170a97a64c4dbfa1b96c879d9c6523 Mon Sep 17 00:00:00 2001 From: Julien Robert Date: Wed, 4 Dec 2024 11:16:18 +0100 Subject: [PATCH] feat(server/v2/stf): delayed marshalling of typed event (#22684) (cherry picked from commit 7fa2356c07aad32198a6a170db682564e46405ef) # Conflicts: # server/v2/stf/core_event_service.go --- server/v2/cometbft/abci.go | 28 +++++-- server/v2/cometbft/config.go | 26 ++++--- server/v2/cometbft/query.go | 6 +- server/v2/cometbft/server.go | 8 +- server/v2/cometbft/utils.go | 77 +++++++++---------- server/v2/stf/core_event_service.go | 112 ++++++++++++++++++++++++++++ tools/confix/data/v2-app.toml | 8 +- tools/confix/migrations.go | 2 +- 8 files changed, 203 insertions(+), 64 deletions(-) create mode 100644 server/v2/stf/core_event_service.go diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index e6a07fd86f07..c8fc58649e41 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -62,9 +62,9 @@ type consensus[T transaction.Tx] struct { streamingManager streaming.Manager mempool mempool.Mempool[T] - cfg Config - chainID string - indexedEvents map[string]struct{} + cfg Config + chainID string + indexedABCIEvents map[string]struct{} initialHeight uint64 // this is only available after this node has committed a block (in FinalizeBlock), @@ -105,9 +105,16 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques return nil, err } - events, err := intoABCIEvents(resp.Events, c.indexedEvents) - if err != nil { - return nil, err + events := make([]abci.Event, 0) + if !c.cfg.AppTomlConfig.DisableABCIEvents { + events, err = intoABCIEvents( + resp.Events, + c.indexedABCIEvents, + c.cfg.AppTomlConfig.DisableIndexABCIEvents, + ) + if err != nil { + return nil, err + } } cometResp := &abciproto.CheckTxResponse{ @@ -116,6 +123,7 @@ func (c *consensus[T]) CheckTx(ctx context.Context, req *abciproto.CheckTxReques GasUsed: uint64ToInt64(resp.GasUsed), Events: events, } + if resp.Error != nil { space, code, log := errorsmod.ABCIInfo(resp.Error, c.cfg.AppTomlConfig.Trace) cometResp.Code = code @@ -557,7 +565,13 @@ func (c *consensus[T]) FinalizeBlock( return nil, err } - return finalizeBlockResponse(resp, cp, appHash, c.indexedEvents, c.cfg.AppTomlConfig.Trace) + return finalizeBlockResponse( + resp, + cp, + appHash, + c.indexedABCIEvents, + c.cfg.AppTomlConfig, + ) } func (c *consensus[T]) internalFinalizeBlock( diff --git a/server/v2/cometbft/config.go b/server/v2/cometbft/config.go index d8e591a9695c..a97ec834c541 100644 --- a/server/v2/cometbft/config.go +++ b/server/v2/cometbft/config.go @@ -16,7 +16,6 @@ type Config struct { func DefaultAppTomlConfig() *AppTomlConfig { return &AppTomlConfig{ MinRetainBlocks: 0, - IndexEvents: make([]string, 0), HaltHeight: 0, HaltTime: 0, Address: "tcp://127.0.0.1:26658", @@ -28,22 +27,27 @@ func DefaultAppTomlConfig() *AppTomlConfig { Target: make(map[string]indexer.Config), ChannelBufferSize: 1024, }, + IndexABCIEvents: make([]string, 0), + DisableIndexABCIEvents: false, + DisableABCIEvents: false, } } type AppTomlConfig struct { - MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."` - IndexEvents []string `mapstructure:"index-events" toml:"index-events" comment:"index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."` - HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."` - HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."` - Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."` - Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"` - Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."` - Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."` + MinRetainBlocks uint64 `mapstructure:"min-retain-blocks" toml:"min-retain-blocks" comment:"min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned."` + HaltHeight uint64 `mapstructure:"halt-height" toml:"halt-height" comment:"halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."` + HaltTime uint64 `mapstructure:"halt-time" toml:"halt-time" comment:"halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing."` + Address string `mapstructure:"address" toml:"address" comment:"address defines the CometBFT RPC server address to bind to."` + Transport string `mapstructure:"transport" toml:"transport" comment:"transport defines the CometBFT RPC server transport protocol: socket, grpc"` + Trace bool `mapstructure:"trace" toml:"trace" comment:"trace enables the CometBFT RPC server to output trace information about its internal operations."` + Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."` // Sub configs - Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."` - Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."` + Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."` + Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."` + IndexABCIEvents []string `mapstructure:"index-abci-events" toml:"index-abci-events" comment:"index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed."` + DisableIndexABCIEvents bool `mapstructure:"disable-index-abci-events" toml:"disable-index-abci-events" comment:"disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse."` + DisableABCIEvents bool `mapstructure:"disable-abci-events" toml:"disable-abci-events" comment:"disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing."` } // CfgOption is a function that allows to overwrite the default server configuration. diff --git a/server/v2/cometbft/query.go b/server/v2/cometbft/query.go index f7fbe811d1b1..bf6bcfe02eb6 100644 --- a/server/v2/cometbft/query.go +++ b/server/v2/cometbft/query.go @@ -61,7 +61,11 @@ func (c *consensus[T]) handleQueryApp(ctx context.Context, path []string, req *a return nil, errorsmod.Wrap(err, "failed to simulate tx") } - bz, err := intoABCISimulationResponse(txResult, c.indexedEvents) + bz, err := intoABCISimulationResponse( + txResult, + c.indexedABCIEvents, + c.cfg.AppTomlConfig.DisableIndexABCIEvents, + ) if err != nil { return nil, errorsmod.Wrap(err, "failed to marshal txResult") } diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index ed0c4fba8702..51c24285e440 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -122,9 +122,9 @@ func New[T transaction.Tx]( } } - indexEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexEvents)) - for _, e := range srv.config.AppTomlConfig.IndexEvents { - indexEvents[e] = struct{}{} + indexedABCIEvents := make(map[string]struct{}, len(srv.config.AppTomlConfig.IndexABCIEvents)) + for _, e := range srv.config.AppTomlConfig.IndexABCIEvents { + indexedABCIEvents[e] = struct{}{} } sc := store.GetStateCommitment().(snapshots.CommitSnapshotter) @@ -183,7 +183,7 @@ func New[T transaction.Tx]( checkTxHandler: srv.serverOptions.CheckTxHandler, extendVote: srv.serverOptions.ExtendVoteHandler, chainID: chainID, - indexedEvents: indexEvents, + indexedABCIEvents: indexedABCIEvents, initialHeight: 0, queryHandlersMap: queryHandlers, getProtoRegistry: sync.OnceValues(gogoproto.MergedRegistry), diff --git a/server/v2/cometbft/utils.go b/server/v2/cometbft/utils.go index 09b32c3d00b1..3929debe65a0 100644 --- a/server/v2/cometbft/utils.go +++ b/server/v2/cometbft/utils.go @@ -70,16 +70,23 @@ func finalizeBlockResponse( cp *cmtproto.ConsensusParams, appHash []byte, indexSet map[string]struct{}, - debug bool, + cfg *AppTomlConfig, ) (*abci.FinalizeBlockResponse, error) { - allEvents := append(in.BeginBlockEvents, in.EndBlockEvents...) - - events, err := intoABCIEvents(allEvents, indexSet) - if err != nil { - return nil, err + events := make([]abci.Event, 0) + + if !cfg.DisableABCIEvents { + var err error + events, err = intoABCIEvents( + append(in.BeginBlockEvents, in.EndBlockEvents...), + indexSet, + cfg.DisableIndexABCIEvents, + ) + if err != nil { + return nil, err + } } - txResults, err := intoABCITxResults(in.TxResults, indexSet, debug) + txResults, err := intoABCITxResults(in.TxResults, indexSet, cfg) if err != nil { return nil, err } @@ -91,6 +98,7 @@ func finalizeBlockResponse( AppHash: appHash, ConsensusParamUpdates: cp, } + return resp, nil } @@ -108,12 +116,21 @@ func intoABCIValidatorUpdates(updates []appmodulev2.ValidatorUpdate) []abci.Vali return valsetUpdates } -func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{}, debug bool) ([]*abci.ExecTxResult, error) { +func intoABCITxResults( + results []server.TxResult, + indexSet map[string]struct{}, + cfg *AppTomlConfig, +) ([]*abci.ExecTxResult, error) { res := make([]*abci.ExecTxResult, len(results)) for i := range results { - events, err := intoABCIEvents(results[i].Events, indexSet) - if err != nil { - return nil, err + var err error + events := make([]abci.Event, 0) + + if !cfg.DisableABCIEvents { + events, err = intoABCIEvents(results[i].Events, indexSet, cfg.DisableIndexABCIEvents) + if err != nil { + return nil, err + } } res[i] = responseExecTxResultWithEvents( @@ -121,59 +138,43 @@ func intoABCITxResults(results []server.TxResult, indexSet map[string]struct{}, results[i].GasWanted, results[i].GasUsed, events, - debug, + cfg.Trace, ) } return res, nil } -func intoABCIEvents(events []event.Event, indexSet map[string]struct{}) ([]abci.Event, error) { +func intoABCIEvents(events []event.Event, indexSet map[string]struct{}, indexNone bool) ([]abci.Event, error) { indexAll := len(indexSet) == 0 abciEvents := make([]abci.Event, len(events)) for i, e := range events { - attributes, err := e.Attributes() + attrs, err := e.Attributes() if err != nil { return nil, err } + abciEvents[i] = abci.Event{ Type: e.Type, - Attributes: make([]abci.EventAttribute, len(attributes)), + Attributes: make([]abci.EventAttribute, len(attrs)), } - for j, attr := range attributes { + for j, attr := range attrs { _, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)] abciEvents[i].Attributes[j] = abci.EventAttribute{ Key: attr.Key, Value: attr.Value, - Index: index || indexAll, + Index: !indexNone && (index || indexAll), } } } return abciEvents, nil } -func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}) ([]byte, error) { - indexAll := len(indexSet) == 0 - abciEvents := make([]abci.Event, len(txRes.Events)) - for i, e := range txRes.Events { - attributes, err := e.Attributes() - if err != nil { - return nil, err - } - abciEvents[i] = abci.Event{ - Type: e.Type, - Attributes: make([]abci.EventAttribute, len(attributes)), - } - - for j, attr := range attributes { - _, index := indexSet[fmt.Sprintf("%s.%s", e.Type, attr.Key)] - abciEvents[i].Attributes[j] = abci.EventAttribute{ - Key: attr.Key, - Value: attr.Value, - Index: index || indexAll, - } - } +func intoABCISimulationResponse(txRes server.TxResult, indexSet map[string]struct{}, indexNone bool) ([]byte, error) { + abciEvents, err := intoABCIEvents(txRes.Events, indexSet, indexNone) + if err != nil { + return nil, err } msgResponses := make([]*gogoany.Any, len(txRes.Resp)) diff --git a/server/v2/stf/core_event_service.go b/server/v2/stf/core_event_service.go new file mode 100644 index 000000000000..182848db45eb --- /dev/null +++ b/server/v2/stf/core_event_service.go @@ -0,0 +1,112 @@ +package stf + +import ( + "bytes" + "context" + "encoding/json" + "maps" + "slices" + + "github.com/cosmos/gogoproto/jsonpb" + gogoproto "github.com/cosmos/gogoproto/proto" + + "cosmossdk.io/core/event" + "cosmossdk.io/core/transaction" +) + +func NewEventService() event.Service { + return eventService{} +} + +type eventService struct{} + +// EventManager implements event.Service. +func (eventService) EventManager(ctx context.Context) event.Manager { + exCtx, err := getExecutionCtxFromContext(ctx) + if err != nil { + panic(err) + } + + return &eventManager{exCtx} +} + +var _ event.Manager = (*eventManager)(nil) + +type eventManager struct { + executionContext *executionContext +} + +// Emit emits an typed event that is defined in the protobuf file. +// In the future these events will be added to consensus. +func (em *eventManager) Emit(tev transaction.Msg) error { + ev := event.Event{ + Type: gogoproto.MessageName(tev), + Attributes: func() ([]event.Attribute, error) { + outerEvent, err := TypedEventToEvent(tev) + if err != nil { + return nil, err + } + + return outerEvent.Attributes() + }, + Data: func() (json.RawMessage, error) { + buf := new(bytes.Buffer) + jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil} + if err := jm.Marshal(buf, tev); err != nil { + return nil, err + } + + return buf.Bytes(), nil + }, + } + + em.executionContext.events = append(em.executionContext.events, ev) + return nil +} + +// EmitKV emits a key value pair event. +func (em *eventManager) EmitKV(eventType string, attrs ...event.Attribute) error { + ev := event.Event{ + Type: eventType, + Attributes: func() ([]event.Attribute, error) { + return attrs, nil + }, + Data: func() (json.RawMessage, error) { + return json.Marshal(attrs) + }, + } + + em.executionContext.events = append(em.executionContext.events, ev) + return nil +} + +// TypedEventToEvent takes typed event and converts to Event object +func TypedEventToEvent(tev transaction.Msg) (event.Event, error) { + evtType := gogoproto.MessageName(tev) + buf := new(bytes.Buffer) + jm := &jsonpb.Marshaler{OrigName: true, EmitDefaults: true, AnyResolver: nil} + if err := jm.Marshal(buf, tev); err != nil { + return event.Event{}, err + } + + var attrMap map[string]json.RawMessage + if err := json.Unmarshal(buf.Bytes(), &attrMap); err != nil { + return event.Event{}, err + } + + // sort the keys to ensure the order is always the same + keys := slices.Sorted(maps.Keys(attrMap)) + attrs := make([]event.Attribute, 0, len(attrMap)) + for _, k := range keys { + v := attrMap[k] + attrs = append(attrs, event.Attribute{ + Key: k, + Value: string(v), + }) + } + + return event.Event{ + Type: evtType, + Attributes: func() ([]event.Attribute, error) { return attrs, nil }, + }, nil +} diff --git a/tools/confix/data/v2-app.toml b/tools/confix/data/v2-app.toml index 2f9ed9e734d3..7d136a84c2c6 100644 --- a/tools/confix/data/v2-app.toml +++ b/tools/confix/data/v2-app.toml @@ -1,8 +1,6 @@ [comet] # min-retain-blocks defines the minimum block height offset from the current block being committed, such that all blocks past this offset are pruned from CometBFT. A value of 0 indicates that no blocks should be pruned. min-retain-blocks = 0 -# index-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed. -index-events = [] # halt-height contains a non-zero block height at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing. halt-height = 0 # halt-time contains a non-zero minimum block time (in Unix seconds) at which a node will gracefully halt and shutdown that can be used to assist upgrades and testing. @@ -15,6 +13,12 @@ transport = 'socket' trace = false # standalone starts the application without the CometBFT node. The node should be started separately. standalone = false +# index-abci-events defines the set of events in the form {eventType}.{attributeKey}, which informs CometBFT what to index. If empty, all events will be indexed. +index-abci-events = [] +# disable-index-abci-events disables the ABCI event indexing done by CometBFT. Useful when relying on the SDK indexer for event indexing, but still want events to be included in FinalizeBlockResponse. +disable-index-abci-events = false +# disable-abci-events disables all ABCI events. Useful when relying on the SDK indexer for event indexing. +disable-abci-events = false # mempool defines the configuration for the SDK built-in app-side mempool implementations. [comet.mempool] diff --git a/tools/confix/migrations.go b/tools/confix/migrations.go index 3589124c733b..cc34ffadf38d 100644 --- a/tools/confix/migrations.go +++ b/tools/confix/migrations.go @@ -41,7 +41,7 @@ type v2KeyChangesMap map[string][]string var v2KeyChanges = v2KeyChangesMap{ "minimum-gas-prices": []string{"server.minimum-gas-prices"}, "min-retain-blocks": []string{"comet.min-retain-blocks"}, - "index-events": []string{"comet.index-events"}, + "index-events": []string{"comet.index-abci-events"}, "halt-height": []string{"comet.halt-height"}, "halt-time": []string{"comet.halt-time"}, "app-db-backend": []string{"store.app-db-backend"},