diff --git a/spec/observables/dom/webSocket-spec.js b/spec/observables/dom/webSocket-spec.js new file mode 100644 index 0000000000..9f5ebca8b3 --- /dev/null +++ b/spec/observables/dom/webSocket-spec.js @@ -0,0 +1,139 @@ +/* globals describe, it, expect, sinon, rxTestScheduler */ +var Rx = require('../../../dist/cjs/Rx.DOM'); +var Observable = Rx.Observable; + +function noop() { + // nope. +} + +describe('Observable.webSocket', function () { + beforeEach(function () { + setupMockWebSocket(); + }); + + afterEach(function () { + teardownMockWebSocket(); + }); + + it ('should send a message', function () { + var messageReceived = false; + var subject = Observable.webSocket('ws://mysocket'); + + subject.next('ping'); + + subject.subscribe(function (x) { + expect(x).toBe('pong'); + messageReceived = true; + }); + + var socket = MockWebSocket.lastSocket(); + + socket.open(); + expect(socket.lastMessageSent()).toBe('ping'); + + socket.triggerMessage('pong'); + expect(messageReceived).toBe(true); + }); +}); + +var sockets = []; + +function MockWebSocket(url, protocol) { + sockets.push(this); + this.url = url; + this.protocol = protocol; + this.sent = []; + this.handlers = {}; + this.readyState = 1; +} + +MockWebSocket.lastSocket = function () { + return sockets.length > 0 ? sockets[sockets.length - 1] : undefined; +}; + +MockWebSocket.prototype = { + send: function (data) { + this.sent.push(data); + }, + + lastMessageSent: function () { + var sent = this.sent; + return sent.length > 0 ? sent[sent.length - 1] : undefined; + }, + + triggerClose: function (e) { + this.readyState = 3; + this.trigger('close', e); + }, + + triggerError: function (err) { + this.readyState = 3; + this.trigger('error', err); + }, + + triggerMessage: function (data) { + var messageEvent = { + data: JSON.stringify(data), + origin: 'mockorigin', + ports: undefined, + source: __root__, + }; + + this.trigger('message', messageEvent); + }, + + open: function () { + this.readyState = 1; + this.trigger('open', {}); + }, + + close: function (code, reason) { + if (this.readyState < 2) { + this.readyState = 2; + this.closeCode = code; + this.closeReason = reason; + this.triggerClose(); + } + }, + + addEventListener: function (name, handler) { + var lookup = this.handlers[name] = this.handlers[name] || []; + lookup.push(handler); + }, + + removeEventListener: function (name, handler) { + var lookup = this.handlers[name]; + if (lookup) { + for (var i = lookup.length - 1; i--;) { + if (lookup[i] === handler) { + lookup.splice(i, 1); + } + } + } + }, + + trigger: function (name, e) { + if (this['on' + name]) { + this['on' + name](e); + } + + var lookup = this.handlers[name]; + if (lookup) { + for (var i = 0; i < lookup.length; i++) { + lookup[i](e); + } + } + } +} + +var __ws; +function setupMockWebSocket() { + sockets = []; + __ws = __root__.WebSocket; + __root__.WebSocket = MockWebSocket; +} + +function teardownMockWebSocket() { + __root__.WebSocket = __ws; + sockets = null; +} \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index a7bec2a5c6..c611f955c9 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -32,6 +32,7 @@ import {RangeObservable} from './observable/range'; import {InfiniteObservable} from './observable/never'; import {ErrorObservable} from './observable/throw'; import {AjaxCreationMethod} from './observable/dom/ajax'; +import {WebSocketSubject} from './observable/dom/webSocket'; /** * A representation of any set of values over any amount of time. This the most basic building block @@ -187,6 +188,7 @@ export class Observable implements CoreOperators { static range: typeof RangeObservable.create; static throw: typeof ErrorObservable.create; static timer: typeof TimerObservable.create; + static webSocket: typeof WebSocketSubject.create; static zip: typeof zipStatic; // core operators diff --git a/src/Rx.DOM.ts b/src/Rx.DOM.ts index a06c969b53..4bca9733d1 100644 --- a/src/Rx.DOM.ts +++ b/src/Rx.DOM.ts @@ -28,6 +28,7 @@ import './add/observable/timer'; import './add/operator/zip-static'; import './add/observable/dom/ajax'; +import './add/observable/dom/webSocket'; //operators import './add/operator/buffer'; diff --git a/src/Subject.ts b/src/Subject.ts index de0ce47bc0..ebc817ee34 100644 --- a/src/Subject.ts +++ b/src/Subject.ts @@ -8,9 +8,9 @@ import {rxSubscriber} from './symbol/rxSubscriber'; export class Subject extends Observable implements Observer, Subscription { - static create(source: Observable, destination: Observer): Subject { + static create: Function = (source: Observable, destination: Observer): Subject => { return new Subject(source, destination); - } + }; constructor(source?: Observable, destination?: Observer) { super(); @@ -127,13 +127,17 @@ export class Subject extends Observable implements Observer, Subscripti if (this.destination) { this.destination.next(value); } else { - let index = -1; - const observers = this.observers.slice(0); - const len = observers.length; + this._finalNext(value); + } + } - while (++index < len) { - observers[index].next(value); - } + protected _finalNext(value: T): void { + let index = -1; + const observers = this.observers.slice(0); + const len = observers.length; + + while (++index < len) { + observers[index].next(value); } } @@ -141,46 +145,54 @@ export class Subject extends Observable implements Observer, Subscripti if (this.destination) { this.destination.error(err); } else { - let index = -1; - const observers = this.observers; - const len = observers.length; + this._finalError(err); + } + } - // optimization to block our SubjectSubscriptions from - // splicing themselves out of the observers list one by one. - this.observers = null; - this.isUnsubscribed = true; + protected _finalError(err: any): void { + let index = -1; + const observers = this.observers; + const len = observers.length; - while (++index < len) { - observers[index].error(err); - } - - this.isUnsubscribed = false; + // optimization to block our SubjectSubscriptions from + // splicing themselves out of the observers list one by one. + this.observers = null; + this.isUnsubscribed = true; - this.unsubscribe(); + while (++index < len) { + observers[index].error(err); } + + this.isUnsubscribed = false; + + this.unsubscribe(); } protected _complete(): void { if (this.destination) { this.destination.complete(); } else { - let index = -1; - const observers = this.observers; - const len = observers.length; + this._finalComplete(); + } + } - // optimization to block our SubjectSubscriptions from - // splicing themselves out of the observers list one by one. - this.observers = null; - this.isUnsubscribed = true; + protected _finalComplete(): void { + let index = -1; + const observers = this.observers; + const len = observers.length; - while (++index < len) { - observers[index].complete(); - } - - this.isUnsubscribed = false; + // optimization to block our SubjectSubscriptions from + // splicing themselves out of the observers list one by one. + this.observers = null; + this.isUnsubscribed = true; - this.unsubscribe(); + while (++index < len) { + observers[index].complete(); } + + this.isUnsubscribed = false; + + this.unsubscribe(); } [rxSubscriber]() { diff --git a/src/add/observable/dom/webSocket.ts b/src/add/observable/dom/webSocket.ts new file mode 100644 index 0000000000..993f0c0a4e --- /dev/null +++ b/src/add/observable/dom/webSocket.ts @@ -0,0 +1,5 @@ +import {Observable} from '../../../Observable'; +import {WebSocketSubject} from '../../../observable/dom/webSocket'; +Observable.webSocket = WebSocketSubject.create; + +export var _void: void; diff --git a/src/observable/dom/webSocket.ts b/src/observable/dom/webSocket.ts new file mode 100644 index 0000000000..26a85211dc --- /dev/null +++ b/src/observable/dom/webSocket.ts @@ -0,0 +1,195 @@ +import {Subject} from '../../Subject'; +import {Subscriber} from '../../Subscriber'; +import {Observable} from '../../Observable'; +import {Subscription} from '../../Subscription'; +import {root} from '../../util/root'; +import {ReplaySubject} from '../../subject/ReplaySubject'; +import {Observer} from '../../Observer'; +import {tryCatch} from '../../util/tryCatch'; +import {errorObject} from '../../util/errorObject'; +import {Operator} from '../../Operator'; +import {assign} from '../../util/assign'; + +export interface WebSocketSubjectConfig { + url: string; + protocol?: string | Array; + resultSelector?: (e: MessageEvent) => T; + openObserver?: Observer; + closeObserver?: Observer; + closingObserver?: Observer; + WebSocketCtor?: { new(url: string, protocol?: string|Array): WebSocket }; +} + +export class WebSocketSubject extends Subject { + url: string; + protocol: string|Array; + socket: WebSocket; + openObserver: Observer; + closeObserver: Observer; + closingObserver: Observer; + WebSocketCtor: { new(url: string, protocol?: string|Array)}; + + resultSelector(e: MessageEvent) { + return JSON.parse(e.data); + } + + static create(urlConfigOrSource: string | WebSocketSubjectConfig): WebSocketSubject { + return new WebSocketSubject(urlConfigOrSource); + } + + constructor(urlConfigOrSource: string | WebSocketSubjectConfig | Observable, destination?: Observer) { + if (urlConfigOrSource instanceof Observable) { + super(urlConfigOrSource, destination); + } else { + super(); + this.WebSocketCtor = root.WebSocket; + + if (typeof urlConfigOrSource === 'string') { + this.url = urlConfigOrSource; + } else { + // WARNING: config object could override important members here. + assign(this, urlConfigOrSource); + } + + if (!this.WebSocketCtor) { + throw new Error('no WebSocket constructor can be found'); + } + + this.destination = new ReplaySubject(); + } + } + + lift(operator) { + const sock = new WebSocketSubject(this, this.destination); + sock.operator = operator; + return sock; + } + + multiplex(subMsg: any, unsubMsg: any, messageFilter: (value: T) => boolean) { + return this.lift(new MultiplexOperator(this, subMsg, unsubMsg, messageFilter)); + } + + _unsubscribe() { + this.source = null; + this.isStopped = false; + this.observers = null; + this.isUnsubscribed = false; + } + + _subscribe(subscriber: Subscriber) { + const subscription = super._subscribe(subscriber); + // HACK: For some reason transpilation wasn't honoring this in arrow functions below + // Doesn't seem right, need to reinvestigate. + const self = this; + + if (self.source || !subscription || (subscription).isUnsubscribed) { + return subscription; + } + + if (self.url && !self.socket) { + const socket = new WebSocket(self.url); + self.socket = socket; + + socket.onopen = (e) => { + const openObserver = self.openObserver; + if (openObserver) { + openObserver.next(e); + } + + const queue = self.destination; + + self.destination = Subscriber.create( + (x) => socket.readyState === 1 && socket.send(x), + (e) => socket.close(e), + ( ) => { + const closingObserver = self.closingObserver; + if (closingObserver) { + closingObserver.next(undefined); + } + socket.close(); + } + ); + + if (queue && queue instanceof ReplaySubject) { + subscription.add((>queue).subscribe(self.destination)); + } + }; + + socket.onerror = (e) => self.error(e); + + socket.onclose = (e: CloseEvent) => { + const closeObserver = self.closeObserver; + if (closeObserver) { + closeObserver.next(e); + } + if (e.wasClean) { + self._finalComplete(); + } else { + self._finalError(e); + } + }; + + socket.onmessage = (e: MessageEvent) => { + const result = tryCatch(self.resultSelector)(e); + if (result === errorObject.e) { + self._finalError(errorObject.e); + } else { + self._finalNext(result); + } + }; + return subscription; + } + + return new Subscription(() => { + subscription.unsubscribe(); + if (this.observers.length === 0) { + const { socket } = this; + if (socket && socket.readyState < 2) { + socket.close(); + } + this.socket = undefined; + this.source = undefined; + this.destination = new ReplaySubject(); + } + }); + } +} + +export class MultiplexOperator implements Operator { + constructor(private socketSubject: WebSocketSubject, + private subscribeMessage: any, + private unsubscribeMessage, + private messageFilter: (data: any) => R) { + // noop + } + + call(subscriber: Subscriber) { + return new MultiplexSubscriber(subscriber, this.socketSubject, this.subscribeMessage, this.unsubscribeMessage, this.messageFilter); + } +} + +export class MultiplexSubscriber extends Subscriber { + constructor(destination: Observer, + private socketSubject: WebSocketSubject, + private subscribeMessage: any, + private unsubscribeMessage: any, + private messageFilter: (data: any) => T) { + super(destination); + + socketSubject.next(subscribeMessage); + } + + next(value: any) { + const pass = tryCatch(this.messageFilter)(value); + if (pass === errorObject) { + this.destination.error(errorObject.e); + } else if (pass) { + this.destination.next(value); + } + } + + unsubscribe() { + this.socketSubject.next(this.unsubscribeMessage); + super.unsubscribe(); + } +} \ No newline at end of file