Skip to content

Commit

Permalink
Merge branch 'node-fetch-prevent-host-abort' into node-fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
nleush committed Nov 17, 2021
2 parents 381bc0e + 63acdf0 commit b0da2b6
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 22 deletions.
33 changes: 27 additions & 6 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -956,8 +956,31 @@

// After new context received - launch link post plugins.

var iterationPluginContexts = {};

var hasData = false;
for(var i = 0; i < result.length && !hasData; i++) {
var r = result[i];
if (r.data && !r.error && r.method.name === 'getLink' || r.method.name === 'getLinks' || r.method.name === 'prepareLink') {
var links;
if (r.method.name === 'prepareLink') {
links = r.data && r.data.addLink;
} else {
links = r.data;
}
if (links) {
hasData = true;;
}
}
}

if (hasData) {
runPostPluginsIterationCall('startIteration', iterationPluginContexts);
}

for(var i = 0; i < result.length; i++) {


var r = result[i];

if (r.data && !r.error && r.method.name === 'getLink' || r.method.name === 'getLinks' || r.method.name === 'prepareLink') {
Expand All @@ -980,20 +1003,18 @@

links = _.compact(links);

var iterationPluginContexts = {};

runPostPluginsIterationCall('startIteration', iterationPluginContexts);

for(var j = 0; j < links.length; j++) {
var link = links[j];
allResults.links.push(link);
runPostPlugins(link, r, usedMethods, context, pluginsContexts, iterationPluginContexts, options, asyncMethodCb);
}

runPostPluginsIterationCall('finishIteration', iterationPluginContexts);
}
}

if (hasData) {
runPostPluginsIterationCall('finishIteration', iterationPluginContexts);
}

return hasNewData;
}

Expand Down
131 changes: 116 additions & 15 deletions lib/fetch.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { URL } from 'url';
import { h1NoCache, noCache, createUrl, AbortController, AbortError, FetchError } from '@adobe/helix-fetch';
import log from '../logging.js';

Expand Down Expand Up @@ -27,30 +28,40 @@ const fetchH1 = h1NoCache({
}).fetch;

function doFetch(fetch_func, h1_fetch_func, options) {
const abortController = new AbortController();

const fetch_options = Object.assign({}, options);

// Implement `qs` (get params).
var uri = options.qs ? createUrl(options.uri, options.qs) : options.uri;
// Remove hash part of url.
uri = uri.replace(/#.*/gi, '');

const abortController = new HostAbortController(uri);

// Allow request abort before finish.
fetch_options.signal = abortController.signal;
// Implement `timeout`.
const timeoutTimerId = setTimeout(() => {
abortController.abort();
}, options.timeout || CONFIG.RESPONSE_TIMEOUT);
// Implement `qs` (get params).
var uri = options.qs ? createUrl(options.uri, options.qs) : options.uri;
// Remove hash part of url.
uri = uri.replace(/#.*/gi, '');

const a_fetch_func = options.disable_http2 ? h1_fetch_func: fetch_func;
return new Promise((resolve, reject) => {
a_fetch_func(uri, fetch_options)
.then(response => {
var stream = response.body;
stream.on('end', () => {
clearTimeout(timeoutTimerId);
});
abortController.onResponse(stream);
stream.status = response.status;
stream.headers = response.headers.plain();
stream.abortController = abortController;
stream.h2 = response.httpVersion === '2.0';
resolve(stream);
})
.catch(error => {
clearTimeout(timeoutTimerId);
if (!options.disable_http2 && error.code && /^ERR_HTTP2/.test(error.code)) {
log(' -- doFetch http2 error', error.code, uri);
resolve(doFetch(fetch_func, h1_fetch_func, Object.assign({}, options, {disable_http2: true})));
Expand All @@ -71,9 +82,6 @@ function doFetch(fetch_func, h1_fetch_func, options) {
reject(error);
}
}
})
.finally(() => {
clearTimeout(timeoutTimerId);
});
});
}
Expand All @@ -93,14 +101,30 @@ export function fetchStreamAuthorized(options) {
export function fetchData(options) {
var json = options.json;
delete options.json;
var response;
var res;
const fetch_options = Object.assign({}, options);
const uri = options.qs ? createUrl(options.uri, options.qs) : options.uri;

const abortController = new HostAbortController(uri);

// Allow request abort before finish.
fetch_options.signal = abortController.signal;
// Implement `timeout`.
const timeoutTimerId = setTimeout(() => {
abortController.abort();
}, options.timeout || CONFIG.RESPONSE_TIMEOUT);

const a_fetch_func = options.disable_http2 ? fetchH1: fetch;
return new Promise((resolve, reject) => {
a_fetch_func(uri, fetch_options)
.then(res => {
response = res;
.then(response => {
var stream = response.body;
// TODO: looks like HEAD request has no END event.
stream.on('end', () => {
clearTimeout(timeoutTimerId);
});
abortController.onResponse(stream);
res = response;
if (json !== false) {
// If `json` not forbidden, read `content-type`.
json = json || (response.headers.get('content-type').indexOf('application/json') > -1);
Expand All @@ -113,11 +137,88 @@ export function fetchData(options) {
})
.then(data => {
resolve({
status: response.status,
headers: response.headers.plain(),
status: res.status,
headers: res.headers.plain(),
data: data
});
})
.catch(reject);
.catch((error) => {
clearTimeout(timeoutTimerId);
reject(error);
});
});
};
};

const hostsCache = {};

function addController(ctrl) {
hostsCache[ctrl.host] = hostsCache[ctrl.host] || [];
hostsCache[ctrl.host].push(ctrl);
}

function tryAbortHost(host) {
var controllers = hostsCache[host];
if (controllers) {
const hasWaitingRequests = controllers.some(ctrl => ctrl.waiting);
// If all aborted or finished.
if (!hasWaitingRequests) {
while (controllers.length) {
let ctrl = controllers.pop();
ctrl.forceAbort();
}
}
}
}

function removeController(ctrl) {
var controllers = hostsCache[ctrl.host];
if (controllers) {
const idx = controllers.indexOf(ctrl);
controllers.splice(idx, 1);
}
}

class HostAbortController {

constructor(url) {
this.aborted = false;
this.responded = false;
this.url = url;
const parsedUrl = new URL(url);
this.host = parsedUrl.protocol + parsedUrl.hostname;
this.abortController = new AbortController();
addController(this);
}

onResponse(stream) {
this.stream = stream;
if (this.aborted) {
stream.pause();
}
stream.on('end', () => {
this.finished = true;
removeController(this);
tryAbortHost(this.host);
});
}

abort() {
this.stream && this.stream.pause();
this.aborted = true;
tryAbortHost(this.host);
}

forceAbort() {
if (this.aborted && !this.finished) {
this.abortController.abort();
}
}

get waiting() {
return !(this.aborted || this.finished);
}

get signal() {
return this.abortController.signal;
}
}
2 changes: 1 addition & 1 deletion modules/tests-ui/tester.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import CONFIG from '../../config.loader.js';
global.CONFIG = config;
global.CONFIG = CONFIG;

if (!CONFIG.tests) {
console.error('Tests not started: CONFIG.tests not configured.');
Expand Down

0 comments on commit b0da2b6

Please sign in to comment.