Skip to content

Commit

Permalink
Refactor(sequencer) improve code readability (#1247)
Browse files Browse the repository at this point in the history
* improve code readability

* fix(sequencer):remove double import

* chore(sequencer): typo
  • Loading branch information
kstoykov authored Sep 30, 2024
1 parent 8d69064 commit bf281f0
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 114 deletions.
3 changes: 1 addition & 2 deletions zk/stages/stage_interhashes.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stages

import (
"errors"
"fmt"

"github.com/gateway-fm/cdk-erigon-lib/common"
Expand Down Expand Up @@ -522,7 +521,7 @@ func unwindZkSMT(ctx context.Context, logPrefix string, from, to uint64, db kv.R
for i := from; i >= to+1; i-- {
select {
case <-ctx.Done():
return trie.EmptyRoot, errors.New(fmt.Sprintf("[%s] Context done", logPrefix))
return trie.EmptyRoot, fmt.Errorf("[%s] Context done", logPrefix)
default:
}

Expand Down
42 changes: 20 additions & 22 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,33 @@ func SpawnSequencingStage(
return err
}

highestBatchInDS, err := cfg.datastreamServer.GetHighestBatchNumber()
highestBatchInDs, err := cfg.datastreamServer.GetHighestBatchNumber()
if err != nil {
return err
}

if !cfg.zk.SequencerResequence || lastBatch >= highestBatchInDS {
if cfg.zk.SequencerResequence {
log.Info(fmt.Sprintf("[%s] Resequencing completed. Please restart sequencer without resequence flag.", s.LogPrefix()))
time.Sleep(10 * time.Second)
return nil
if lastBatch < highestBatchInDs {
if !cfg.zk.SequencerResequence {
panic(fmt.Sprintf("[%s] The node need re-sequencing but this option is disabled.", s.LogPrefix()))
}

err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, nil)
if err != nil {
return err
}
} else {
log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDS))
log.Info(fmt.Sprintf("[%s] Last batch %d is lower than highest batch in datastream %d, resequencing...", s.LogPrefix(), lastBatch, highestBatchInDs))

batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDS)
batches, err := cfg.datastreamServer.ReadBatches(lastBatch+1, highestBatchInDs)
if err != nil {
return err
}

err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1)
if err != nil {
if err = cfg.datastreamServer.UnwindToBatchStart(lastBatch + 1); err != nil {
return err
}

log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDS))

log.Info(fmt.Sprintf("[%s] Resequence from batch %d to %d in data stream", s.LogPrefix(), lastBatch+1, highestBatchInDs))
for _, batch := range batches {
batchJob := NewResequenceBatchJob(batch)
subBatchCount := 0
for batchJob.HasMoreBlockToProcess() {
if err = sequencingStageStep(s, u, ctx, cfg, historyCfg, quiet, batchJob); err != nil {
if err = sequencingBatchStep(s, u, ctx, cfg, historyCfg, batchJob); err != nil {
return err
}

Expand All @@ -88,18 +79,25 @@ func SpawnSequencingStage(
return fmt.Errorf("strict mode enabled, but resequenced batch %d has %d sub-batches", batchJob.batchToProcess[0].BatchNumber, subBatchCount)
}
}

return nil
}

if cfg.zk.SequencerResequence {
log.Info(fmt.Sprintf("[%s] Resequencing completed. Please restart sequencer without resequence flag.", s.LogPrefix()))
time.Sleep(10 * time.Minute)
return nil
}

return nil
return sequencingBatchStep(s, u, ctx, cfg, historyCfg, nil)
}

func sequencingStageStep(
func sequencingBatchStep(
s *stagedsync.StageState,
u stagedsync.Unwinder,
ctx context.Context,
cfg SequenceBlockCfg,
historyCfg stagedsync.HistoryCfg,
quiet bool,
resequenceBatchJob *ResequenceBatchJob,
) (err error) {
logPrefix := s.LogPrefix()
Expand Down Expand Up @@ -170,7 +168,7 @@ func sequencingStageStep(
// if we identify any. During normal operation this function will simply check and move on without performing
// any action.
if !batchState.isAnyRecovery() {
isUnwinding, err := alignExecutionToDatastream(batchContext, batchState, executionAt, u)
isUnwinding, err := alignExecutionToDatastream(batchContext, executionAt, u)
if err != nil {
// do not set shouldCheckForExecutionAndDataStreamAlighmentOnNodeStart=false because of the error
return err
Expand Down
8 changes: 6 additions & 2 deletions zk/stages/stage_sequence_execute_data_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (sbc *SequencerBatchStreamWriter) writeBlockDetailsToDatastream(verifiedBun
return checkedVerifierBundles, nil
}

func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchState, lastExecutedBlock uint64, u stagedsync.Unwinder) (bool, error) {
func alignExecutionToDatastream(batchContext *BatchContext, lastExecutedBlock uint64, u stagedsync.Unwinder) (bool, error) {
lastStartedDatastreamBatch, err := batchContext.cfg.datastreamServer.GetHighestBatchNumber()
if err != nil {
return false, err
Expand All @@ -131,7 +131,7 @@ func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchSta
}
}

if lastExecutedBlock != lastDatastreamBlock {
if lastExecutedBlock > lastDatastreamBlock {
block, err := rawdb.ReadBlockByNumber(batchContext.sdb.tx, lastDatastreamBlock)
if err != nil {
return false, err
Expand All @@ -142,6 +142,10 @@ func alignExecutionToDatastream(batchContext *BatchContext, batchState *BatchSta
return true, nil
}

if lastExecutedBlock < lastDatastreamBlock {
panic(fmt.Errorf("[%s] Datastream is ahead of sequencer. Re-sequencing should have handled this case before even comming to this point", batchContext.s.LogPrefix()))
}

return false, nil
}

Expand Down
77 changes: 77 additions & 0 deletions zk/stages/stage_sequence_execute_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync"
dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types"
"github.com/ledgerwatch/erigon/zk/l1_data"
zktx "github.com/ledgerwatch/erigon/zk/tx"
"github.com/ledgerwatch/erigon/zk/txpool"
"github.com/ledgerwatch/log/v3"
)

const maximumOverflowTransactionAttempts = 5
Expand Down Expand Up @@ -306,3 +308,78 @@ func (bbe *BuiltBlockElements) onFinishAddingTransaction(transaction types.Trans
bbe.executionResults = append(bbe.executionResults, execResult)
bbe.effectiveGases = append(bbe.effectiveGases, effectiveGas)
}

type resequenceTxMetadata struct {
blockNum int
txIndex int
}

type ResequenceBatchJob struct {
batchToProcess []*dsTypes.FullL2Block
StartBlockIndex int
StartTxIndex int
txIndexMap map[common.Hash]resequenceTxMetadata
}

func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob {
return &ResequenceBatchJob{
batchToProcess: batch,
StartBlockIndex: 0,
StartTxIndex: 0,
txIndexMap: make(map[common.Hash]resequenceTxMetadata),
}
}

func (r *ResequenceBatchJob) HasMoreBlockToProcess() bool {
return r.StartBlockIndex < len(r.batchToProcess)
}

func (r *ResequenceBatchJob) AtNewBlockBoundary() bool {
return r.StartTxIndex == 0
}

func (r *ResequenceBatchJob) CurrentBlock() *dsTypes.FullL2Block {
if r.HasMoreBlockToProcess() {
return r.batchToProcess[r.StartBlockIndex]
}
return nil
}

func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) ([]types.Transaction, error) {
blockTransactions := make([]types.Transaction, 0)
if r.HasMoreBlockToProcess() {
block := r.CurrentBlock()
r.txIndexMap[block.L2Blockhash] = resequenceTxMetadata{r.StartBlockIndex, 0}

for i := r.StartTxIndex; i < len(block.L2Txs); i++ {
transaction := block.L2Txs[i]
tx, _, err := decoder(transaction.Encoded, transaction.EffectiveGasPricePercentage, block.ForkId)
if err != nil {
return nil, fmt.Errorf("decode tx error: %v", err)
}
r.txIndexMap[tx.Hash()] = resequenceTxMetadata{r.StartBlockIndex, i}
blockTransactions = append(blockTransactions, tx)
}
}

return blockTransactions, nil
}

func (r *ResequenceBatchJob) UpdateLastProcessedTx(h common.Hash) {
if idx, ok := r.txIndexMap[h]; ok {
block := r.batchToProcess[idx.blockNum]

if idx.txIndex >= len(block.L2Txs)-1 {
// we've processed all the transactions in this block
// move to the next block
r.StartBlockIndex = idx.blockNum + 1
r.StartTxIndex = 0
} else {
// move to the next transaction in the block
r.StartBlockIndex = idx.blockNum
r.StartTxIndex = idx.txIndex + 1
}
} else {
log.Warn("tx hash not found in tx index map", "hash", h)
}
}
86 changes: 2 additions & 84 deletions zk/stages/stage_sequence_execute_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import (
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/erigon/zk/datastream/server"
dsTypes "github.com/ledgerwatch/erigon/zk/datastream/types"
"github.com/ledgerwatch/erigon/zk/hermez_db"
verifier "github.com/ledgerwatch/erigon/zk/legacy_executor_verifier"
"github.com/ledgerwatch/erigon/zk/tx"
zktx "github.com/ledgerwatch/erigon/zk/tx"
"github.com/ledgerwatch/erigon/zk/txpool"
zktypes "github.com/ledgerwatch/erigon/zk/types"
Expand Down Expand Up @@ -241,12 +239,7 @@ func prepareHeader(tx kv.RwTx, previousBlockNumber, deltaTimestamp, forcedTimest
}, parentBlock, nil
}

func prepareL1AndInfoTreeRelatedStuff(
sdb *stageDb,
batchState *BatchState,
proposedTimestamp uint64,
reuseL1InfoIndex bool,
) (
func prepareL1AndInfoTreeRelatedStuff(sdb *stageDb, batchState *BatchState, proposedTimestamp uint64, reuseL1InfoIndex bool) (
infoTreeIndexProgress uint64,
l1TreeUpdate *zktypes.L1InfoTreeUpdate,
l1TreeUpdateIndex uint64,
Expand Down Expand Up @@ -483,7 +476,7 @@ func newBlockDataChecker() *BlockDataChecker {
// adds bytes amounting to the block data and checks if the limit is reached
// if the limit is reached, the data is not added, so this can be reused again for next check
func (bdc *BlockDataChecker) AddBlockStartData() bool {
blockStartBytesAmount := tx.START_BLOCK_BATCH_L2_DATA_SIZE // tx.GenerateStartBlockBatchL2Data(deltaTimestamp, l1InfoTreeIndex) returns 65 long byte array
blockStartBytesAmount := zktx.START_BLOCK_BATCH_L2_DATA_SIZE // tx.GenerateStartBlockBatchL2Data(deltaTimestamp, l1InfoTreeIndex) returns 65 long byte array
// add in the changeL2Block transaction
if bdc.counter+blockStartBytesAmount > bdc.limit {
return true
Expand All @@ -504,78 +497,3 @@ func (bdc *BlockDataChecker) AddTransactionData(txL2Data []byte) bool {

return false
}

type txMatadata struct {
blockNum int
txIndex int
}

type ResequenceBatchJob struct {
batchToProcess []*dsTypes.FullL2Block
StartBlockIndex int
StartTxIndex int
txIndexMap map[common.Hash]txMatadata
}

func NewResequenceBatchJob(batch []*dsTypes.FullL2Block) *ResequenceBatchJob {
return &ResequenceBatchJob{
batchToProcess: batch,
StartBlockIndex: 0,
StartTxIndex: 0,
txIndexMap: make(map[common.Hash]txMatadata),
}
}

func (r *ResequenceBatchJob) HasMoreBlockToProcess() bool {
return r.StartBlockIndex < len(r.batchToProcess)
}

func (r *ResequenceBatchJob) AtNewBlockBoundary() bool {
return r.StartTxIndex == 0
}

func (r *ResequenceBatchJob) CurrentBlock() *dsTypes.FullL2Block {
if r.HasMoreBlockToProcess() {
return r.batchToProcess[r.StartBlockIndex]
}
return nil
}

func (r *ResequenceBatchJob) YieldNextBlockTransactions(decoder zktx.TxDecoder) ([]types.Transaction, error) {
blockTransactions := make([]types.Transaction, 0)
if r.HasMoreBlockToProcess() {
block := r.CurrentBlock()
r.txIndexMap[block.L2Blockhash] = txMatadata{r.StartBlockIndex, 0}

for i := r.StartTxIndex; i < len(block.L2Txs); i++ {
transaction := block.L2Txs[i]
tx, _, err := decoder(transaction.Encoded, transaction.EffectiveGasPricePercentage, block.ForkId)
if err != nil {
return nil, fmt.Errorf("decode tx error: %v", err)
}
r.txIndexMap[tx.Hash()] = txMatadata{r.StartBlockIndex, i}
blockTransactions = append(blockTransactions, tx)
}
}

return blockTransactions, nil
}

func (r *ResequenceBatchJob) UpdateLastProcessedTx(h common.Hash) {
if idx, ok := r.txIndexMap[h]; ok {
block := r.batchToProcess[idx.blockNum]

if idx.txIndex >= len(block.L2Txs)-1 {
// we've processed all the transactions in this block
// move to the next block
r.StartBlockIndex = idx.blockNum + 1
r.StartTxIndex = 0
} else {
// move to the next transaction in the block
r.StartBlockIndex = idx.blockNum
r.StartTxIndex = idx.txIndex + 1
}
} else {
log.Warn("tx hash not found in tx index map", "hash", h)
}
}
8 changes: 4 additions & 4 deletions zk/stages/stage_sequence_execute_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) {
},
StartBlockIndex: 0,
StartTxIndex: 0,
txIndexMap: make(map[common.Hash]txMatadata),
txIndexMap: make(map[common.Hash]resequenceTxMetadata),
},
expectedTxCount: 2,
expectedError: false,
Expand All @@ -332,7 +332,7 @@ func TestResequenceBatchJob_YieldNextBlockTransactions(t *testing.T) {
batchToProcess: []*dsTypes.FullL2Block{{}},
StartBlockIndex: 1,
StartTxIndex: 0,
txIndexMap: make(map[common.Hash]txMatadata),
txIndexMap: make(map[common.Hash]resequenceTxMetadata),
},
expectedTxCount: 0,
expectedError: false,
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) {
batchToProcess: batch,
StartBlockIndex: 0,
StartTxIndex: 1, // Start at block 0, index 1
txIndexMap: make(map[common.Hash]txMatadata),
txIndexMap: make(map[common.Hash]resequenceTxMetadata),
}

processTransactions := func(txs []types.Transaction) {
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestResequenceBatchJob_YieldAndUpdate(t *testing.T) {
}

// Verify txIndexMap
expectedTxIndexMap := map[common.Hash]txMatadata{
expectedTxIndexMap := map[common.Hash]resequenceTxMetadata{
common.HexToHash("0"): {0, 0},
common.HexToHash("1"): {1, 0},
common.HexToHash("2"): {2, 0},
Expand Down

0 comments on commit bf281f0

Please sign in to comment.