Skip to content

Commit 6294e36

Browse files
committed
Make sure INIT error is handled correctly with 3.0 server
Neo4j 3.1 closes connection on INIT error. This made it possible to handle such errors in driver as fatal because we do not expect to receive anything else from the server. However Neo4j 3.0 does not close the connection after INIT failure and thus connection might receive more incoming messages. This commit makes sure all incoming messages are ignored after a fatal error.
1 parent 805292f commit 6294e36

File tree

2 files changed

+47
-20
lines changed

2 files changed

+47
-20
lines changed

src/v1/internal/connector.js

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {alloc} from './buf';
2424
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types';
2525
import {newError} from './../error';
2626
import ChannelConfig from './ch-config';
27+
import StreamObserver from './stream-observer';
2728

2829
let Channel;
2930
if( NodeChannel.available ) {
@@ -57,7 +58,7 @@ UNBOUND_RELATIONSHIP = 0x72,
5758
PATH = 0x50,
5859
//sent before version negotiation
5960
MAGIC_PREAMBLE = 0x6060B017,
60-
DEBUG = true;
61+
DEBUG = false;
6162

6263
let URLREGEX = new RegExp([
6364
"([^/]+//)?", // scheme
@@ -272,7 +273,7 @@ class Connection {
272273
* failing, and the connection getting ejected from the session pool.
273274
*
274275
* @param err an error object, forwarded to all current and future subscribers
275-
* @private
276+
* @protected
276277
*/
277278
_handleFatalError( err ) {
278279
this._isBroken = true;
@@ -288,21 +289,13 @@ class Connection {
288289
}
289290
}
290291

291-
/**
292-
* Mark this connection as failed because processing of INIT message failed and server will close the connection.
293-
* Initialization failure is a fatal error for the connection.
294-
* @param {Neo4jError} error the initialization error.
295-
* @param {StreamObserver} initObserver the initialization observer that noticed the failure.
296-
* @protected
297-
*/
298-
_initializationFailed(error, initObserver) {
299-
if (this._currentObserver === initObserver) {
300-
this._currentObserver = null; // init observer detected the failure and should not be notified again
292+
_handleMessage( msg ) {
293+
if (this._isBroken) {
294+
// ignore all incoming messages when this connection is broken. all previously pending observers failed
295+
// with the fatal error. all future observers will fail with same fatal error.
296+
return;
301297
}
302-
this._handleFatalError(error);
303-
}
304298

305-
_handleMessage( msg ) {
306299
const payload = msg.fields[0];
307300

308301
switch( msg.signature ) {
@@ -315,7 +308,7 @@ class Connection {
315308
try {
316309
this._currentObserver.onCompleted( payload );
317310
} finally {
318-
this._currentObserver = this._pendingObservers.shift();
311+
this._updateCurrentObserver();
319312
}
320313
break;
321314
case FAILURE:
@@ -324,7 +317,7 @@ class Connection {
324317
this._currentFailure = newError(payload.message, payload.code);
325318
this._currentObserver.onError( this._currentFailure );
326319
} finally {
327-
this._currentObserver = this._pendingObservers.shift();
320+
this._updateCurrentObserver();
328321
// Things are now broken. Pending observers will get FAILURE messages routed until
329322
// We are done handling this failure.
330323
if( !this._isHandlingFailure ) {
@@ -354,7 +347,7 @@ class Connection {
354347
else if(this._currentObserver.onError)
355348
this._currentObserver.onError(payload);
356349
} finally {
357-
this._currentObserver = this._pendingObservers.shift();
350+
this._updateCurrentObserver();
358351
}
359352
break;
360353
default:
@@ -452,6 +445,14 @@ class Connection {
452445
}
453446
}
454447

448+
/**
449+
* Pop next pending observer form the list of observers and make it current observer.
450+
* @protected
451+
*/
452+
_updateCurrentObserver() {
453+
this._currentObserver = this._pendingObservers.shift();
454+
}
455+
455456
/**
456457
* Synchronize - flush all queued outgoing messages and route their responses
457458
* to their respective handlers.
@@ -509,14 +510,15 @@ function connect(url, config = {}, connectionErrorCode = null) {
509510
* closed by the server if processing of INIT message fails so this observer will handle initialization failure
510511
* as a fatal error.
511512
*/
512-
class InitObserver {
513+
class InitObserver extends StreamObserver {
513514

514515
/**
515516
* @constructor
516517
* @param {Connection} connection the connection used to send INIT message.
517518
* @param {StreamObserver} originalObserver the observer to wrap and delegate calls to.
518519
*/
519520
constructor(connection, originalObserver) {
521+
super();
520522
this._connection = connection;
521523
this._originalObserver = originalObserver || NO_OP_OBSERVER;
522524
}
@@ -526,10 +528,11 @@ class InitObserver {
526528
}
527529

528530
onError(error) {
531+
this._connection._updateCurrentObserver(); // make sure this same observer will not be called again
529532
try {
530533
this._originalObserver.onError(error);
531534
} finally {
532-
this._connection._initializationFailed(error, this);
535+
this._connection._handleFatalError(error);
533536
}
534537
}
535538

test/internal/connector.test.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,30 @@ describe('connector', () => {
141141
});
142142
});
143143

144+
it('should fail all new observers after initialization error', done => {
145+
const connection = connect('bolt://localhost:7474'); // wrong port
146+
147+
connection.initialize('mydriver/0.0.0', basicAuthToken(), {
148+
onError: initialError => {
149+
expect(initialError).toBeDefined();
150+
151+
connection.run('RETURN 1', {}, {
152+
onError: error1 => {
153+
expect(error1).toEqual(initialError);
154+
155+
connection.initialize('mydriver/0.0.0', basicAuthToken(), {
156+
onError: error2 => {
157+
expect(error2).toEqual(initialError);
158+
159+
done();
160+
}
161+
});
162+
}
163+
});
164+
},
165+
});
166+
});
167+
144168
function packedHandshakeMessage() {
145169
const result = alloc(4);
146170
result.putInt32(0, 1);

0 commit comments

Comments
 (0)