Skip to content

Commit faca37c

Browse files
author
Zhen Li
authored
Merge pull request #222 from lutovich/1.2-queued-records-streaming
Fix streaming of queued records in stream observer
2 parents 6bb8032 + 9fc43c4 commit faca37c

File tree

2 files changed

+163
-4
lines changed

2 files changed

+163
-4
lines changed

src/v1/internal/stream-observer.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class StreamObserver {
4242
this._error = null;
4343
this._hasFailed = false;
4444
this._errorTransformer = errorTransformer;
45+
this._observer = null;
46+
this._conn = null;
4547
}
4648

4749
/**
@@ -69,7 +71,7 @@ class StreamObserver {
6971
this._fieldLookup = {};
7072
if( meta.fields && meta.fields.length > 0 ) {
7173
this._fieldKeys = meta.fields;
72-
for (var i = 0; i < meta.fields.length; i++) {
74+
for (let i = 0; i < meta.fields.length; i++) {
7375
this._fieldLookup[meta.fields[i]] = i;
7476
}
7577
}
@@ -94,7 +96,7 @@ class StreamObserver {
9496
* @param {Object} error - An error object
9597
*/
9698
onError(error) {
97-
let transformedError = this._errorTransformer(error, this._conn);
99+
const transformedError = this._errorTransformer(error, this._conn);
98100
if(this._hasFailed) {
99101
return;
100102
}
@@ -123,8 +125,8 @@ class StreamObserver {
123125
return;
124126
}
125127
if( this._queuedRecords.length > 0 ) {
126-
for (var i = 0; i < _queuedRecords.length; i++) {
127-
observer.onNext( _queuedRecords[i] );
128+
for (let i = 0; i < this._queuedRecords.length; i++) {
129+
observer.onNext( this._queuedRecords[i] );
128130
}
129131
}
130132
if( this._tail ) {

test/internal/stream-observer.test.js

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import StreamObserver from '../../src/v1/internal/stream-observer';
21+
import FakeConnection from './fake-connection';
22+
23+
const NO_OP = () => {
24+
};
25+
26+
describe('StreamObserver', () => {
27+
28+
it('remembers resolved connection', () => {
29+
const streamObserver = newStreamObserver();
30+
const connection = new FakeConnection();
31+
32+
streamObserver.resolveConnection(connection);
33+
34+
expect(streamObserver._conn).toBe(connection);
35+
});
36+
37+
it('remembers subscriber', () => {
38+
const streamObserver = newStreamObserver();
39+
const subscriber = newObserver();
40+
41+
streamObserver.subscribe(subscriber);
42+
43+
expect(streamObserver._observer).toBe(subscriber);
44+
});
45+
46+
it('passes received records to the subscriber', () => {
47+
const streamObserver = newStreamObserver();
48+
const receivedRecords = [];
49+
const observer = newObserver(record => {
50+
receivedRecords.push(record);
51+
});
52+
53+
streamObserver.subscribe(observer);
54+
streamObserver.onCompleted({fields: ['A', 'B', 'C']});
55+
56+
streamObserver.onNext([1, 2, 3]);
57+
streamObserver.onNext([11, 22, 33]);
58+
streamObserver.onNext([111, 222, 333]);
59+
60+
expect(receivedRecords.length).toEqual(3);
61+
expect(receivedRecords[0].toObject()).toEqual({'A': 1, 'B': 2, 'C': 3});
62+
expect(receivedRecords[1].toObject()).toEqual({'A': 11, 'B': 22, 'C': 33});
63+
expect(receivedRecords[2].toObject()).toEqual({'A': 111, 'B': 222, 'C': 333});
64+
});
65+
66+
it('queues received record when no subscriber', () => {
67+
const streamObserver = newStreamObserver();
68+
69+
streamObserver.onCompleted({fields: ['A', 'B', 'C']});
70+
71+
streamObserver.onNext([1111, 2222, 3333]);
72+
streamObserver.onNext([111, 222, 333]);
73+
streamObserver.onNext([11, 22, 33]);
74+
streamObserver.onNext([1, 2, 3]);
75+
76+
const queuedRecords = streamObserver._queuedRecords;
77+
78+
expect(queuedRecords.length).toEqual(4);
79+
expect(queuedRecords[0].toObject()).toEqual({'A': 1111, 'B': 2222, 'C': 3333});
80+
expect(queuedRecords[1].toObject()).toEqual({'A': 111, 'B': 222, 'C': 333});
81+
expect(queuedRecords[2].toObject()).toEqual({'A': 11, 'B': 22, 'C': 33});
82+
expect(queuedRecords[3].toObject()).toEqual({'A': 1, 'B': 2, 'C': 3});
83+
});
84+
85+
it('passes received error the subscriber', () => {
86+
const streamObserver = newStreamObserver();
87+
const error = new Error('Invalid Cypher statement');
88+
89+
let receivedError = null;
90+
const observer = newObserver(NO_OP, error => {
91+
receivedError = error;
92+
});
93+
94+
streamObserver.subscribe(observer);
95+
streamObserver.onError(error);
96+
97+
expect(receivedError).toBe(error);
98+
});
99+
100+
it('passes existing error to a new subscriber', () => {
101+
const streamObserver = newStreamObserver();
102+
const error = new Error('Invalid Cypher statement');
103+
104+
streamObserver.onError(error);
105+
106+
streamObserver.subscribe(newObserver(NO_OP, receivedError => {
107+
expect(receivedError).toBe(error);
108+
}));
109+
});
110+
111+
it('passes queued records to a new subscriber', () => {
112+
const streamObserver = newStreamObserver();
113+
114+
streamObserver.onCompleted({fields: ['A', 'B', 'C']});
115+
116+
streamObserver.onNext([1, 2, 3]);
117+
streamObserver.onNext([11, 22, 33]);
118+
streamObserver.onNext([111, 222, 333]);
119+
120+
const receivedRecords = [];
121+
streamObserver.subscribe(newObserver(record => {
122+
receivedRecords.push(record);
123+
}));
124+
125+
expect(receivedRecords.length).toEqual(3);
126+
expect(receivedRecords[0].toObject()).toEqual({'A': 1, 'B': 2, 'C': 3});
127+
expect(receivedRecords[1].toObject()).toEqual({'A': 11, 'B': 22, 'C': 33});
128+
expect(receivedRecords[2].toObject()).toEqual({'A': 111, 'B': 222, 'C': 333});
129+
});
130+
131+
it('passes existing metadata to a new subscriber', () => {
132+
const streamObserver = newStreamObserver();
133+
134+
streamObserver.onCompleted({fields: ['Foo', 'Bar', 'Baz', 'Qux']});
135+
streamObserver.onCompleted({metaDataField1: 'value1', metaDataField2: 'value2'});
136+
137+
let receivedMetaData = null;
138+
streamObserver.subscribe(newObserver(NO_OP, NO_OP, metaData => {
139+
receivedMetaData = metaData;
140+
}));
141+
142+
expect(receivedMetaData).toEqual({metaDataField1: 'value1', metaDataField2: 'value2'});
143+
});
144+
145+
});
146+
147+
function newStreamObserver() {
148+
return new StreamObserver();
149+
}
150+
151+
function newObserver(onNext = NO_OP, onError = NO_OP, onCompleted = NO_OP) {
152+
return {
153+
onNext: onNext,
154+
onError: onError,
155+
onCompleted: onCompleted
156+
};
157+
}

0 commit comments

Comments
 (0)