diff --git a/perf/micro/immediate-scheduler/operators/elementat.js b/perf/micro/immediate-scheduler/operators/elementat.js new file mode 100644 index 0000000000..3d03817038 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/elementat.js @@ -0,0 +1,20 @@ +var RxOld = require("rx"); +var RxNew = require("../../../../index"); + +module.exports = function (suite) { + + var oldElementAtWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).elementAt(5); + var newElementAtWithImmediateScheduler = RxNew.Observable.range(0, 25).elementAt(5); + + return suite + .add('old elementAt with immediate scheduler', function () { + oldElementAtWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new elementAt with immediate scheduler', function () { + newElementAtWithImmediateScheduler.subscribe(_next, _error, _complete); + }); + + function _next(x) { } + function _error(e){ } + function _complete(){ } +}; \ No newline at end of file diff --git a/spec/operators/elementat-spec.js b/spec/operators/elementat-spec.js new file mode 100644 index 0000000000..0062649336 --- /dev/null +++ b/spec/operators/elementat-spec.js @@ -0,0 +1,47 @@ +/* globals describe, it, expect, hot, cold, expectObservable */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.elementAt', function() { + it("should return first element by zero-based index", function() { + var source = hot('--a--b--c--|'); + var expected = '--(a|)'; + + expectObservable(source.elementAt(0)).toBe(expected); + }); + + it("should return non-first element by zero-based index", function() { + var source = hot('--a--b--c--d--e--f--|'); + var expected = '-----------(d|)'; + + expectObservable(source.elementAt(3)).toBe(expected); + }); + + it("should return last element by zero-based index", function() { + var source = hot('--a--b--c--|'); + var expected = '--------(c|)'; + + expectObservable(source.elementAt(2)).toBe(expected); + }); + + it("should throw if index is smaller than zero", function() { + expect(function() { Observable.range(0,10).elementAt(-1); }) + .toThrow(new Rx.ArgumentOutOfRangeError); + }); + + it("should raise error if index is out of range but does not have default value", function() { + var source = hot('--a--|'); + var expected = '-----#'; + + expectObservable(source.elementAt(3)) + .toBe(expected, null, new Rx.ArgumentOutOfRangeError); + }); + + it("should return default value if index is out of range", function() { + var source = hot('--a--|'); + var expected = '-----(x|)'; + var defaultValue = '42'; + + expectObservable(source.elementAt(3, defaultValue)).toBe(expected, { x: defaultValue }); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 1b3bbc139e..efa0cce0cc 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -184,6 +184,7 @@ export default class Observable { startWith: (x: T) => Observable; debounce: (dueTime: number, scheduler?: Scheduler) => Observable; + elementAt: (index: number, defaultValue?: any) => Observable; last: (predicate?: (value: T, index:number) => boolean, thisArg?: any, defaultValue?: any) => Observable; filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable; @@ -233,4 +234,4 @@ export default class Observable { finally: (ensure: () => void, thisArg?: any) => Observable; timeout: (due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable; timeoutWith: (due: number|Date, withObservable: Observable, scheduler?: Scheduler) => Observable; -} \ No newline at end of file +} diff --git a/src/Rx.ts b/src/Rx.ts index 9dcf869dff..1aeaf0aa75 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -10,6 +10,7 @@ import Subscriber from './Subscriber'; import Subscription from './Subscription'; import Notification from './Notification'; import EmptyError from './util/EmptyError'; +import ArgumentOutOfRangeError from './util/ArgumentOutOfRangeError'; import ReplaySubject from './subjects/ReplaySubject'; import BehaviorSubject from './subjects/BehaviorSubject'; @@ -102,6 +103,7 @@ import take from './operators/take'; import skip from './operators/skip'; import skipUntil from './operators/skipUntil'; import takeUntil from './operators/takeUntil'; +import elementAt from './operators/elementAt'; import filter from './operators/filter'; import distinctUntilChanged from './operators/distinctUntilChanged'; import distinctUntilKeyChanged from './operators/distinctUntilKeyChanged'; @@ -110,6 +112,7 @@ observableProto.take = take; observableProto.skip = skip; observableProto.takeUntil = takeUntil; observableProto.skipUntil = skipUntil; +observableProto.elementAt = elementAt; observableProto.filter = filter; observableProto.distinctUntilChanged = distinctUntilChanged; observableProto.distinctUntilKeyChanged = distinctUntilKeyChanged; @@ -237,5 +240,6 @@ export { Notification, VirtualTimeScheduler, TestScheduler, - EmptyError + EmptyError, + ArgumentOutOfRangeError }; diff --git a/src/operators/elementAt.ts b/src/operators/elementAt.ts new file mode 100644 index 0000000000..108cfa92e6 --- /dev/null +++ b/src/operators/elementAt.ts @@ -0,0 +1,47 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import ArgumentOutOfRangeError from '../util/ArgumentOutOfRangeError'; + +export default function elementAt(index: number, defaultValue?: any) { + return this.lift(new ElementAtOperator(index, defaultValue)); +} + +class ElementAtOperator implements Operator { + + constructor(private index: number, private defaultValue?: any) { + if (index < 0) { + throw new ArgumentOutOfRangeError; + } + } + + call(subscriber: Subscriber): Subscriber { + return new ElementAtSubscriber(subscriber, this.index, this.defaultValue); + } +} + +class ElementAtSubscriber extends Subscriber { + + constructor(destination: Subscriber, private index: number, private defaultValue?: any) { + super(destination); + } + + _next(x) { + if (this.index-- === 0) { + this.destination.next(x); + this.destination.complete(); + } + } + + _complete() { + const destination = this.destination; + if (this.index >= 0) { + if(typeof this.defaultValue !== 'undefined') { + destination.next(this.defaultValue); + } else { + destination.error(new ArgumentOutOfRangeError); + } + } + destination.complete(); + } +} diff --git a/src/util/ArgumentOutOfRangeError.ts b/src/util/ArgumentOutOfRangeError.ts new file mode 100644 index 0000000000..9691d8f585 --- /dev/null +++ b/src/util/ArgumentOutOfRangeError.ts @@ -0,0 +1,4 @@ +export default class ArgumentOutOfRangeError implements Error { + name = 'ArgumentOutOfRangeError'; + message = 'argument out of range'; +} \ No newline at end of file