Skip to content

Commit

Permalink
fix: duplicate proving broker config (#12230)
Browse files Browse the repository at this point in the history
Fixes #10269
  • Loading branch information
spypsy authored Feb 25, 2025
1 parent f2b9684 commit 8f28030
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 31 deletions.
1 change: 1 addition & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export type EnvVar =
| 'PROVER_BROKER_JOB_MAX_RETRIES'
| 'PROVER_BROKER_BATCH_INTERVAL_MS'
| 'PROVER_BROKER_BATCH_SIZE'
| 'PROVER_BROKER_MAX_EPOCHS_TO_KEEP_RESULTS_FOR'
| 'PROVER_COORDINATION_NODE_URL'
| 'PROVER_DISABLED'
| 'PROVER_FAILED_PROOF_STORE'
Expand Down
16 changes: 15 additions & 1 deletion yarn-project/prover-client/src/proving_broker/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { type ConfigMappingsType, booleanConfigHelper, numberConfigHelper } from '@aztec/foundation/config';
import {
type ConfigMappingsType,
booleanConfigHelper,
getDefaultConfig,
numberConfigHelper,
} from '@aztec/foundation/config';
import { type DataStoreConfig, dataConfigMappings } from '@aztec/kv-store/config';
import { ProvingRequestType } from '@aztec/stdlib/proofs';

Expand All @@ -19,6 +24,8 @@ export const ProverBrokerConfig = z.object({
proverBrokerBatchSize: z.number(),
/** How often the job batches get flushed */
proverBrokerBatchIntervalMs: z.number(),
/** The maximum number of epochs to keep results for */
proverBrokerMaxEpochsToKeepResultsFor: z.number(),
});

export type ProverBrokerConfig = z.infer<typeof ProverBrokerConfig> &
Expand Down Expand Up @@ -50,9 +57,16 @@ export const proverBrokerConfigMappings: ConfigMappingsType<ProverBrokerConfig>
description: 'How often to flush batches to disk',
...numberConfigHelper(50),
},
proverBrokerMaxEpochsToKeepResultsFor: {
env: 'PROVER_BROKER_MAX_EPOCHS_TO_KEEP_RESULTS_FOR',
description: 'The maximum number of epochs to keep results for',
...numberConfigHelper(1),
},
...dataConfigMappings,
};

export const defaultProverBrokerConfig: ProverBrokerConfig = getDefaultConfig(proverBrokerConfigMappings);

export const ProverAgentConfig = z.object({
/** The number of prover agents to start */
proverAgentCount: z.number(),
Expand Down
10 changes: 1 addition & 9 deletions yarn-project/prover-client/src/proving_broker/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,7 @@ export async function createAndStartProvingBroker(
): Promise<ProvingBroker> {
const database = config.dataDirectory ? await KVBrokerDatabase.new(config, client) : new InMemoryBrokerDatabase();

const broker = new ProvingBroker(
database,
{
jobTimeoutMs: config.proverBrokerJobTimeoutMs,
maxRetries: config.proverBrokerJobMaxRetries,
timeoutIntervalMs: config.proverBrokerPollIntervalMs,
},
client,
);
const broker = new ProvingBroker(database, config, client);

await broker.start();
return broker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { mkdtemp } from 'fs/promises';
import { tmpdir } from 'os';
import { join } from 'path';

import { type ProverBrokerConfig } from './config.js';
import { type ProverBrokerConfig, defaultProverBrokerConfig } from './config.js';
import { makeInputsUri, makeOutputsUri, makeRandomProvingJobId } from './fixtures.js';
import { ProvingBroker } from './proving_broker.js';
import { type ProvingBrokerDatabase } from './proving_broker_database.js';
Expand All @@ -24,6 +24,7 @@ describe.each([
async () => {
const directory = await mkdtemp(join(tmpdir(), 'proving-broker-test'));
const config: ProverBrokerConfig = {
...defaultProverBrokerConfig,
dataStoreMapSizeKB: 1024 * 1024 * 1024, // 1GB
dataDirectory: directory,
proverBrokerJobMaxRetries: 1,
Expand Down Expand Up @@ -55,9 +56,10 @@ describe.each([
({ database, cleanup } = await createDb());

broker = new ProvingBroker(database, {
jobTimeoutMs,
timeoutIntervalMs: brokerIntervalMs,
maxRetries,
proverBrokerJobTimeoutMs: jobTimeoutMs,
proverBrokerPollIntervalMs: brokerIntervalMs,
proverBrokerJobMaxRetries: maxRetries,
proverBrokerMaxEpochsToKeepResultsFor: 1,
});
});

Expand Down
35 changes: 18 additions & 17 deletions yarn-project/prover-client/src/proving_broker/proving_broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {

import assert from 'assert';

import { type ProverBrokerConfig, defaultProverBrokerConfig } from './config.js';
import { type ProvingBrokerDatabase } from './proving_broker_database.js';
import { type MonitorCallback, ProvingBrokerInstrumentation } from './proving_broker_instrumentation.js';

Expand All @@ -33,14 +34,6 @@ type InProgressMetadata = {
lastUpdatedAt: number;
};

type ProofRequestBrokerConfig = {
timeoutIntervalMs?: number;
jobTimeoutMs?: number;
maxRetries?: number;
maxEpochsToKeepResultsFor?: number;
maxParallelCleanUps?: number;
};

type EnqueuedProvingJob = Pick<ProvingJob, 'id' | 'epochNumber'>;

/**
Expand Down Expand Up @@ -115,20 +108,28 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer, Tr
public constructor(
private database: ProvingBrokerDatabase,
{
jobTimeoutMs = 30_000,
timeoutIntervalMs = 10_000,
maxRetries = 3,
maxEpochsToKeepResultsFor = 1,
}: ProofRequestBrokerConfig = {},
proverBrokerJobTimeoutMs,
proverBrokerPollIntervalMs,
proverBrokerJobMaxRetries,
proverBrokerMaxEpochsToKeepResultsFor,
}: Required<
Pick<
ProverBrokerConfig,
| 'proverBrokerJobTimeoutMs'
| 'proverBrokerPollIntervalMs'
| 'proverBrokerJobMaxRetries'
| 'proverBrokerMaxEpochsToKeepResultsFor'
>
> = defaultProverBrokerConfig,
client: TelemetryClient = getTelemetryClient(),
private logger = createLogger('prover-client:proving-broker'),
) {
this.tracer = client.getTracer('ProvingBroker');
this.instrumentation = new ProvingBrokerInstrumentation(client);
this.cleanupPromise = new RunningPromise(this.cleanupPass.bind(this), this.logger, timeoutIntervalMs);
this.jobTimeoutMs = jobTimeoutMs;
this.maxRetries = maxRetries;
this.maxEpochsToKeepResultsFor = maxEpochsToKeepResultsFor;
this.cleanupPromise = new RunningPromise(this.cleanupPass.bind(this), this.logger, proverBrokerPollIntervalMs);
this.jobTimeoutMs = proverBrokerJobTimeoutMs!;
this.maxRetries = proverBrokerJobMaxRetries!;
this.maxEpochsToKeepResultsFor = proverBrokerMaxEpochsToKeepResultsFor!;
}

private measureQueueDepth: MonitorCallback = (type: ProvingRequestType) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ describe('ProvingBrokerPersistedDatabase', () => {
proverBrokerPollIntervalMs: 1000,
proverBrokerBatchSize: 1,
proverBrokerBatchIntervalMs: 10,
proverBrokerMaxEpochsToKeepResultsFor: 1,
};
db = await KVBrokerDatabase.new(config);
});
Expand Down Expand Up @@ -286,6 +287,7 @@ describe('ProvingBrokerPersistedDatabase', () => {
proverBrokerPollIntervalMs: 1000,
proverBrokerBatchSize: batchSize,
proverBrokerBatchIntervalMs: 10,
proverBrokerMaxEpochsToKeepResultsFor: 1,
};
db = await KVBrokerDatabase.new(config);
commitSpy = jest.spyOn(db, 'commitWrites');
Expand Down

0 comments on commit 8f28030

Please sign in to comment.