Skip to content

Commit 3e9d529

Browse files
jayphelpsbenlesh
authored andcommitted
fix(timeout): Cancels scheduled timeout, if no longer needed
* fix(timeout): Cancels scheduled timeout, if no longer needed fixes #2134 * fix(timeoutWith): Cancels scheduled timeout, if no longer needed * build(npm-scripts): update debug_mocha npm script for node 6 * fix(VirtualAction): Block rescheduled VirtualActions from executing their scheduled work. VirtualActions are immutable so they can be inspected by the TestScheduler. In order to mirror rescheduled stateful Actions, rescheduled VirtualActions shouldn't execute if they've been rescheduled before execution. * fix(timeout): Update timeout and timeoutWith to recycle their scheduled timeout actions. The timeout and timeoutWith operators should dispose their scheduled timeout actions on unsubscription. Also, given the new scheduling architecture, they can recycle their scheduled actions so just one action is allocated per subscription. * test(timeout): Add types to timeout and timeoutWith specs * Fix merge conflicts * Fix timeoutWith to work with new Subscriber leak fix. * fix(timeout-spec): fix merge conflicts * fix(Subscription): fold ChildSubscription logic into Subscriber to prevent operators from leaking ChildSubscriptions. (#2360) The addition of ChildSubscription to fix #2244 accidentally introduced a different memory leak. Most operators that add and remove inner Subscriptions store the inner Subscriber instance, not the value returned by Subscription#add. When they try to remove the inner Subscription manually, nothing is removed, because the ChildSubscription wrapper instance is the one added to the subscriptions list. Fixes #2355 * chore(publish): 5.1.1 * Ignore coverage It's 5.5mb that people installing this don't need :) * feat(AjaxObservable) : support 'PATCH' request type Add support of the 'PATCH' request type based on the already existing 'PUT' request. * fix(subscribeToResult): accept array-like as result Accept array-like as a result to subscribe, so that Observable.from and operators using subscribeToResult have identical behaviour. * chore(ajax.patch): Adds test for ajax.patch * fix(forkJoin): add type signature for single observable with selector Add type signature for using forkJoin with single observable as first parameter and selector function as second parameter, so that typings would not prevent usage which is permitted and properly handled by operator. Closes #2347 * feat(webSocket): Add binaryType to config object Add binaryType to config object, so that it is possible to set that parameter on underlying socket before any data emits happen. Closes #2353 * fix(merge): return Observable when called with single lowerCaseO Return Observable when merge is called with single lower case observable, so that merge would always return Observable instance. * fix(bindNodeCallback): emit undefined when callback has no success arguments Emit undefined insteady of empty array by resulting Observable, when callback function is called without success parameters. Closes #2254 * chore(danger): update dangerfile to validate commit message * chore(*): correctly scope disabled `max-line-length` tslint rule The max line length is set to 150 in 'tslint.json'. In specific regions, it is desirable to allow longer lines, so these regions should be wrapped in comments like the following: ```js // Max line length enforced here. /* tslint:disable:max-line-length */ // Max line length NOT enforced here. /* tslint:enable:max-line-length */ <-- CORRECT // Max line length enforced here. ``` In many cases, the re-enabling comment incorrectly included `disable` instead of `enable` (as shown below), which essentially keeps the `max-line-length` **disabled** for the rest of the file: ```js // Max line length enforced here. /* tslint:disable:max-line-length */ // Max line length NOT enforced here. /* tslint:disable:max-line-length */ <-- INCORRECT // Max line length NOT enforced here. ``` This commit fixes these comments, so the `max-line-length` rule is properly enforced in regions that don't need longer lines. * fix(bindCallback): emit undefined when callback is without arguments In resulting Observable emit undefined when callback is called without parameters, instead of emitting empty array. * fix(mergeAll): introduce variant support <T, R> for mergeMap - closes #2372 * feat(windowTime): maxWindowSize parameter in windowTime operator Adds new parameter in windowTime operator to control how much values given window can emit. Closes #1301 * docs(ObservableInput): add ObservableInput and SubscribableOrPromise descriptions Add ObservableInput and SubscribableOrPromise interface descriptions, as well as link these interfaces in type descriptions of operators, so that users always know what kind of parameters they can pass to used methods. * fix(timeoutWith): update timeoutWith to work with new Subscriber leak fix changes
1 parent 3ced24f commit 3e9d529

File tree

7 files changed

+128
-82
lines changed

7 files changed

+128
-82
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
"prepublish": "shx rm -rf ./typings && typings install && npm run build_all",
9292
"publish_docs": "./publish_docs.sh",
9393
"test_mocha": "mocha --opts spec/support/default.opts spec-js",
94-
"debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js",
94+
"debug_mocha": "node --inspect --debug-brk ./node_modules/.bin/_mocha --opts spec/support/debug.opts spec-js",
9595
"test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html",
9696
"test": "npm-run-all clean_spec build_spec test_mocha clean_spec",
9797
"tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js",

spec/operators/timeout-spec.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import * as Rx from '../../dist/cjs/Rx';
21
import { expect } from 'chai';
2+
import * as Rx from '../../dist/cjs/Rx';
33
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports
44

55
declare const { asDiagram };
6+
declare const rxTestScheduler: Rx.TestScheduler;
67
declare const hot: typeof marbleTestingSignature.hot;
78
declare const cold: typeof marbleTestingSignature.cold;
89
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
910
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;
1011

11-
declare const rxTestScheduler: Rx.TestScheduler;
1212
const Observable = Rx.Observable;
1313

1414
/** @test {timeout} */
@@ -121,4 +121,28 @@ describe('Observable.prototype.timeout', () => {
121121
expectObservable(result).toBe(expected, values, defaultTimeoutError);
122122
expectSubscriptions(e1.subscriptions).toBe(e1subs);
123123
});
124+
125+
it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
126+
const e1 = hot('--a--b--c---d--e--|');
127+
const e1subs = '^ ! ';
128+
const expected = '--a--b--c-- ';
129+
const unsub = ' ! ';
130+
131+
const result = e1
132+
.lift({
133+
call: (timeoutSubscriber, source) => {
134+
const { action } = <any> timeoutSubscriber; // get a ref to the action here
135+
timeoutSubscriber.add(() => { // because it'll be null by the
136+
if (!action.closed) { // time we get into this function.
137+
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
138+
}
139+
});
140+
return source.subscribe(timeoutSubscriber);
141+
}
142+
})
143+
.timeout(50, rxTestScheduler);
144+
145+
expectObservable(result, unsub).toBe(expected);
146+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
147+
});
124148
});

spec/operators/timeoutWith-spec.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ import * as Rx from '../../dist/cjs/Rx';
22
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports
33

44
declare const { asDiagram };
5+
declare const rxTestScheduler: Rx.TestScheduler;
56
declare const hot: typeof marbleTestingSignature.hot;
67
declare const cold: typeof marbleTestingSignature.cold;
78
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
89
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;
910

10-
declare const rxTestScheduler: Rx.TestScheduler;
1111
const Observable = Rx.Observable;
1212

1313
/** @test {timeoutWith} */
@@ -266,4 +266,31 @@ describe('Observable.prototype.timeoutWith', () => {
266266
expectSubscriptions(e1.subscriptions).toBe(e1subs);
267267
expectSubscriptions(e2.subscriptions).toBe(e2subs);
268268
});
269+
270+
it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
271+
const e1 = hot('---a---b-----c----|');
272+
const e1subs = '^ ! ';
273+
const e2 = cold( '-x---y| ');
274+
const e2subs = ' ^ ! ';
275+
const expected = '---a---b----x-- ';
276+
const unsub = ' ! ';
277+
278+
const result = e1
279+
.lift({
280+
call: (timeoutSubscriber, source) => {
281+
const { action } = <any> timeoutSubscriber; // get a ref to the action here
282+
timeoutSubscriber.add(() => { // because it'll be null by the
283+
if (!action.closed) { // time we get into this function.
284+
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
285+
}
286+
});
287+
return source.subscribe(timeoutSubscriber);
288+
}
289+
})
290+
.timeoutWith(40, e2, rxTestScheduler);
291+
292+
expectObservable(result, unsub).toBe(expected);
293+
expectSubscriptions(e1.subscriptions).toBe(e1subs);
294+
expectSubscriptions(e2.subscriptions).toBe(e2subs);
295+
});
269296
});

spec/schedulers/VirtualTimeScheduler-spec.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => {
8080
v.flush();
8181
expect(count).to.equal(3);
8282
});
83-
});
83+
84+
it('should not execute virtual actions that have been rescheduled before flush', () => {
85+
const v = new VirtualTimeScheduler();
86+
let messages = [];
87+
let action: VirtualAction<string> = <VirtualAction<string>> v.schedule(function(state: string) {
88+
messages.push(state);
89+
}, 10, 'first message');
90+
action = <VirtualAction<string>> action.schedule('second message' , 10);
91+
v.flush();
92+
expect(messages).to.deep.equal(['second message']);
93+
});
94+
});

src/operator/timeout.ts

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { Action } from '../scheduler/Action';
12
import { async } from '../scheduler/async';
23
import { isDate } from '../util/isDate';
34
import { Operator } from '../Operator';
@@ -42,15 +43,8 @@ class TimeoutOperator<T> implements Operator<T, T> {
4243
* @extends {Ignored}
4344
*/
4445
class TimeoutSubscriber<T> extends Subscriber<T> {
45-
private index: number = 0;
46-
private _previousIndex: number = 0;
47-
get previousIndex(): number {
48-
return this._previousIndex;
49-
}
50-
private _hasCompleted: boolean = false;
51-
get hasCompleted(): boolean {
52-
return this._hasCompleted;
53-
}
46+
47+
private action: Action<TimeoutSubscriber<T>> = null;
5448

5549
constructor(destination: Subscriber<T>,
5650
private absoluteTimeout: boolean,
@@ -61,40 +55,36 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
6155
this.scheduleTimeout();
6256
}
6357

64-
private static dispatchTimeout(state: any): void {
65-
const source = state.subscriber;
66-
const currentIndex = state.index;
67-
if (!source.hasCompleted && source.previousIndex === currentIndex) {
68-
source.notifyTimeout();
69-
}
58+
private static dispatchTimeout<T>(subscriber: TimeoutSubscriber<T>): void {
59+
subscriber.error(subscriber.errorInstance);
7060
}
7161

7262
private scheduleTimeout(): void {
73-
let currentIndex = this.index;
74-
this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
75-
this.index++;
76-
this._previousIndex = currentIndex;
63+
const { action } = this;
64+
if (action) {
65+
// Recycle the action if we've already scheduled one. All the production
66+
// Scheduler Actions mutate their state/delay time and return themeselves.
67+
// VirtualActions are immutable, so they create and return a clone. In this
68+
// case, we need to set the action reference to the most recent VirtualAction,
69+
// to ensure that's the one we clone from next time.
70+
this.action = (<Action<TimeoutSubscriber<T>>> action.schedule(this, this.waitFor));
71+
} else {
72+
this.add(this.action = (<Action<TimeoutSubscriber<T>>> this.scheduler.schedule(
73+
TimeoutSubscriber.dispatchTimeout, this.waitFor, this
74+
)));
75+
}
7776
}
7877

7978
protected _next(value: T): void {
80-
this.destination.next(value);
81-
8279
if (!this.absoluteTimeout) {
8380
this.scheduleTimeout();
8481
}
82+
super._next(value);
8583
}
8684

87-
protected _error(err: any): void {
88-
this.destination.error(err);
89-
this._hasCompleted = true;
90-
}
91-
92-
protected _complete(): void {
93-
this.destination.complete();
94-
this._hasCompleted = true;
95-
}
96-
97-
notifyTimeout(): void {
98-
this.error(this.errorInstance);
85+
protected _unsubscribe() {
86+
this.action = null;
87+
this.scheduler = null;
88+
this.errorInstance = null;
9989
}
10090
}

src/operator/timeoutWith.ts

Lines changed: 29 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
import { Action } from '../scheduler/Action';
12
import { Operator } from '../Operator';
23
import { Subscriber } from '../Subscriber';
34
import { IScheduler } from '../Scheduler';
45
import { async } from '../scheduler/async';
5-
import { Subscription, TeardownLogic } from '../Subscription';
6+
import { TeardownLogic } from '../Subscription';
67
import { Observable, ObservableInput } from '../Observable';
78
import { isDate } from '../util/isDate';
89
import { OuterSubscriber } from '../OuterSubscriber';
@@ -49,65 +50,50 @@ class TimeoutWithOperator<T> implements Operator<T, T> {
4950
* @extends {Ignored}
5051
*/
5152
class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
52-
private timeoutSubscription: Subscription = undefined;
53-
private index: number = 0;
54-
private _previousIndex: number = 0;
55-
get previousIndex(): number {
56-
return this._previousIndex;
57-
}
58-
private _hasCompleted: boolean = false;
59-
get hasCompleted(): boolean {
60-
return this._hasCompleted;
61-
}
6253

63-
constructor(public destination: Subscriber<T>,
54+
private action: Action<TimeoutWithSubscriber<T, R>> = null;
55+
56+
constructor(destination: Subscriber<T>,
6457
private absoluteTimeout: boolean,
6558
private waitFor: number,
6659
private withObservable: ObservableInput<any>,
6760
private scheduler: IScheduler) {
68-
super();
69-
destination.add(this);
61+
super(destination);
7062
this.scheduleTimeout();
7163
}
7264

73-
private static dispatchTimeout(state: any): void {
74-
const source = state.subscriber;
75-
const currentIndex = state.index;
76-
if (!source.hasCompleted && source.previousIndex === currentIndex) {
77-
source.handleTimeout();
78-
}
65+
private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
66+
const { withObservable } = subscriber;
67+
(<any> subscriber)._unsubscribeAndRecycle();
68+
subscriber.add(subscribeToResult(subscriber, withObservable));
7969
}
8070

8171
private scheduleTimeout(): void {
82-
let currentIndex = this.index;
83-
const timeoutState = { subscriber: this, index: currentIndex };
84-
this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState);
85-
this.index++;
86-
this._previousIndex = currentIndex;
72+
const { action } = this;
73+
if (action) {
74+
// Recycle the action if we've already scheduled one. All the production
75+
// Scheduler Actions mutate their state/delay time and return themeselves.
76+
// VirtualActions are immutable, so they create and return a clone. In this
77+
// case, we need to set the action reference to the most recent VirtualAction,
78+
// to ensure that's the one we clone from next time.
79+
this.action = (<Action<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
80+
} else {
81+
this.add(this.action = (<Action<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule(
82+
TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this
83+
)));
84+
}
8785
}
8886

89-
protected _next(value: T) {
90-
this.destination.next(value);
87+
protected _next(value: T): void {
9188
if (!this.absoluteTimeout) {
9289
this.scheduleTimeout();
9390
}
91+
super._next(value);
9492
}
9593

96-
protected _error(err: any) {
97-
this.destination.error(err);
98-
this._hasCompleted = true;
99-
}
100-
101-
protected _complete() {
102-
this.destination.complete();
103-
this._hasCompleted = true;
104-
}
105-
106-
handleTimeout(): void {
107-
if (!this.closed) {
108-
const withObservable = this.withObservable;
109-
this.unsubscribe();
110-
this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable));
111-
}
94+
protected _unsubscribe() {
95+
this.action = null;
96+
this.scheduler = null;
97+
this.withObservable = null;
11298
}
11399
}

src/scheduler/VirtualTimeScheduler.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ export class VirtualTimeScheduler extends AsyncScheduler {
4646
*/
4747
export class VirtualAction<T> extends AsyncAction<T> {
4848

49+
protected active: boolean = true;
50+
4951
constructor(protected scheduler: VirtualTimeScheduler,
5052
protected work: (this: VirtualAction<T>, state?: T) => void,
5153
protected index: number = scheduler.index += 1) {
@@ -57,7 +59,7 @@ export class VirtualAction<T> extends AsyncAction<T> {
5759
if (!this.id) {
5860
return super.schedule(state, delay);
5961
}
60-
62+
this.active = false;
6163
// If an action is rescheduled, we save allocations by mutating its state,
6264
// pushing it to the end of the scheduler queue, and recycling the action.
6365
// But since the VirtualTimeScheduler is used for testing, VirtualActions
@@ -79,6 +81,12 @@ export class VirtualAction<T> extends AsyncAction<T> {
7981
return undefined;
8082
}
8183

84+
protected _execute(state: T, delay: number): any {
85+
if (this.active === true) {
86+
return super._execute(state, delay);
87+
}
88+
}
89+
8290
public static sortActions<T>(a: VirtualAction<T>, b: VirtualAction<T>) {
8391
if (a.delay === b.delay) {
8492
if (a.index === b.index) {

0 commit comments

Comments
 (0)