-
Notifications
You must be signed in to change notification settings - Fork 30.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
lib: move _stream_wrap into internals
This makes a subsequent possible deprecation easier. PR-URL: #16158 Reviewed-By: Colin Ihrig <[email protected]> Reviewed-By: Franziska Hinkelmann <[email protected]> Reviewed-By: Tobias Nießen <[email protected]>
- Loading branch information
Showing
3 changed files
with
225 additions
and
221 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,223 +1,3 @@ | ||
'use strict'; | ||
|
||
const assert = require('assert'); | ||
const util = require('util'); | ||
const { Socket } = require('net'); | ||
const { JSStream } = process.binding('js_stream'); | ||
const uv = process.binding('uv'); | ||
const debug = util.debuglog('stream_wrap'); | ||
const errors = require('internal/errors'); | ||
|
||
function StreamWrap(stream) { | ||
const handle = new JSStream(); | ||
|
||
this.stream = stream; | ||
|
||
this._list = null; | ||
|
||
const self = this; | ||
handle.close = function(cb) { | ||
debug('close'); | ||
self.doClose(cb); | ||
}; | ||
handle.isAlive = function() { | ||
return self.isAlive(); | ||
}; | ||
handle.isClosing = function() { | ||
return self.isClosing(); | ||
}; | ||
handle.onreadstart = function() { | ||
return self.readStart(); | ||
}; | ||
handle.onreadstop = function() { | ||
return self.readStop(); | ||
}; | ||
handle.onshutdown = function(req) { | ||
return self.doShutdown(req); | ||
}; | ||
handle.onwrite = function(req, bufs) { | ||
return self.doWrite(req, bufs); | ||
}; | ||
|
||
this.stream.pause(); | ||
this.stream.on('error', function onerror(err) { | ||
self.emit('error', err); | ||
}); | ||
this.stream.on('data', function ondata(chunk) { | ||
if (typeof chunk === 'string' || this._readableState.objectMode === true) { | ||
// Make sure that no further `data` events will happen | ||
this.pause(); | ||
this.removeListener('data', ondata); | ||
|
||
self.emit('error', new errors.Error('ERR_STREAM_WRAP')); | ||
return; | ||
} | ||
|
||
debug('data', chunk.length); | ||
if (self._handle) | ||
self._handle.readBuffer(chunk); | ||
}); | ||
this.stream.once('end', function onend() { | ||
debug('end'); | ||
if (self._handle) | ||
self._handle.emitEOF(); | ||
}); | ||
|
||
Socket.call(this, { | ||
handle: handle | ||
}); | ||
} | ||
util.inherits(StreamWrap, Socket); | ||
module.exports = StreamWrap; | ||
|
||
// require('_stream_wrap').StreamWrap | ||
StreamWrap.StreamWrap = StreamWrap; | ||
|
||
StreamWrap.prototype.isAlive = function isAlive() { | ||
return true; | ||
}; | ||
|
||
StreamWrap.prototype.isClosing = function isClosing() { | ||
return !this.readable || !this.writable; | ||
}; | ||
|
||
StreamWrap.prototype.readStart = function readStart() { | ||
this.stream.resume(); | ||
return 0; | ||
}; | ||
|
||
StreamWrap.prototype.readStop = function readStop() { | ||
this.stream.pause(); | ||
return 0; | ||
}; | ||
|
||
StreamWrap.prototype.doShutdown = function doShutdown(req) { | ||
const self = this; | ||
const handle = this._handle; | ||
const item = this._enqueue('shutdown', req); | ||
|
||
this.stream.end(function() { | ||
// Ensure that write was dispatched | ||
setImmediate(function() { | ||
if (!self._dequeue(item)) | ||
return; | ||
|
||
handle.finishShutdown(req, 0); | ||
}); | ||
}); | ||
return 0; | ||
}; | ||
|
||
StreamWrap.prototype.doWrite = function doWrite(req, bufs) { | ||
const self = this; | ||
const handle = self._handle; | ||
|
||
var pending = bufs.length; | ||
|
||
// Queue the request to be able to cancel it | ||
const item = self._enqueue('write', req); | ||
|
||
self.stream.cork(); | ||
for (var n = 0; n < bufs.length; n++) | ||
self.stream.write(bufs[n], done); | ||
self.stream.uncork(); | ||
|
||
function done(err) { | ||
if (!err && --pending !== 0) | ||
return; | ||
|
||
// Ensure that this is called once in case of error | ||
pending = 0; | ||
|
||
let errCode = 0; | ||
if (err) { | ||
const code = uv[`UV_${err.code}`]; | ||
errCode = (err.code && code) ? code : uv.UV_EPIPE; | ||
} | ||
|
||
// Ensure that write was dispatched | ||
setImmediate(function() { | ||
// Do not invoke callback twice | ||
if (!self._dequeue(item)) | ||
return; | ||
|
||
handle.doAfterWrite(req); | ||
handle.finishWrite(req, errCode); | ||
}); | ||
} | ||
|
||
return 0; | ||
}; | ||
|
||
function QueueItem(type, req) { | ||
this.type = type; | ||
this.req = req; | ||
this.prev = this; | ||
this.next = this; | ||
} | ||
|
||
StreamWrap.prototype._enqueue = function _enqueue(type, req) { | ||
const item = new QueueItem(type, req); | ||
if (this._list === null) { | ||
this._list = item; | ||
return item; | ||
} | ||
|
||
item.next = this._list.next; | ||
item.prev = this._list; | ||
item.next.prev = item; | ||
item.prev.next = item; | ||
|
||
return item; | ||
}; | ||
|
||
StreamWrap.prototype._dequeue = function _dequeue(item) { | ||
assert(item instanceof QueueItem); | ||
|
||
var next = item.next; | ||
var prev = item.prev; | ||
|
||
if (next === null && prev === null) | ||
return false; | ||
|
||
item.next = null; | ||
item.prev = null; | ||
|
||
if (next === item) { | ||
prev = null; | ||
next = null; | ||
} else { | ||
prev.next = next; | ||
next.prev = prev; | ||
} | ||
|
||
if (this._list === item) | ||
this._list = next; | ||
|
||
return true; | ||
}; | ||
|
||
StreamWrap.prototype.doClose = function doClose(cb) { | ||
const self = this; | ||
const handle = self._handle; | ||
|
||
setImmediate(function() { | ||
while (self._list !== null) { | ||
const item = self._list; | ||
const req = item.req; | ||
self._dequeue(item); | ||
|
||
const errCode = uv.UV_ECANCELED; | ||
if (item.type === 'write') { | ||
handle.doAfterWrite(req); | ||
handle.finishWrite(req, errCode); | ||
} else if (item.type === 'shutdown') { | ||
handle.finishShutdown(req, errCode); | ||
} | ||
} | ||
|
||
// Should be already set by net.js | ||
assert(self._handle === null); | ||
cb(); | ||
}); | ||
}; | ||
module.exports = require('internal/wrap_js_stream'); |
Oops, something went wrong.