From ec858a9419259fc2c7b6811081141c24c10bf372 Mon Sep 17 00:00:00 2001 From: Vladimir Razuvaev Date: Fri, 9 Jul 2021 19:18:13 +0700 Subject: [PATCH 1/7] feat(gatsby): PQR: merge data dependencies from workers to the main process --- packages/gatsby-worker/src/index.ts | 10 +++ packages/gatsby/src/redux/reducers/queries.ts | 68 +++++++++++++++++++ packages/gatsby/src/redux/types.ts | 6 ++ packages/gatsby/src/utils/worker/pool.ts | 21 ++++++ 4 files changed, 105 insertions(+) diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts index 8a4332e32bffb..e672676feaddc 100644 --- a/packages/gatsby-worker/src/index.ts +++ b/packages/gatsby-worker/src/index.ts @@ -86,6 +86,10 @@ interface IWorkerInfo { currentTask?: TaskInfo } +export interface IPublicWorkerInfo { + workerId: number +} + /** * Worker pool is a class that allow you to queue function execution across multiple * child processes, in order to parallelize work. It accepts absolute path to worker module @@ -293,6 +297,12 @@ export class WorkerPool< this.startAll() } + getWorkerInfo(): Array { + return this.workers.map(worker => { + return { workerId: worker.workerId } + }) + } + private checkForWork( workerInfo: IWorkerInfo ): void { diff --git a/packages/gatsby/src/redux/reducers/queries.ts b/packages/gatsby/src/redux/reducers/queries.ts index dd4a67e1a311d..0b5a6c4638fd8 100644 --- a/packages/gatsby/src/redux/reducers/queries.ts +++ b/packages/gatsby/src/redux/reducers/queries.ts @@ -220,6 +220,14 @@ export function queriesReducer( state.dirtyQueriesListToEmitViaWebsocket = [] return state } + case `MERGE_WORKER_QUERY_STATE`: { + assertSaneWorkerState(action.payload) + + for (const workerState of action.payload) { + state = mergeWorkerDataDependencies(state, workerState) + } + return state + } default: return state } @@ -334,3 +342,63 @@ function trackDirtyQuery( return state } + +interface IWorkerStateChunk { + workerId: number + queryStateChunk: IGatsbyState["queries"] +} + +function mergeWorkerDataDependencies( + state: IGatsbyState["queries"], + workerStateChunk: IWorkerStateChunk +): IGatsbyState["queries"] { + const queryState = workerStateChunk.queryStateChunk + + // First clear data dependencies for all queries tracked by worker + for (const queryId of queryState.trackedQueries.keys()) { + state = clearNodeDependencies(state, queryId) + state = clearConnectionDependencies(state, queryId) + } + + // Now re-add all data deps from worker + for (const [nodeId, queries] of queryState.byNode) { + for (const queryId of queries) { + state = addNodeDependency(state, queryId, nodeId) + } + } + for (const [connectionName, queries] of queryState.byConnection) { + for (const queryId of queries) { + state = addConnectionDependency(state, queryId, connectionName) + } + } + return state +} + +function assertSaneWorkerState( + workerStateChunks: Array +): void { + for (const { workerId, queryStateChunk } of workerStateChunks) { + if (queryStateChunk.deletedQueries.size !== 0) { + throw new Error( + `Assertion failed: workerState.deletedQueries.size === 0 (worker #${workerId})` + ) + } + if (queryStateChunk.trackedComponents.size !== 0) { + throw new Error( + `Assertion failed: queryStateChunk.trackedComponents.size === 0 (worker #${workerId})` + ) + } + for (const query of queryStateChunk.trackedQueries.values()) { + if (query.dirty) { + throw new Error( + `Assertion failed: all worker queries are not dirty (worker #${workerId})` + ) + } + if (query.running) { + throw new Error( + `Assertion failed: all worker queries are not running (worker #${workerId})` + ) + } + } + } +} diff --git a/packages/gatsby/src/redux/types.ts b/packages/gatsby/src/redux/types.ts index 37a4a13008542..a19c5be714afe 100644 --- a/packages/gatsby/src/redux/types.ts +++ b/packages/gatsby/src/redux/types.ts @@ -403,6 +403,7 @@ export type ActionsUnion = | IMarkHtmlDirty | ISSRUsedUnsafeBuiltin | ISetSiteConfig + | IMergeWorkerQueryState export interface IApiFinishedAction { type: `API_FINISHED` @@ -905,3 +906,8 @@ export interface INodeManifest { id: string } } + +export interface IMergeWorkerQueryState { + type: `MERGE_WORKER_QUERY_STATE` + payload: Array<{ workerId: number; queryStateChunk: IGatsbyState["queries"] }> +} diff --git a/packages/gatsby/src/utils/worker/pool.ts b/packages/gatsby/src/utils/worker/pool.ts index 949549a042641..29b8a7fa5ad47 100644 --- a/packages/gatsby/src/utils/worker/pool.ts +++ b/packages/gatsby/src/utils/worker/pool.ts @@ -8,6 +8,8 @@ import { initJobsMessagingInMainProcess } from "../jobs/worker-messaging" import { initReporterMessagingInMainProcess } from "./reporter" import { GatsbyWorkerPool } from "./types" +import { loadPartialStateFromDisk, store } from "../../redux" +import { IGatsbyState, IMergeWorkerQueryState } from "../../redux/types" export type { GatsbyWorkerPool } @@ -65,6 +67,25 @@ export async function runQueriesInWorkersQueue( } await Promise.all(promises) + await mergeWorkerState(pool) activity.end() } + +async function mergeWorkerState(pool: GatsbyWorkerPool): Promise { + await pool.all.saveQueries() + const workerQueryState: IMergeWorkerQueryState["payload"] = [] + + for (const { workerId } of pool.getWorkerInfo()) { + const state = loadPartialStateFromDisk([`queries`], String(workerId)) + workerQueryState.push({ + workerId, + queryStateChunk: state as IGatsbyState["queries"], + }) + await new Promise(resolve => process.nextTick(resolve)) + } + store.dispatch({ + type: `MERGE_WORKER_QUERY_STATE`, + payload: workerQueryState, + }) +} From 0644d77015fe71021faf557b2209987ea77ce87a Mon Sep 17 00:00:00 2001 From: Vladimir Razuvaev Date: Fri, 9 Jul 2021 20:34:47 +0700 Subject: [PATCH 2/7] Move worker state merging to a standalone step --- packages/gatsby/src/commands/build.ts | 6 +++++- packages/gatsby/src/utils/worker/__tests__/queries.ts | 2 +- packages/gatsby/src/utils/worker/pool.ts | 7 +++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/gatsby/src/commands/build.ts b/packages/gatsby/src/commands/build.ts index c15e1b933f3ee..9b7d8c4c899fe 100644 --- a/packages/gatsby/src/commands/build.ts +++ b/packages/gatsby/src/commands/build.ts @@ -43,7 +43,10 @@ import { markWebpackStatusAsDone, } from "../utils/webpack-status" import { showExperimentNotices } from "../utils/show-experiment-notice" -import { runQueriesInWorkersQueue } from "../utils/worker/pool" +import { + mergeWorkerState, + runQueriesInWorkersQueue, +} from "../utils/worker/pool" module.exports = async function build(program: IBuildArgs): Promise { if (isTruthy(process.env.VERBOSE)) { @@ -96,6 +99,7 @@ module.exports = async function build(program: IBuildArgs): Promise { let waitForWorkerPoolRestart = Promise.resolve() if (process.env.GATSBY_EXPERIMENTAL_PARALLEL_QUERY_RUNNING) { await runQueriesInWorkersQueue(workerPool, queryIds) + await mergeWorkerState(workerPool) waitForWorkerPoolRestart = workerPool.restart() } else { await runStaticQueries({ diff --git a/packages/gatsby/src/utils/worker/__tests__/queries.ts b/packages/gatsby/src/utils/worker/__tests__/queries.ts index bcae5e4febecb..ef1b56ae0a4b2 100644 --- a/packages/gatsby/src/utils/worker/__tests__/queries.ts +++ b/packages/gatsby/src/utils/worker/__tests__/queries.ts @@ -105,7 +105,7 @@ const queryIdsBig: IGroupedQueryIds = { staticQueryIds: [dummyStaticQuery.id], } -describeWhenLMDB(`worker (queries)`, () => { +describe(`worker (queries)`, () => { beforeAll(async () => { store.dispatch({ type: `DELETE_CACHE` }) const fileDir = path.join(process.cwd(), `.cache/worker`) diff --git a/packages/gatsby/src/utils/worker/pool.ts b/packages/gatsby/src/utils/worker/pool.ts index 29b8a7fa5ad47..4c93eb2222a40 100644 --- a/packages/gatsby/src/utils/worker/pool.ts +++ b/packages/gatsby/src/utils/worker/pool.ts @@ -67,12 +67,14 @@ export async function runQueriesInWorkersQueue( } await Promise.all(promises) - await mergeWorkerState(pool) activity.end() } -async function mergeWorkerState(pool: GatsbyWorkerPool): Promise { +export async function mergeWorkerState(pool: GatsbyWorkerPool): Promise { + const activity = reporter.activityTimer(`Merge worker state`) + activity.start() + await pool.all.saveQueries() const workerQueryState: IMergeWorkerQueryState["payload"] = [] @@ -88,4 +90,5 @@ async function mergeWorkerState(pool: GatsbyWorkerPool): Promise { type: `MERGE_WORKER_QUERY_STATE`, payload: workerQueryState, }) + activity.end() } From e6c1e210e40c40de0814b4d37bb15eed11c108e1 Mon Sep 17 00:00:00 2001 From: Vladimir Razuvaev Date: Fri, 9 Jul 2021 23:41:30 +0700 Subject: [PATCH 3/7] revert acciental change --- packages/gatsby/src/utils/worker/__tests__/queries.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/gatsby/src/utils/worker/__tests__/queries.ts b/packages/gatsby/src/utils/worker/__tests__/queries.ts index ef1b56ae0a4b2..bcae5e4febecb 100644 --- a/packages/gatsby/src/utils/worker/__tests__/queries.ts +++ b/packages/gatsby/src/utils/worker/__tests__/queries.ts @@ -105,7 +105,7 @@ const queryIdsBig: IGroupedQueryIds = { staticQueryIds: [dummyStaticQuery.id], } -describe(`worker (queries)`, () => { +describeWhenLMDB(`worker (queries)`, () => { beforeAll(async () => { store.dispatch({ type: `DELETE_CACHE` }) const fileDir = path.join(process.cwd(), `.cache/worker`) From cb80dab4b40fc79fcbc6d08c213c45efb55fc4d5 Mon Sep 17 00:00:00 2001 From: Vladimir Razuvaev Date: Thu, 15 Jul 2021 20:21:06 +0700 Subject: [PATCH 4/7] different approach: replay actions vs merging state --- packages/gatsby/src/commands/build.ts | 6 +- packages/gatsby/src/redux/reducers/queries.ts | 68 ------ .../src/utils/worker/__tests__/queries.ts | 213 +++++++++++++++++- .../gatsby/src/utils/worker/child/queries.ts | 40 +++- packages/gatsby/src/utils/worker/pool.ts | 36 ++- 5 files changed, 267 insertions(+), 96 deletions(-) diff --git a/packages/gatsby/src/commands/build.ts b/packages/gatsby/src/commands/build.ts index 9b7d8c4c899fe..c15e1b933f3ee 100644 --- a/packages/gatsby/src/commands/build.ts +++ b/packages/gatsby/src/commands/build.ts @@ -43,10 +43,7 @@ import { markWebpackStatusAsDone, } from "../utils/webpack-status" import { showExperimentNotices } from "../utils/show-experiment-notice" -import { - mergeWorkerState, - runQueriesInWorkersQueue, -} from "../utils/worker/pool" +import { runQueriesInWorkersQueue } from "../utils/worker/pool" module.exports = async function build(program: IBuildArgs): Promise { if (isTruthy(process.env.VERBOSE)) { @@ -99,7 +96,6 @@ module.exports = async function build(program: IBuildArgs): Promise { let waitForWorkerPoolRestart = Promise.resolve() if (process.env.GATSBY_EXPERIMENTAL_PARALLEL_QUERY_RUNNING) { await runQueriesInWorkersQueue(workerPool, queryIds) - await mergeWorkerState(workerPool) waitForWorkerPoolRestart = workerPool.restart() } else { await runStaticQueries({ diff --git a/packages/gatsby/src/redux/reducers/queries.ts b/packages/gatsby/src/redux/reducers/queries.ts index 0b5a6c4638fd8..dd4a67e1a311d 100644 --- a/packages/gatsby/src/redux/reducers/queries.ts +++ b/packages/gatsby/src/redux/reducers/queries.ts @@ -220,14 +220,6 @@ export function queriesReducer( state.dirtyQueriesListToEmitViaWebsocket = [] return state } - case `MERGE_WORKER_QUERY_STATE`: { - assertSaneWorkerState(action.payload) - - for (const workerState of action.payload) { - state = mergeWorkerDataDependencies(state, workerState) - } - return state - } default: return state } @@ -342,63 +334,3 @@ function trackDirtyQuery( return state } - -interface IWorkerStateChunk { - workerId: number - queryStateChunk: IGatsbyState["queries"] -} - -function mergeWorkerDataDependencies( - state: IGatsbyState["queries"], - workerStateChunk: IWorkerStateChunk -): IGatsbyState["queries"] { - const queryState = workerStateChunk.queryStateChunk - - // First clear data dependencies for all queries tracked by worker - for (const queryId of queryState.trackedQueries.keys()) { - state = clearNodeDependencies(state, queryId) - state = clearConnectionDependencies(state, queryId) - } - - // Now re-add all data deps from worker - for (const [nodeId, queries] of queryState.byNode) { - for (const queryId of queries) { - state = addNodeDependency(state, queryId, nodeId) - } - } - for (const [connectionName, queries] of queryState.byConnection) { - for (const queryId of queries) { - state = addConnectionDependency(state, queryId, connectionName) - } - } - return state -} - -function assertSaneWorkerState( - workerStateChunks: Array -): void { - for (const { workerId, queryStateChunk } of workerStateChunks) { - if (queryStateChunk.deletedQueries.size !== 0) { - throw new Error( - `Assertion failed: workerState.deletedQueries.size === 0 (worker #${workerId})` - ) - } - if (queryStateChunk.trackedComponents.size !== 0) { - throw new Error( - `Assertion failed: queryStateChunk.trackedComponents.size === 0 (worker #${workerId})` - ) - } - for (const query of queryStateChunk.trackedQueries.values()) { - if (query.dirty) { - throw new Error( - `Assertion failed: all worker queries are not dirty (worker #${workerId})` - ) - } - if (query.running) { - throw new Error( - `Assertion failed: all worker queries are not running (worker #${workerId})` - ) - } - } - } -} diff --git a/packages/gatsby/src/utils/worker/__tests__/queries.ts b/packages/gatsby/src/utils/worker/__tests__/queries.ts index bcae5e4febecb..41ce0e6448a7f 100644 --- a/packages/gatsby/src/utils/worker/__tests__/queries.ts +++ b/packages/gatsby/src/utils/worker/__tests__/queries.ts @@ -7,6 +7,7 @@ import sourceNodesAndRemoveStaleNodes from "../../source-nodes" import { savePartialStateToDisk, store, + emitter, loadPartialStateFromDisk, } from "../../../redux" import { loadConfigAndPlugins } from "../../../bootstrap/load-config-and-plugins" @@ -167,7 +168,6 @@ describeWhenLMDB(`worker (queries)`, () => { savePartialStateToDisk([`components`, `staticQueryComponents`]) await Promise.all(worker.all.buildSchema()) - await worker.single.runQueries(queryIdsSmall) }) afterAll(() => { @@ -180,9 +180,14 @@ describeWhenLMDB(`worker (queries)`, () => { } }) + // This was the original implementation of state syncing between a worker and the main process. + // We switched to "replaying actions" as a mechanism for state syncing. + // But we can get back to state saving / merging if "replaying actions" proves to be too expensive + // TODO: delete or re-activate depending on results yielded by "replaying actions" approach. it(`should save worker "queries" state to disk`, async () => { if (!worker) fail(`worker not defined`) + await worker.single.runQueries(queryIdsSmall) await Promise.all(worker.all.saveQueries()) // Pass "1" as workerId as the test only have one worker const result = loadPartialStateFromDisk([`queries`], `1`) @@ -233,6 +238,8 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute static queries`, async () => { if (!worker) fail(`worker not defined`) + + await worker.single.runQueries(queryIdsSmall) const stateFromWorker = await worker.single.getState() const staticQueryResult = await fs.readJson( @@ -250,6 +257,8 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute page queries`, async () => { if (!worker) fail(`worker not defined`) + + await worker.single.runQueries(queryIdsSmall) const stateFromWorker = await worker.single.getState() const pageQueryResult = await fs.readJson( @@ -265,6 +274,8 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should execute page queries with context variables`, async () => { if (!worker) fail(`worker not defined`) + + await worker.single.runQueries(queryIdsSmall) const stateFromWorker = await worker.single.getState() const pageQueryResult = await fs.readJson( @@ -331,4 +342,204 @@ describeWhenLMDB(`worker (queries)`, () => { spy.mockRestore() }) + + it(`should return actions occurred in worker to replay in the main process`, async () => { + const result = await worker.single.runQueries(queryIdsSmall) + + expect(result).toMatchInlineSnapshot(` + Array [ + Object { + "payload": Object { + "componentPath": "/static-query-component.js", + "isPage": false, + "path": "sq--q1", + }, + "type": "QUERY_START", + }, + Object { + "payload": Object { + "nodeId": "ceb8e742-a2ce-5110-a560-94c93d1c71a5", + "path": "sq--q1", + }, + "plugin": "", + "type": "CREATE_COMPONENT_DEPENDENCY", + }, + Object { + "payload": Object { + "componentPath": "/static-query-component.js", + "isPage": false, + "path": "sq--q1", + "queryHash": "q1-hash", + "resultHash": "Dr5hgCDB+R0S9oRBWeZYj3lB7VI=", + }, + "type": "PAGE_QUERY_RUN", + }, + Object { + "payload": Object { + "componentPath": "/foo.js", + "isPage": true, + "path": "/foo", + }, + "type": "QUERY_START", + }, + Object { + "payload": Object { + "componentPath": "/bar.js", + "isPage": true, + "path": "/bar", + }, + "type": "QUERY_START", + }, + Object { + "payload": Object { + "nodeId": "ceb8e742-a2ce-5110-a560-94c93d1c71a5", + "path": "/foo", + }, + "plugin": "", + "type": "CREATE_COMPONENT_DEPENDENCY", + }, + Object { + "payload": Object { + "nodeId": "ceb8e742-a2ce-5110-a560-94c93d1c71a5", + "path": "/bar", + }, + "plugin": "", + "type": "CREATE_COMPONENT_DEPENDENCY", + }, + Object { + "payload": Object { + "path": "/foo", + }, + "type": "ADD_PENDING_PAGE_DATA_WRITE", + }, + Object { + "payload": Object { + "componentPath": "/foo.js", + "isPage": true, + "path": "/foo", + "resultHash": "8dW7PoqwZNk/0U8LO6kTj1qBCwU=", + }, + "type": "PAGE_QUERY_RUN", + }, + Object { + "payload": Object { + "path": "/bar", + }, + "type": "ADD_PENDING_PAGE_DATA_WRITE", + }, + Object { + "payload": Object { + "componentPath": "/bar.js", + "isPage": true, + "path": "/bar", + "resultHash": "iKmhf9XgbsfK7qJw0tw95pmGwJM=", + }, + "type": "PAGE_QUERY_RUN", + }, + ] + `) + }) + + it(`should replay selected worker actions in runQueriesInWorkersQueue`, async () => { + const expectedActions = [ + { + payload: { + componentPath: `/static-query-component.js`, + isPage: false, + path: `sq--q1`, + }, + type: `QUERY_START`, + }, + { + payload: { + nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`, + path: `sq--q1`, + }, + plugin: ``, + type: `CREATE_COMPONENT_DEPENDENCY`, + }, + { + payload: { + componentPath: `/static-query-component.js`, + isPage: false, + path: `sq--q1`, + queryHash: `q1-hash`, + resultHash: `Dr5hgCDB+R0S9oRBWeZYj3lB7VI=`, + }, + type: `PAGE_QUERY_RUN`, + }, + { + payload: { + componentPath: `/foo.js`, + isPage: true, + path: `/foo`, + }, + type: `QUERY_START`, + }, + { + payload: { + componentPath: `/bar.js`, + isPage: true, + path: `/bar`, + }, + type: `QUERY_START`, + }, + { + payload: { + nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`, + path: `/foo`, + }, + plugin: ``, + type: `CREATE_COMPONENT_DEPENDENCY`, + }, + { + payload: { + nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`, + path: `/bar`, + }, + plugin: ``, + type: `CREATE_COMPONENT_DEPENDENCY`, + }, + { + payload: { + path: `/foo`, + }, + type: `ADD_PENDING_PAGE_DATA_WRITE`, + }, + { + payload: { + componentPath: `/foo.js`, + isPage: true, + path: `/foo`, + resultHash: `8dW7PoqwZNk/0U8LO6kTj1qBCwU=`, + }, + type: `PAGE_QUERY_RUN`, + }, + { + payload: { + path: `/bar`, + }, + type: `ADD_PENDING_PAGE_DATA_WRITE`, + }, + { + payload: { + componentPath: `/bar.js`, + isPage: true, + path: `/bar`, + resultHash: `iKmhf9XgbsfK7qJw0tw95pmGwJM=`, + }, + type: `PAGE_QUERY_RUN`, + }, + ] + + const actualActions: Array = [] + function listenActions(action): void { + actualActions.push(action) + } + emitter.on(`*`, listenActions) + await runQueriesInWorkersQueue(worker, queryIdsSmall) + emitter.off(`*`, listenActions) + + expect(actualActions).toContainAllValues(expectedActions) + }) }) diff --git a/packages/gatsby/src/utils/worker/child/queries.ts b/packages/gatsby/src/utils/worker/child/queries.ts index 7e3094bf8e6dc..534dbb39179bb 100644 --- a/packages/gatsby/src/utils/worker/child/queries.ts +++ b/packages/gatsby/src/utils/worker/child/queries.ts @@ -8,6 +8,12 @@ import { GraphQLRunner } from "../../../query/graphql-runner" import { getDataStore } from "../../../datastore" import { setState } from "./state" import { buildSchema } from "./schema" +import { + IAddPendingPageDataWriteAction, + ICreatePageDependencyAction, + IPageQueryRunAction, + IQueryStartAction, +} from "../../../redux/types" export function setComponents(): void { setState([`components`, `staticQueryComponents`]) @@ -29,7 +35,39 @@ function getGraphqlRunner(): GraphQLRunner { return gqlRunner } -export async function runQueries(queryIds: IGroupedQueryIds): Promise { +type ActionsToReplay = Array< + | IQueryStartAction + | IPageQueryRunAction + | IAddPendingPageDataWriteAction + | ICreatePageDependencyAction +> + +export async function runQueries( + queryIds: IGroupedQueryIds +): Promise { + const actionsToReplay: ActionsToReplay = [] + + const unsubscribe = store.subscribe(() => { + const action = store.getState().lastAction + if ( + action.type === `QUERY_START` || + action.type === `PAGE_QUERY_RUN` || + action.type === `ADD_PENDING_PAGE_DATA_WRITE` || + action.type === `CREATE_COMPONENT_DEPENDENCY` + ) { + actionsToReplay.push(action) + } + }) + + try { + await doRunQueries(queryIds) + return actionsToReplay + } finally { + unsubscribe() + } +} + +async function doRunQueries(queryIds: IGroupedQueryIds): Promise { const workerStore = store.getState() // If buildSchema() didn't run yet, execute it diff --git a/packages/gatsby/src/utils/worker/pool.ts b/packages/gatsby/src/utils/worker/pool.ts index 4c93eb2222a40..cda547a7a545d 100644 --- a/packages/gatsby/src/utils/worker/pool.ts +++ b/packages/gatsby/src/utils/worker/pool.ts @@ -8,8 +8,8 @@ import { initJobsMessagingInMainProcess } from "../jobs/worker-messaging" import { initReporterMessagingInMainProcess } from "./reporter" import { GatsbyWorkerPool } from "./types" -import { loadPartialStateFromDisk, store } from "../../redux" -import { IGatsbyState, IMergeWorkerQueryState } from "../../redux/types" +import { store } from "../../redux" +import { ActionsUnion } from "../../redux/types" export type { GatsbyWorkerPool } @@ -50,6 +50,7 @@ export async function runQueriesInWorkersQueue( promises.push( pool.single .runQueries({ pageQueryIds: [], staticQueryIds: segment }) + .then(replayWorkerActions) .then(() => { activity.tick(segment.length) }) @@ -60,6 +61,7 @@ export async function runQueriesInWorkersQueue( promises.push( pool.single .runQueries({ pageQueryIds: segment, staticQueryIds: [] }) + .then(replayWorkerActions) .then(() => { activity.tick(segment.length) }) @@ -71,24 +73,16 @@ export async function runQueriesInWorkersQueue( activity.end() } -export async function mergeWorkerState(pool: GatsbyWorkerPool): Promise { - const activity = reporter.activityTimer(`Merge worker state`) - activity.start() - - await pool.all.saveQueries() - const workerQueryState: IMergeWorkerQueryState["payload"] = [] - - for (const { workerId } of pool.getWorkerInfo()) { - const state = loadPartialStateFromDisk([`queries`], String(workerId)) - workerQueryState.push({ - workerId, - queryStateChunk: state as IGatsbyState["queries"], - }) - await new Promise(resolve => process.nextTick(resolve)) +async function replayWorkerActions( + actions: Array +): Promise { + let i = 1 + for (const action of actions) { + store.dispatch(action) + + // Give event loop some breath + if (i++ % 100 === 0) { + await new Promise(resolve => process.nextTick(resolve)) + } } - store.dispatch({ - type: `MERGE_WORKER_QUERY_STATE`, - payload: workerQueryState, - }) - activity.end() } From 45a095ca2fbf78f770305f1bc5111dd953e86a4c Mon Sep 17 00:00:00 2001 From: Vladimir Razuvaev Date: Thu, 15 Jul 2021 20:27:55 +0700 Subject: [PATCH 5/7] revert unneded changes --- packages/gatsby-worker/src/index.ts | 10 ---------- packages/gatsby/src/redux/types.ts | 6 ------ 2 files changed, 16 deletions(-) diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts index e672676feaddc..8a4332e32bffb 100644 --- a/packages/gatsby-worker/src/index.ts +++ b/packages/gatsby-worker/src/index.ts @@ -86,10 +86,6 @@ interface IWorkerInfo { currentTask?: TaskInfo } -export interface IPublicWorkerInfo { - workerId: number -} - /** * Worker pool is a class that allow you to queue function execution across multiple * child processes, in order to parallelize work. It accepts absolute path to worker module @@ -297,12 +293,6 @@ export class WorkerPool< this.startAll() } - getWorkerInfo(): Array { - return this.workers.map(worker => { - return { workerId: worker.workerId } - }) - } - private checkForWork( workerInfo: IWorkerInfo ): void { diff --git a/packages/gatsby/src/redux/types.ts b/packages/gatsby/src/redux/types.ts index a19c5be714afe..37a4a13008542 100644 --- a/packages/gatsby/src/redux/types.ts +++ b/packages/gatsby/src/redux/types.ts @@ -403,7 +403,6 @@ export type ActionsUnion = | IMarkHtmlDirty | ISSRUsedUnsafeBuiltin | ISetSiteConfig - | IMergeWorkerQueryState export interface IApiFinishedAction { type: `API_FINISHED` @@ -906,8 +905,3 @@ export interface INodeManifest { id: string } } - -export interface IMergeWorkerQueryState { - type: `MERGE_WORKER_QUERY_STATE` - payload: Array<{ workerId: number; queryStateChunk: IGatsbyState["queries"] }> -} From eee2f55b7e7a48124204d2fddbd0a40a5f707b8f Mon Sep 17 00:00:00 2001 From: Vladimir Razuvaev Date: Fri, 16 Jul 2021 23:59:47 +0700 Subject: [PATCH 6/7] do not use inline snapshot --- .../src/utils/worker/__tests__/queries.ts | 111 +++--------------- 1 file changed, 19 insertions(+), 92 deletions(-) diff --git a/packages/gatsby/src/utils/worker/__tests__/queries.ts b/packages/gatsby/src/utils/worker/__tests__/queries.ts index 41ce0e6448a7f..8909f5b0ab20c 100644 --- a/packages/gatsby/src/utils/worker/__tests__/queries.ts +++ b/packages/gatsby/src/utils/worker/__tests__/queries.ts @@ -346,98 +346,25 @@ describeWhenLMDB(`worker (queries)`, () => { it(`should return actions occurred in worker to replay in the main process`, async () => { const result = await worker.single.runQueries(queryIdsSmall) - expect(result).toMatchInlineSnapshot(` - Array [ - Object { - "payload": Object { - "componentPath": "/static-query-component.js", - "isPage": false, - "path": "sq--q1", - }, - "type": "QUERY_START", - }, - Object { - "payload": Object { - "nodeId": "ceb8e742-a2ce-5110-a560-94c93d1c71a5", - "path": "sq--q1", - }, - "plugin": "", - "type": "CREATE_COMPONENT_DEPENDENCY", - }, - Object { - "payload": Object { - "componentPath": "/static-query-component.js", - "isPage": false, - "path": "sq--q1", - "queryHash": "q1-hash", - "resultHash": "Dr5hgCDB+R0S9oRBWeZYj3lB7VI=", - }, - "type": "PAGE_QUERY_RUN", - }, - Object { - "payload": Object { - "componentPath": "/foo.js", - "isPage": true, - "path": "/foo", - }, - "type": "QUERY_START", - }, - Object { - "payload": Object { - "componentPath": "/bar.js", - "isPage": true, - "path": "/bar", - }, - "type": "QUERY_START", - }, - Object { - "payload": Object { - "nodeId": "ceb8e742-a2ce-5110-a560-94c93d1c71a5", - "path": "/foo", - }, - "plugin": "", - "type": "CREATE_COMPONENT_DEPENDENCY", - }, - Object { - "payload": Object { - "nodeId": "ceb8e742-a2ce-5110-a560-94c93d1c71a5", - "path": "/bar", - }, - "plugin": "", - "type": "CREATE_COMPONENT_DEPENDENCY", - }, - Object { - "payload": Object { - "path": "/foo", - }, - "type": "ADD_PENDING_PAGE_DATA_WRITE", - }, - Object { - "payload": Object { - "componentPath": "/foo.js", - "isPage": true, - "path": "/foo", - "resultHash": "8dW7PoqwZNk/0U8LO6kTj1qBCwU=", - }, - "type": "PAGE_QUERY_RUN", - }, - Object { - "payload": Object { - "path": "/bar", - }, - "type": "ADD_PENDING_PAGE_DATA_WRITE", - }, - Object { - "payload": Object { - "componentPath": "/bar.js", - "isPage": true, - "path": "/bar", - "resultHash": "iKmhf9XgbsfK7qJw0tw95pmGwJM=", - }, - "type": "PAGE_QUERY_RUN", - }, - ] - `) + const expectedActionShapes = { + QUERY_START: [`componentPath`, `isPage`, `path`], + PAGE_QUERY_RUN: [`componentPath`, `isPage`, `path`, `resultHash`], + CREATE_COMPONENT_DEPENDENCY: [`nodeId`, `path`], + ADD_PENDING_PAGE_DATA_WRITE: [`path`], + } + expect(result).toBeArrayOfSize(11) + + for (const action of result) { + expect(action.type).toBeOneOf(Object.keys(expectedActionShapes)) + expect(action.payload).toContainKeys(expectedActionShapes[action.type]) + } + // Double-check that important actions are actually present + expect(result).toContainValue( + expect.objectContaining({ type: `QUERY_START` }) + ) + expect(result).toContainValue( + expect.objectContaining({ type: `PAGE_QUERY_RUN` }) + ) }) it(`should replay selected worker actions in runQueriesInWorkersQueue`, async () => { From cd3e0fdc08a1116e5ff0c511a692393ca182ba5e Mon Sep 17 00:00:00 2001 From: Vladimir Razuvaev Date: Mon, 19 Jul 2021 17:55:56 +0700 Subject: [PATCH 7/7] Update packages/gatsby/src/utils/worker/__tests__/queries.ts Co-authored-by: Lennart --- packages/gatsby/src/utils/worker/__tests__/queries.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/gatsby/src/utils/worker/__tests__/queries.ts b/packages/gatsby/src/utils/worker/__tests__/queries.ts index 8909f5b0ab20c..4b802adb23111 100644 --- a/packages/gatsby/src/utils/worker/__tests__/queries.ts +++ b/packages/gatsby/src/utils/worker/__tests__/queries.ts @@ -184,6 +184,7 @@ describeWhenLMDB(`worker (queries)`, () => { // We switched to "replaying actions" as a mechanism for state syncing. // But we can get back to state saving / merging if "replaying actions" proves to be too expensive // TODO: delete or re-activate depending on results yielded by "replaying actions" approach. + // The logic for `loadPartialStateFromDisk` itself is tested in `share-state` tests it(`should save worker "queries" state to disk`, async () => { if (!worker) fail(`worker not defined`)