Skip to content

Commit f3f983b

Browse files
authored
test: add success e2e tests for stream processing commands MONGOSH-2127 (#2459)
1 parent a2dfa8b commit f3f983b

File tree

3 files changed

+371
-2
lines changed

3 files changed

+371
-2
lines changed

.evergreen.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ functions:
220220
MONGOSH_RUN_ONLY_IN_PACKAGE: ${mongosh_run_only_in_package}
221221
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
222222
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
223+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
224+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
225+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
226+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
223227
TASK_NAME: ${task_name}
224228
- command: s3.put
225229
params:
@@ -3778,6 +3782,10 @@ functions:
37783782
NODE_JS_VERSION: ${node_js_version}
37793783
AWS_AUTH_IAM_ACCESS_KEY_ID: ${devtools_ci_aws_key}
37803784
AWS_AUTH_IAM_SECRET_ACCESS_KEY: ${devtools_ci_aws_secret}
3785+
STREAMS_E2E_SPI_CONNECTION_STRING: ${streams_e2e_spi_connection_string}
3786+
STREAMS_E2E_DB_USER: ${streams_e2e_db_user}
3787+
STREAMS_E2E_DB_PASSWORD: ${streams_e2e_db_password}
3788+
STREAMS_E2E_CLUSTER_CONNECTION_STRING: ${streams_e2e_cluster_connection_string}
37813789
DISABLE_OPENSSL_SHARED_CONFIG_FOR_BUNDLED_OPENSSL: ${disable_openssl_shared_config_for_bundled_openssl}
37823790
TASK_NAME: ${task_name}
37833791

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
import { bson } from '@mongosh/service-provider-core';
2+
import type { Db, Collection, Document } from '@mongosh/service-provider-core';
3+
import { MongoClient } from 'mongodb';
4+
import { expect } from 'chai';
5+
import type { TestShell } from './test-shell';
6+
import { sleep } from './util-helpers';
7+
import { eventually } from '../../../testing/eventually';
8+
9+
const {
10+
STREAMS_E2E_SPI_CONNECTION_STRING = '',
11+
STREAMS_E2E_DB_USER = '',
12+
STREAMS_E2E_DB_PASSWORD = '',
13+
STREAMS_E2E_CLUSTER_CONNECTION_STRING = '',
14+
} = process.env;
15+
16+
describe('e2e Streams', function () {
17+
this.timeout(60_000);
18+
let shell: TestShell;
19+
20+
before(function () {
21+
if (!STREAMS_E2E_SPI_CONNECTION_STRING) {
22+
console.error(
23+
'Stream Instance connection string not found - skipping Streams E2E tests...'
24+
);
25+
return this.skip();
26+
}
27+
28+
if (!STREAMS_E2E_CLUSTER_CONNECTION_STRING) {
29+
console.error(
30+
'Cluster connection string not found - skipping Streams E2E tests...'
31+
);
32+
return this.skip();
33+
}
34+
35+
if (!STREAMS_E2E_DB_USER) {
36+
console.error(
37+
'Atlas database user for Stream Processing not found - skipping Streams E2E tests...'
38+
);
39+
return this.skip();
40+
}
41+
42+
if (!STREAMS_E2E_DB_PASSWORD) {
43+
console.error(
44+
'Password for Atlas database user not found - skipping Streams E2E tests...'
45+
);
46+
return this.skip();
47+
}
48+
});
49+
50+
describe('basic stream processor operations', function () {
51+
let processorName = '';
52+
let db: Db;
53+
let collection: Collection<Document>;
54+
let client: MongoClient;
55+
56+
beforeEach(async function () {
57+
shell = this.startTestShell({
58+
args: [
59+
STREAMS_E2E_SPI_CONNECTION_STRING,
60+
'--tls',
61+
'--authenticationDatabase admin',
62+
'--username',
63+
STREAMS_E2E_DB_USER,
64+
'--password',
65+
STREAMS_E2E_DB_PASSWORD,
66+
],
67+
removeSigintListeners: true,
68+
});
69+
await shell.waitForPromptOrExit({ timeout: 45_000 });
70+
71+
processorName = `spi${new bson.ObjectId().toHexString()}`;
72+
client = await MongoClient.connect(
73+
STREAMS_E2E_CLUSTER_CONNECTION_STRING,
74+
{}
75+
);
76+
db = client.db(processorName);
77+
const collectionName = 'processedData';
78+
collection = db.collection(collectionName);
79+
80+
// this stream processor reads from the sample stream and inserts documents into an Atlas database
81+
const sourceStage = {
82+
$source: {
83+
connectionName: 'sample_stream_solar',
84+
},
85+
};
86+
87+
const mergeStage = {
88+
$merge: {
89+
into: {
90+
connectionName: 'testClusterConnection',
91+
db: processorName,
92+
coll: collectionName,
93+
},
94+
},
95+
};
96+
97+
const aggPipeline = [sourceStage, mergeStage];
98+
99+
const createResult = await shell.executeLine(
100+
`sp.createStreamProcessor("${processorName}", ${JSON.stringify(
101+
aggPipeline
102+
)})`,
103+
{ timeout: 45_000 }
104+
);
105+
expect(createResult).to.include(
106+
`Atlas Stream Processor: ${processorName}`
107+
);
108+
});
109+
110+
afterEach(async function () {
111+
try {
112+
await db.dropDatabase();
113+
await client.close();
114+
115+
const result = await shell.executeLine(`sp.${processorName}.drop()`, {
116+
timeout: 45_000,
117+
});
118+
expect(result).to.include(`{ ok: 1 }`);
119+
} catch (err: any) {
120+
console.error(
121+
`Could not clean up stream processor ${processorName}:`,
122+
err
123+
);
124+
}
125+
});
126+
127+
it('can list stream processors', async function () {
128+
const listResult = await shell.executeLine(`sp.listStreamProcessors()`, {
129+
timeout: 45_000,
130+
});
131+
// make sure the processor created in the beforeEach is present
132+
expect(listResult).to.include(`name: '${processorName}'`);
133+
});
134+
135+
it('can start and stop a stream processor', async function () {
136+
// this should be a unique collection for this test run, so no data to start
137+
const initialDocsCount = await collection.countDocuments();
138+
expect(initialDocsCount).to.eq(0);
139+
140+
const startResult = await shell.executeLine(
141+
`sp.${processorName}.start()`,
142+
{ timeout: 45_000 }
143+
);
144+
expect(startResult).to.include('{ ok: 1 }');
145+
146+
let updatedDocCount = 0;
147+
await eventually(async () => {
148+
updatedDocCount = await collection.countDocuments();
149+
expect(updatedDocCount).to.be.greaterThan(0);
150+
});
151+
152+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`, {
153+
timeout: 45_000,
154+
});
155+
expect(stopResult).to.include('{ ok: 1 }');
156+
157+
const statsResult = await shell.executeLine(
158+
`sp.${processorName}.stats()`,
159+
{ timeout: 45_000 }
160+
);
161+
expect(statsResult).to.include(`state: 'STOPPED'`);
162+
});
163+
164+
it(`can modify an existing stream processor's pipeline`, async function () {
165+
// this field is not present on any docs emit by the stream processor
166+
// created in the beforeEach
167+
const newField = 'newField';
168+
169+
const startResult = await shell.executeLine(
170+
`sp.${processorName}.start()`,
171+
{ timeout: 45_000 }
172+
);
173+
expect(startResult).to.include('{ ok: 1 }');
174+
175+
// sleep for a bit to let the processor do stuff
176+
await sleep(500);
177+
178+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`, {
179+
timeout: 45_000,
180+
});
181+
expect(stopResult).to.include('{ ok: 1 }');
182+
183+
const initialDocsWithNewField = await collection.countDocuments({
184+
[newField]: { $exists: true },
185+
});
186+
expect(initialDocsWithNewField).to.eq(0);
187+
188+
// define a new pipeline that will append our newField to the docs the stream
189+
// processor inserts into the database
190+
const sourceStage = {
191+
$source: {
192+
connectionName: 'sample_stream_solar',
193+
},
194+
};
195+
196+
const addFieldStage = {
197+
$addFields: {
198+
newField,
199+
},
200+
};
201+
202+
const mergeStage = {
203+
$merge: {
204+
into: {
205+
connectionName: 'testClusterConnection',
206+
db: processorName,
207+
coll: collection.collectionName,
208+
},
209+
},
210+
};
211+
212+
const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage];
213+
214+
const modifyResult = await shell.executeLine(
215+
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})`,
216+
{ timeout: 45_000 }
217+
);
218+
expect(modifyResult).to.include('{ ok: 1 }');
219+
220+
const secondStartResult = await shell.executeLine(
221+
`sp.${processorName}.start()`,
222+
{ timeout: 45_000 }
223+
);
224+
expect(secondStartResult).to.include('{ ok: 1 }');
225+
226+
await eventually(async () => {
227+
const updatedDocsWithNewField = await collection.countDocuments({
228+
[newField]: { $exists: true },
229+
});
230+
expect(updatedDocsWithNewField).to.be.greaterThan(0);
231+
});
232+
});
233+
234+
it('can view stats for a stream processor', async function () {
235+
const statsResult = await shell.executeLine(
236+
`sp.${processorName}.stats()`,
237+
{ timeout: 45_000 }
238+
);
239+
expect(statsResult).to.include(`name: '${processorName}'`);
240+
expect(statsResult).to.include(`state: 'CREATED'`);
241+
expect(statsResult).to.include('stats: {');
242+
expect(statsResult).to.include(`pipeline: [`);
243+
expect(statsResult).to.include(
244+
`{ '$source': { connectionName: 'sample_stream_solar' } },`
245+
);
246+
});
247+
});
248+
249+
describe('sampling from a running stream processor', function () {
250+
beforeEach(async function () {
251+
shell = this.startTestShell({
252+
args: [
253+
STREAMS_E2E_SPI_CONNECTION_STRING,
254+
'--tls',
255+
'--authenticationDatabase admin',
256+
'--username',
257+
STREAMS_E2E_DB_USER,
258+
'--password',
259+
STREAMS_E2E_DB_PASSWORD,
260+
],
261+
removeSigintListeners: true,
262+
});
263+
await shell.waitForPromptOrExit({ timeout: 45_000 });
264+
});
265+
266+
it('should output streamed documents to the shell', async function () {
267+
if (process.platform === 'win32') {
268+
return this.skip(); // No SIGINT on Windows.
269+
}
270+
271+
// this processor is pre-defined on the cloud-dev test project
272+
// it reads from sample solar stream, appends a field with the processor name to each doc, and
273+
// inserts the docs into an Atlas collection
274+
const immortalProcessorName = 'immortalProcessor';
275+
276+
await eventually(
277+
() => {
278+
shell.writeInputLine(`sp.${immortalProcessorName}.sample()`);
279+
// data from the sample solar stream isn't deterministic, so just assert that
280+
// the processorName field appears in the shell output after sampling
281+
shell.assertContainsOutput(
282+
`processorName: '${immortalProcessorName}'`
283+
);
284+
},
285+
{ timeout: 45_000 }
286+
);
287+
288+
shell.kill('SIGINT');
289+
});
290+
});
291+
292+
describe('creating an interactive stream processor with .process()', function () {
293+
let interactiveId = '';
294+
const collectionName = 'processedData';
295+
296+
beforeEach(async function () {
297+
shell = this.startTestShell({
298+
args: [
299+
STREAMS_E2E_SPI_CONNECTION_STRING,
300+
'--tls',
301+
'--authenticationDatabase admin',
302+
'--username',
303+
STREAMS_E2E_DB_USER,
304+
'--password',
305+
STREAMS_E2E_DB_PASSWORD,
306+
],
307+
removeSigintListeners: true,
308+
});
309+
await shell.waitForPromptOrExit({ timeout: 45_000 });
310+
311+
interactiveId = new bson.ObjectId().toHexString();
312+
});
313+
314+
it('should output streamed documents to the shell', async function () {
315+
if (process.platform === 'win32') {
316+
return this.skip(); // No SIGINT on Windows.
317+
}
318+
319+
// the pipeline for our interactive processor reads from sample solar stream, adds a
320+
// unique test id to each document, and inserts it into an Atlas collection
321+
const sourceStage = {
322+
$source: {
323+
connectionName: 'sample_stream_solar',
324+
},
325+
};
326+
327+
const addFieldStage = {
328+
$addFields: {
329+
interactiveId,
330+
},
331+
};
332+
333+
const mergeStage = {
334+
$merge: {
335+
into: {
336+
connectionName: 'testClusterConnection',
337+
db: interactiveId,
338+
coll: collectionName,
339+
},
340+
},
341+
};
342+
343+
const aggPipeline = [sourceStage, addFieldStage, mergeStage];
344+
345+
await eventually(
346+
() => {
347+
shell.writeInputLine(`sp.process(${JSON.stringify(aggPipeline)})`);
348+
// data from the sample solar stream isn't deterministic, so just assert that
349+
// the interactiveId field appears in the shell output after sampling
350+
shell.assertContainsOutput(`interactiveId: '${interactiveId}'`);
351+
},
352+
{ timeout: 45_000 }
353+
);
354+
355+
shell.kill('SIGINT');
356+
});
357+
});
358+
});

packages/e2e-tests/test/test-shell.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,13 @@ export class TestShell {
292292
this.writeInput(`${chars}\n`);
293293
}
294294

295-
async executeLine(line: string): Promise<string> {
295+
async executeLine(
296+
line: string,
297+
opts: { timeout?: number; promptPattern?: RegExp } = {}
298+
): Promise<string> {
296299
const previousOutputLength = this._output.length;
297300
this.writeInputLine(line);
298-
await this.waitForPrompt(previousOutputLength);
301+
await this.waitForPrompt(previousOutputLength, opts);
299302
return this._output.slice(previousOutputLength);
300303
}
301304

0 commit comments

Comments
 (0)