Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: browser in ws #1145

Merged
merged 3 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/ws/client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

var mqtt = require('../../types')
var mqtt = require('../../')

var clientId = 'mqttjs_' + Math.random().toString(16).substr(2, 8)

Expand Down
225 changes: 191 additions & 34 deletions lib/connect/ws.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
'use strict'

var WebSocket = require('ws')
var debug = require('debug')('mqttjs:ws')
var urlModule = require('url')
var WSS_OPTIONS = [
const WS = require('ws')
const debug = require('debug')('mqttjs:ws')
const duplexify = require('duplexify')
const Buffer = require('safe-buffer').Buffer
const urlModule = require('url')
const Transform = require('readable-stream').Transform

let WSS_OPTIONS = [
'rejectUnauthorized',
'ca',
'cert',
Expand All @@ -12,85 +16,238 @@ var WSS_OPTIONS = [
'passphrase'
]
// eslint-disable-next-line camelcase
var IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function'
const IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function'
function buildUrl (opts, client) {
var url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path
let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path
if (typeof (opts.transformWsUrl) === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
}

function setDefaultOpts (opts) {
let options = opts
if (!opts.hostname) {
opts.hostname = 'localhost'
options.hostname = 'localhost'
}
if (!opts.port) {
if (opts.protocol === 'wss') {
opts.port = 443
options.port = 443
} else {
opts.port = 80
options.port = 80
}
}
if (!opts.path) {
opts.path = '/'
options.path = '/'
}

if (!opts.wsOptions) {
opts.wsOptions = {}
options.wsOptions = {}
}
if (!IS_BROWSER && opts.protocol === 'wss') {
// Add cert/key/ca etc options
WSS_OPTIONS.forEach(function (prop) {
if (opts.hasOwnProperty(prop) && !opts.wsOptions.hasOwnProperty(prop)) {
opts.wsOptions[prop] = opts[prop]
options.wsOptions[prop] = opts[prop]
}
})
}

return options
}

function createWebSocket (client, opts) {
function setDefaultBrowserOpts (opts) {
let options = setDefaultOpts(opts)

if (!options.hostname) {
options.hostname = options.host
}

if (!options.hostname) {
// Throwing an error in a Web Worker if no `hostname` is given, because we
// can not determine the `hostname` automatically. If connecting to
// localhost, please supply the `hostname` as an argument.
if (typeof (document) === 'undefined') {
throw new Error('Could not determine host. Specify host manually.')
}
const parsed = urlModule.parse(document.URL)
options.hostname = parsed.hostname

if (!options.port) {
options.port = parsed.port
}
}

// objectMode should be defined for logic
if (options.objectMode === undefined) {
options.objectMode = !(options.binary === true || options.binary === undefined)
}

return options
}

function createWebSocket (client, url, opts) {
debug('createWebSocket')
debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)
var websocketSubProtocol =
const websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'

setDefaultOpts(opts)
var url = buildUrl(opts, client)
debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)
var ws = new WebSocket(url, [websocketSubProtocol], opts.wsOptions)
var duplex = WebSocket.createWebSocketStream(ws, opts.wsOptions)
duplex.url = url
return duplex
let socket = new WS(url, [websocketSubProtocol], opts.wsOptions)
return socket
}

function createBrowserWebSocket (client, opts) {
const websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'

let url = buildUrl(opts, client)
/* global WebSocket */
let socket = new WebSocket(url, [websocketSubProtocol])
socket.binaryType = 'arraybuffer'
return socket
}

function streamBuilder (client, opts) {
return createWebSocket(client, opts)
debug('streamBuilder')
let options = setDefaultOpts(opts)
const url = buildUrl(options, client)
let socket = createWebSocket(client, url, options)
let webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)
webSocketStream.url = url
return webSocketStream
}

function browserStreamBuilder (client, opts) {
debug('browserStreamBuilder')
if (!opts.hostname) {
opts.hostname = opts.host
let stream
let options = setDefaultBrowserOpts(opts)
// sets the maximum socket buffer size before throttling
const bufferSize = options.browserBufferSize || 1024 * 512

const bufferTimeout = opts.browserBufferTimeout || 1000

const coerceToBuffer = !opts.objectMode

let socket = createBrowserWebSocket(client, opts)

let proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser)

if (!opts.objectMode) {
proxy._writev = writev
}
proxy.on('close', () => { socket.close() })

if (!opts.hostname) {
// Throwing an error in a Web Worker if no `hostname` is given, because we
// can not determine the `hostname` automatically. If connecting to
// localhost, please supply the `hostname` as an argument.
if (typeof (document) === 'undefined') {
throw new Error('Could not determine host. Specify host manually.')
const eventListenerSupport = (typeof socket.addEventListener === 'undefined')

// was already open when passed in
if (socket.readyState === socket.OPEN) {
stream = proxy
} else {
stream = stream = duplexify(undefined, undefined, opts)
if (!opts.objectMode) {
stream._writev = writev
}
var parsed = urlModule.parse(document.URL)
opts.hostname = parsed.hostname

if (!opts.port) {
opts.port = parsed.port
if (eventListenerSupport) {
socket.addEventListener('open', onopen)
} else {
socket.onopen = onopen
}
}
return createWebSocket(client, opts)

stream.socket = socket

if (eventListenerSupport) {
socket.addEventListener('close', onclose)
socket.addEventListener('error', onerror)
socket.addEventListener('message', onmessage)
} else {
socket.onclose = onclose
socket.onerror = onerror
socket.onmessage = onmessage
}

// methods for browserStreamBuilder

function buildProxy (options, socketWrite, socketEnd) {
let proxy = new Transform({
objectModeMode: options.objectMode
})

proxy._write = socketWrite
proxy._flush = socketEnd

return proxy
}

function onopen () {
stream.setReadable(proxy)
stream.setWritable(proxy)
stream.emit('connect')
}

function onclose () {
stream.end()
stream.destroy()
}

function onerror (err) {
stream.destroy(err)
}

function onmessage (event) {
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
}

// this is to be enabled only if objectMode is false
function writev (chunks, cb) {
const buffers = new Array(chunks.length)
for (let i = 0; i < chunks.length; i++) {
if (typeof chunks[i].chunk === 'string') {
buffers[i] = Buffer.from(chunks[i], 'utf8')
} else {
buffers[i] = chunks[i].chunk
}
}

this._write(Buffer.concat(buffers), 'binary', cb)
}

function socketWriteBrowser (chunk, enc, next) {
if (socket.bufferedAmount > bufferSize) {
// throttle data until buffered amount is reduced.
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
}

if (coerceToBuffer && typeof chunk === 'string') {
chunk = Buffer.from(chunk, 'utf8')
}

try {
socket.send(chunk)
} catch (err) {
return next(err)
}

next()
}

function socketEndBrowser (done) {
socket.close()
done()
}

// end methods for browserStreamBuilder

return stream
}

if (IS_BROWSER) {
Expand Down
6 changes: 3 additions & 3 deletions test/browser/server.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use strict'

var handleClient
var websocket = require('websocket-stream')
var WebSocketServer = require('ws').Server
var WS = require('ws')
var WebSocketServer = WS.Server
var Connection = require('mqtt-connection')
var http = require('http')

Expand Down Expand Up @@ -109,7 +109,7 @@ function start (startPort, done) {
return ws.close()
}

stream = websocket(ws)
stream = WS.createWebSocketStream(ws)
connection = new Connection(stream)
handleClient.call(server, connection)
})
Expand Down
2 changes: 1 addition & 1 deletion test/websocket_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ describe('Websocket Client', function () {
})
})

it('should be able transform the url (for e.g. to sign it)', function (done) {
it('should be able to transform the url (for e.g. to sign it)', function (done) {
var baseUrl = 'ws://localhost:9999/mqtt'
var sig = '?AUTH=token'
var expected = baseUrl + sig
Expand Down