diff --git a/benchmark/http/bench-parser.js b/benchmark/http/bench-parser.js index 087616f44e71d1..8208df11223b90 100644 --- a/benchmark/http/bench-parser.js +++ b/benchmark/http/bench-parser.js @@ -25,7 +25,7 @@ function main({ len, n }) { bench.start(); for (var i = 0; i < n; i++) { parser.execute(header, 0, header.length); - parser.reinitialize(REQUEST); + parser.reinitialize(REQUEST, i > 0); } bench.end(n); } diff --git a/benchmark/misc/freelist.js b/benchmark/misc/freelist.js index 8c3281cc407363..7fa9af4f3ddb7f 100644 --- a/benchmark/misc/freelist.js +++ b/benchmark/misc/freelist.js @@ -9,7 +9,7 @@ const bench = common.createBenchmark(main, { }); function main({ n }) { - const FreeList = require('internal/freelist'); + const { FreeList } = require('internal/freelist'); const poolSize = 1000; const list = new FreeList('test', poolSize, Object); var j; diff --git a/lib/_http_agent.js b/lib/_http_agent.js index 1a2920cf098298..97c5ab604ff821 100644 --- a/lib/_http_agent.js +++ b/lib/_http_agent.js @@ -167,7 +167,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */, var socket = this.freeSockets[name].shift(); // Guard against an uninitialized or user supplied Socket. if (socket._handle && typeof socket._handle.asyncReset === 'function') { - // Assign the handle a new asyncId and run any init() hooks. + // Assign the handle a new asyncId and run any destroy()/init() hooks. socket._handle.asyncReset(); socket[async_id_symbol] = socket._handle.getAsyncId(); } diff --git a/lib/_http_client.js b/lib/_http_client.js index c83800a93b1da9..d91b43516fa4ee 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -47,6 +47,7 @@ const { ERR_UNESCAPED_CHARACTERS } = require('internal/errors').codes; const { validateTimerDuration } = require('internal/timers'); +const is_reused_symbol = require('internal/freelist').symbols.is_reused_symbol; const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/; @@ -631,7 +632,7 @@ function tickOnSocket(req, socket) { var parser = parsers.alloc(); req.socket = socket; req.connection = socket; - parser.reinitialize(HTTPParser.RESPONSE); + parser.reinitialize(HTTPParser.RESPONSE, parser[is_reused_symbol]); parser.socket = socket; parser.outgoing = req; req.parser = parser; diff --git a/lib/_http_common.js b/lib/_http_common.js index 1de0ee6025d571..b37814f7832242 100644 --- a/lib/_http_common.js +++ b/lib/_http_common.js @@ -23,7 +23,7 @@ const { methods, HTTPParser } = internalBinding('http_parser'); -const FreeList = require('internal/freelist'); +const { FreeList } = require('internal/freelist'); const { ondrain } = require('internal/http'); const incoming = require('_http_incoming'); const { diff --git a/lib/_http_server.js b/lib/_http_server.js index cc1a428cd66c3b..3b2d7f50419127 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -42,6 +42,7 @@ const { defaultTriggerAsyncIdScope, getOrSetAsyncId } = require('internal/async_hooks'); +const is_reused_symbol = require('internal/freelist').symbols.is_reused_symbol; const { IncomingMessage } = require('_http_incoming'); const { ERR_HTTP_HEADERS_SENT, @@ -338,7 +339,7 @@ function connectionListenerInternal(server, socket) { socket.on('timeout', socketOnTimeout); var parser = parsers.alloc(); - parser.reinitialize(HTTPParser.REQUEST); + parser.reinitialize(HTTPParser.REQUEST, parser[is_reused_symbol]); parser.socket = socket; socket.parser = parser; diff --git a/lib/internal/freelist.js b/lib/internal/freelist.js index 7e9cef9528ab75..04d684e8334ff5 100644 --- a/lib/internal/freelist.js +++ b/lib/internal/freelist.js @@ -1,5 +1,7 @@ 'use strict'; +const is_reused_symbol = Symbol('isReused'); + class FreeList { constructor(name, max, ctor) { this.name = name; @@ -9,9 +11,15 @@ class FreeList { } alloc() { - return this.list.length ? - this.list.pop() : - this.ctor.apply(this, arguments); + let item; + if (this.list.length > 0) { + item = this.list.pop(); + item[is_reused_symbol] = true; + } else { + item = this.ctor.apply(this, arguments); + item[is_reused_symbol] = false; + } + return item; } free(obj) { @@ -23,4 +31,9 @@ class FreeList { } } -module.exports = FreeList; +module.exports = { + FreeList, + symbols: { + is_reused_symbol + } +}; diff --git a/src/async_wrap.cc b/src/async_wrap.cc index 2b163a5fa283d1..596fcc8356d500 100644 --- a/src/async_wrap.cc +++ b/src/async_wrap.cc @@ -563,6 +563,7 @@ AsyncWrap::AsyncWrap(Environment* env, CHECK_NE(provider, PROVIDER_NONE); CHECK_GE(object->InternalFieldCount(), 1); + async_id_ = -1; // Use AsyncReset() call to execute the init() callbacks. AsyncReset(execution_async_id, silent); } @@ -606,6 +607,14 @@ void AsyncWrap::EmitDestroy(Environment* env, double async_id) { // and reused over their lifetime. This way a new uid can be assigned when // the resource is pulled out of the pool and put back into use. void AsyncWrap::AsyncReset(double execution_async_id, bool silent) { + if (async_id_ != -1) { + // This instance was in use before, we have already emitted an init with + // its previous async_id and need to emit a matching destroy for that + // before generating a new async_id. + EmitDestroy(env(), async_id_); + } + + // Now we can assign a new async_id_ to this instance. async_id_ = execution_async_id == -1 ? env()->new_async_id() : execution_async_id; trigger_async_id_ = env()->get_default_trigger_async_id(); diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 5d093b27c39d3c..9850b4f698205b 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -465,6 +465,8 @@ class Parser : public AsyncWrap, public StreamListener { Environment* env = Environment::GetCurrent(args); CHECK(args[0]->IsInt32()); + CHECK(args[1]->IsBoolean()); + bool isReused = args[1]->IsTrue(); http_parser_type type = static_cast(args[0].As()->Value()); @@ -473,8 +475,12 @@ class Parser : public AsyncWrap, public StreamListener { ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder()); // Should always be called from the same context. CHECK_EQ(env, parser->env()); - // The parser is being reused. Reset the async id and call init() callbacks. - parser->AsyncReset(); + // This parser has either just been created or it is being reused. + // We must only call AsyncReset for the latter case, because AsyncReset has + // already been called via the constructor for the former case. + if (isReused) { + parser->AsyncReset(); + } parser->Init(type); } diff --git a/test/async-hooks/test-graph.http.js b/test/async-hooks/test-graph.http.js index b18bc7453c0fa2..414ebabeeeaea4 100644 --- a/test/async-hooks/test-graph.http.js +++ b/test/async-hooks/test-graph.http.js @@ -38,20 +38,14 @@ process.on('exit', function() { { type: 'HTTPPARSER', id: 'httpparser:1', triggerAsyncId: 'tcpserver:1' }, - { type: 'HTTPPARSER', - id: 'httpparser:2', - triggerAsyncId: 'tcpserver:1' }, { type: 'TCPWRAP', id: 'tcp:2', triggerAsyncId: 'tcpserver:1' }, { type: 'Timeout', id: 'timeout:1', triggerAsyncId: 'tcp:2' }, { type: 'HTTPPARSER', - id: 'httpparser:3', - triggerAsyncId: 'tcp:2' }, - { type: 'HTTPPARSER', - id: 'httpparser:4', + id: 'httpparser:2', triggerAsyncId: 'tcp:2' }, { type: 'Timeout', id: 'timeout:2', - triggerAsyncId: 'httpparser:4' }, + triggerAsyncId: 'httpparser:2' }, { type: 'SHUTDOWNWRAP', id: 'shutdown:1', triggerAsyncId: 'tcp:2' } ] diff --git a/test/parallel/test-async-hooks-http-agent-destroy.js b/test/parallel/test-async-hooks-http-agent-destroy.js new file mode 100644 index 00000000000000..637f2c511410e7 --- /dev/null +++ b/test/parallel/test-async-hooks-http-agent-destroy.js @@ -0,0 +1,84 @@ +'use strict'; +// Flags: --expose-internals +const common = require('../common'); +const assert = require('assert'); +const { async_id_symbol } = require('internal/async_hooks').symbols; +const async_hooks = require('async_hooks'); +const http = require('http'); + +// Regression test for https://github.com/nodejs/node/issues/19859 +// Checks that an http.Agent emits a destroy for the old asyncId before calling +// asyncReset()s when reusing a socket handle. The setup is nearly identical to +// parallel/test-async-hooks-http-agent (which focuses on the assertion that +// a fresh asyncId is assigned to the net.Socket instance). + +const destroyedIds = new Set(); +async_hooks.createHook({ + destroy: common.mustCallAtLeast((asyncId) => { + destroyedIds.add(asyncId); + }, 1) +}).enable(); + +// Make sure a single socket is transparently reused for 2 requests. +const agent = new http.Agent({ + keepAlive: true, + keepAliveMsecs: Infinity, + maxSockets: 1 +}); + +const server = http.createServer(common.mustCall((req, res) => { + req.once('data', common.mustCallAtLeast(() => { + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.write('foo'); + })); + req.on('end', common.mustCall(() => { + res.end('bar'); + })); +}, 2)).listen(0, common.mustCall(() => { + const port = server.address().port; + const payload = 'hello world'; + + // First request. This is useless except for adding a socket to the + // agent’s pool for reuse. + const r1 = http.request({ + agent, port, method: 'POST' + }, common.mustCall((res) => { + // Remember which socket we used. + const socket = res.socket; + const asyncIdAtFirstRequest = socket[async_id_symbol]; + assert.ok(asyncIdAtFirstRequest > 0, `${asyncIdAtFirstRequest} > 0`); + // Check that request and response share their socket. + assert.strictEqual(r1.socket, socket); + + res.on('data', common.mustCallAtLeast(() => {})); + res.on('end', common.mustCall(() => { + // setImmediate() to give the agent time to register the freed socket. + setImmediate(common.mustCall(() => { + // The socket is free for reuse now. + assert.strictEqual(socket[async_id_symbol], -1); + + // second request: + const r2 = http.request({ + agent, port, method: 'POST' + }, common.mustCall((res) => { + assert.ok(destroyedIds.has(asyncIdAtFirstRequest)); + + // Empty payload, to hit the “right” code path. + r2.end(''); + + res.on('data', common.mustCallAtLeast(() => {})); + res.on('end', common.mustCall(() => { + // Clean up to let the event loop stop. + server.close(); + agent.destroy(); + })); + })); + + // Schedule a payload to be written immediately, but do not end the + // request just yet. + r2.write(payload); + })); + })); + })); + r1.end(payload); +})); diff --git a/test/parallel/test-async-hooks-http-parser-destroy.js b/test/parallel/test-async-hooks-http-parser-destroy.js new file mode 100644 index 00000000000000..aeb805702d89e9 --- /dev/null +++ b/test/parallel/test-async-hooks-http-parser-destroy.js @@ -0,0 +1,61 @@ +'use strict'; +const common = require('../common'); +const Countdown = require('../common/countdown'); +const assert = require('assert'); +const async_hooks = require('async_hooks'); +const http = require('http'); + +// Regression test for https://github.com/nodejs/node/issues/19859. +// Checks that matching destroys are emitted when creating new/reusing old http +// parser instances. + +const N = 50; +const KEEP_ALIVE = 100; + +const createdIds = []; +const destroyedIds = []; +async_hooks.createHook({ + init: common.mustCallAtLeast((asyncId, type) => { + if (type === 'HTTPPARSER') { + createdIds.push(asyncId); + } + }, N), + destroy: (asyncId) => { + destroyedIds.push(asyncId); + } +}).enable(); + +const server = http.createServer(function(req, res) { + res.end('Hello'); +}); + +const keepAliveAgent = new http.Agent({ + keepAlive: true, + keepAliveMsecs: KEEP_ALIVE, +}); + +const countdown = new Countdown(N, () => { + server.close(() => { + // give the server sockets time to close (which will also free their + // associated parser objects) after the server has been closed. + setTimeout(() => { + createdIds.forEach((createdAsyncId) => { + assert.ok(destroyedIds.indexOf(createdAsyncId) >= 0); + }); + }, KEEP_ALIVE * 2); + }); +}); + +server.listen(0, function() { + for (let i = 0; i < N; ++i) { + (function makeRequest() { + http.get({ + port: server.address().port, + agent: keepAliveAgent + }, function(res) { + countdown.dec(); + res.resume(); + }); + })(); + } +}); diff --git a/test/parallel/test-freelist.js b/test/parallel/test-freelist.js index d1f7d888c03868..03946dfda257c2 100644 --- a/test/parallel/test-freelist.js +++ b/test/parallel/test-freelist.js @@ -4,28 +4,27 @@ require('../common'); const assert = require('assert'); -const FreeList = require('internal/freelist'); +const { FreeList } = require('internal/freelist'); assert.strictEqual(typeof FreeList, 'function'); -const flist1 = new FreeList('flist1', 3, String); +const flist1 = new FreeList('flist1', 3, Object); // Allocating when empty, should not change the list size -const result = flist1.alloc('test'); -assert.strictEqual(typeof result, 'string'); -assert.strictEqual(result, 'test'); +const result = flist1.alloc(); +assert.strictEqual(typeof result, 'object'); assert.strictEqual(flist1.list.length, 0); // Exhaust the free list -assert(flist1.free('test1')); -assert(flist1.free('test2')); -assert(flist1.free('test3')); +assert(flist1.free({ id: 'test1' })); +assert(flist1.free({ id: 'test2' })); +assert(flist1.free({ id: 'test3' })); // Now it should not return 'true', as max length is exceeded -assert.strictEqual(flist1.free('test4'), false); -assert.strictEqual(flist1.free('test5'), false); +assert.strictEqual(flist1.free({ id: 'test4' }), false); +assert.strictEqual(flist1.free({ id: 'test5' }), false); // At this point 'alloc' should just return the stored values -assert.strictEqual(flist1.alloc(), 'test3'); -assert.strictEqual(flist1.alloc(), 'test2'); -assert.strictEqual(flist1.alloc(), 'test1'); +assert.strictEqual(flist1.alloc().id, 'test3'); +assert.strictEqual(flist1.alloc().id, 'test2'); +assert.strictEqual(flist1.alloc().id, 'test1'); diff --git a/test/parallel/test-http-parser.js b/test/parallel/test-http-parser.js index 0bdaa22b8ade5a..36f41f79e59dcf 100644 --- a/test/parallel/test-http-parser.js +++ b/test/parallel/test-http-parser.js @@ -98,7 +98,7 @@ function expectBody(expected) { throw new Error('hello world'); }; - parser.reinitialize(HTTPParser.REQUEST); + parser.reinitialize(HTTPParser.REQUEST, false); assert.throws( () => { parser.execute(request, 0, request.length); }, @@ -558,7 +558,7 @@ function expectBody(expected) { parser[kOnBody] = expectBody('ping'); parser.execute(req1, 0, req1.length); - parser.reinitialize(REQUEST); + parser.reinitialize(REQUEST, false); parser[kOnBody] = expectBody('pong'); parser[kOnHeadersComplete] = onHeadersComplete2; parser.execute(req2, 0, req2.length); diff --git a/test/parallel/test-internal-modules-expose.js b/test/parallel/test-internal-modules-expose.js index a3fd6f63ffe399..ab48e36881268c 100644 --- a/test/parallel/test-internal-modules-expose.js +++ b/test/parallel/test-internal-modules-expose.js @@ -7,5 +7,5 @@ const config = process.binding('config'); console.log(config, process.argv); -assert.strictEqual(typeof require('internal/freelist'), 'function'); +assert.strictEqual(typeof require('internal/freelist').FreeList, 'function'); assert.strictEqual(config.exposeInternals, true); diff --git a/test/sequential/test-http-regr-gh-2928.js b/test/sequential/test-http-regr-gh-2928.js index 0950b84bbe093d..3794eddaa09369 100644 --- a/test/sequential/test-http-regr-gh-2928.js +++ b/test/sequential/test-http-regr-gh-2928.js @@ -7,6 +7,7 @@ const common = require('../common'); const assert = require('assert'); const httpCommon = require('_http_common'); const { internalBinding } = require('internal/test/binding'); +const is_reused_symbol = require('internal/freelist').symbols.is_reused_symbol; const { HTTPParser } = internalBinding('http_parser'); const net = require('net'); @@ -25,7 +26,7 @@ function execAndClose() { process.stdout.write('.'); const parser = parsers.pop(); - parser.reinitialize(HTTPParser.RESPONSE); + parser.reinitialize(HTTPParser.RESPONSE, parser[is_reused_symbol]); const socket = net.connect(common.PORT); socket.on('error', (e) => {