@@ -6898,144 +6898,147 @@
6898
6898
return new PausableBufferedObservable ( this , subject ) ;
6899
6899
} ;
6900
6900
6901
- var ControlledObservable = ( function ( __super__ ) {
6901
+ var ControlledObservable = ( function ( __super__ ) {
6902
6902
6903
- inherits ( ControlledObservable , __super__ ) ;
6903
+ inherits ( ControlledObservable , __super__ ) ;
6904
6904
6905
- function subscribe ( observer ) {
6906
- return this . source . subscribe ( observer ) ;
6907
- }
6908
-
6909
- function ControlledObservable ( source , enableQueue , scheduler ) {
6910
- __super__ . call ( this , subscribe , source ) ;
6911
- this . subject = new ControlledSubject ( enableQueue , scheduler ) ;
6912
- this . source = source . multicast ( this . subject ) . refCount ( ) ;
6913
- }
6914
-
6915
- ControlledObservable . prototype . request = function ( numberOfItems ) {
6916
- return this . subject . request ( numberOfItems == null ? - 1 : numberOfItems ) ;
6917
- } ;
6918
-
6919
- return ControlledObservable ;
6905
+ function subscribe ( observer ) {
6906
+ return this . source . subscribe ( observer ) ;
6907
+ }
6920
6908
6921
- } ( Observable ) ) ;
6909
+ function ControlledObservable ( source , enableQueue , scheduler ) {
6910
+ __super__ . call ( this , subscribe , source ) ;
6911
+ this . subject = new ControlledSubject ( enableQueue , scheduler ) ;
6912
+ this . source = source . multicast ( this . subject ) . refCount ( ) ;
6913
+ }
6922
6914
6923
- var ControlledSubject = ( function ( __super__ ) {
6915
+ ControlledObservable . prototype . request = function ( numberOfItems ) {
6916
+ return this . subject . request ( numberOfItems == null ? - 1 : numberOfItems ) ;
6917
+ } ;
6924
6918
6925
- function subscribe ( observer ) {
6926
- return this . subject . subscribe ( observer ) ;
6927
- }
6919
+ return ControlledObservable ;
6928
6920
6929
- inherits ( ControlledSubject , __super__ ) ;
6921
+ } ( Observable ) ) ;
6930
6922
6931
- function ControlledSubject ( enableQueue , scheduler ) {
6932
- enableQueue == null && ( enableQueue = true ) ;
6923
+ var ControlledSubject = ( function ( __super__ ) {
6933
6924
6934
- __super__ . call ( this , subscribe ) ;
6935
- this . subject = new Subject ( ) ;
6936
- this . enableQueue = enableQueue ;
6937
- this . queue = enableQueue ? [ ] : null ;
6938
- this . requestedCount = 0 ;
6939
- this . requestedDisposable = disposableEmpty ;
6940
- this . error = null ;
6941
- this . hasFailed = false ;
6942
- this . hasCompleted = false ;
6943
- this . scheduler = scheduler || currentThreadScheduler ;
6944
- }
6925
+ function subscribe ( observer ) {
6926
+ return this . subject . subscribe ( observer ) ;
6927
+ }
6945
6928
6946
- addProperties ( ControlledSubject . prototype , Observer , {
6947
- onCompleted : function ( ) {
6948
- this . hasCompleted = true ;
6949
- if ( ! this . enableQueue || this . queue . length === 0 ) {
6950
- this . subject . onCompleted ( ) ;
6951
- } else {
6952
- this . queue . push ( Notification . createOnCompleted ( ) ) ;
6953
- }
6954
- } ,
6955
- onError : function ( error ) {
6956
- this . hasFailed = true ;
6957
- this . error = error ;
6958
- if ( ! this . enableQueue || this . queue . length === 0 ) {
6959
- this . subject . onError ( error ) ;
6960
- } else {
6961
- this . queue . push ( Notification . createOnError ( error ) ) ;
6962
- }
6963
- } ,
6964
- onNext : function ( value ) {
6965
- var hasRequested = false ;
6929
+ inherits ( ControlledSubject , __super__ ) ;
6930
+
6931
+ function ControlledSubject ( enableQueue , scheduler ) {
6932
+ enableQueue == null && ( enableQueue = true ) ;
6933
+
6934
+ __super__ . call ( this , subscribe ) ;
6935
+ this . subject = new Subject ( ) ;
6936
+ this . enableQueue = enableQueue ;
6937
+ this . queue = enableQueue ? [ ] : null ;
6938
+ this . requestedCount = 0 ;
6939
+ this . requestedDisposable = null ;
6940
+ this . error = null ;
6941
+ this . hasFailed = false ;
6942
+ this . hasCompleted = false ;
6943
+ this . scheduler = scheduler || currentThreadScheduler ;
6944
+ }
6966
6945
6967
- if ( this . requestedCount === 0 ) {
6968
- this . enableQueue && this . queue . push ( Notification . createOnNext ( value ) ) ;
6969
- } else {
6970
- ( this . requestedCount !== - 1 && this . requestedCount -- === 0 ) && this . disposeCurrentRequest ( ) ;
6971
- hasRequested = true ;
6972
- }
6973
- hasRequested && this . subject . onNext ( value ) ;
6974
- } ,
6975
- _processRequest : function ( numberOfItems ) {
6976
- if ( this . enableQueue ) {
6977
- while ( ( this . queue . length >= numberOfItems && numberOfItems > 0 ) ||
6978
- ( this . queue . length > 0 && this . queue [ 0 ] . kind !== 'N' ) ) {
6979
- var first = this . queue . shift ( ) ;
6980
- first . accept ( this . subject ) ;
6981
- if ( first . kind === 'N' ) {
6982
- numberOfItems -- ;
6983
- } else {
6984
- this . disposeCurrentRequest ( ) ;
6985
- this . queue = [ ] ;
6986
- }
6946
+ addProperties ( ControlledSubject . prototype , Observer , {
6947
+ onCompleted : function ( ) {
6948
+ this . hasCompleted = true ;
6949
+ if ( ! this . enableQueue || this . queue . length === 0 ) {
6950
+ this . subject . onCompleted ( ) ;
6951
+ this . disposeCurrentRequest ( )
6952
+ } else {
6953
+ this . queue . push ( Notification . createOnCompleted ( ) ) ;
6954
+ }
6955
+ } ,
6956
+ onError : function ( error ) {
6957
+ this . hasFailed = true ;
6958
+ this . error = error ;
6959
+ if ( ! this . enableQueue || this . queue . length === 0 ) {
6960
+ this . subject . onError ( error ) ;
6961
+ this . disposeCurrentRequest ( )
6962
+ } else {
6963
+ this . queue . push ( Notification . createOnError ( error ) ) ;
6964
+ }
6965
+ } ,
6966
+ onNext : function ( value ) {
6967
+ if ( this . requestedCount <= 0 ) {
6968
+ this . enableQueue && this . queue . push ( Notification . createOnNext ( value ) ) ;
6969
+ } else {
6970
+ ( this . requestedCount -- === 0 ) && this . disposeCurrentRequest ( ) ;
6971
+ this . subject . onNext ( value ) ;
6972
+ }
6973
+ } ,
6974
+ _processRequest : function ( numberOfItems ) {
6975
+ if ( this . enableQueue ) {
6976
+ while ( this . queue . length > 0 && ( numberOfItems > 0 || this . queue [ 0 ] . kind !== 'N' ) ) {
6977
+ var first = this . queue . shift ( ) ;
6978
+ first . accept ( this . subject ) ;
6979
+ if ( first . kind === 'N' ) {
6980
+ numberOfItems -- ;
6981
+ } else {
6982
+ this . disposeCurrentRequest ( ) ;
6983
+ this . queue = [ ] ;
6987
6984
}
6988
-
6989
- return { numberOfItems : numberOfItems , returnValue : this . queue . length !== 0 } ;
6990
6985
}
6986
+ }
6991
6987
6992
- return { numberOfItems : numberOfItems , returnValue : false } ;
6993
- } ,
6994
- request : function ( number ) {
6995
- this . disposeCurrentRequest ( ) ;
6996
- var self = this ;
6988
+ return numberOfItems ;
6989
+ } ,
6990
+ request : function ( number ) {
6991
+ this . disposeCurrentRequest ( ) ;
6992
+ var self = this ;
6997
6993
6998
- this . requestedDisposable = this . scheduler . scheduleWithState ( number ,
6999
- function ( s , i ) {
7000
- var r = self . _processRequest ( i ) , remaining = r . numberOfItems ;
7001
- if ( ! r . returnValue ) {
7002
- self . requestedCount = remaining ;
7003
- self . requestedDisposable = disposableCreate ( function ( ) {
7004
- self . requestedCount = 0 ;
7005
- } ) ;
7006
- }
7007
- } ) ;
6994
+ this . requestedDisposable = this . scheduler . scheduleWithState ( number ,
6995
+ function ( s , i ) {
6996
+ var remaining = self . _processRequest ( i ) ;
6997
+ var stopped = self . hasCompleted || self . hasFailed
6998
+ if ( ! stopped && remaining > 0 ) {
6999
+ self . requestedCount = remaining ;
7008
7000
7009
- return this . requestedDisposable ;
7010
- } ,
7011
- disposeCurrentRequest : function ( ) {
7001
+ return disposableCreate ( function ( ) {
7002
+ self . requestedCount = 0 ;
7003
+ } ) ;
7004
+ // Scheduled item is still in progress. Return a new
7005
+ // disposable to allow the request to be interrupted
7006
+ // via dispose.
7007
+ }
7008
+ } ) ;
7009
+
7010
+ return this . requestedDisposable ;
7011
+ } ,
7012
+ disposeCurrentRequest : function ( ) {
7013
+ if ( this . requestedDisposable ) {
7012
7014
this . requestedDisposable . dispose ( ) ;
7013
- this . requestedDisposable = disposableEmpty ;
7015
+ this . requestedDisposable = null ;
7014
7016
}
7015
- } ) ;
7017
+ }
7018
+ } ) ;
7016
7019
7017
- return ControlledSubject ;
7018
- } ( Observable ) ) ;
7020
+ return ControlledSubject ;
7021
+ } ( Observable ) ) ;
7019
7022
7020
- /**
7021
- * Attaches a controller to the observable sequence with the ability to queue.
7022
- * @example
7023
- * var source = Rx.Observable.interval(100).controlled();
7024
- * source.request(3); // Reads 3 values
7025
- * @param {bool } enableQueue truthy value to determine if values should be queued pending the next request
7026
- * @param {Scheduler } scheduler determines how the requests will be scheduled
7027
- * @returns {Observable } The observable sequence which only propagates values on request.
7028
- */
7029
- observableProto . controlled = function ( enableQueue , scheduler ) {
7023
+ /**
7024
+ * Attaches a controller to the observable sequence with the ability to queue.
7025
+ * @example
7026
+ * var source = Rx.Observable.interval(100).controlled();
7027
+ * source.request(3); // Reads 3 values
7028
+ * @param {bool } enableQueue truthy value to determine if values should be queued pending the next request
7029
+ * @param {Scheduler } scheduler determines how the requests will be scheduled
7030
+ * @returns {Observable } The observable sequence which only propagates values on request.
7031
+ */
7032
+ observableProto . controlled = function ( enableQueue , scheduler ) {
7030
7033
7031
- if ( enableQueue && isScheduler ( enableQueue ) ) {
7032
- scheduler = enableQueue ;
7033
- enableQueue = true ;
7034
- }
7034
+ if ( enableQueue && isScheduler ( enableQueue ) ) {
7035
+ scheduler = enableQueue ;
7036
+ enableQueue = true ;
7037
+ }
7035
7038
7036
- if ( enableQueue == null ) { enableQueue = true ; }
7037
- return new ControlledObservable ( this , enableQueue , scheduler ) ;
7038
- } ;
7039
+ if ( enableQueue == null ) { enableQueue = true ; }
7040
+ return new ControlledObservable ( this , enableQueue , scheduler ) ;
7041
+ } ;
7039
7042
7040
7043
var StopAndWaitObservable = ( function ( __super__ ) {
7041
7044
0 commit comments