Skip to content

Commit

Permalink
http2: use and support non-empty DATA frame with END_STREAM flag
Browse files Browse the repository at this point in the history
Adds support for reading from a stream where the final frame is a
non-empty DATA frame with the END_STREAM flag set, instead of hanging
waiting for another frame. When writing to a stream, uses a
END_STREAM flag on final DATA frame instead of adding an empty
DATA frame.

BREAKING: http2 client now expects servers to properly support
END_STREAM flag

Fixes: #31309
Fixes: #33891
Refs: https://nghttp2.org/documentation/types.html#c.nghttp2_on_data_chunk_recv_callback

PR-URL: #33875
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
clshortfuse committed Oct 7, 2020
1 parent 6802870 commit a9ea96c
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 47 deletions.
106 changes: 84 additions & 22 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,7 @@ class Http2Session extends EventEmitter {
streams: new Map(),
pendingStreams: new Set(),
pendingAck: 0,
shutdownWritableCalled: false,
writeQueueSize: 0,
originSet: undefined
};
Expand Down Expand Up @@ -1722,6 +1723,26 @@ function afterShutdown(status) {
stream[kMaybeDestroy]();
}

function shutdownWritable(callback) {
const handle = this[kHandle];
if (!handle) return callback();
const state = this[kState];
if (state.shutdownWritableCalled) {
// Backport v12.x: Session required for debugging stream object
// debugStreamObj(this, 'shutdownWritable() already called');
return callback();
}
state.shutdownWritableCalled = true;

const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = callback;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
}

function finishSendTrailers(stream, headersList) {
// The stream might be destroyed and in that case
// there is nothing to do.
Expand Down Expand Up @@ -1981,10 +2002,50 @@ class Http2Stream extends Duplex {

let req;

let waitingForWriteCallback = true;
let waitingForEndCheck = true;
let writeCallbackErr;
let endCheckCallbackErr;
const done = () => {
if (waitingForEndCheck || waitingForWriteCallback) return;
const err = writeCallbackErr || endCheckCallbackErr;
// writeGeneric does not destroy on error and
// we cannot enable autoDestroy,
// so make sure to destroy on error.
if (err) {
this.destroy(err);
}
cb(err);
};
const writeCallback = (err) => {
waitingForWriteCallback = false;
writeCallbackErr = err;
done();
};
const endCheckCallback = (err) => {
waitingForEndCheck = false;
endCheckCallbackErr = err;
done();
};
// Shutdown write stream right after last chunk is sent
// so final DATA frame can include END_STREAM flag
process.nextTick(() => {
if (writeCallbackErr ||
!this._writableState.ending ||
// Backport v12.x: _writableState.buffered does not exist
// this._writableState.buffered.length ||
this._writableState.bufferedRequest ||
(this[kState].flags & STREAM_FLAGS_HAS_TRAILERS))
return endCheckCallback();
// Backport v12.x: Session required for debugging stream object
// debugStreamObj(this, 'shutting down writable on last write');
shutdownWritable.call(this, endCheckCallback);
});

if (writev)
req = writevGeneric(this, data, cb);
req = writevGeneric(this, data, writeCallback);
else
req = writeGeneric(this, data, encoding, cb);
req = writeGeneric(this, data, encoding, writeCallback);

trackWriteState(this, req.bytes);
}
Expand All @@ -1998,21 +2059,13 @@ class Http2Stream extends Duplex {
}

_final(cb) {
const handle = this[kHandle];
if (this.pending) {
this.once('ready', () => this._final(cb));
} else if (handle !== undefined) {
debugStreamObj(this, '_final shutting down');
const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = cb;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
} else {
cb();
return;
}
// Backport v12.x: Session required for debugging stream object
// debugStreamObj(this, 'shutting down writable on _final');
shutdownWritable.call(this, cb);
}

_read(nread) {
Expand Down Expand Up @@ -2117,11 +2170,20 @@ class Http2Stream extends Duplex {
debugStream(this[kID] || 'pending', session[kType], 'destroying stream');

const state = this[kState];
const sessionCode = session[kState].goawayCode ||
session[kState].destroyCode;
const code = err != null ?
sessionCode || NGHTTP2_INTERNAL_ERROR :
state.rstCode || sessionCode;
const sessionState = session[kState];
const sessionCode = sessionState.goawayCode || sessionState.destroyCode;

// If a stream has already closed successfully, there is no error
// to report from this stream, even if the session has errored.
// This can happen if the stream was already in process of destroying
// after a successful close, but the session had a error between
// this stream's close and destroy operations.
// Previously, this always overrode a successful close operation code
// NGHTTP2_NO_ERROR (0) with sessionCode because the use of the || operator.
const code = (err != null ?
(sessionCode || NGHTTP2_INTERNAL_ERROR) :
(this.closed ? this.rstCode : sessionCode)
);
const hasHandle = handle !== undefined;

if (!this.closed)
Expand All @@ -2130,13 +2192,13 @@ class Http2Stream extends Duplex {

if (hasHandle) {
handle.destroy();
session[kState].streams.delete(id);
sessionState.streams.delete(id);
} else {
session[kState].pendingStreams.delete(this);
sessionState.pendingStreams.delete(this);
}

// Adjust the write queue size for accounting
session[kState].writeQueueSize -= state.writeQueueSize;
sessionState.writeQueueSize -= state.writeQueueSize;
state.writeQueueSize = 0;

// RST code 8 not emitted as an error as its used by clients to signify
Expand Down
13 changes: 7 additions & 6 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
// quite expensive. This is a potential performance optimization target later.
ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_NOT_NULL(stream_buf_.base);
CHECK_LT(stream_buf_offset_, stream_buf_.len);
CHECK_LE(stream_buf_offset_, stream_buf_.len);
size_t read_len = stream_buf_.len - stream_buf_offset_;

// multiple side effects.
Expand All @@ -832,11 +832,11 @@ ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_GT(ret, 0);
CHECK_LE(static_cast<size_t>(ret), read_len);

if (static_cast<size_t>(ret) < read_len) {
// Mark the remainder of the data as available for later consumption.
stream_buf_offset_ += ret;
return ret;
}
// Mark the remainder of the data as available for later consumption.
// Even if all bytes were received, a paused stream may delay the
// nghttp2_on_frame_recv_callback which may have an END_STREAM flag.
stream_buf_offset_ += ret;
return ret;
}

// We are done processing the current input chunk.
Expand Down Expand Up @@ -1174,6 +1174,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
CHECK_NE(session->flags_ & SESSION_STATE_READING_STOPPED, 0);
session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
Debug(session, "receive paused");
return NGHTTP2_ERR_PAUSE;
}

Expand Down
56 changes: 39 additions & 17 deletions test/parallel/test-http2-misbehaving-multiplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Flags: --expose-internals

const common = require('../common');
const assert = require('assert');

if (!common.hasCrypto)
common.skip('missing crypto');
Expand All @@ -13,16 +14,36 @@ const h2test = require('../common/http2');
let client;

const server = h2.createServer();
let gotFirstStreamId1;
server.on('stream', common.mustCall((stream) => {
stream.respond();
stream.end('ok');

// The error will be emitted asynchronously
stream.on('error', common.expectsError({
constructor: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
}));
// Http2Server should be fast enough to respond to and close
// the first streams with ID 1 and ID 3 without errors.

// Test for errors in 'close' event to ensure no errors on some streams.
stream.on('error', () => {});
stream.on('close', (err) => {
if (stream.id === 1) {
if (gotFirstStreamId1) {
// We expect our outgoing frames to fail on Stream ID 1 the second time
// because a stream with ID 1 was already closed before.
common.expectsError({
constructor: NghttpError,
code: 'ERR_HTTP2_ERROR',
message: 'Stream was already closed or invalid'
});
return;
}
gotFirstStreamId1 = true;
}
assert.strictEqual(err, undefined);
});

// Stream ID 5 should never reach the server
assert.notStrictEqual(stream.id, 5);

}, 2));

server.on('session', common.mustCall((session) => {
Expand All @@ -35,26 +56,27 @@ server.on('session', common.mustCall((session) => {

const settings = new h2test.SettingsFrame();
const settingsAck = new h2test.SettingsFrame(true);
const head1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head2 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const head3 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const head4 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);
// HeadersFrame(id, payload, padding, END_STREAM)
const id1 = new h2test.HeadersFrame(1, h2test.kFakeRequestHeaders, 0, true);
const id3 = new h2test.HeadersFrame(3, h2test.kFakeRequestHeaders, 0, true);
const id5 = new h2test.HeadersFrame(5, h2test.kFakeRequestHeaders, 0, true);

server.listen(0, () => {
client = net.connect(server.address().port, () => {
client.write(h2test.kClientMagic, () => {
client.write(settings.data, () => {
client.write(settingsAck.data);
// This will make it ok.
client.write(head1.data, () => {
// This will make it ok.
client.write(head2.data, () => {
// Stream ID 1 frame will make it OK.
client.write(id1.data, () => {
// Stream ID 3 frame will make it OK.
client.write(id3.data, () => {
// A second Stream ID 1 frame should fail.
// This will cause an error to occur because the client is
// attempting to reuse an already closed stream. This must
// cause the server session to be torn down.
client.write(head3.data, () => {
// This won't ever make it to the server
client.write(head4.data);
client.write(id1.data, () => {
// This Stream ID 5 frame will never make it to the server
client.write(id5.data);
});
});
});
Expand Down
61 changes: 61 additions & 0 deletions test/parallel/test-http2-pack-end-stream-flag.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');

const { PerformanceObserver } = require('perf_hooks');

const server = http2.createServer();

server.on('stream', (stream, headers) => {
stream.respond({
'content-type': 'text/html',
':status': 200
});
switch (headers[':path']) {
case '/singleEnd':
stream.end('OK');
break;
case '/sequentialEnd':
stream.write('OK');
stream.end();
break;
case '/delayedEnd':
stream.write('OK', () => stream.end());
break;
}
});

function testRequest(path, targetFrameCount, callback) {
const obs = new PerformanceObserver((list, observer) => {
const entry = list.getEntries()[0];
if (entry.name !== 'Http2Session') return;
if (entry.type !== 'client') return;
assert.strictEqual(entry.framesReceived, targetFrameCount);
observer.disconnect();
callback();
});
obs.observe({ entryTypes: ['http2'] });
const client = http2.connect(`http://localhost:${server.address().port}`, () => {
const req = client.request({ ':path': path });
req.resume();
req.end();
req.on('end', () => client.close());
});
}

// SETTINGS => SETTINGS => HEADERS => DATA
const MIN_FRAME_COUNT = 4;

server.listen(0, () => {
testRequest('/singleEnd', MIN_FRAME_COUNT, () => {
testRequest('/sequentialEnd', MIN_FRAME_COUNT, () => {
testRequest('/delayedEnd', MIN_FRAME_COUNT + 1, () => {
server.close();
});
});
});
});
2 changes: 1 addition & 1 deletion test/parallel/test-http2-padding-aligned.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const makeDuplexPair = require('../common/duplexpair');
// The lengths of the expected writes... note that this is highly
// sensitive to how the internals are implemented.
const serverLengths = [24, 9, 9, 32];
const clientLengths = [9, 9, 48, 9, 1, 21, 1, 16];
const clientLengths = [9, 9, 48, 9, 1, 21, 1];

// Adjust for the 24-byte preamble and two 9-byte settings frames, and
// the result must be equally divisible by 8
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-perf_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => {
break;
case 'client':
assert.strictEqual(entry.streamCount, 1);
assert.strictEqual(entry.framesReceived, 8);
assert.strictEqual(entry.framesReceived, 7);
break;
default:
assert.fail('invalid Http2Session type');
Expand Down

0 comments on commit a9ea96c

Please sign in to comment.