Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gatsby): PQR: merge data dependencies from workers to the main process #32305

Merged
merged 7 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 139 additions & 1 deletion packages/gatsby/src/utils/worker/__tests__/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import sourceNodesAndRemoveStaleNodes from "../../source-nodes"
import {
savePartialStateToDisk,
store,
emitter,
loadPartialStateFromDisk,
} from "../../../redux"
import { loadConfigAndPlugins } from "../../../bootstrap/load-config-and-plugins"
Expand Down Expand Up @@ -167,7 +168,6 @@ describeWhenLMDB(`worker (queries)`, () => {
savePartialStateToDisk([`components`, `staticQueryComponents`])

await Promise.all(worker.all.buildSchema())
await worker.single.runQueries(queryIdsSmall)
})

afterAll(() => {
Expand All @@ -180,9 +180,14 @@ describeWhenLMDB(`worker (queries)`, () => {
}
})

// This was the original implementation of state syncing between a worker and the main process.
// We switched to "replaying actions" as a mechanism for state syncing.
// But we can get back to state saving / merging if "replaying actions" proves to be too expensive
// TODO: delete or re-activate depending on results yielded by "replaying actions" approach.
vladar marked this conversation as resolved.
Show resolved Hide resolved
it(`should save worker "queries" state to disk`, async () => {
if (!worker) fail(`worker not defined`)

await worker.single.runQueries(queryIdsSmall)
await Promise.all(worker.all.saveQueries())
// Pass "1" as workerId as the test only have one worker
const result = loadPartialStateFromDisk([`queries`], `1`)
Expand Down Expand Up @@ -233,6 +238,8 @@ describeWhenLMDB(`worker (queries)`, () => {

it(`should execute static queries`, async () => {
if (!worker) fail(`worker not defined`)

await worker.single.runQueries(queryIdsSmall)
const stateFromWorker = await worker.single.getState()

const staticQueryResult = await fs.readJson(
Expand All @@ -250,6 +257,8 @@ describeWhenLMDB(`worker (queries)`, () => {

it(`should execute page queries`, async () => {
if (!worker) fail(`worker not defined`)

await worker.single.runQueries(queryIdsSmall)
const stateFromWorker = await worker.single.getState()

const pageQueryResult = await fs.readJson(
Expand All @@ -265,6 +274,8 @@ describeWhenLMDB(`worker (queries)`, () => {

it(`should execute page queries with context variables`, async () => {
if (!worker) fail(`worker not defined`)

await worker.single.runQueries(queryIdsSmall)
const stateFromWorker = await worker.single.getState()

const pageQueryResult = await fs.readJson(
Expand Down Expand Up @@ -331,4 +342,131 @@ describeWhenLMDB(`worker (queries)`, () => {

spy.mockRestore()
})

it(`should return actions occurred in worker to replay in the main process`, async () => {
const result = await worker.single.runQueries(queryIdsSmall)

const expectedActionShapes = {
QUERY_START: [`componentPath`, `isPage`, `path`],
PAGE_QUERY_RUN: [`componentPath`, `isPage`, `path`, `resultHash`],
CREATE_COMPONENT_DEPENDENCY: [`nodeId`, `path`],
ADD_PENDING_PAGE_DATA_WRITE: [`path`],
}
expect(result).toBeArrayOfSize(11)

for (const action of result) {
expect(action.type).toBeOneOf(Object.keys(expectedActionShapes))
expect(action.payload).toContainKeys(expectedActionShapes[action.type])
}
// Double-check that important actions are actually present
expect(result).toContainValue(
expect.objectContaining({ type: `QUERY_START` })
)
expect(result).toContainValue(
expect.objectContaining({ type: `PAGE_QUERY_RUN` })
)
})

it(`should replay selected worker actions in runQueriesInWorkersQueue`, async () => {
const expectedActions = [
{
payload: {
componentPath: `/static-query-component.js`,
isPage: false,
path: `sq--q1`,
},
type: `QUERY_START`,
},
{
payload: {
nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`,
path: `sq--q1`,
},
plugin: ``,
type: `CREATE_COMPONENT_DEPENDENCY`,
},
{
payload: {
componentPath: `/static-query-component.js`,
isPage: false,
path: `sq--q1`,
queryHash: `q1-hash`,
resultHash: `Dr5hgCDB+R0S9oRBWeZYj3lB7VI=`,
},
type: `PAGE_QUERY_RUN`,
},
{
payload: {
componentPath: `/foo.js`,
isPage: true,
path: `/foo`,
},
type: `QUERY_START`,
},
{
payload: {
componentPath: `/bar.js`,
isPage: true,
path: `/bar`,
},
type: `QUERY_START`,
},
{
payload: {
nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`,
path: `/foo`,
},
plugin: ``,
type: `CREATE_COMPONENT_DEPENDENCY`,
},
{
payload: {
nodeId: `ceb8e742-a2ce-5110-a560-94c93d1c71a5`,
path: `/bar`,
},
plugin: ``,
type: `CREATE_COMPONENT_DEPENDENCY`,
},
{
payload: {
path: `/foo`,
},
type: `ADD_PENDING_PAGE_DATA_WRITE`,
},
{
payload: {
componentPath: `/foo.js`,
isPage: true,
path: `/foo`,
resultHash: `8dW7PoqwZNk/0U8LO6kTj1qBCwU=`,
},
type: `PAGE_QUERY_RUN`,
},
{
payload: {
path: `/bar`,
},
type: `ADD_PENDING_PAGE_DATA_WRITE`,
},
{
payload: {
componentPath: `/bar.js`,
isPage: true,
path: `/bar`,
resultHash: `iKmhf9XgbsfK7qJw0tw95pmGwJM=`,
},
type: `PAGE_QUERY_RUN`,
},
]

const actualActions: Array<any> = []
function listenActions(action): void {
actualActions.push(action)
}
emitter.on(`*`, listenActions)
await runQueriesInWorkersQueue(worker, queryIdsSmall)
emitter.off(`*`, listenActions)

expect(actualActions).toContainAllValues(expectedActions)
})
})
40 changes: 39 additions & 1 deletion packages/gatsby/src/utils/worker/child/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import { GraphQLRunner } from "../../../query/graphql-runner"
import { getDataStore } from "../../../datastore"
import { setState } from "./state"
import { buildSchema } from "./schema"
import {
IAddPendingPageDataWriteAction,
ICreatePageDependencyAction,
IPageQueryRunAction,
IQueryStartAction,
} from "../../../redux/types"

export function setComponents(): void {
setState([`components`, `staticQueryComponents`])
Expand All @@ -29,7 +35,39 @@ function getGraphqlRunner(): GraphQLRunner {
return gqlRunner
}

export async function runQueries(queryIds: IGroupedQueryIds): Promise<void> {
type ActionsToReplay = Array<
| IQueryStartAction
| IPageQueryRunAction
| IAddPendingPageDataWriteAction
| ICreatePageDependencyAction
>

export async function runQueries(
queryIds: IGroupedQueryIds
): Promise<ActionsToReplay> {
const actionsToReplay: ActionsToReplay = []

const unsubscribe = store.subscribe(() => {
const action = store.getState().lastAction
if (
action.type === `QUERY_START` ||
action.type === `PAGE_QUERY_RUN` ||
action.type === `ADD_PENDING_PAGE_DATA_WRITE` ||
action.type === `CREATE_COMPONENT_DEPENDENCY`
) {
actionsToReplay.push(action)
}
})

try {
await doRunQueries(queryIds)
return actionsToReplay
} finally {
unsubscribe()
}
pieh marked this conversation as resolved.
Show resolved Hide resolved
}

async function doRunQueries(queryIds: IGroupedQueryIds): Promise<void> {
const workerStore = store.getState()

// If buildSchema() didn't run yet, execute it
Expand Down
18 changes: 18 additions & 0 deletions packages/gatsby/src/utils/worker/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { initJobsMessagingInMainProcess } from "../jobs/worker-messaging"
import { initReporterMessagingInMainProcess } from "./reporter"

import { GatsbyWorkerPool } from "./types"
import { store } from "../../redux"
import { ActionsUnion } from "../../redux/types"

export type { GatsbyWorkerPool }

Expand Down Expand Up @@ -48,6 +50,7 @@ export async function runQueriesInWorkersQueue(
promises.push(
pool.single
.runQueries({ pageQueryIds: [], staticQueryIds: segment })
.then(replayWorkerActions)
.then(() => {
activity.tick(segment.length)
})
Expand All @@ -58,6 +61,7 @@ export async function runQueriesInWorkersQueue(
promises.push(
pool.single
.runQueries({ pageQueryIds: segment, staticQueryIds: [] })
.then(replayWorkerActions)
.then(() => {
activity.tick(segment.length)
})
Expand All @@ -68,3 +72,17 @@ export async function runQueriesInWorkersQueue(

activity.end()
}

async function replayWorkerActions(
actions: Array<ActionsUnion>
): Promise<void> {
let i = 1
for (const action of actions) {
store.dispatch(action)

// Give event loop some breath
if (i++ % 100 === 0) {
await new Promise(resolve => process.nextTick(resolve))
}
}
}