Skip to content

Commit

Permalink
Merge branch 'zkevm' into kyrin/add-read-batches-with-concurrency-to-…
Browse files Browse the repository at this point in the history
…datastream-server
  • Loading branch information
KyrinCode authored Feb 26, 2025
2 parents a9e1ede + b9306f9 commit ec39f56
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 15 deletions.
32 changes: 17 additions & 15 deletions zk/stages/stage_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,36 +105,38 @@ func SpawnStageWitness(

reader := hermez_db.NewHermezDbReader(tx)

var highestVerifiedBatchNo uint64
highestVerifiedBatch, err := reader.GetLatestVerification()
if err != nil {
return fmt.Errorf("GetLatestVerification: %w", err)
}
if highestVerifiedBatch == nil {
highestVerifiedBatchNo = 0
} else {
highestVerifiedBatchNo = highestVerifiedBatch.BatchNo
}

highestVerifiedBatchWithOffset := highestVerifiedBatch.BatchNo - cfg.zkCfg.WitnessCacheBatchBehindOffset // default 5

latestCachedWitnessBatchNo, err := reader.GetLatestCachedWitnessBatchNo()
latestBlock, err := stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return fmt.Errorf("GetLatestCachedWitnessBatchNo: %w", err)
return fmt.Errorf("GetStageProgress: %w", err)
}

fromBatch := latestCachedWitnessBatchNo + 1
if fromBatch < highestVerifiedBatchWithOffset {
fromBatch = highestVerifiedBatchWithOffset
latestExecutedBatchNo, err := reader.GetBatchNoByL2Block(latestBlock)
if err != nil {
return fmt.Errorf("GetBatchNoByL2Block: %w", err)
}

toBatch := highestVerifiedBatch.BatchNo + cfg.zkCfg.WitnessCacheBatchAheadOffset
highestBatch, err := stages.GetStageProgress(tx, stages.HighestSeenBatchNumber)
latestCachedWitnessBatchNo, err := reader.GetLatestCachedWitnessBatchNo()
if err != nil {
return fmt.Errorf("GetStageProgress: %w", err)
}
if toBatch > highestBatch {
toBatch = highestBatch - 1 // we cannot cache the highest batch because it might not be full yet
return fmt.Errorf("GetLatestCachedWitnessBatchNo: %w", err)
}

startBatch, endBatch, truncateTo := witness.GetBatchesToCache(highestVerifiedBatchNo, latestExecutedBatchNo, latestCachedWitnessBatchNo, cfg.zkCfg.WitnessCacheBatchAheadOffset, cfg.zkCfg.WitnessCacheBatchBehindOffset)

hermezDb := hermez_db.NewHermezDb(tx)
g := witness.NewGenerator(cfg.dirs, cfg.historyV3, cfg.agg, cfg.blockReader, cfg.chainConfig, cfg.zkCfg, cfg.engine, cfg.forcedContracts, cfg.unwindLimit)

for batchNo := fromBatch; batchNo <= toBatch; batchNo++ {
for batchNo := startBatch; batchNo <= endBatch; batchNo++ {
badBatch, err := reader.GetInvalidBatch(batchNo)
if err != nil {
return fmt.Errorf("GetInvalidBatch: %w", err)
Expand Down Expand Up @@ -175,7 +177,7 @@ func SpawnStageWitness(
}

log.Info(fmt.Sprintf("[%s] Deleting old witness caches", logPrefix))
if err = hermezDb.TruncateWitnessCacheBelow(highestVerifiedBatchWithOffset); err != nil {
if err = hermezDb.TruncateWitnessCacheBelow(truncateTo); err != nil {
return fmt.Errorf("DeleteWitnessCache: %w", err)
}

Expand Down
30 changes: 30 additions & 0 deletions zk/witness/witness_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,33 @@ func MergeWitnesses(ctx context.Context, witnesses []*trie.Witness) (*trie.Witne

return witness, nil
}

// GetBatchesToCache calculates and returns the start batch, and the end batch to cache.
func GetBatchesToCache(highestVerifiedBatch, highestExecutedBatch, latestCachedBatch, offsetAhead, offsetBehind uint64) (startBatch, endBatch, truncateTo uint64) {
var highestVerifiedBatchOffsetBehind uint64
if highestVerifiedBatch >= offsetBehind {
highestVerifiedBatchOffsetBehind = highestVerifiedBatch - offsetBehind
} else {
highestVerifiedBatchOffsetBehind = 0
}

startBatch = latestCachedBatch + 1
if startBatch < highestVerifiedBatchOffsetBehind {
startBatch = highestVerifiedBatchOffsetBehind
}

var highestVerifiedBatchOffsetAhead uint64
highestVerifiedBatchOffsetAhead = highestVerifiedBatch + offsetAhead

if highestVerifiedBatchOffsetAhead > highestExecutedBatch {
highestVerifiedBatchOffsetAhead = highestExecutedBatch
}

endBatch = highestVerifiedBatchOffsetAhead

if startBatch > endBatch {
startBatch = endBatch
}

return startBatch, endBatch, highestVerifiedBatchOffsetBehind
}
78 changes: 78 additions & 0 deletions zk/witness/witness_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,81 @@ func TestMergeWitnessesWithHashNodes(t *testing.T) {
}
assert.Equal(t, 0, len(diff), "witnesses should be equal")
}

func TestGetBatchesToCache(t *testing.T) {
scenarios := map[string]struct {
highestVerifiedBatch uint64
highestExecutedBatch uint64
latestCachedBatch uint64

offsetAhead uint64
offsetBehind uint64

expectedStartBatch uint64
expectedEndBatch uint64
}{
"5 ahead, 5 behind": {
highestVerifiedBatch: 10,
highestExecutedBatch: 100,
latestCachedBatch: 0,

offsetAhead: 5,
offsetBehind: 5,

expectedStartBatch: 5,
expectedEndBatch: 15,
},
"5 ahead, 5 behind, latest cached 10": {
highestVerifiedBatch: 10,
highestExecutedBatch: 100,
latestCachedBatch: 10,

offsetAhead: 5,
offsetBehind: 5,

expectedStartBatch: 11,
expectedEndBatch: 15,
},
"5 ahead, 5 behind, latest cached 10, highest verified 5": {
highestVerifiedBatch: 5,
highestExecutedBatch: 100,
latestCachedBatch: 10,

offsetAhead: 5,
offsetBehind: 5,

expectedStartBatch: 10,
expectedEndBatch: 10,
},
"highest ver 0, latest cached 0, highest executed 10": {
highestVerifiedBatch: 0,
highestExecutedBatch: 10,
latestCachedBatch: 0,

offsetAhead: 500,
offsetBehind: 500,

expectedStartBatch: 1,
expectedEndBatch: 10,
},
"highest ver 0, latest cached 0, highest executed 0": {
highestVerifiedBatch: 0,
highestExecutedBatch: 0,
latestCachedBatch: 0,

offsetAhead: 500,
offsetBehind: 500,

expectedStartBatch: 0,
expectedEndBatch: 0,
},
}

for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
startBatch, endBatch, _ := GetBatchesToCache(scenario.highestVerifiedBatch, scenario.highestExecutedBatch, scenario.latestCachedBatch, scenario.offsetAhead, scenario.offsetBehind)
assert.Equal(t, scenario.expectedStartBatch, startBatch, "start batch should be equal")
assert.Equal(t, scenario.expectedEndBatch, endBatch, "end batch should be equal")
})
}
}

0 comments on commit ec39f56

Please sign in to comment.