Skip to content

Commit

Permalink
Pause world state while forking
Browse files Browse the repository at this point in the history
  • Loading branch information
spalladino committed Aug 6, 2024
1 parent 1ea38c7 commit 53afc67
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ export class L2BlockDownloader {
*/
private async collectBlocks(targetBlockNumber?: number) {
let totalBlocks = 0;
log.debug(`Starting block collection for target ${targetBlockNumber} from ${this.from}`);
while (true) {
// If we have a target and have reached it, return
if (targetBlockNumber !== undefined && this.from > targetBlockNumber) {
log.debug(`Reached target block number ${targetBlockNumber}`);
log.verbose(`Reached target block number ${targetBlockNumber}`);
return totalBlocks;
}

Expand All @@ -87,15 +86,16 @@ export class L2BlockDownloader {

// Hit the archiver for blocks
const blocks = await this.l2BlockSource.getBlocks(this.from, limit, proven);
log.debug(
`Received ${blocks.length} blocks from archiver after querying from ${this.from} limit ${limit} (proven ${proven})`,
);

// If there are no more blocks, return
if (!blocks.length) {
return totalBlocks;
}

log.verbose(
`Received ${blocks.length} blocks from archiver after querying from ${this.from} limit ${limit} (proven ${proven})`,
);

// Push new blocks into the queue and loop
await this.semaphore.acquire();
this.blockQueue.put(blocks);
Expand Down
1 change: 0 additions & 1 deletion yarn-project/end-to-end/src/e2e_prover_node.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ describe('e2e_prover_node', () => {
// Prove the first two blocks simultaneously
logger.info(`Starting proof for first block #${firstBlock}`);
await proverNode.startProof(firstBlock, firstBlock);
await sleep(3000); // Well, _almost_ simultaneously!
logger.info(`Starting proof for second block #${secondBlock}`);
await proverNode.startProof(secondBlock, secondBlock);

Expand Down
4 changes: 1 addition & 3 deletions yarn-project/prover-node/src/prover-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ export class ProverNode {
}

// Fast forward world state to right before the target block and get a fork
// TODO(palla/prover-node): We need to pause world state while we procure the fork
await this.worldState.syncImmediate(fromBlock - 1);
const db = await this.worldState.getFork(true);
const db = await this.worldState.syncImmediateAndFork(fromBlock - 1, true);

// Create a processor using the forked world state
const publicProcessorFactory = new PublicProcessorFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import {
type L2Block,
L2BlockDownloader,
type L2BlockSource,
MerkleTreeId,
} from '@aztec/circuit-types';
import { type L2BlockHandledStats } from '@aztec/circuit-types/stats';
import { L1_TO_L2_MSG_SUBTREE_HEIGHT } from '@aztec/circuits.js/constants';
import { Fr } from '@aztec/foundation/fields';
import { SerialQueue } from '@aztec/foundation/fifo';
import { createDebugLogger } from '@aztec/foundation/log';
import { promiseWithResolvers } from '@aztec/foundation/promise';
import { elapsed } from '@aztec/foundation/timer';
import { type AztecKVStore, type AztecSingleton } from '@aztec/kv-store';
import { openTmpStore } from '@aztec/kv-store/utils';
Expand All @@ -33,12 +35,16 @@ import {
export class ServerWorldStateSynchronizer implements WorldStateSynchronizer {
private latestBlockNumberAtStart = 0;

// TODO(palla/prover-node): JobQueue, stopping, runningPromise, pausedPromise, pausedResolve
// should all be hidden under a single abstraction. Also, check if we actually need the jobqueue.
private l2BlockDownloader: L2BlockDownloader;
private syncPromise: Promise<void> = Promise.resolve();
private syncResolve?: () => void = undefined;
private jobQueue = new SerialQueue();
private stopping = false;
private runningPromise: Promise<void> = Promise.resolve();
private pausedPromise?: Promise<void> = undefined;
private pausedResolve?: () => void = undefined;
private currentState: WorldStateRunningState = WorldStateRunningState.IDLE;
private blockNumber: AztecSingleton<number>;

Expand Down Expand Up @@ -70,6 +76,7 @@ export class ServerWorldStateSynchronizer implements WorldStateSynchronizer {
}

public async getFork(includeUncommitted: boolean): Promise<MerkleTreeOperationsFacade> {
this.log.verbose(`Forking world state at ${this.blockNumber.get()}`);
return new MerkleTreeOperationsFacade(await this.merkleTreeDb.fork(), includeUncommitted);
}

Expand Down Expand Up @@ -108,6 +115,9 @@ export class ServerWorldStateSynchronizer implements WorldStateSynchronizer {
const blockProcess = async () => {
while (!this.stopping) {
await this.jobQueue.put(() => this.collectAndProcessBlocks());
if (this.pausedPromise) {
await this.pausedPromise;
}
}
};
this.jobQueue.start();
Expand Down Expand Up @@ -135,6 +145,23 @@ export class ServerWorldStateSynchronizer implements WorldStateSynchronizer {
return this.blockNumber.get() ?? 0;
}

private async pause() {
this.log.debug('Pausing world state synchronizer');
({ promise: this.pausedPromise, resolve: this.pausedResolve } = promiseWithResolvers());
await this.jobQueue.syncPoint();
this.log.debug('Paused world state synchronizer');
}

private resume() {
if (this.pausedResolve) {
this.log.debug('Resuming world state synchronizer');
this.pausedResolve();
this.pausedResolve = undefined;
this.pausedPromise = undefined;
this.log.debug('Resumed world state synchronizer');
}
}

public status(): Promise<WorldStateStatus> {
const status = {
syncedToL2Block: this.currentL2BlockNum,
Expand All @@ -156,13 +183,10 @@ export class ServerWorldStateSynchronizer implements WorldStateSynchronizer {
if (targetBlockNumber !== undefined && targetBlockNumber <= this.currentL2BlockNum) {
return this.currentL2BlockNum;
}
const blockToSyncTo = targetBlockNumber === undefined ? 'latest' : `${targetBlockNumber}`;
this.log.debug(`World State at block ${this.currentL2BlockNum}, told to sync to block ${blockToSyncTo}...`);
// ensure any outstanding block updates are completed first.
this.log.debug(`World State at ${this.currentL2BlockNum} told to sync to ${targetBlockNumber ?? 'latest'}`);
// ensure any outstanding block updates are completed first
await this.jobQueue.syncPoint();

// TODO(palla/prover-node): We need to somehow pause the jobQueue and block downloader while we run this, otherwise
// we may end up with more blocks past the target than we wanted, which is a problem for prover-node (see ProverNode#createProvingJob).
while (true) {
// Check the block number again
if (targetBlockNumber !== undefined && targetBlockNumber <= this.currentL2BlockNum) {
Expand All @@ -186,6 +210,17 @@ export class ServerWorldStateSynchronizer implements WorldStateSynchronizer {
}
}

public async syncImmediateAndFork(
targetBlockNumber: number,
forkIncludeUncommitted: boolean,
): Promise<MerkleTreeOperationsFacade> {
await this.pause();
await this.syncImmediate(targetBlockNumber);
const fork = await this.getFork(forkIncludeUncommitted);
this.resume();
return fork;
}

/**
* Checks for the availability of new blocks and processes them.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ export interface WorldStateSynchronizer {
*/
syncImmediate(minBlockNumber?: number): Promise<number>;

/**
* Pauses the synchronizer, syncs to the target block number, forks world state, and resumes.
* @param targetBlockNumber - The block number to sync to.
* @param forkIncludeUncommitted - Whether to include uncommitted data in the fork.
* @returns The db forked at the requested target block number.
*/
syncImmediateAndFork(targetBlockNumber: number, forkIncludeUncommitted: boolean): Promise<MerkleTreeOperations>;

/**
* Returns an instance of MerkleTreeOperations that will include uncommitted data.
* @returns An instance of MerkleTreeOperations that will include uncommitted data.
Expand Down

0 comments on commit 53afc67

Please sign in to comment.