diff --git a/packages/gatsby/src/utils/worker/__tests__/queries.ts b/packages/gatsby/src/utils/worker/__tests__/queries.ts index bcae5e4febecb..4b802adb23111 100644 --- a/packages/gatsby/src/utils/worker/__tests__/queries.ts +++ b/packages/gatsby/src/utils/worker/__tests__/queries.ts @@ -7,6 +7,7 @@ import sourceNodesAndRemoveStaleNodes from "../../source-nodes" import { savePartialStateToDisk, store, + emitter, loadPartialStateFromDisk, } from "../../../redux" import { loadConfigAndPlugins } from "../../../bootstrap/load-config-and-plugins" @@ -167,7 +168,6 @@ describeWhenLMDB(`worker (queries)`, () => { savePartialStateToDisk([`components`, `staticQueryComponents`]) await Promise.all(worker.all.buildSchema()) - await worker.single.runQueries(queryIdsSmall) }) afterAll(() => { @@ -180,9 +180,15 @@ describeWhenLMDB(`worker (queries)`, () => { } }) + // This was the original implementation of state syncing between a worker and the main process. + // We switched to "replaying actions" as a mechanism for state syncing. + // But we can get back to state saving / merging if "replaying actions" proves to be too expensive + // TODO: delete or re-activate depending on results yielded by "replaying actions" approach. + // The logic for `loadPartialStateFromDisk` itself is tested in `share-state` tests it(`should save worker "queries" state to disk`, async () => { if (!worker) fail(`worker not defined`) + await worker.single.runQueries(queryIdsSmall) await Promise.all(worker.all.saveQueries()) // Pass "1" as workerId as the test only have one worker const result = loadPartialStateFromDisk([`queries`], `1`) @@ -233,6 +239,8 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute static queries`, async () => { if (!worker) fail(`worker not defined`) + + await worker.single.runQueries(queryIdsSmall) const stateFromWorker = await worker.single.getState() const staticQueryResult = await fs.readJson( @@ -250,6 +258,8 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute page queries`, async () => { if (!worker) fail(`worker not defined`) + + await worker.single.runQueries(queryIdsSmall) const stateFromWorker = await worker.single.getState() const pageQueryResult = await fs.readJson( @@ -265,6 +275,8 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute page queries with context variables`, async () => { if (!worker) fail(`worker not defined`) + + await worker.single.runQueries(queryIdsSmall) const stateFromWorker = await worker.single.getState() const pageQueryResult = await fs.readJson( @@ -331,4 +343,131 @@ describeWhenLMDB(`worker (queries)`, () => { spy.mockRestore() }) + + it(`should return actions occurred in worker to replay in the main process`, async () => { + const result = await worker.single.runQueries(queryIdsSmall) + + const expectedActionShapes = { + QUERY_START: [`componentPath`, `isPage`, `path`], + PAGE_QUERY_RUN: [`componentPath`, `isPage`, `path`, `resultHash`], + CREATE_COMPONENT_DEPENDENCY: [`nodeId`, `path`], + ADD_PENDING_PAGE_DATA_WRITE: [`path`], + } + expect(result).toBeArrayOfSize(11) + + for (const action of result) { + expect(action.type).toBeOneOf(Object.keys(expectedActionShapes)) + expect(action.payload).toContainKeys(expectedActionShapes[action.type]) + } + // Double-check that important actions are actually present + expect(result).toContainValue( + expect.objectContaining({ type: `QUERY_START` }) + ) + expect(result).toContainValue( + expect.objectContaining({ type: `PAGE_QUERY_RUN` }) + ) + }) + + it(`should replay selected worker actions in runQueriesInWorkersQueue`, async () => { + const expectedActions = [ + { + payload: { + componentPath: `/static-query-component.js`, + isPage: false, + path: `sq--q1`, + }, + type: `QUERY_START`, + }, + { + payload: { + nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`, + path: `sq--q1`, + }, + plugin: ``, + type: `CREATE_COMPONENT_DEPENDENCY`, + }, + { + payload: { + componentPath: `/static-query-component.js`, + isPage: false, + path: `sq--q1`, + queryHash: `q1-hash`, + resultHash: `Dr5hgCDB+R0S9oRBWeZYj3lB7VI=`, + }, + type: `PAGE_QUERY_RUN`, + }, + { + payload: { + componentPath: `/foo.js`, + isPage: true, + path: `/foo`, + }, + type: `QUERY_START`, + }, + { + payload: { + componentPath: `/bar.js`, + isPage: true, + path: `/bar`, + }, + type: `QUERY_START`, + }, + { + payload: { + nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`, + path: `/foo`, + }, + plugin: ``, + type: `CREATE_COMPONENT_DEPENDENCY`, + }, + { + payload: { + nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`, + path: `/bar`, + }, + plugin: ``, + type: `CREATE_COMPONENT_DEPENDENCY`, + }, + { + payload: { + path: `/foo`, + }, + type: `ADD_PENDING_PAGE_DATA_WRITE`, + }, + { + payload: { + componentPath: `/foo.js`, + isPage: true, + path: `/foo`, + resultHash: `8dW7PoqwZNk/0U8LO6kTj1qBCwU=`, + }, + type: `PAGE_QUERY_RUN`, + }, + { + payload: { + path: `/bar`, + }, + type: `ADD_PENDING_PAGE_DATA_WRITE`, + }, + { + payload: { + componentPath: `/bar.js`, + isPage: true, + path: `/bar`, + resultHash: `iKmhf9XgbsfK7qJw0tw95pmGwJM=`, + }, + type: `PAGE_QUERY_RUN`, + }, + ] + + const actualActions: Array = [] + function listenActions(action): void { + actualActions.push(action) + } + emitter.on(`*`, listenActions) + await runQueriesInWorkersQueue(worker, queryIdsSmall) + emitter.off(`*`, listenActions) + + expect(actualActions).toContainAllValues(expectedActions) + }) }) diff --git a/packages/gatsby/src/utils/worker/child/queries.ts b/packages/gatsby/src/utils/worker/child/queries.ts index 7e3094bf8e6dc..534dbb39179bb 100644 --- a/packages/gatsby/src/utils/worker/child/queries.ts +++ b/packages/gatsby/src/utils/worker/child/queries.ts @@ -8,6 +8,12 @@ import { GraphQLRunner } from "../../../query/graphql-runner" import { getDataStore } from "../../../datastore" import { setState } from "./state" import { buildSchema } from "./schema" +import { + IAddPendingPageDataWriteAction, + ICreatePageDependencyAction, + IPageQueryRunAction, + IQueryStartAction, +} from "../../../redux/types" export function setComponents(): void { setState([`components`, `staticQueryComponents`]) @@ -29,7 +35,39 @@ function getGraphqlRunner(): GraphQLRunner { return gqlRunner } -export async function runQueries(queryIds: IGroupedQueryIds): Promise { +type ActionsToReplay = Array< + | IQueryStartAction + | IPageQueryRunAction + | IAddPendingPageDataWriteAction + | ICreatePageDependencyAction +> + +export async function runQueries( + queryIds: IGroupedQueryIds +): Promise { + const actionsToReplay: ActionsToReplay = [] + + const unsubscribe = store.subscribe(() => { + const action = store.getState().lastAction + if ( + action.type === `QUERY_START` || + action.type === `PAGE_QUERY_RUN` || + action.type === `ADD_PENDING_PAGE_DATA_WRITE` || + action.type === `CREATE_COMPONENT_DEPENDENCY` + ) { + actionsToReplay.push(action) + } + }) + + try { + await doRunQueries(queryIds) + return actionsToReplay + } finally { + unsubscribe() + } +} + +async function doRunQueries(queryIds: IGroupedQueryIds): Promise { const workerStore = store.getState() // If buildSchema() didn't run yet, execute it diff --git a/packages/gatsby/src/utils/worker/pool.ts b/packages/gatsby/src/utils/worker/pool.ts index 949549a042641..cda547a7a545d 100644 --- a/packages/gatsby/src/utils/worker/pool.ts +++ b/packages/gatsby/src/utils/worker/pool.ts @@ -8,6 +8,8 @@ import { initJobsMessagingInMainProcess } from "../jobs/worker-messaging" import { initReporterMessagingInMainProcess } from "./reporter" import { GatsbyWorkerPool } from "./types" +import { store } from "../../redux" +import { ActionsUnion } from "../../redux/types" export type { GatsbyWorkerPool } @@ -48,6 +50,7 @@ export async function runQueriesInWorkersQueue( promises.push( pool.single .runQueries({ pageQueryIds: [], staticQueryIds: segment }) + .then(replayWorkerActions) .then(() => { activity.tick(segment.length) }) @@ -58,6 +61,7 @@ export async function runQueriesInWorkersQueue( promises.push( pool.single .runQueries({ pageQueryIds: segment, staticQueryIds: [] }) + .then(replayWorkerActions) .then(() => { activity.tick(segment.length) }) @@ -68,3 +72,17 @@ export async function runQueriesInWorkersQueue( activity.end() } + +async function replayWorkerActions( + actions: Array +): Promise { + let i = 1 + for (const action of actions) { + store.dispatch(action) + + // Give event loop some breath + if (i++ % 100 === 0) { + await new Promise(resolve => process.nextTick(resolve)) + } + } +}