diff --git a/examples/ws/client.js b/examples/ws/client.js index 53d8bc1d7..61524d345 100644 --- a/examples/ws/client.js +++ b/examples/ws/client.js @@ -1,6 +1,6 @@ 'use strict' -var mqtt = require('../../types') +var mqtt = require('../../') var clientId = 'mqttjs_' + Math.random().toString(16).substr(2, 8) diff --git a/lib/connect/ws.js b/lib/connect/ws.js index dfd8c5140..77844e30b 100644 --- a/lib/connect/ws.js +++ b/lib/connect/ws.js @@ -1,9 +1,13 @@ 'use strict' -var WebSocket = require('ws') -var debug = require('debug')('mqttjs:ws') -var urlModule = require('url') -var WSS_OPTIONS = [ +const WS = require('ws') +const debug = require('debug')('mqttjs:ws') +const duplexify = require('duplexify') +const Buffer = require('safe-buffer').Buffer +const urlModule = require('url') +const Transform = require('readable-stream').Transform + +let WSS_OPTIONS = [ 'rejectUnauthorized', 'ca', 'cert', @@ -12,9 +16,9 @@ var WSS_OPTIONS = [ 'passphrase' ] // eslint-disable-next-line camelcase -var IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function' +const IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function' function buildUrl (opts, client) { - var url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path + let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path if (typeof (opts.transformWsUrl) === 'function') { url = opts.transformWsUrl(url, opts, client) } @@ -22,75 +26,228 @@ function buildUrl (opts, client) { } function setDefaultOpts (opts) { + let options = opts if (!opts.hostname) { - opts.hostname = 'localhost' + options.hostname = 'localhost' } if (!opts.port) { if (opts.protocol === 'wss') { - opts.port = 443 + options.port = 443 } else { - opts.port = 80 + options.port = 80 } } if (!opts.path) { - opts.path = '/' + options.path = '/' } if (!opts.wsOptions) { - opts.wsOptions = {} + options.wsOptions = {} } if (!IS_BROWSER && opts.protocol === 'wss') { // Add cert/key/ca etc options WSS_OPTIONS.forEach(function (prop) { if (opts.hasOwnProperty(prop) && !opts.wsOptions.hasOwnProperty(prop)) { - opts.wsOptions[prop] = opts[prop] + options.wsOptions[prop] = opts[prop] } }) } + + return options } -function createWebSocket (client, opts) { +function setDefaultBrowserOpts (opts) { + let options = setDefaultOpts(opts) + + if (!options.hostname) { + options.hostname = options.host + } + + if (!options.hostname) { + // Throwing an error in a Web Worker if no `hostname` is given, because we + // can not determine the `hostname` automatically. If connecting to + // localhost, please supply the `hostname` as an argument. + if (typeof (document) === 'undefined') { + throw new Error('Could not determine host. Specify host manually.') + } + const parsed = urlModule.parse(document.URL) + options.hostname = parsed.hostname + + if (!options.port) { + options.port = parsed.port + } + } + + // objectMode should be defined for logic + if (options.objectMode === undefined) { + options.objectMode = !(options.binary === true || options.binary === undefined) + } + + return options +} + +function createWebSocket (client, url, opts) { debug('createWebSocket') debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion) - var websocketSubProtocol = + const websocketSubProtocol = (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) ? 'mqttv3.1' : 'mqtt' - setDefaultOpts(opts) - var url = buildUrl(opts, client) debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol) - var ws = new WebSocket(url, [websocketSubProtocol], opts.wsOptions) - var duplex = WebSocket.createWebSocketStream(ws, opts.wsOptions) - duplex.url = url - return duplex + let socket = new WS(url, [websocketSubProtocol], opts.wsOptions) + return socket +} + +function createBrowserWebSocket (client, opts) { + const websocketSubProtocol = + (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) + ? 'mqttv3.1' + : 'mqtt' + + let url = buildUrl(opts, client) + /* global WebSocket */ + let socket = new WebSocket(url, [websocketSubProtocol]) + socket.binaryType = 'arraybuffer' + return socket } function streamBuilder (client, opts) { - return createWebSocket(client, opts) + debug('streamBuilder') + let options = setDefaultOpts(opts) + const url = buildUrl(options, client) + let socket = createWebSocket(client, url, options) + let webSocketStream = WS.createWebSocketStream(socket, options.wsOptions) + webSocketStream.url = url + return webSocketStream } function browserStreamBuilder (client, opts) { debug('browserStreamBuilder') - if (!opts.hostname) { - opts.hostname = opts.host + let stream + let options = setDefaultBrowserOpts(opts) + // sets the maximum socket buffer size before throttling + const bufferSize = options.browserBufferSize || 1024 * 512 + + const bufferTimeout = opts.browserBufferTimeout || 1000 + + const coerceToBuffer = !opts.objectMode + + let socket = createBrowserWebSocket(client, opts) + + let proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser) + + if (!opts.objectMode) { + proxy._writev = writev } + proxy.on('close', () => { socket.close() }) - if (!opts.hostname) { - // Throwing an error in a Web Worker if no `hostname` is given, because we - // can not determine the `hostname` automatically. If connecting to - // localhost, please supply the `hostname` as an argument. - if (typeof (document) === 'undefined') { - throw new Error('Could not determine host. Specify host manually.') + const eventListenerSupport = (typeof socket.addEventListener === 'undefined') + + // was already open when passed in + if (socket.readyState === socket.OPEN) { + stream = proxy + } else { + stream = stream = duplexify(undefined, undefined, opts) + if (!opts.objectMode) { + stream._writev = writev } - var parsed = urlModule.parse(document.URL) - opts.hostname = parsed.hostname - if (!opts.port) { - opts.port = parsed.port + if (eventListenerSupport) { + socket.addEventListener('open', onopen) + } else { + socket.onopen = onopen } } - return createWebSocket(client, opts) + + stream.socket = socket + + if (eventListenerSupport) { + socket.addEventListener('close', onclose) + socket.addEventListener('error', onerror) + socket.addEventListener('message', onmessage) + } else { + socket.onclose = onclose + socket.onerror = onerror + socket.onmessage = onmessage + } + + // methods for browserStreamBuilder + + function buildProxy (options, socketWrite, socketEnd) { + let proxy = new Transform({ + objectModeMode: options.objectMode + }) + + proxy._write = socketWrite + proxy._flush = socketEnd + + return proxy + } + + function onopen () { + stream.setReadable(proxy) + stream.setWritable(proxy) + stream.emit('connect') + } + + function onclose () { + stream.end() + stream.destroy() + } + + function onerror (err) { + stream.destroy(err) + } + + function onmessage (event) { + let data = event.data + if (data instanceof ArrayBuffer) data = Buffer.from(data) + else data = Buffer.from(data, 'utf8') + proxy.push(data) + } + + // this is to be enabled only if objectMode is false + function writev (chunks, cb) { + const buffers = new Array(chunks.length) + for (let i = 0; i < chunks.length; i++) { + if (typeof chunks[i].chunk === 'string') { + buffers[i] = Buffer.from(chunks[i], 'utf8') + } else { + buffers[i] = chunks[i].chunk + } + } + + this._write(Buffer.concat(buffers), 'binary', cb) + } + + function socketWriteBrowser (chunk, enc, next) { + if (socket.bufferedAmount > bufferSize) { + // throttle data until buffered amount is reduced. + setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) + } + + if (coerceToBuffer && typeof chunk === 'string') { + chunk = Buffer.from(chunk, 'utf8') + } + + try { + socket.send(chunk) + } catch (err) { + return next(err) + } + + next() + } + + function socketEndBrowser (done) { + socket.close() + done() + } + + // end methods for browserStreamBuilder + + return stream } if (IS_BROWSER) { diff --git a/test/browser/server.js b/test/browser/server.js index 0b5e96516..75a9a8994 100644 --- a/test/browser/server.js +++ b/test/browser/server.js @@ -1,8 +1,8 @@ 'use strict' var handleClient -var websocket = require('websocket-stream') -var WebSocketServer = require('ws').Server +var WS = require('ws') +var WebSocketServer = WS.Server var Connection = require('mqtt-connection') var http = require('http') @@ -109,7 +109,7 @@ function start (startPort, done) { return ws.close() } - stream = websocket(ws) + stream = WS.createWebSocketStream(ws) connection = new Connection(stream) handleClient.call(server, connection) }) diff --git a/test/websocket_client.js b/test/websocket_client.js index fc0107b78..55c8d088c 100644 --- a/test/websocket_client.js +++ b/test/websocket_client.js @@ -103,7 +103,7 @@ describe('Websocket Client', function () { }) }) - it('should be able transform the url (for e.g. to sign it)', function (done) { + it('should be able to transform the url (for e.g. to sign it)', function (done) { var baseUrl = 'ws://localhost:9999/mqtt' var sig = '?AUTH=token' var expected = baseUrl + sig