From e0dc7f16c562869d8b48d63f9ee049a413b2f1a2 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Wed, 27 Jun 2018 16:20:34 -0700 Subject: [PATCH] Tornado 5 support --- README.rst | 3 +- engineio/__init__.py | 3 +- engineio/async_aiohttp.py | 2 +- engineio/async_sanic.py | 2 +- engineio/async_tornado.py | 154 + engineio/asyncio_server.py | 12 +- examples/README.rst | 6 + examples/tornado/README.rst | 39 + examples/tornado/latency.py | 41 + examples/tornado/requirements.txt | 3 + examples/tornado/simple.py | 53 + examples/tornado/static/engine.io.js | 4245 +++++++++++++++++++++++ examples/tornado/static/style.css | 4 + examples/tornado/templates/latency.html | 64 + examples/tornado/templates/simple.html | 28 + tests/test_async_aiohttp.py | 2 +- tests/test_asyncio_server.py | 2 +- 17 files changed, 4652 insertions(+), 11 deletions(-) create mode 100644 engineio/async_tornado.py create mode 100644 examples/tornado/README.rst create mode 100755 examples/tornado/latency.py create mode 100644 examples/tornado/requirements.txt create mode 100755 examples/tornado/simple.py create mode 100644 examples/tornado/static/engine.io.js create mode 100644 examples/tornado/static/style.css create mode 100644 examples/tornado/templates/latency.html create mode 100644 examples/tornado/templates/simple.html diff --git a/README.rst b/README.rst index 71ad5a7c..59c27360 100644 --- a/README.rst +++ b/README.rst @@ -11,7 +11,7 @@ Features - Fully compatible with the Javascript `engine.io-client`_ library, versions 1.5.0 and up. - Compatible with Python 2.7 and Python 3.3+. -- Supports large number of clients even on modest hardware when used with an asynchronous server based on `asyncio`_(`sanic`_ or `aiohttp`_), `eventlet`_ or `gevent`_. For development and testing, any WSGI compliant multi-threaded server can be used. +- Supports large number of clients even on modest hardware when used with an asynchronous server based on `asyncio`_(`sanic`_, `aiohttp`_ or `tornado`_), `eventlet`_ or `gevent`_. For development and testing, any WSGI compliant multi-threaded server can be used. - Includes a WSGI middleware that integrates Engine.IO traffic with standard WSGI applications. - Uses an event-based architecture implemented with decorators that hides the details of the protocol. - Implements HTTP long-polling and WebSocket transports. @@ -111,6 +111,7 @@ Resources .. _asyncio: https://docs.python.org/3/library/asyncio.html .. _sanic: http://sanic.readthedocs.io/ .. _aiohttp: http://aiohttp.readthedocs.io/ +.. _tornado: http://www.tornadoweb.org/ .. _eventlet: http://eventlet.net/ .. _gevent: http://gevent.org/ .. _aiohttp: http://aiohttp.readthedocs.io/ diff --git a/engineio/__init__.py b/engineio/__init__.py index 8c832cc1..52b86721 100644 --- a/engineio/__init__.py +++ b/engineio/__init__.py @@ -4,6 +4,7 @@ from .server import Server if sys.version_info >= (3, 5): # pragma: no cover from .asyncio_server import AsyncServer + from .async_tornado import get_tornado_handler else: # pragma: no cover AsyncServer = None @@ -11,4 +12,4 @@ __all__ = ['__version__', 'Middleware', 'Server'] if AsyncServer is not None: # pragma: no cover - __all__.append('AsyncServer') + __all__ += ['AsyncServer', 'get_tornado_handler'] diff --git a/engineio/async_aiohttp.py b/engineio/async_aiohttp.py index d0e2a564..ad0252df 100644 --- a/engineio/async_aiohttp.py +++ b/engineio/async_aiohttp.py @@ -72,7 +72,7 @@ def translate_request(request): return environ -def make_response(status, headers, payload): +def make_response(status, headers, payload, environ): """This function generates an appropriate response object for this async mode. """ diff --git a/engineio/async_sanic.py b/engineio/async_sanic.py index 8ff2eb8c..d150aac0 100644 --- a/engineio/async_sanic.py +++ b/engineio/async_sanic.py @@ -89,7 +89,7 @@ async def read(self, length=None): return environ -def make_response(status, headers, payload): +def make_response(status, headers, payload, environ): """This function generates an appropriate response object for this async mode. """ diff --git a/engineio/async_tornado.py b/engineio/async_tornado.py new file mode 100644 index 00000000..ea9c14da --- /dev/null +++ b/engineio/async_tornado.py @@ -0,0 +1,154 @@ +import asyncio +import sys +from urllib.parse import urlsplit + +try: + import tornado.web + import tornado.websocket +except ImportError: + pass +import six + + +def get_tornado_handler(engineio_server): + class Handler(tornado.websocket.WebSocketHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.receive_queue = asyncio.Queue() + + async def get(self): + if self.request.headers.get('Upgrade', '').lower() == 'websocket': + super().get() + await engineio_server.handle_request(self) + + async def post(self): + await engineio_server.handle_request(self) + + async def options(self): + await engineio_server.handle_request(self) + + async def on_message(self, message): + await self.receive_queue.put(message) + + async def get_next_message(self): + return await self.receive_queue.get() + + def on_close(self): + self.receive_queue.put_nowait(None) + + return Handler + + +def translate_request(handler): + """This function takes the arguments passed to the request handler and + uses them to generate a WSGI compatible environ dictionary. + """ + class AwaitablePayload(object): + def __init__(self, payload): + self.payload = payload or b'' + + async def read(self, length=None): + if length is None: + r = self.payload + self.payload = b'' + else: + r = self.payload[:length] + self.payload = self.payload[length:] + return r + + payload = handler.request.body + + uri_parts = urlsplit(handler.request.path) + environ = { + 'wsgi.input': AwaitablePayload(payload), + 'wsgi.errors': sys.stderr, + 'wsgi.version': (1, 0), + 'wsgi.async': True, + 'wsgi.multithread': False, + 'wsgi.multiprocess': False, + 'wsgi.run_once': False, + 'SERVER_SOFTWARE': 'aiohttp', + 'REQUEST_METHOD': handler.request.method, + 'QUERY_STRING': handler.request.query or '', + 'RAW_URI': handler.request.path, + 'SERVER_PROTOCOL': 'HTTP/%s' % handler.request.version, + 'REMOTE_ADDR': '127.0.0.1', + 'REMOTE_PORT': '0', + 'SERVER_NAME': 'aiohttp', + 'SERVER_PORT': '0', + 'tornado.handler': handler + } + + for hdr_name, hdr_value in handler.request.headers.items(): + hdr_name = hdr_name.upper() + if hdr_name == 'CONTENT-TYPE': + environ['CONTENT_TYPE'] = hdr_value + continue + elif hdr_name == 'CONTENT-LENGTH': + environ['CONTENT_LENGTH'] = hdr_value + continue + + key = 'HTTP_%s' % hdr_name.replace('-', '_') + if key in environ: + hdr_value = '%s,%s' % (environ[key], hdr_value) + + environ[key] = hdr_value + + environ['wsgi.url_scheme'] = environ.get('HTTP_X_FORWARDED_PROTO', 'http') + + path_info = uri_parts.path + + environ['PATH_INFO'] = path_info + environ['SCRIPT_NAME'] = '' + + return environ + + +def make_response(status, headers, payload, environ): + """This function generates an appropriate response object for this async + mode. + """ + tornado_handler = environ['tornado.handler'] + tornado_handler.set_status(int(status.split()[0])) + for header, value in headers: + tornado_handler.set_header(header, value) + tornado_handler.write(payload) + tornado_handler.finish() + + +class WebSocket(object): # pragma: no cover + """ + This wrapper class provides a tornado WebSocket interface that is + somewhat compatible with eventlet's implementation. + """ + def __init__(self, handler): + self.handler = handler + self.tornado_handler = None + + async def __call__(self, environ): + self.tornado_handler = environ['tornado.handler'] + self.environ = environ + await self.handler(self) + + async def close(self): + self.tornado_handler.close() + + async def send(self, message): + self.tornado_handler.write_message( + message, binary=isinstance(message, bytes)) + + async def wait(self): + msg = await self.tornado_handler.get_next_message() + if not isinstance(msg, six.binary_type) and \ + not isinstance(msg, six.text_type): + raise IOError() + return msg + + +_async = { + 'asyncio': True, + 'translate_request': translate_request, + 'make_response': make_response, + 'websocket': sys.modules[__name__], + 'websocket_class': 'WebSocket' +} diff --git a/engineio/asyncio_server.py b/engineio/asyncio_server.py index 1dc919d0..e1afe8a4 100644 --- a/engineio/asyncio_server.py +++ b/engineio/asyncio_server.py @@ -18,9 +18,10 @@ class AsyncServer(server.Server): :param async_mode: The asynchronous model to use. See the Deployment section in the documentation for a description of the - available options. Valid async modes are "aiohttp". If - this argument is not given, an async mode is chosen - based on the installed packages. + available options. Valid async modes are "aiohttp", + "sanic" and "tornado". If this argument is not given, + an async mode is chosen based on the installed + packages. :param ping_timeout: The time in seconds that the client waits for the server to respond before disconnecting. :param ping_interval: The interval in seconds at which the client pings @@ -55,7 +56,7 @@ def is_asyncio_based(self): return True def async_modes(self): - return ['aiohttp', 'sanic'] + return ['aiohttp', 'sanic', 'tornado'] def attach(self, app, engineio_path='engine.io'): """Attach the Engine.IO server to an application.""" @@ -192,7 +193,8 @@ async def handle_request(self, *args, **kwargs): cors_headers = self._cors_headers(environ) return self._async['make_response'](r['status'], r['headers'] + cors_headers, - r['response']) + r['response'], + environ) def start_background_task(self, target, *args, **kwargs): """Start a background task using the appropriate async model. diff --git a/examples/README.rst b/examples/README.rst index 710d3a86..54bffea2 100644 --- a/examples/README.rst +++ b/examples/README.rst @@ -18,3 +18,9 @@ sanic ----- Examples that are compatible with the sanic framework for asyncio. + + +tornado +------- + +Examples that are compatible with the Tornado framework. diff --git a/examples/tornado/README.rst b/examples/tornado/README.rst new file mode 100644 index 00000000..0ac52d62 --- /dev/null +++ b/examples/tornado/README.rst @@ -0,0 +1,39 @@ +Engine.IO Examples +================== + +This directory contains example Engine.IO applications that are compatible +with the Tornado framework. These applications require Tornado 5 and Python +3.5 or later. + +simple.py +--------- + +A basic application in which the client sends messages to the server and the +server responds. + +latency.py +---------- + +A port of the latency application included in the official Engine.IO +Javascript server. In this application the client sends *ping* messages to +the server, which are responded by the server with a *pong*. The client +measures the time it takes for each of these exchanges and plots these in real +time to the page. + +This is an ideal application to measure the performance of the different +asynchronous modes supported by the Engine.IO server. + +Running the Examples +-------------------- + +To run these examples, create a virtual environment, install the requirements +and then run:: + + $ python simple.py + +or:: + + $ python latency.py + +You can then access the application from your web browser at +``http://localhost:8888``. \ No newline at end of file diff --git a/examples/tornado/latency.py b/examples/tornado/latency.py new file mode 100755 index 00000000..d147ea4f --- /dev/null +++ b/examples/tornado/latency.py @@ -0,0 +1,41 @@ +import os + +import tornado.ioloop +from tornado.options import define, options, parse_command_line +import tornado.web + +import engineio + +define("port", default=8888, help="run on the given port", type=int) +define("debug", default=False, help="run in debug mode") + +eio = engineio.AsyncServer(async_mode='tornado') + + +class MainHandler(tornado.web.RequestHandler): + def get(self): + self.render("latency.html") + + +@eio.on('message') +async def message(sid, data): + await eio.send(sid, 'pong', binary=False) + + +def main(): + parse_command_line() + app = tornado.web.Application( + [ + (r"/", MainHandler), + (r"/engine.io/", engineio.get_tornado_handler(eio)), + ], + template_path=os.path.join(os.path.dirname(__file__), "templates"), + static_path=os.path.join(os.path.dirname(__file__), "static"), + debug=options.debug, + ) + app.listen(options.port) + tornado.ioloop.IOLoop.current().start() + + +if __name__ == "__main__": + main() diff --git a/examples/tornado/requirements.txt b/examples/tornado/requirements.txt new file mode 100644 index 00000000..ba598740 --- /dev/null +++ b/examples/tornado/requirements.txt @@ -0,0 +1,3 @@ +tornado==5.0.2 +python-engineio +six==1.10.0 diff --git a/examples/tornado/simple.py b/examples/tornado/simple.py new file mode 100755 index 00000000..15f3da74 --- /dev/null +++ b/examples/tornado/simple.py @@ -0,0 +1,53 @@ +import os + +import tornado.ioloop +from tornado.options import define, options, parse_command_line +import tornado.web + +import engineio +from engineio.async_tornado import get_engineio_handler + +define("port", default=8888, help="run on the given port", type=int) +define("debug", default=False, help="run in debug mode") + +eio = engineio.AsyncServer(async_mode='tornado') + + +class MainHandler(tornado.web.RequestHandler): + def get(self): + self.render("simple.html") + + +@eio.on('connect') +def connect(sid, environ): + print("connect ", sid) + + +@eio.on('message') +async def message(sid, data): + print('message from', sid, data) + await eio.send(sid, 'Thank you for your message!', binary=False) + + +@eio.on('disconnect') +def disconnect(sid): + print('disconnect ', sid) + + +def main(): + parse_command_line() + app = tornado.web.Application( + [ + (r"/", MainHandler), + (r"/engine.io/", engineio.get_tornado_handler(eio)), + ], + template_path=os.path.join(os.path.dirname(__file__), "templates"), + static_path=os.path.join(os.path.dirname(__file__), "static"), + debug=options.debug, + ) + app.listen(options.port) + tornado.ioloop.IOLoop.current().start() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/tornado/static/engine.io.js b/examples/tornado/static/engine.io.js new file mode 100644 index 00000000..ec00ad08 --- /dev/null +++ b/examples/tornado/static/engine.io.js @@ -0,0 +1,4245 @@ +!function(e){if("object"==typeof exports&&"undefined"!=typeof module)module.exports=e();else if("function"==typeof define&&define.amd)define([],e);else{var f;"undefined"!=typeof window?f=window:"undefined"!=typeof global?f=global:"undefined"!=typeof self&&(f=self),f.eio=e()}}(function(){var define,module,exports;return (function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require=="function"&&require;if(!u&&a)return a(o,!0);if(i)return i(o,!0);var f=new Error("Cannot find module '"+o+"'");throw f.code="MODULE_NOT_FOUND",f}var l=n[o]={exports:{}};t[o][0].call(l.exports,function(e){var n=t[o][1][e];return s(n?n:e)},l,l.exports,e,t,n,r)}return n[o].exports}var i=typeof require=="function"&&require;for(var o=0;o 0) { + this.extraHeaders = opts.extraHeaders; + } + } + + this.open(); +} + +Socket.priorWebsocketSuccess = false; + +/** + * Mix in `Emitter`. + */ + +Emitter(Socket.prototype); + +/** + * Protocol version. + * + * @api public + */ + +Socket.protocol = parser.protocol; // this is an int + +/** + * Expose deps for legacy compatibility + * and standalone browser access. + */ + +Socket.Socket = Socket; +Socket.Transport = _dereq_('./transport'); +Socket.transports = _dereq_('./transports'); +Socket.parser = _dereq_('engine.io-parser'); + +/** + * Creates transport of the given type. + * + * @param {String} transport name + * @return {Transport} + * @api private + */ + +Socket.prototype.createTransport = function (name) { + debug('creating transport "%s"', name); + var query = clone(this.query); + + // append engine.io protocol identifier + query.EIO = parser.protocol; + + // transport name + query.transport = name; + + // session id if we already have one + if (this.id) query.sid = this.id; + + var transport = new transports[name]({ + agent: this.agent, + hostname: this.hostname, + port: this.port, + secure: this.secure, + path: this.path, + query: query, + forceJSONP: this.forceJSONP, + jsonp: this.jsonp, + forceBase64: this.forceBase64, + enablesXDR: this.enablesXDR, + timestampRequests: this.timestampRequests, + timestampParam: this.timestampParam, + policyPort: this.policyPort, + socket: this, + pfx: this.pfx, + key: this.key, + passphrase: this.passphrase, + cert: this.cert, + ca: this.ca, + ciphers: this.ciphers, + rejectUnauthorized: this.rejectUnauthorized, + perMessageDeflate: this.perMessageDeflate, + extraHeaders: this.extraHeaders + }); + + return transport; +}; + +function clone (obj) { + var o = {}; + for (var i in obj) { + if (obj.hasOwnProperty(i)) { + o[i] = obj[i]; + } + } + return o; +} + +/** + * Initializes transport to use and starts probe. + * + * @api private + */ +Socket.prototype.open = function () { + var transport; + if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') != -1) { + transport = 'websocket'; + } else if (0 === this.transports.length) { + // Emit error on next tick so it can be listened to + var self = this; + setTimeout(function() { + self.emit('error', 'No transports available'); + }, 0); + return; + } else { + transport = this.transports[0]; + } + this.readyState = 'opening'; + + // Retry with the next transport if the transport is disabled (jsonp: false) + try { + transport = this.createTransport(transport); + } catch (e) { + this.transports.shift(); + this.open(); + return; + } + + transport.open(); + this.setTransport(transport); +}; + +/** + * Sets the current transport. Disables the existing one (if any). + * + * @api private + */ + +Socket.prototype.setTransport = function(transport){ + debug('setting transport %s', transport.name); + var self = this; + + if (this.transport) { + debug('clearing existing transport %s', this.transport.name); + this.transport.removeAllListeners(); + } + + // set up transport + this.transport = transport; + + // set up transport listeners + transport + .on('drain', function(){ + self.onDrain(); + }) + .on('packet', function(packet){ + self.onPacket(packet); + }) + .on('error', function(e){ + self.onError(e); + }) + .on('close', function(){ + self.onClose('transport close'); + }); +}; + +/** + * Probes a transport. + * + * @param {String} transport name + * @api private + */ + +Socket.prototype.probe = function (name) { + debug('probing transport "%s"', name); + var transport = this.createTransport(name, { probe: 1 }) + , failed = false + , self = this; + + Socket.priorWebsocketSuccess = false; + + function onTransportOpen(){ + if (self.onlyBinaryUpgrades) { + var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary; + failed = failed || upgradeLosesBinary; + } + if (failed) return; + + debug('probe transport "%s" opened', name); + transport.send([{ type: 'ping', data: 'probe' }]); + transport.once('packet', function (msg) { + if (failed) return; + if ('pong' == msg.type && 'probe' == msg.data) { + debug('probe transport "%s" pong', name); + self.upgrading = true; + self.emit('upgrading', transport); + if (!transport) return; + Socket.priorWebsocketSuccess = 'websocket' == transport.name; + + debug('pausing current transport "%s"', self.transport.name); + self.transport.pause(function () { + if (failed) return; + if ('closed' == self.readyState) return; + debug('changing transport and sending upgrade packet'); + + cleanup(); + + self.setTransport(transport); + transport.send([{ type: 'upgrade' }]); + self.emit('upgrade', transport); + transport = null; + self.upgrading = false; + self.flush(); + }); + } else { + debug('probe transport "%s" failed', name); + var err = new Error('probe error'); + err.transport = transport.name; + self.emit('upgradeError', err); + } + }); + } + + function freezeTransport() { + if (failed) return; + + // Any callback called by transport should be ignored since now + failed = true; + + cleanup(); + + transport.close(); + transport = null; + } + + //Handle any error that happens while probing + function onerror(err) { + var error = new Error('probe error: ' + err); + error.transport = transport.name; + + freezeTransport(); + + debug('probe transport "%s" failed because of error: %s', name, err); + + self.emit('upgradeError', error); + } + + function onTransportClose(){ + onerror("transport closed"); + } + + //When the socket is closed while we're probing + function onclose(){ + onerror("socket closed"); + } + + //When the socket is upgraded while we're probing + function onupgrade(to){ + if (transport && to.name != transport.name) { + debug('"%s" works - aborting "%s"', to.name, transport.name); + freezeTransport(); + } + } + + //Remove all listeners on the transport and on self + function cleanup(){ + transport.removeListener('open', onTransportOpen); + transport.removeListener('error', onerror); + transport.removeListener('close', onTransportClose); + self.removeListener('close', onclose); + self.removeListener('upgrading', onupgrade); + } + + transport.once('open', onTransportOpen); + transport.once('error', onerror); + transport.once('close', onTransportClose); + + this.once('close', onclose); + this.once('upgrading', onupgrade); + + transport.open(); + +}; + +/** + * Called when connection is deemed open. + * + * @api public + */ + +Socket.prototype.onOpen = function () { + debug('socket open'); + this.readyState = 'open'; + Socket.priorWebsocketSuccess = 'websocket' == this.transport.name; + this.emit('open'); + this.flush(); + + // we check for `readyState` in case an `open` + // listener already closed the socket + if ('open' == this.readyState && this.upgrade && this.transport.pause) { + debug('starting upgrade probes'); + for (var i = 0, l = this.upgrades.length; i < l; i++) { + this.probe(this.upgrades[i]); + } + } +}; + +/** + * Handles a packet. + * + * @api private + */ + +Socket.prototype.onPacket = function (packet) { + if ('opening' == this.readyState || 'open' == this.readyState) { + debug('socket receive: type "%s", data "%s"', packet.type, packet.data); + + this.emit('packet', packet); + + // Socket is live - any packet counts + this.emit('heartbeat'); + + switch (packet.type) { + case 'open': + this.onHandshake(parsejson(packet.data)); + break; + + case 'pong': + this.setPing(); + this.emit('pong'); + break; + + case 'error': + var err = new Error('server error'); + err.code = packet.data; + this.onError(err); + break; + + case 'message': + this.emit('data', packet.data); + this.emit('message', packet.data); + break; + } + } else { + debug('packet received with socket readyState "%s"', this.readyState); + } +}; + +/** + * Called upon handshake completion. + * + * @param {Object} handshake obj + * @api private + */ + +Socket.prototype.onHandshake = function (data) { + this.emit('handshake', data); + this.id = data.sid; + this.transport.query.sid = data.sid; + this.upgrades = this.filterUpgrades(data.upgrades); + this.pingInterval = data.pingInterval; + this.pingTimeout = data.pingTimeout; + this.onOpen(); + // In case open handler closes socket + if ('closed' == this.readyState) return; + this.setPing(); + + // Prolong liveness of socket on heartbeat + this.removeListener('heartbeat', this.onHeartbeat); + this.on('heartbeat', this.onHeartbeat); +}; + +/** + * Resets ping timeout. + * + * @api private + */ + +Socket.prototype.onHeartbeat = function (timeout) { + clearTimeout(this.pingTimeoutTimer); + var self = this; + self.pingTimeoutTimer = setTimeout(function () { + if ('closed' == self.readyState) return; + self.onClose('ping timeout'); + }, timeout || (self.pingInterval + self.pingTimeout)); +}; + +/** + * Pings server every `this.pingInterval` and expects response + * within `this.pingTimeout` or closes connection. + * + * @api private + */ + +Socket.prototype.setPing = function () { + var self = this; + clearTimeout(self.pingIntervalTimer); + self.pingIntervalTimer = setTimeout(function () { + debug('writing ping packet - expecting pong within %sms', self.pingTimeout); + self.ping(); + self.onHeartbeat(self.pingTimeout); + }, self.pingInterval); +}; + +/** +* Sends a ping packet. +* +* @api private +*/ + +Socket.prototype.ping = function () { + var self = this; + this.sendPacket('ping', function(){ + self.emit('ping'); + }); +}; + +/** + * Called on `drain` event + * + * @api private + */ + +Socket.prototype.onDrain = function() { + this.writeBuffer.splice(0, this.prevBufferLen); + + // setting prevBufferLen = 0 is very important + // for example, when upgrading, upgrade packet is sent over, + // and a nonzero prevBufferLen could cause problems on `drain` + this.prevBufferLen = 0; + + if (0 === this.writeBuffer.length) { + this.emit('drain'); + } else { + this.flush(); + } +}; + +/** + * Flush write buffers. + * + * @api private + */ + +Socket.prototype.flush = function () { + if ('closed' != this.readyState && this.transport.writable && + !this.upgrading && this.writeBuffer.length) { + debug('flushing %d packets in socket', this.writeBuffer.length); + this.transport.send(this.writeBuffer); + // keep track of current length of writeBuffer + // splice writeBuffer and callbackBuffer on `drain` + this.prevBufferLen = this.writeBuffer.length; + this.emit('flush'); + } +}; + +/** + * Sends a message. + * + * @param {String} message. + * @param {Function} callback function. + * @param {Object} options. + * @return {Socket} for chaining. + * @api public + */ + +Socket.prototype.write = +Socket.prototype.send = function (msg, options, fn) { + this.sendPacket('message', msg, options, fn); + return this; +}; + +/** + * Sends a packet. + * + * @param {String} packet type. + * @param {String} data. + * @param {Object} options. + * @param {Function} callback function. + * @api private + */ + +Socket.prototype.sendPacket = function (type, data, options, fn) { + if('function' == typeof data) { + fn = data; + data = undefined; + } + + if ('function' == typeof options) { + fn = options; + options = null; + } + + if ('closing' == this.readyState || 'closed' == this.readyState) { + return; + } + + options = options || {}; + options.compress = false !== options.compress; + + var packet = { + type: type, + data: data, + options: options + }; + this.emit('packetCreate', packet); + this.writeBuffer.push(packet); + if (fn) this.once('flush', fn); + this.flush(); +}; + +/** + * Closes the connection. + * + * @api private + */ + +Socket.prototype.close = function () { + if ('opening' == this.readyState || 'open' == this.readyState) { + this.readyState = 'closing'; + + var self = this; + + if (this.writeBuffer.length) { + this.once('drain', function() { + if (this.upgrading) { + waitForUpgrade(); + } else { + close(); + } + }); + } else if (this.upgrading) { + waitForUpgrade(); + } else { + close(); + } + } + + function close() { + self.onClose('forced close'); + debug('socket closing - telling transport to close'); + self.transport.close(); + } + + function cleanupAndClose() { + self.removeListener('upgrade', cleanupAndClose); + self.removeListener('upgradeError', cleanupAndClose); + close(); + } + + function waitForUpgrade() { + // wait for upgrade to finish since we can't send packets while pausing a transport + self.once('upgrade', cleanupAndClose); + self.once('upgradeError', cleanupAndClose); + } + + return this; +}; + +/** + * Called upon transport error + * + * @api private + */ + +Socket.prototype.onError = function (err) { + debug('socket error %j', err); + Socket.priorWebsocketSuccess = false; + this.emit('error', err); + this.onClose('transport error', err); +}; + +/** + * Called upon transport close. + * + * @api private + */ + +Socket.prototype.onClose = function (reason, desc) { + if ('opening' == this.readyState || 'open' == this.readyState || 'closing' == this.readyState) { + debug('socket close with reason: "%s"', reason); + var self = this; + + // clear timers + clearTimeout(this.pingIntervalTimer); + clearTimeout(this.pingTimeoutTimer); + + // stop event from firing again for transport + this.transport.removeAllListeners('close'); + + // ensure transport won't stay open + this.transport.close(); + + // ignore further transport communication + this.transport.removeAllListeners(); + + // set ready state + this.readyState = 'closed'; + + // clear session id + this.id = null; + + // emit close event + this.emit('close', reason, desc); + + // clean buffers after, so users can still + // grab the buffers on `close` event + self.writeBuffer = []; + self.prevBufferLen = 0; + } +}; + +/** + * Filters upgrades, returning only those matching client transports. + * + * @param {Array} server upgrades + * @api private + * + */ + +Socket.prototype.filterUpgrades = function (upgrades) { + var filteredUpgrades = []; + for (var i = 0, j = upgrades.length; i