Skip to content

Commit

Permalink
Merge pull request #4607 from snyk/dotkas/SUP-1093/replace-with-pmap
Browse files Browse the repository at this point in the history
[SUP-1093] `p-map` some network requests
  • Loading branch information
dotkas authored May 23, 2023
2 parents e3576d8 + a64b508 commit c1d47be
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 55 deletions.
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
"open": "^7.0.3",
"ora": "5.4.0",
"os-name": "^5.1.0",
"p-map": "^4.0.0",
"proxy-from-env": "^1.0.0",
"rimraf": "^2.6.3",
"semver": "^6.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/lib/plugins/nodejs-plugin/yarn-workspaces-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export async function processYarnWorkspaces(
? pathUtil.dirname(yarnWorkspacesFilesMap[packageJsonFileName].root)
: pathUtil.dirname(packageJsonFileName);
const rootYarnLockfileName = pathUtil.join(rootDir, 'yarn.lock');
const yarnLock = await getFileContents(root, rootYarnLockfileName);
const yarnLock = getFileContents(root, rootYarnLockfileName);

if (
rootWorkspaceManifestContent.hasOwnProperty('resolutions') &&
Expand Down
91 changes: 37 additions & 54 deletions src/lib/snyk-test/run-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { parsePackageString as moduleToObject } from 'snyk-module';
import * as depGraphLib from '@snyk/dep-graph';
import * as theme from '../../lib/theme';
import { jsonStringifyLargeObject } from '../../lib/json';
import * as pMap from 'p-map';

import {
AffectedPackages,
Expand All @@ -22,20 +23,21 @@ import {
} from './legacy';
import {
AuthFailedError,
BadGatewayError,
DockerImageNotFoundError,
errorMessageWithRetry,
FailedToGetVulnerabilitiesError,
FailedToGetVulnsFromUnavailableResource,
FailedToRunTestError,
InternalServerError,
NoSupportedManifestsFoundError,
NotFoundError,
errorMessageWithRetry,
BadGatewayError,
ServiceUnavailableError,
} from '../errors';
import * as snyk from '../';
import { isCI } from '../is-ci';
import * as common from './common';
import { RETRY_ATTEMPTS, RETRY_DELAY } from './common';
import config from '../config';
import * as analytics from '../analytics';
import { maybePrintDepGraph, maybePrintDepTree } from '../print-deps';
Expand All @@ -58,9 +60,9 @@ import { extractPackageManager } from '../plugins/extract-package-manager';
import { getExtraProjectCount } from '../plugins/get-extra-project-count';
import { findAndLoadPolicy } from '../policy';
import {
DepTreeFromResolveDeps,
Payload,
PayloadBody,
DepTreeFromResolveDeps,
TestDependenciesRequest,
} from './types';
import { getAuthHeader } from '../api-token';
Expand All @@ -71,7 +73,6 @@ import { makeRequest } from '../request';
import { spinner } from '../spinner';
import { hasUnknownVersions } from '../dep-graph';
import { sleep } from '../common';
import { RETRY_ATTEMPTS, RETRY_DELAY } from './common';

const debug = debugModule('snyk:run-test');

Expand Down Expand Up @@ -237,61 +238,43 @@ async function sendAndParseResults(
originalPayload: Payload;
response: any;
};
const requests: (() => Promise<TestResponse>)[] = [];
for (const originalPayload of payloads) {
const request = async (): Promise<TestResponse> => {
let step = 0;
let error;

while (step < RETRY_ATTEMPTS) {
debug(`sendTestPayload retry step ${step} out of ${RETRY_ATTEMPTS}`);
try {
/** sendTestPayload() deletes the request.body from the payload once completed. */
const payload = Object.assign({}, originalPayload);
const response = await sendTestPayload(payload);

return { payload, originalPayload, response };
} catch (err) {
error = err;
step++;

if (
err instanceof InternalServerError ||
err instanceof BadGatewayError ||
err instanceof ServiceUnavailableError
) {
await sleep(RETRY_DELAY);
} else {
break;
}

const sendRequest = async (
originalPayload: Payload,
): Promise<TestResponse> => {
let step = 0;
let error;

while (step < RETRY_ATTEMPTS) {
debug(`sendTestPayload retry step ${step} out of ${RETRY_ATTEMPTS}`);
try {
/** sendTestPayload() deletes the request.body from the payload once completed. */
const payload = Object.assign({}, originalPayload);
const response = await sendTestPayload(payload);

return { payload, originalPayload, response };
} catch (err) {
error = err;
step++;

if (
err instanceof InternalServerError ||
err instanceof BadGatewayError ||
err instanceof ServiceUnavailableError
) {
await sleep(RETRY_DELAY);
} else {
break;
}
}

throw error;
};

requests.push(request);
}

// Start block adapted from https://gist.github.com/jcouyang/632709f30e12a7879a73e9e132c0d56b?permalink_comment_id=3591045#gistcomment-3591045
let index = 0;
const responses: TestResponse[] = [];

const execRequest = async () => {
while (index < requests.length) {
// index is shared across "threads", so capture the current value for this
// iteration on this "thread" and increment it for the rest of the world.
const curIndex = index++;
responses[curIndex] = await requests[curIndex]();
}

throw error;
};

const threads: Promise<void>[] = [];
for (let thread = 0; thread < MAX_CONCURRENCY; thread++) {
threads.push(execRequest());
}
await Promise.all(threads);
// End block adapted from https://gist.github.com/jcouyang/632709f30e12a7879a73e9e132c0d56b?permalink_comment_id=3591045#gistcomment-3591045
const responses = await pMap(payloads, sendRequest, {
concurrency: MAX_CONCURRENCY,
});

for (const { payload, originalPayload, response } of responses) {
const {
Expand Down

0 comments on commit c1d47be

Please sign in to comment.