Skip to content

Commit a03b173

Browse files
committed
Set connection status to ERROR when closed due to protocol error
Amends DuplexConnection#close to accept an optional error indicating that the connection is being closed due to that error. Updates implementations to handle this error and report it to consumers. Updates RSocketMachine to pass protocol-level connection errors to close Adds/updates tests to check for handling this parameter Signed-off-by: Stephen Cohen <[email protected]>
1 parent a85a4db commit a03b173

File tree

13 files changed

+206
-63
lines changed

13 files changed

+206
-63
lines changed

packages/rsocket-core/src/RSocketMachine.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -598,8 +598,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
598598
};
599599

600600
_handleConnectionError(error: Error): void {
601-
this._handleError(error);
602-
this._connection.close();
601+
this._connection.close(error);
603602
const errorHandler = this._errorHandler;
604603
if (errorHandler) {
605604
errorHandler(error);

packages/rsocket-core/src/RSocketResumableTransport.js

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ export default class RSocketResumableTransport implements DuplexConnection {
156156
this._statusSubscribers = new Set();
157157
}
158158

159-
close(): void {
160-
this._close();
159+
close(error?: Error): void {
160+
this._close(error);
161161
}
162162

163163
connect(): void {
@@ -275,13 +275,18 @@ export default class RSocketResumableTransport implements DuplexConnection {
275275
if (this._isTerminated()) {
276276
return;
277277
}
278-
if (error) {
279-
this._setConnectionStatus({error, kind: 'ERROR'});
280-
} else {
281-
this._setConnectionStatus(CONNECTION_STATUS.CLOSED);
282-
}
278+
279+
const status = error ? {error, kind: 'ERROR'} : CONNECTION_STATUS.CLOSED;
280+
this._setConnectionStatus(status);
281+
283282
const receivers = this._receivers;
284-
receivers.forEach(r => r.onComplete());
283+
receivers.forEach(subscriber => {
284+
if (error) {
285+
subscriber.onError(error);
286+
} else {
287+
subscriber.onComplete();
288+
}
289+
});
285290
receivers.clear();
286291

287292
const senders = this._senders;

packages/rsocket-core/src/ReassemblyDuplexConnection.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ export class ReassemblyDuplexConnection implements DuplexConnection {
3737
.lift(actual => new ReassemblySubscriber(actual));
3838
}
3939

40-
close(): void {
41-
this._source.close();
40+
close(error?: Error): void {
41+
this._source.close(error);
4242
}
4343

4444
connect(): void {

packages/rsocket-core/src/__mocks__/MockDuplexConnection.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ export function genMockConnection() {
2828
let closed = false;
2929

3030
const connection = {
31-
close: jest.fn(() => {
32-
connection.mock.close();
31+
close: jest.fn(error => {
32+
if (error) {
33+
connection.mock.closeWithError(error);
34+
} else {
35+
connection.mock.close();
36+
}
3337
}),
3438
connect: jest.fn(),
3539
connectionStatus: jest.fn(() => status),

packages/rsocket-core/src/__tests__/RSocketClient-test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ describe('RSocketClient', () => {
310310
expect(errors.values().next().value).toEqual(
311311
`No keep-alive acks for ${keepAliveTimeout} millis`,
312312
);
313-
expect(status.kind).toEqual('CLOSED');
313+
expect(status.kind).toEqual('ERROR');
314314

315315
jest.advanceTimersByTime(keepAliveTimeout);
316316
});

packages/rsocket-core/src/__tests__/RSocketResumableTransport-test.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,4 +687,59 @@ describe('RSocketResumableTransport', () => {
687687
expect(currentTransport.sendOne.mock.calls.length).toBe(0);
688688
});
689689
});
690+
691+
describe('post-connect() APIs', () => {
692+
beforeEach(() => {
693+
resumableTransport.connect();
694+
currentTransport.mock.connect();
695+
});
696+
697+
describe('close()', () => {
698+
describe('given an error', () => {
699+
it('closes the transport', () => {
700+
resumableTransport.close(new Error());
701+
expect(currentTransport.close.mock.calls.length).toBe(1);
702+
});
703+
704+
it('sets the status to ERROR with the given error', () => {
705+
const error = new Error();
706+
resumableTransport.close(error);
707+
expect(resumableStatus.kind).toBe('ERROR');
708+
expect(resumableStatus.error).toBe(error);
709+
});
710+
711+
it('calls receive.onError with the given error', () => {
712+
const onError = jest.fn();
713+
const onSubscribe = subscription =>
714+
subscription.request(Number.MAX_SAFE_INTEGER);
715+
resumableTransport.receive().subscribe({onError, onSubscribe});
716+
const error = new Error();
717+
resumableTransport.close(error);
718+
expect(onError.mock.calls.length).toBe(1);
719+
expect(onError.mock.calls[0][0]).toBe(error);
720+
});
721+
});
722+
723+
describe('not given an error', () => {
724+
it('closes the transport', () => {
725+
resumableTransport.close();
726+
expect(currentTransport.close.mock.calls.length).toBe(1);
727+
});
728+
729+
it('sets the status to CLOSED', () => {
730+
resumableTransport.close();
731+
expect(resumableStatus.kind).toBe('CLOSED');
732+
});
733+
734+
it('calls receive.onComplete', () => {
735+
const onComplete = jest.fn();
736+
const onSubscribe = subscription =>
737+
subscription.request(Number.MAX_SAFE_INTEGER);
738+
resumableTransport.receive().subscribe({onComplete, onSubscribe});
739+
resumableTransport.close();
740+
expect(onComplete.mock.calls.length).toBe(1);
741+
});
742+
});
743+
});
744+
});
690745
});

packages/rsocket-tcp-client/src/RSocketTcpClient.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ export class RSocketTcpConnection implements DuplexConnection {
6060
}
6161
}
6262

63-
close(): void {
64-
this._close();
63+
close(error?: Error): void {
64+
this._close(error);
6565
}
6666

6767
connect(): void {

packages/rsocket-tcp-client/src/__tests__/RSocketTcpClient-test.js

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,62 @@ describe('RSocketTcpClient', () => {
9898
});
9999

100100
describe('close()', () => {
101-
it('closes the socket', () => {
102-
client.close();
103-
expect(socket.end.mock.calls.length).toBe(1);
104-
});
101+
describe('given an error', () => {
102+
it('closes the socket', () => {
103+
client.close(new Error());
104+
expect(socket.end.mock.calls.length).toBe(1);
105+
});
105106

106-
it('sets the status to CLOSED', () => {
107-
let status;
108-
client.connectionStatus().subscribe({
109-
onNext: _status => (status = _status),
110-
onSubscribe: subscription =>
111-
subscription.request(Number.MAX_SAFE_INTEGER),
107+
it('sets the status to ERROR with the given error', () => {
108+
let status;
109+
client.connectionStatus().subscribe({
110+
onNext: _status => (status = _status),
111+
onSubscribe: subscription =>
112+
subscription.request(Number.MAX_SAFE_INTEGER),
113+
});
114+
const error = new Error();
115+
client.close(error);
116+
expect(status.kind).toBe('ERROR');
117+
expect(status.error).toBe(error);
118+
});
119+
120+
it('calls receive.onError with the given error', () => {
121+
const onError = jest.fn();
122+
const onSubscribe = subscription =>
123+
subscription.request(Number.MAX_SAFE_INTEGER);
124+
client.receive().subscribe({onError, onSubscribe});
125+
const error = new Error();
126+
client.close(error);
127+
expect(onError.mock.calls.length).toBe(1);
128+
expect(onError.mock.calls[0][0]).toBe(error);
112129
});
113-
client.close();
114-
expect(status.kind).toBe('CLOSED');
115130
});
116131

117-
it('calls receive.onComplete', () => {
118-
const onComplete = jest.fn();
119-
const onSubscribe = subscription =>
120-
subscription.request(Number.MAX_SAFE_INTEGER);
121-
client.receive().subscribe({onComplete, onSubscribe});
122-
client.close();
123-
expect(onComplete.mock.calls.length).toBe(1);
132+
describe('not given an error', () => {
133+
it('closes the socket', () => {
134+
client.close();
135+
expect(socket.end.mock.calls.length).toBe(1);
136+
});
137+
138+
it('sets the status to CLOSED', () => {
139+
let status;
140+
client.connectionStatus().subscribe({
141+
onNext: _status => (status = _status),
142+
onSubscribe: subscription =>
143+
subscription.request(Number.MAX_SAFE_INTEGER),
144+
});
145+
client.close();
146+
expect(status.kind).toBe('CLOSED');
147+
});
148+
149+
it('calls receive.onComplete', () => {
150+
const onComplete = jest.fn();
151+
const onSubscribe = subscription =>
152+
subscription.request(Number.MAX_SAFE_INTEGER);
153+
client.receive().subscribe({onComplete, onSubscribe});
154+
client.close();
155+
expect(onComplete.mock.calls.length).toBe(1);
156+
});
124157
});
125158
});
126159

packages/rsocket-types/src/ReactiveSocketTypes.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,11 @@ export interface DuplexConnection {
118118
receive(): Flowable<Frame>,
119119

120120
/**
121-
* Close the underlying connection, emitting `onComplete` on the receive()
122-
* Publisher.
121+
* Close the underlying connection, optionally providing an error as reason.
122+
* If an error is passed, emits `onError` on the receive() Publisher.
123+
* If no error is passed, emits `onComplete` on the receive() Publisher.
123124
*/
124-
close(): void,
125+
close(error?: Error): void,
125126

126127
/**
127128
* Open the underlying connection. Throws if the connection is already in

packages/rsocket-websocket-client/src/RSocketWebSocketClient.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ export default class RSocketWebSocketClient implements DuplexConnection {
6262
this._statusSubscribers = new Set();
6363
}
6464

65-
close(): void {
66-
this._close();
65+
close(error?: Error): void {
66+
this._close(error);
6767
}
6868

6969
connect(): void {

0 commit comments

Comments
 (0)