// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information. ;(function (factory) { var objectTypes = { 'function': true, 'object': true }; function checkGlobal(value) { return (value && value.Object === Object) ? value : null; } var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null; var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null; var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global); var freeSelf = checkGlobal(objectTypes[typeof self] && self); var freeWindow = checkGlobal(objectTypes[typeof window] && window); var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null; var thisGlobal = checkGlobal(objectTypes[typeof this] && this); var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')(); // Because of build optimizers if (typeof define === 'function' && define.amd) { define(['./rx'], function (Rx, exports) { return factory(root, exports, Rx); }); } else if (typeof module === 'object' && module && module.exports === freeExports) { module.exports = factory(root, module.exports, require('./rx')); } else { root.Rx = factory(root, {}, root.Rx); } }.call(this, function (root, exp, Rx, undefined) { // Aliases var Observable = Rx.Observable, observableProto = Observable.prototype, ObservableBase = Rx.ObservableBase, AbstractObserver = Rx.internals.AbstractObserver, FlatMapObservable = Rx.FlatMapObservable, observableConcat = Observable.concat, observableDefer = Observable.defer, observableEmpty = Observable.empty, disposableEmpty = Rx.Disposable.empty, CompositeDisposable = Rx.CompositeDisposable, SerialDisposable = Rx.SerialDisposable, SingleAssignmentDisposable = Rx.SingleAssignmentDisposable, Enumerable = Rx.internals.Enumerable, enumerableOf = Enumerable.of, currentThreadScheduler = Rx.Scheduler.currentThread, AsyncSubject = Rx.AsyncSubject, Observer = Rx.Observer, inherits = Rx.internals.inherits, addProperties = Rx.internals.addProperties, helpers = Rx.helpers, noop = helpers.noop, isPromise = helpers.isPromise, isFunction = helpers.isFunction, isIterable = Rx.helpers.isIterable, isArrayLike = Rx.helpers.isArrayLike, isScheduler = Rx.Scheduler.isScheduler, observableFromPromise = Observable.fromPromise; var errorObj = {e: {}}; function tryCatcherGen(tryCatchTarget) { return function tryCatcher() { try { return tryCatchTarget.apply(this, arguments); } catch (e) { errorObj.e = e; return errorObj; } }; } var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) { if (!isFunction(fn)) { throw new TypeError('fn must be a function'); } return tryCatcherGen(fn); }; function thrower(e) { throw e; } // Shim in iterator support var $iterator$ = (typeof Symbol === 'function' && Symbol.iterator) || '_es6shim_iterator_'; // Bug for mozilla version if (root.Set && typeof new root.Set()['@@iterator'] === 'function') { $iterator$ = '@@iterator'; } var doneEnumerator = Rx.doneEnumerator = { done: true, value: undefined }; var isIterable = Rx.helpers.isIterable = function (o) { return o && o[$iterator$] !== undefined; }; var isArrayLike = Rx.helpers.isArrayLike = function (o) { return o && o.length !== undefined; }; Rx.helpers.iterator = $iterator$; var WhileEnumerable = (function(__super__) { inherits(WhileEnumerable, __super__); function WhileEnumerable(c, s) { this.c = c; this.s = s; } WhileEnumerable.prototype[$iterator$] = function () { var self = this; return { next: function () { return self.c() ? { done: false, value: self.s } : { done: true, value: void 0 }; } }; }; return WhileEnumerable; }(Enumerable)); function enumerableWhile(condition, source) { return new WhileEnumerable(condition, source); } /** * Returns an observable sequence that is the result of invoking the selector on the source sequence, without sharing subscriptions. * This operator allows for a fluent style of writing queries that use the same sequence multiple times. * * @param {Function} selector Selector function which can use the source sequence as many times as needed, without sharing subscriptions to the source sequence. * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function. */ observableProto.letBind = observableProto['let'] = function (func) { return func(this); }; /** * Determines whether an observable collection contains values. * * @example * 1 - res = Rx.Observable.if(condition, obs1); * 2 - res = Rx.Observable.if(condition, obs1, obs2); * 3 - res = Rx.Observable.if(condition, obs1, scheduler); * @param {Function} condition The condition which determines if the thenSource or elseSource will be run. * @param {Observable} thenSource The observable sequence or Promise that will be run if the condition function returns true. * @param {Observable} [elseSource] The observable sequence or Promise that will be run if the condition function returns false. If this is not provided, it defaults to Rx.Observabe.Empty with the specified scheduler. * @returns {Observable} An observable sequence which is either the thenSource or elseSource. */ Observable['if'] = function (condition, thenSource, elseSourceOrScheduler) { return observableDefer(function () { elseSourceOrScheduler || (elseSourceOrScheduler = observableEmpty()); isPromise(thenSource) && (thenSource = observableFromPromise(thenSource)); isPromise(elseSourceOrScheduler) && (elseSourceOrScheduler = observableFromPromise(elseSourceOrScheduler)); // Assume a scheduler for empty only typeof elseSourceOrScheduler.now === 'function' && (elseSourceOrScheduler = observableEmpty(elseSourceOrScheduler)); return condition() ? thenSource : elseSourceOrScheduler; }); }; /** * Concatenates the observable sequences obtained by running the specified result selector for each element in source. * There is an alias for this method called 'forIn' for browsers <IE9 * @param {Array} sources An array of values to turn into an observable sequence. * @param {Function} resultSelector A function to apply to each item in the sources array to turn it into an observable sequence. * @returns {Observable} An observable sequence from the concatenated observable sequences. */ Observable['for'] = Observable.forIn = function (sources, resultSelector, thisArg) { return enumerableOf(sources, resultSelector, thisArg).concat(); }; /** * Repeats source as long as condition holds emulating a while loop. * There is an alias for this method called 'whileDo' for browsers <IE9 * * @param {Function} condition The condition which determines if the source will be repeated. * @param {Observable} source The observable sequence that will be run if the condition function returns true. * @returns {Observable} An observable sequence which is repeated as long as the condition holds. */ var observableWhileDo = Observable['while'] = Observable.whileDo = function (condition, source) { isPromise(source) && (source = observableFromPromise(source)); return enumerableWhile(condition, source).concat(); }; /** * Repeats source as long as condition holds emulating a do while loop. * * @param {Function} condition The condition which determines if the source will be repeated. * @param {Observable} source The observable sequence that will be run if the condition function returns true. * @returns {Observable} An observable sequence which is repeated as long as the condition holds. */ observableProto.doWhile = function (condition) { return observableConcat([this, observableWhileDo(condition, this)]); }; /** * Uses selector to determine which source in sources to use. * @param {Function} selector The function which extracts the value for to test in a case statement. * @param {Array} sources A object which has keys which correspond to the case statement labels. * @param {Observable} [elseSource] The observable sequence or Promise that will be run if the sources are not matched. If this is not provided, it defaults to Rx.Observabe.empty with the specified scheduler. * * @returns {Observable} An observable sequence which is determined by a case statement. */ Observable['case'] = function (selector, sources, defaultSourceOrScheduler) { return observableDefer(function () { isPromise(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableFromPromise(defaultSourceOrScheduler)); defaultSourceOrScheduler || (defaultSourceOrScheduler = observableEmpty()); isScheduler(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableEmpty(defaultSourceOrScheduler)); var result = sources[selector()]; isPromise(result) && (result = observableFromPromise(result)); return result || defaultSourceOrScheduler; }); }; var ExpandObservable = (function(__super__) { inherits(ExpandObservable, __super__); function ExpandObservable(source, fn, scheduler) { this.source = source; this._fn = fn; this._scheduler = scheduler; __super__.call(this); } function scheduleRecursive(args, recurse) { var state = args[0], self = args[1]; var work; if (state.q.length > 0) { work = state.q.shift(); } else { state.isAcquired = false; return; } var m1 = new SingleAssignmentDisposable(); state.d.add(m1); m1.setDisposable(work.subscribe(new ExpandObserver(state, self, m1))); recurse([state, self]); } ExpandObservable.prototype._ensureActive = function (state) { var isOwner = false; if (state.q.length > 0) { isOwner = !state.isAcquired; state.isAcquired = true; } isOwner && state.m.setDisposable(this._scheduler.scheduleRecursive([state, this], scheduleRecursive)); }; ExpandObservable.prototype.subscribeCore = function (o) { var m = new SerialDisposable(), d = new CompositeDisposable(m), state = { q: [], m: m, d: d, activeCount: 0, isAcquired: false, o: o }; state.q.push(this.source); state.activeCount++; this._ensureActive(state); return d; }; return ExpandObservable; }(ObservableBase)); var ExpandObserver = (function(__super__) { inherits(ExpandObserver, __super__); function ExpandObserver(state, parent, m1) { this._s = state; this._p = parent; this._m1 = m1; __super__.call(this); } ExpandObserver.prototype.next = function (x) { this._s.o.onNext(x); var result = tryCatch(this._p._fn)(x); if (result === errorObj) { return this._s.o.onError(result.e); } this._s.q.push(result); this._s.activeCount++; this._p._ensureActive(this._s); }; ExpandObserver.prototype.error = function (e) { this._s.o.onError(e); }; ExpandObserver.prototype.completed = function () { this._s.d.remove(this._m1); this._s.activeCount--; this._s.activeCount === 0 && this._s.o.onCompleted(); }; return ExpandObserver; }(AbstractObserver)); /** * Expands an observable sequence by recursively invoking selector. * * @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again. * @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler. * @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion. */ observableProto.expand = function (selector, scheduler) { isScheduler(scheduler) || (scheduler = currentThreadScheduler); return new ExpandObservable(this, selector, scheduler); }; function argumentsToArray() { var len = arguments.length, args = new Array(len); for(var i = 0; i < len; i++) { args[i] = arguments[i]; } return args; } var ForkJoinObservable = (function (__super__) { inherits(ForkJoinObservable, __super__); function ForkJoinObservable(sources, cb) { this._sources = sources; this._cb = cb; __super__.call(this); } ForkJoinObservable.prototype.subscribeCore = function (o) { if (this._sources.length === 0) { o.onCompleted(); return disposableEmpty; } var count = this._sources.length; var state = { finished: false, hasResults: new Array(count), hasCompleted: new Array(count), results: new Array(count) }; var subscriptions = new CompositeDisposable(); for (var i = 0, len = this._sources.length; i < len; i++) { var source = this._sources[i]; isPromise(source) && (source = observableFromPromise(source)); subscriptions.add(source.subscribe(new ForkJoinObserver(o, state, i, this._cb, subscriptions))); } return subscriptions; }; return ForkJoinObservable; }(ObservableBase)); var ForkJoinObserver = (function(__super__) { inherits(ForkJoinObserver, __super__); function ForkJoinObserver(o, s, i, cb, subs) { this._o = o; this._s = s; this._i = i; this._cb = cb; this._subs = subs; __super__.call(this); } ForkJoinObserver.prototype.next = function (x) { if (!this._s.finished) { this._s.hasResults[this._i] = true; this._s.results[this._i] = x; } }; ForkJoinObserver.prototype.error = function (e) { this._s.finished = true; this._o.onError(e); this._subs.dispose(); }; ForkJoinObserver.prototype.completed = function () { if (!this._s.finished) { if (!this._s.hasResults[this._i]) { return this._o.onCompleted(); } this._s.hasCompleted[this._i] = true; for (var i = 0; i < this._s.results.length; i++) { if (!this._s.hasCompleted[i]) { return; } } this._s.finished = true; var res = tryCatch(this._cb).apply(null, this._s.results); if (res === errorObj) { return this._o.onError(res.e); } this._o.onNext(res); this._o.onCompleted(); } }; return ForkJoinObserver; }(AbstractObserver)); /** * Runs all observable sequences in parallel and collect their last elements. * * @example * 1 - res = Rx.Observable.forkJoin([obs1, obs2]); * 1 - res = Rx.Observable.forkJoin(obs1, obs2, ...); * @returns {Observable} An observable sequence with an array collecting the last elements of all the input sequences. */ Observable.forkJoin = function () { var len = arguments.length, args = new Array(len); for(var i = 0; i < len; i++) { args[i] = arguments[i]; } var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray; Array.isArray(args[0]) && (args = args[0]); return new ForkJoinObservable(args, resultSelector); }; /** * Runs two observable sequences in parallel and combines their last elemenets. * @param {Observable} second Second observable sequence. * @param {Function} resultSelector Result selector function to invoke with the last elements of both sequences. * @returns {Observable} An observable sequence with the result of calling the selector function with the last elements of both input sequences. */ observableProto.forkJoin = function () { var len = arguments.length, args = new Array(len); for(var i = 0; i < len; i++) { args[i] = arguments[i]; } if (Array.isArray(args[0])) { args[0].unshift(this); } else { args.unshift(this); } return Observable.forkJoin.apply(null, args); }; /** * Comonadic bind operator. * @param {Function} selector A transform function to apply to each element. * @param {Object} scheduler Scheduler used to execute the operation. If not specified, defaults to the ImmediateScheduler. * @returns {Observable} An observable sequence which results from the comonadic bind operation. */ observableProto.manySelect = observableProto.extend = function (selector, scheduler) { isScheduler(scheduler) || (scheduler = Rx.Scheduler.immediate); var source = this; return observableDefer(function () { var chain; return source .map(function (x) { var curr = new ChainObservable(x); chain && chain.onNext(x); chain = curr; return curr; }) .tap( noop, function (e) { chain && chain.onError(e); }, function () { chain && chain.onCompleted(); } ) .observeOn(scheduler) .map(selector); }, source); }; var ChainObservable = (function (__super__) { inherits(ChainObservable, __super__); function ChainObservable(head) { __super__.call(this); this.head = head; this.tail = new AsyncSubject(); } addProperties(ChainObservable.prototype, Observer, { _subscribe: function (o) { var g = new CompositeDisposable(); g.add(currentThreadScheduler.schedule(this, function (_, self) { o.onNext(self.head); g.add(self.tail.mergeAll().subscribe(o)); })); return g; }, onCompleted: function () { this.onNext(Observable.empty()); }, onError: function (e) { this.onNext(Observable['throw'](e)); }, onNext: function (v) { this.tail.onNext(v); this.tail.onCompleted(); } }); return ChainObservable; }(Observable)); var SwitchFirstObservable = (function (__super__) { inherits(SwitchFirstObservable, __super__); function SwitchFirstObservable(source) { this.source = source; __super__.call(this); } SwitchFirstObservable.prototype.subscribeCore = function (o) { var m = new SingleAssignmentDisposable(), g = new CompositeDisposable(), state = { hasCurrent: false, isStopped: false, o: o, g: g }; g.add(m); m.setDisposable(this.source.subscribe(new SwitchFirstObserver(state))); return g; }; return SwitchFirstObservable; }(ObservableBase)); var SwitchFirstObserver = (function(__super__) { inherits(SwitchFirstObserver, __super__); function SwitchFirstObserver(state) { this._s = state; __super__.call(this); } SwitchFirstObserver.prototype.next = function (x) { if (!this._s.hasCurrent) { this._s.hasCurrent = true; isPromise(x) && (x = observableFromPromise(x)); var inner = new SingleAssignmentDisposable(); this._s.g.add(inner); inner.setDisposable(x.subscribe(new InnerObserver(this._s, inner))); } }; SwitchFirstObserver.prototype.error = function (e) { this._s.o.onError(e); }; SwitchFirstObserver.prototype.completed = function () { this._s.isStopped = true; !this._s.hasCurrent && this._s.g.length === 1 && this._s.o.onCompleted(); }; inherits(InnerObserver, __super__); function InnerObserver(state, inner) { this._s = state; this._i = inner; __super__.call(this); } InnerObserver.prototype.next = function (x) { this._s.o.onNext(x); }; InnerObserver.prototype.error = function (e) { this._s.o.onError(e); }; InnerObserver.prototype.completed = function () { this._s.g.remove(this._i); this._s.hasCurrent = false; this._s.isStopped && this._s.g.length === 1 && this._s.o.onCompleted(); }; return SwitchFirstObserver; }(AbstractObserver)); /** * Performs a exclusive waiting for the first to finish before subscribing to another observable. * Observables that come in between subscriptions will be dropped on the floor. * @returns {Observable} A exclusive observable with only the results that happen when subscribed. */ observableProto.switchFirst = function () { return new SwitchFirstObservable(this); }; observableProto.flatMapFirst = observableProto.exhaustMap = function(selector, resultSelector, thisArg) { return new FlatMapObservable(this, selector, resultSelector, thisArg).switchFirst(); }; observableProto.flatMapWithMaxConcurrent = observableProto.flatMapMaxConcurrent = function(limit, selector, resultSelector, thisArg) { return new FlatMapObservable(this, selector, resultSelector, thisArg).merge(limit); }; return Rx; }));