Skip to content

Commit

Permalink
http: Refactor for streams2
Browse files Browse the repository at this point in the history
Because of some of the peculiarities of http, this has a bit of special
magic to handle cases where the IncomingMessage would wait forever in a
paused state.

In the server, if you do not begin consuming the request body by the
time the response emits 'finish', then it will be flushed out.

In the client, if you do not add a 'response' handler onto the request,
then the response stream will be flushed out.
  • Loading branch information
isaacs committed Dec 15, 2012
1 parent 81e3562 commit 1d36931
Showing 1 changed file with 64 additions and 70 deletions.
134 changes: 64 additions & 70 deletions lib/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,30 @@ function parserOnHeadersComplete(info) {
return skipBody;
}

// XXX This is a mess.
// TODO: http.Parser should be a Writable emits request/response events.
function parserOnBody(b, start, len) {
var parser = this;
var slice = b.slice(start, start + len);
if (parser.incoming._paused || parser.incoming._pendings.length) {
parser.incoming._pendings.push(slice);
} else {
parser.incoming._emitData(slice);
var stream = parser.incoming;
var rs = stream._readableState;
var socket = stream.socket;

// pretend this was the result of a stream._read call.
if (len > 0) {
var slice = b.slice(start, start + len);
rs.onread(null, slice);
}

if (rs.length >= rs.highWaterMark)
socket.pause();
}

function parserOnMessageComplete() {
var parser = this;
parser.incoming.complete = true;
var stream = parser.incoming;
var socket = stream.socket;

stream.complete = true;

// Emit any trailing headers.
var headers = parser._headers;
Expand All @@ -140,19 +151,13 @@ function parserOnMessageComplete() {
parser._url = '';
}

if (!parser.incoming.upgrade) {
if (!stream.upgrade)
// For upgraded connections, also emit this after parser.execute
if (parser.incoming._paused || parser.incoming._pendings.length) {
parser.incoming._pendings.push(END_OF_FILE);
} else {
parser.incoming.readable = false;
parser.incoming._emitEnd();
}
}
stream._readableState.onread(null, null);

if (parser.socket.readable) {
// force to read the next incoming message
parser.socket.resume();
socket.resume();
}
}

Expand Down Expand Up @@ -263,9 +268,13 @@ function utcDate() {

/* Abstract base class for ServerRequest and ClientResponse. */
function IncomingMessage(socket) {
Stream.call(this);
Stream.Readable.call(this);

// XXX This implementation is kind of all over the place
// When the parser emits body chunks, they go in this list.
// _read() pulls them out, and when it finds EOF, it ends.
this._pendings = [];

// TODO Remove one of these eventually.
this.socket = socket;
this.connection = socket;

Expand All @@ -276,77 +285,49 @@ function IncomingMessage(socket) {

this.readable = true;

this._paused = false;
this._pendings = [];

this._endEmitted = false;
this._pendingIndex = 0;

// request (server) only
this.url = '';

this.method = null;

// response (client) only
this.statusCode = null;
this.client = this.socket;

// flag for backwards compatibility grossness.
this._consuming = false;
}
util.inherits(IncomingMessage, Stream);
util.inherits(IncomingMessage, Stream.Readable);


exports.IncomingMessage = IncomingMessage;


IncomingMessage.prototype.destroy = function(error) {
this.socket.destroy(error);
IncomingMessage.prototype.read = function(n) {
this._consuming = true;
return Stream.Readable.prototype.read.call(this, n);
};


IncomingMessage.prototype.setEncoding = function(encoding) {
var StringDecoder = require('string_decoder').StringDecoder; // lazy load
this._decoder = new StringDecoder(encoding);
};


IncomingMessage.prototype.pause = function() {
this._paused = true;
this.socket.pause();
IncomingMessage.prototype._read = function(n, callback) {
// We actually do almost nothing here, because the parserOnBody
// function fills up our internal buffer directly. However, we
// do need to unpause the underlying socket so that it flows.
if (!this.socket.readable)
return callback(null, null);
else
this.socket.resume();
};


IncomingMessage.prototype.resume = function() {
this._paused = false;
if (this.socket) {
this.socket.resume();
}

this._emitPending();
IncomingMessage.prototype.destroy = function(error) {
this.socket.destroy(error);
};


IncomingMessage.prototype._emitPending = function(callback) {
if (this._pendings.length) {
var self = this;
process.nextTick(function() {
while (!self._paused && self._pendings.length) {
var chunk = self._pendings.shift();
if (chunk !== END_OF_FILE) {
assert(Buffer.isBuffer(chunk));
self._emitData(chunk);
} else {
assert(self._pendings.length === 0);
self.readable = false;
self._emitEnd();
}
}

if (callback) {
callback();
}
});
} else if (callback) {
callback();
}
};


IncomingMessage.prototype._emitData = function(d) {
Expand Down Expand Up @@ -1016,7 +997,7 @@ ServerResponse.prototype.writeHead = function(statusCode) {

// don't keep alive connections where the client expects 100 Continue
// but we sent a final status; they may put extra bytes on the wire.
if (this._expect_continue && ! this._sent100) {
if (this._expect_continue && !this._sent100) {
this.shouldKeepAlive = false;
}

Expand Down Expand Up @@ -1321,11 +1302,10 @@ function socketCloseListener() {
// Socket closed before we emitted 'end' below.
req.res.emit('aborted');
var res = req.res;
req.res._emitPending(function() {
res._emitEnd();
res.on('end', function() {
res.emit('close');
res = null;
});
res._readableState.onread(null, null);
} else if (!req.res && !req._hadError) {
// This socket error fired before we started to
// receive a response. The error needs to
Expand Down Expand Up @@ -1428,11 +1408,13 @@ function socketOnData(d, start, end) {
}


// client
function parserOnIncomingClient(res, shouldKeepAlive) {
var parser = this;
var socket = this.socket;
var req = socket._httpMessage;


// propogate "domain" setting...
if (req.domain && !res.domain) {
debug('setting "res.domain"');
Expand Down Expand Up @@ -1480,15 +1462,21 @@ function parserOnIncomingClient(res, shouldKeepAlive) {

DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
COUNTER_HTTP_CLIENT_RESPONSE();
req.emit('response', res);
req.res = res;
res.req = req;

var handled = req.emit('response', res);
res.on('end', responseOnEnd);

// If the user did not listen for the 'response' event, then they
// can't possibly read the data, so we .resume() it into the void
// so that the socket doesn't hang there in a paused state.
if (!handled)
res.resume();

return isHeadResponse;
}

// client
function responseOnEnd() {
var res = this;
var req = res.req;
Expand Down Expand Up @@ -1784,7 +1772,7 @@ function connectionListener(socket) {
incoming.push(req);

var res = new ServerResponse(req);
debug('server response shouldKeepAlive: ' + shouldKeepAlive);

res.shouldKeepAlive = shouldKeepAlive;
DTRACE_HTTP_SERVER_REQUEST(req, socket);
COUNTER_HTTP_SERVER_REQUEST();
Expand All @@ -1806,6 +1794,12 @@ function connectionListener(socket) {

incoming.shift();

// if the user never called req.read(), and didn't pipe() or
// .resume() or .on('data'), then we call req.resume() so that the
// bytes will be pulled off the wire.
if (!req._consuming)
req.resume();

res.detachSocket(socket);

if (res._last) {
Expand Down

0 comments on commit 1d36931

Please sign in to comment.