Skip to content

Commit

Permalink
feat: add persisted database of proving jobs (#9942)
Browse files Browse the repository at this point in the history
This PR adds a new proving job store for the ProvingBroker. In this
implementation the jobs are stored in an lmdb database and on disk
(assuming the broker will have access to a persistent disk)

Fix #9532
  • Loading branch information
alexghr authored Nov 19, 2024
1 parent b660739 commit 6a16a4b
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 79 deletions.
100 changes: 64 additions & 36 deletions yarn-project/prover-client/src/proving_broker/proving_broker.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
ProvingRequestType,
type V2ProofOutput,
type V2ProvingJob,
type V2ProvingJobId,
makePublicInputsAndRecursiveProof,
Expand All @@ -13,35 +14,54 @@ import {
makeRootParityInputs,
} from '@aztec/circuits.js/testing';
import { randomBytes } from '@aztec/foundation/crypto';
import { AztecLmdbStore } from '@aztec/kv-store/lmdb';

import { jest } from '@jest/globals';

import { ProvingBroker } from './proving_broker.js';
import { InMemoryDatabase } from './proving_broker_database.js';
import { type ProvingJobDatabase } from './proving_job_database.js';
import { InMemoryDatabase } from './proving_job_database/memory.js';
import { PersistedProvingJobDatabase } from './proving_job_database/persisted.js';

beforeAll(() => {
jest.useFakeTimers();
});

describe('ProvingBroker', () => {
let database: InMemoryDatabase;
describe.each([
() => ({ database: new InMemoryDatabase(), cleanup: undefined }),
() => {
const store = AztecLmdbStore.open(undefined, true);
const database = new PersistedProvingJobDatabase(store);
const cleanup = () => store.close();
return { database, cleanup };
},
])('ProvingBroker', createDb => {
let broker: ProvingBroker;
let jobTimeoutSec: number;
let maxRetries: number;
let database: ProvingJobDatabase;
let cleanup: undefined | (() => Promise<void> | void);

const now = () => Math.floor(Date.now() / 1000);

beforeEach(() => {
jobTimeoutSec = 10;
maxRetries = 2;
database = new InMemoryDatabase();
({ database, cleanup } = createDb());

broker = new ProvingBroker(database, {
jobTimeoutSec: jobTimeoutSec,
timeoutIntervalSec: jobTimeoutSec / 4,
maxRetries,
});
});

afterEach(async () => {
if (cleanup) {
await cleanup();
}
});

describe('Producer API', () => {
beforeEach(async () => {
await broker.start();
Expand Down Expand Up @@ -909,10 +929,6 @@ describe('ProvingBroker', () => {
inputs: makePrivateBaseRollupInputs(),
});

expect(database.getProvingJob(id1)).not.toBeUndefined();
expect(database.getProvingJobResult(id1)).not.toBeUndefined();
expect(database.getProvingJob(id2)).not.toBeUndefined();

await broker.start();

await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({
Expand All @@ -922,27 +938,31 @@ describe('ProvingBroker', () => {

await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'in-queue' });

jest.spyOn(database, 'deleteProvingJobAndResult');

await broker.removeAndCancelProvingJob(id1);
await broker.removeAndCancelProvingJob(id2);

expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id1);
expect(database.deleteProvingJobAndResult).toHaveBeenCalledWith(id2);

await expect(broker.getProvingJobStatus(id1)).resolves.toEqual({ status: 'not-found' });
await expect(broker.getProvingJobStatus(id2)).resolves.toEqual({ status: 'not-found' });

expect(database.getProvingJob(id1)).toBeUndefined();
expect(database.getProvingJobResult(id1)).toBeUndefined();
expect(database.getProvingJob(id2)).toBeUndefined();
});

it('saves job when enqueued', async () => {
await broker.start();
const id = makeProvingJobId();
await broker.enqueueProvingJob({
id,
const job: V2ProvingJob = {
id: makeProvingJobId(),
type: ProvingRequestType.BASE_PARITY,
blockNumber: 1,
inputs: makeBaseParityInputs(),
});
expect(database.getProvingJob(id)).not.toBeUndefined();
};

jest.spyOn(database, 'addProvingJob');
await broker.enqueueProvingJob(job);

expect(database.addProvingJob).toHaveBeenCalledWith(job);
});

it('does not retain job if database fails to save', async () => {
Expand All @@ -963,23 +983,29 @@ describe('ProvingBroker', () => {

it('saves job result', async () => {
await broker.start();
const id = makeProvingJobId();
await broker.enqueueProvingJob({
id,

const job: V2ProvingJob = {
id: makeProvingJobId(),
type: ProvingRequestType.BASE_PARITY,
blockNumber: 1,
inputs: makeBaseParityInputs(),
});
await broker.reportProvingJobSuccess(id, {
};
jest.spyOn(database, 'setProvingJobResult');

await broker.enqueueProvingJob(job);

const result: V2ProofOutput = {
type: ProvingRequestType.BASE_PARITY,
value: makePublicInputsAndRecursiveProof(
makeParityPublicInputs(RECURSIVE_PROOF_LENGTH),
makeRecursiveProof(RECURSIVE_PROOF_LENGTH),
VerificationKeyData.makeFake(),
),
});
await assertJobStatus(id, 'resolved');
expect(database.getProvingJobResult(id)).toEqual({ value: expect.any(Object) });
};
await broker.reportProvingJobSuccess(job.id, result);

await assertJobStatus(job.id, 'resolved');
expect(database.setProvingJobResult).toHaveBeenCalledWith(job.id, result);
});

it('does not retain job result if database fails to save', async () => {
Expand All @@ -1003,22 +1029,25 @@ describe('ProvingBroker', () => {
}),
).rejects.toThrow(new Error('db error'));
await assertJobStatus(id, 'in-queue');
expect(database.getProvingJobResult(id)).toBeUndefined();
});

it('saves job error', async () => {
await broker.start();

const id = makeProvingJobId();
jest.spyOn(database, 'setProvingJobError');

await broker.enqueueProvingJob({
id,
type: ProvingRequestType.BASE_PARITY,
blockNumber: 1,
inputs: makeBaseParityInputs(),
});

const error = new Error('test error');
await broker.reportProvingJobError(id, error);
await assertJobStatus(id, 'rejected');
expect(database.getProvingJobResult(id)).toEqual({ error: String(error) });
expect(database.setProvingJobError).toHaveBeenCalledWith(id, error);
});

it('does not retain job error if database fails to save', async () => {
Expand All @@ -1033,15 +1062,14 @@ describe('ProvingBroker', () => {
});
await expect(broker.reportProvingJobError(id, new Error())).rejects.toThrow(new Error('db error'));
await assertJobStatus(id, 'in-queue');
expect(database.getProvingJobResult(id)).toBeUndefined();
});

it('does not save job result if job is unknown', async () => {
await broker.start();
const id = makeProvingJobId();

expect(database.getProvingJob(id)).toBeUndefined();
expect(database.getProvingJobResult(id)).toBeUndefined();
jest.spyOn(database, 'setProvingJobResult');
jest.spyOn(database, 'addProvingJob');

await broker.reportProvingJobSuccess(id, {
type: ProvingRequestType.BASE_PARITY,
Expand All @@ -1052,21 +1080,21 @@ describe('ProvingBroker', () => {
),
});

expect(database.getProvingJob(id)).toBeUndefined();
expect(database.getProvingJobResult(id)).toBeUndefined();
expect(database.setProvingJobResult).not.toHaveBeenCalled();
expect(database.addProvingJob).not.toHaveBeenCalled();
});

it('does not save job error if job is unknown', async () => {
await broker.start();
const id = makeProvingJobId();

expect(database.getProvingJob(id)).toBeUndefined();
expect(database.getProvingJobResult(id)).toBeUndefined();
jest.spyOn(database, 'setProvingJobError');
jest.spyOn(database, 'addProvingJob');

await broker.reportProvingJobError(id, new Error('test error'));

expect(database.getProvingJob(id)).toBeUndefined();
expect(database.getProvingJobResult(id)).toBeUndefined();
expect(database.setProvingJobError).not.toHaveBeenCalled();
expect(database.addProvingJob).not.toHaveBeenCalled();
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { PriorityMemoryQueue } from '@aztec/foundation/queue';

import assert from 'assert';

import { type ProvingBrokerDatabase } from './proving_broker_database.js';
import type { ProvingJobConsumer, ProvingJobFilter, ProvingJobProducer } from './proving_broker_interface.js';
import { type ProvingJobDatabase } from './proving_job_database.js';

type InProgressMetadata = {
id: V2ProvingJobId;
Expand Down Expand Up @@ -71,7 +71,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer {
private maxRetries: number;

public constructor(
private database: ProvingBrokerDatabase,
private database: ProvingJobDatabase,
{ jobTimeoutSec = 30, timeoutIntervalSec = 10, maxRetries = 3 }: ProofRequestBrokerConfig = {},
private logger = createDebugLogger('aztec:prover-client:proof-request-broker'),
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import {
type V2ProvingJobResult,
} from '@aztec/circuit-types';

export interface ProvingBrokerDatabase {
/**
* A database for storing proof requests and their results
*/
export interface ProvingJobDatabase {
/**
* Saves a proof request so it can be retrieved later
* @param request - The proof request to save
Expand Down Expand Up @@ -39,43 +42,3 @@ export interface ProvingBrokerDatabase {
*/
setProvingJobError(id: V2ProvingJobId, err: Error): Promise<void>;
}

export class InMemoryDatabase implements ProvingBrokerDatabase {
private jobs = new Map<V2ProvingJobId, V2ProvingJob>();
private results = new Map<V2ProvingJobId, V2ProvingJobResult>();

getProvingJob(id: V2ProvingJobId): V2ProvingJob | undefined {
return this.jobs.get(id);
}

getProvingJobResult(id: V2ProvingJobId): V2ProvingJobResult | undefined {
return this.results.get(id);
}

addProvingJob(request: V2ProvingJob): Promise<void> {
this.jobs.set(request.id, request);
return Promise.resolve();
}

setProvingJobResult(id: V2ProvingJobId, value: V2ProofOutput): Promise<void> {
this.results.set(id, { value });
return Promise.resolve();
}

setProvingJobError(id: V2ProvingJobId, error: Error): Promise<void> {
this.results.set(id, { error: String(error) });
return Promise.resolve();
}

deleteProvingJobAndResult(id: V2ProvingJobId): Promise<void> {
this.jobs.delete(id);
this.results.delete(id);
return Promise.resolve();
}

*allProvingJobs(): Iterable<[V2ProvingJob, V2ProvingJobResult | undefined]> {
for (const item of this.jobs.values()) {
yield [item, this.results.get(item.id)] as const;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import type { V2ProofOutput, V2ProvingJob, V2ProvingJobId, V2ProvingJobResult } from '@aztec/circuit-types';

import { type ProvingJobDatabase } from '../proving_job_database.js';

export class InMemoryDatabase implements ProvingJobDatabase {
private jobs = new Map<V2ProvingJobId, V2ProvingJob>();
private results = new Map<V2ProvingJobId, V2ProvingJobResult>();

getProvingJob(id: V2ProvingJobId): V2ProvingJob | undefined {
return this.jobs.get(id);
}

getProvingJobResult(id: V2ProvingJobId): V2ProvingJobResult | undefined {
return this.results.get(id);
}

addProvingJob(request: V2ProvingJob): Promise<void> {
this.jobs.set(request.id, request);
return Promise.resolve();
}

setProvingJobResult(id: V2ProvingJobId, value: V2ProofOutput): Promise<void> {
this.results.set(id, { value });
return Promise.resolve();
}

setProvingJobError(id: V2ProvingJobId, error: Error): Promise<void> {
this.results.set(id, { error: String(error) });
return Promise.resolve();
}

deleteProvingJobAndResult(id: V2ProvingJobId): Promise<void> {
this.jobs.delete(id);
this.results.delete(id);
return Promise.resolve();
}

*allProvingJobs(): Iterable<[V2ProvingJob, V2ProvingJobResult | undefined]> {
for (const item of this.jobs.values()) {
yield [item, this.results.get(item.id)] as const;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { type V2ProofOutput, V2ProvingJob, type V2ProvingJobId, V2ProvingJobResult } from '@aztec/circuit-types';
import { type AztecKVStore, type AztecMap } from '@aztec/kv-store';

import { type ProvingJobDatabase } from '../proving_job_database.js';

export class PersistedProvingJobDatabase implements ProvingJobDatabase {
private jobs: AztecMap<V2ProvingJobId, string>;
private jobResults: AztecMap<V2ProvingJobId, string>;

constructor(private store: AztecKVStore) {
this.jobs = store.openMap('proving_jobs');
this.jobResults = store.openMap('proving_job_results');
}

async addProvingJob(job: V2ProvingJob): Promise<void> {
await this.jobs.set(job.id, JSON.stringify(job));
}

*allProvingJobs(): Iterable<[V2ProvingJob, V2ProvingJobResult | undefined]> {
for (const jobStr of this.jobs.values()) {
const job = V2ProvingJob.parse(JSON.parse(jobStr));
const resultStr = this.jobResults.get(job.id);
const result = resultStr ? V2ProvingJobResult.parse(JSON.parse(resultStr)) : undefined;
yield [job, result];
}
}

deleteProvingJobAndResult(id: V2ProvingJobId): Promise<void> {
return this.store.transaction(() => {
void this.jobs.delete(id);
void this.jobResults.delete(id);
});
}

async setProvingJobError(id: V2ProvingJobId, err: Error): Promise<void> {
const res: V2ProvingJobResult = { error: err.message };
await this.jobResults.set(id, JSON.stringify(res));
}

async setProvingJobResult(id: V2ProvingJobId, value: V2ProofOutput): Promise<void> {
const res: V2ProvingJobResult = { value };
await this.jobResults.set(id, JSON.stringify(res));
}
}

0 comments on commit 6a16a4b

Please sign in to comment.