Skip to content

Commit

Permalink
feat: implement CommitFinalState for rewards-v2 models (#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcgary committed Jan 14, 2025
2 parents ba303bb + 7d17b67 commit eae7a5d
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 34 deletions.
83 changes: 49 additions & 34 deletions pkg/eigenState/defaultOperatorSplits/defaultOperatorSplits.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type DefaultOperatorSplitModel struct {

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*DefaultOperatorSplit
committedState map[uint64][]*DefaultOperatorSplit
}

func NewDefaultOperatorSplitModel(
Expand All @@ -52,13 +53,14 @@ func NewDefaultOperatorSplitModel(
logger: logger,
globalConfig: globalConfig,
stateAccumulator: make(map[uint64]map[types.SlotID]*DefaultOperatorSplit),
committedState: make(map[uint64][]*DefaultOperatorSplit),
}

esm.RegisterState(model, 10)
return model, nil
}

func (oas *DefaultOperatorSplitModel) GetModelName() string {
func (dos *DefaultOperatorSplitModel) GetModelName() string {
return "DefaultOperatorSplitModel"
}

Expand All @@ -80,7 +82,7 @@ func parseDefaultOperatorSplitOutputData(outputDataStr string) (*defaultOperator
return outputData, err
}

func (oas *DefaultOperatorSplitModel) handleDefaultOperatorSplitBipsSetEvent(log *storage.TransactionLog) (*DefaultOperatorSplit, error) {
func (dos *DefaultOperatorSplitModel) handleDefaultOperatorSplitBipsSetEvent(log *storage.TransactionLog) (*DefaultOperatorSplit, error) {
outputData, err := parseDefaultOperatorSplitOutputData(log.OutputData)
if err != nil {
return nil, err
Expand All @@ -97,25 +99,25 @@ func (oas *DefaultOperatorSplitModel) handleDefaultOperatorSplitBipsSetEvent(log
return split, nil
}

func (oas *DefaultOperatorSplitModel) GetStateTransitions() (types.StateTransitions[*DefaultOperatorSplit], []uint64) {
func (dos *DefaultOperatorSplitModel) GetStateTransitions() (types.StateTransitions[*DefaultOperatorSplit], []uint64) {
stateChanges := make(types.StateTransitions[*DefaultOperatorSplit])

stateChanges[0] = func(log *storage.TransactionLog) (*DefaultOperatorSplit, error) {
defaultOperatorSplit, err := oas.handleDefaultOperatorSplitBipsSetEvent(log)
defaultOperatorSplit, err := dos.handleDefaultOperatorSplitBipsSetEvent(log)
if err != nil {
return nil, err
}

slotId := base.NewSlotID(defaultOperatorSplit.TransactionHash, defaultOperatorSplit.LogIndex)

_, ok := oas.stateAccumulator[log.BlockNumber][slotId]
_, ok := dos.stateAccumulator[log.BlockNumber][slotId]
if ok {
err := fmt.Errorf("Duplicate default operator split submitted for slot %s at block %d", slotId, log.BlockNumber)
oas.logger.Sugar().Errorw("Duplicate default operator split submitted", zap.Error(err))
dos.logger.Sugar().Errorw("Duplicate default operator split submitted", zap.Error(err))
return nil, err
}

oas.stateAccumulator[log.BlockNumber][slotId] = defaultOperatorSplit
dos.stateAccumulator[log.BlockNumber][slotId] = defaultOperatorSplit

return defaultOperatorSplit, nil
}
Expand All @@ -133,36 +135,38 @@ func (oas *DefaultOperatorSplitModel) GetStateTransitions() (types.StateTransiti
return stateChanges, blockNumbers
}

func (oas *DefaultOperatorSplitModel) getContractAddressesForEnvironment() map[string][]string {
contracts := oas.globalConfig.GetContractsMapForChain()
func (dos *DefaultOperatorSplitModel) getContractAddressesForEnvironment() map[string][]string {
contracts := dos.globalConfig.GetContractsMapForChain()
return map[string][]string{
contracts.RewardsCoordinator: {
"DefaultOperatorSplitBipsSet",
},
}
}

func (oas *DefaultOperatorSplitModel) IsInterestingLog(log *storage.TransactionLog) bool {
addresses := oas.getContractAddressesForEnvironment()
return oas.BaseEigenState.IsInterestingLog(addresses, log)
func (dos *DefaultOperatorSplitModel) IsInterestingLog(log *storage.TransactionLog) bool {
addresses := dos.getContractAddressesForEnvironment()
return dos.BaseEigenState.IsInterestingLog(addresses, log)
}

func (oas *DefaultOperatorSplitModel) SetupStateForBlock(blockNumber uint64) error {
oas.stateAccumulator[blockNumber] = make(map[types.SlotID]*DefaultOperatorSplit)
func (dos *DefaultOperatorSplitModel) SetupStateForBlock(blockNumber uint64) error {
dos.stateAccumulator[blockNumber] = make(map[types.SlotID]*DefaultOperatorSplit)
dos.committedState[blockNumber] = make([]*DefaultOperatorSplit, 0)
return nil
}

func (oas *DefaultOperatorSplitModel) CleanupProcessedStateForBlock(blockNumber uint64) error {
delete(oas.stateAccumulator, blockNumber)
func (dos *DefaultOperatorSplitModel) CleanupProcessedStateForBlock(blockNumber uint64) error {
delete(dos.stateAccumulator, blockNumber)
delete(dos.committedState, blockNumber)
return nil
}

func (oas *DefaultOperatorSplitModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) {
stateChanges, sortedBlockNumbers := oas.GetStateTransitions()
func (dos *DefaultOperatorSplitModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) {
stateChanges, sortedBlockNumbers := dos.GetStateTransitions()

for _, blockNumber := range sortedBlockNumbers {
if log.BlockNumber >= blockNumber {
oas.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", log.BlockNumber))
dos.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", log.BlockNumber))

change, err := stateChanges[blockNumber](log)
if err != nil {
Expand All @@ -178,11 +182,11 @@ func (oas *DefaultOperatorSplitModel) HandleStateChange(log *storage.Transaction
}

// prepareState prepares the state for commit by adding the new state to the existing state.
func (oas *DefaultOperatorSplitModel) prepareState(blockNumber uint64) ([]*DefaultOperatorSplit, error) {
accumulatedState, ok := oas.stateAccumulator[blockNumber]
func (dos *DefaultOperatorSplitModel) prepareState(blockNumber uint64) ([]*DefaultOperatorSplit, error) {
accumulatedState, ok := dos.stateAccumulator[blockNumber]
if !ok {
err := fmt.Errorf("No accumulated state found for block %d", blockNumber)
oas.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber))
dos.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber))
return nil, err
}

Expand All @@ -194,40 +198,41 @@ func (oas *DefaultOperatorSplitModel) prepareState(blockNumber uint64) ([]*Defau
}

// CommitFinalState commits the final state for the given block number.
func (oas *DefaultOperatorSplitModel) CommitFinalState(blockNumber uint64) error {
recordsToInsert, err := oas.prepareState(blockNumber)
func (dos *DefaultOperatorSplitModel) CommitFinalState(blockNumber uint64) error {
recordsToInsert, err := dos.prepareState(blockNumber)
if err != nil {
return err
}

if len(recordsToInsert) > 0 {
for _, record := range recordsToInsert {
res := oas.DB.Model(&DefaultOperatorSplit{}).Clauses(clause.Returning{}).Create(&record)
res := dos.DB.Model(&DefaultOperatorSplit{}).Clauses(clause.Returning{}).Create(&record)
if res.Error != nil {
oas.logger.Sugar().Errorw("Failed to insert records", zap.Error(res.Error))
dos.logger.Sugar().Errorw("Failed to insert records", zap.Error(res.Error))
return res.Error
}
}
}
dos.committedState[blockNumber] = recordsToInsert
return nil
}

// GenerateStateRoot generates the state root for the given block number using the results of the state changes.
func (oas *DefaultOperatorSplitModel) GenerateStateRoot(blockNumber uint64) ([]byte, error) {
inserts, err := oas.prepareState(blockNumber)
func (dos *DefaultOperatorSplitModel) GenerateStateRoot(blockNumber uint64) ([]byte, error) {
inserts, err := dos.prepareState(blockNumber)
if err != nil {
return nil, err
}

inputs := oas.sortValuesForMerkleTree(inserts)
inputs := dos.sortValuesForMerkleTree(inserts)

if len(inputs) == 0 {
return nil, nil
}

fullTree, err := oas.MerkleizeEigenState(blockNumber, inputs)
fullTree, err := dos.MerkleizeEigenState(blockNumber, inputs)
if err != nil {
oas.logger.Sugar().Errorw("Failed to create merkle tree",
dos.logger.Sugar().Errorw("Failed to create merkle tree",
zap.Error(err),
zap.Uint64("blockNumber", blockNumber),
zap.Any("inputs", inputs),
Expand All @@ -237,7 +242,17 @@ func (oas *DefaultOperatorSplitModel) GenerateStateRoot(blockNumber uint64) ([]b
return fullTree.Root(), nil
}

func (oas *DefaultOperatorSplitModel) sortValuesForMerkleTree(splits []*DefaultOperatorSplit) []*base.MerkleTreeInput {
func (dos *DefaultOperatorSplitModel) GetCommittedState(blockNumber uint64) ([]interface{}, error) {
records, ok := dos.committedState[blockNumber]
if !ok {
err := fmt.Errorf("No committed state found for block %d", blockNumber)
dos.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber))
return nil, err
}
return base.CastCommittedStateToInterface(records), nil
}

func (dos *DefaultOperatorSplitModel) sortValuesForMerkleTree(splits []*DefaultOperatorSplit) []*base.MerkleTreeInput {
inputs := make([]*base.MerkleTreeInput, 0)
for _, split := range splits {
slotID := base.NewSlotID(split.TransactionHash, split.LogIndex)
Expand All @@ -255,6 +270,6 @@ func (oas *DefaultOperatorSplitModel) sortValuesForMerkleTree(splits []*DefaultO
return inputs
}

func (oas *DefaultOperatorSplitModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return oas.BaseEigenState.DeleteState("default_operator_splits", startBlockNumber, endBlockNumber, oas.DB)
func (dos *DefaultOperatorSplitModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return dos.BaseEigenState.DeleteState("default_operator_splits", startBlockNumber, endBlockNumber, dos.DB)
}
15 changes: 15 additions & 0 deletions pkg/eigenState/operatorAVSSplits/operatorAVSSplits.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type OperatorAVSSplitModel struct {

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*OperatorAVSSplit
committedState map[uint64][]*OperatorAVSSplit
}

func NewOperatorAVSSplitModel(
Expand All @@ -56,6 +57,7 @@ func NewOperatorAVSSplitModel(
logger: logger,
globalConfig: globalConfig,
stateAccumulator: make(map[uint64]map[types.SlotID]*OperatorAVSSplit),
committedState: make(map[uint64][]*OperatorAVSSplit),
}

esm.RegisterState(model, 8)
Expand Down Expand Up @@ -164,11 +166,13 @@ func (oas *OperatorAVSSplitModel) IsInterestingLog(log *storage.TransactionLog)

func (oas *OperatorAVSSplitModel) SetupStateForBlock(blockNumber uint64) error {
oas.stateAccumulator[blockNumber] = make(map[types.SlotID]*OperatorAVSSplit)
oas.committedState[blockNumber] = make([]*OperatorAVSSplit, 0)
return nil
}

func (oas *OperatorAVSSplitModel) CleanupProcessedStateForBlock(blockNumber uint64) error {
delete(oas.stateAccumulator, blockNumber)
delete(oas.committedState, blockNumber)
return nil
}

Expand Down Expand Up @@ -224,6 +228,7 @@ func (oas *OperatorAVSSplitModel) CommitFinalState(blockNumber uint64) error {
}
}
}
oas.committedState[blockNumber] = recordsToInsert
return nil
}

Expand Down Expand Up @@ -255,6 +260,16 @@ func (oas *OperatorAVSSplitModel) GenerateStateRoot(blockNumber uint64) ([]byte,
return fullTree.Root(), nil
}

func (oas *OperatorAVSSplitModel) GetCommittedState(blockNumber uint64) ([]interface{}, error) {
records, ok := oas.committedState[blockNumber]
if !ok {
err := fmt.Errorf("No committed state found for block %d", blockNumber)
oas.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber))
return nil, err
}
return base.CastCommittedStateToInterface(records), nil
}

func (oas *OperatorAVSSplitModel) formatMerkleLeafValue(
blockNumber uint64,
operator string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type OperatorDirectedRewardSubmissionsModel struct {

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*OperatorDirectedRewardSubmission
committedState map[uint64][]*OperatorDirectedRewardSubmission
}

func NewOperatorDirectedRewardSubmissionsModel(
Expand All @@ -66,6 +67,7 @@ func NewOperatorDirectedRewardSubmissionsModel(
logger: logger,
globalConfig: globalConfig,
stateAccumulator: make(map[uint64]map[types.SlotID]*OperatorDirectedRewardSubmission),
committedState: make(map[uint64][]*OperatorDirectedRewardSubmission),
}

esm.RegisterState(model, 7)
Expand Down Expand Up @@ -249,11 +251,13 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) IsInterestingLog(log *storag

func (odrs *OperatorDirectedRewardSubmissionsModel) SetupStateForBlock(blockNumber uint64) error {
odrs.stateAccumulator[blockNumber] = make(map[types.SlotID]*OperatorDirectedRewardSubmission)
odrs.committedState[blockNumber] = make([]*OperatorDirectedRewardSubmission, 0)
return nil
}

func (odrs *OperatorDirectedRewardSubmissionsModel) CleanupProcessedStateForBlock(blockNumber uint64) error {
delete(odrs.stateAccumulator, blockNumber)
delete(odrs.committedState, blockNumber)
return nil
}

Expand Down Expand Up @@ -309,6 +313,7 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) CommitFinalState(blockNumber
}
}
}
odrs.committedState[blockNumber] = recordsToInsert
return nil
}

Expand Down Expand Up @@ -340,6 +345,16 @@ func (odrs *OperatorDirectedRewardSubmissionsModel) GenerateStateRoot(blockNumbe
return fullTree.Root(), nil
}

func (odrs *OperatorDirectedRewardSubmissionsModel) GetCommittedState(blockNumber uint64) ([]interface{}, error) {
records, ok := odrs.committedState[blockNumber]
if !ok {
err := fmt.Errorf("No committed state found for block %d", blockNumber)
odrs.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber))
return nil, err
}
return base.CastCommittedStateToInterface(records), nil
}

func (odrs *OperatorDirectedRewardSubmissionsModel) formatMerkleLeafValue(
blockNumber uint64,
rewardHash string,
Expand Down
15 changes: 15 additions & 0 deletions pkg/eigenState/operatorPISplits/operatorPISplits.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type OperatorPISplitModel struct {

// Accumulates state changes for SlotIds, grouped by block number
stateAccumulator map[uint64]map[types.SlotID]*OperatorPISplit
committedState map[uint64][]*OperatorPISplit
}

func NewOperatorPISplitModel(
Expand All @@ -55,6 +56,7 @@ func NewOperatorPISplitModel(
logger: logger,
globalConfig: globalConfig,
stateAccumulator: make(map[uint64]map[types.SlotID]*OperatorPISplit),
committedState: make(map[uint64][]*OperatorPISplit),
}

esm.RegisterState(model, 9)
Expand Down Expand Up @@ -162,11 +164,13 @@ func (ops *OperatorPISplitModel) IsInterestingLog(log *storage.TransactionLog) b

func (ops *OperatorPISplitModel) SetupStateForBlock(blockNumber uint64) error {
ops.stateAccumulator[blockNumber] = make(map[types.SlotID]*OperatorPISplit)
ops.committedState[blockNumber] = make([]*OperatorPISplit, 0)
return nil
}

func (ops *OperatorPISplitModel) CleanupProcessedStateForBlock(blockNumber uint64) error {
delete(ops.stateAccumulator, blockNumber)
delete(ops.committedState, blockNumber)
return nil
}

Expand Down Expand Up @@ -222,6 +226,7 @@ func (ops *OperatorPISplitModel) CommitFinalState(blockNumber uint64) error {
}
}
}
ops.committedState[blockNumber] = recordsToInsert
return nil
}

Expand Down Expand Up @@ -253,6 +258,16 @@ func (ops *OperatorPISplitModel) GenerateStateRoot(blockNumber uint64) ([]byte,
return fullTree.Root(), nil
}

func (ops *OperatorPISplitModel) GetCommittedState(blockNumber uint64) ([]interface{}, error) {
records, ok := ops.committedState[blockNumber]
if !ok {
err := fmt.Errorf("No committed state found for block %d", blockNumber)
ops.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber))
return nil, err
}
return base.CastCommittedStateToInterface(records), nil
}

func (ops *OperatorPISplitModel) formatMerkleLeafValue(
blockNumber uint64,
operator string,
Expand Down

0 comments on commit eae7a5d

Please sign in to comment.