Skip to content

Commit 7381f90

Browse files
committed
Support $out to a different database
JAVA-3669
1 parent f470da0 commit 7381f90

File tree

8 files changed

+138
-10
lines changed

8 files changed

+138
-10
lines changed

driver-core/src/main/com/mongodb/client/model/Aggregates.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,21 @@ public static Bson out(final String collectionName) {
423423
return new BsonDocument("$out", new BsonString(collectionName));
424424
}
425425

426+
/**
427+
* Creates a $out pipeline stage that supports outputting to a different database.
428+
*
429+
* @param databaseName the database name
430+
* @param collectionName the collection name
431+
* @return the $out pipeline stage
432+
* @mongodb.driver.manual reference/operator/aggregation/out/ $out
433+
* @mongodb.server.release 4.4
434+
* @since 4.1
435+
*/
436+
public static Bson out(final String databaseName, final String collectionName) {
437+
return new BsonDocument("$out", new BsonDocument("db", new BsonString(databaseName))
438+
.append("coll", new BsonString(collectionName)));
439+
}
440+
426441
/**
427442
* Creates a $out pipeline stage that writes out to the specified destination
428443
*

driver-core/src/main/com/mongodb/internal/async/client/AsyncAggregateIterableImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,18 @@ private MongoNamespace getOutNamespace() {
185185
return null;
186186
}
187187
if (lastPipelineStage.containsKey("$out")) {
188-
if (!lastPipelineStage.get("$out").isString()) {
189-
throw new IllegalStateException("Cannot return a cursor when the value for $out stage is not a string");
188+
if (lastPipelineStage.get("$out").isString()) {
189+
return new MongoNamespace(namespace.getDatabaseName(), lastPipelineStage.getString("$out").getValue());
190+
} else if (lastPipelineStage.get("$out").isDocument()) {
191+
BsonDocument outDocument = lastPipelineStage.getDocument("$out");
192+
if (!outDocument.containsKey("db") || !outDocument.containsKey("coll")) {
193+
throw new IllegalStateException("Cannot return a cursor when the value for $out stage is not a namespace document");
194+
}
195+
return new MongoNamespace(outDocument.getString("db").getValue(), outDocument.getString("coll").getValue());
196+
} else {
197+
throw new IllegalStateException("Cannot return a cursor when the value for $out stage "
198+
+ "is not a string or namespace document");
190199
}
191-
return new MongoNamespace(namespace.getDatabaseName(), lastPipelineStage.getString("$out").getValue());
192200
} else if (lastPipelineStage.containsKey("$merge")) {
193201
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
194202
if (mergeDocument.isDocument("into")) {

driver-core/src/test/functional/com/mongodb/client/model/AggregatesFunctionalSpecification.groovy

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,20 @@ class AggregatesFunctionalSpecification extends OperationFunctionalSpecification
202202
getCollectionHelper(new MongoNamespace(getDatabaseName(), outCollectionName)).find() == [a, b, c]
203203
}
204204

205+
@IgnoreIf({ !serverVersionAtLeast(4, 3) })
206+
def '$out to specified database'() {
207+
given:
208+
def outDatabaseName = getDatabaseName() + '_out'
209+
def outCollectionName = getCollectionName() + '.out'
210+
getCollectionHelper(new MongoNamespace(outDatabaseName, outCollectionName)).create()
211+
212+
when:
213+
aggregate([out(outDatabaseName, outCollectionName)])
214+
215+
then:
216+
getCollectionHelper(new MongoNamespace(outDatabaseName, outCollectionName)).find() == [a, b, c]
217+
}
218+
205219
@IgnoreIf({ !serverVersionAtLeast(4, 2) })
206220
def '$merge'() {
207221
given:

driver-core/src/test/unit/com/mongodb/client/model/AggregatesSpecification.groovy

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ class AggregatesSpecification extends Specification {
347347
toBson(out('authors')) == parse('{ $out : "authors" }')
348348
toBson(out(Document.parse('{ s3: "s3://bucket/path/to/file…?format=json&maxFileSize=100MiB"}'))) ==
349349
parse('{ $out : { s3: "s3://bucket/path/to/file…?format=json&maxFileSize=100MiB"} }')
350+
toBson(out('authorsDB', 'books')) == parse('{ $out : { db: "authorsDB", coll: "books" } }')
350351
}
351352

352353
def 'should render merge'() {

driver-core/src/test/unit/com/mongodb/internal/async/client/AsyncAggregateIterableSpecification.groovy

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -354,13 +354,24 @@ class AsyncAggregateIterableSpecification extends Specification {
354354

355355
def 'should build the expected AggregateToCollectionOperation for $out as a document'() {
356356
given:
357+
def cannedResults = [new Document('_id', 1), new Document('_id', 1), new Document('_id', 1)]
357358
def cursor = Stub(AsyncBatchCursor) {
358-
next(_) >> {
359-
it[0].onResult(null, null)
360-
}
359+
def count = 0
360+
def results;
361+
def getResult = {
362+
count++
363+
results = count == 1 ? cannedResults : null
364+
results
365+
}
366+
next(_) >> {
367+
it[0].onResult(getResult(), null)
368+
}
369+
isClosed() >> { count >= 1 }
361370
}
362371
def executor = new TestOperationExecutor([cursor, cursor, cursor, cursor, cursor]);
363372
def pipeline = [new Document('$match', 1), new Document('$out', new Document('s3', true))]
373+
def outWithDBpipeline = [new Document('$match', 1),
374+
new Document('$out', new Document('db', 'testDB').append('coll', 'testCollection'))]
364375

365376
when: 'aggregation includes $out'
366377
def aggregateIterable = new AsyncAggregateIterableImpl(null, namespace, Document, Document, codecRegistry,
@@ -422,6 +433,28 @@ class AsyncAggregateIterableSpecification extends Specification {
422433

423434
then:
424435
thrown(IllegalStateException)
436+
437+
when: 'aggregation includes $out with namespace'
438+
aggregateIterable = new AsyncAggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference,
439+
readConcern, writeConcern, executor, outWithDBpipeline, AggregationLevel.COLLECTION, false)
440+
futureResultCallback = new FutureResultCallback()
441+
aggregateIterable.toCollection(futureResultCallback)
442+
futureResultCallback.get()
443+
444+
operation = executor.getWriteOperation() as AggregateToCollectionOperation
445+
446+
then:
447+
expect operation, isTheSameAs(new AggregateToCollectionOperation(namespace,
448+
[new BsonDocument('$match', new BsonInt32(1)),
449+
BsonDocument.parse('{$out: {db: "testDB", coll: "testCollection"}}')], readConcern, writeConcern))
450+
451+
when: 'Trying to iterate it should succeed'
452+
def target = []
453+
def results = new FutureResultCallback()
454+
aggregateIterable.into(target, results)
455+
456+
then:
457+
results.get() == cannedResults
425458
}
426459

427460
def 'should handle exceptions correctly'() {

driver-scala/src/main/scala/org/mongodb/scala/model/Aggregates.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,16 @@ object Aggregates {
377377
*/
378378
def out(collectionName: String): Bson = JAggregates.out(collectionName)
379379

380+
/**
381+
* Creates a `\$out` pipeline stage that supports outputting to a different database.
382+
*
383+
* @param databaseName the database name
384+
* @param collectionName the collection name
385+
* @return the `\$out` pipeline stage
386+
* @see [[http://docs.mongodb.org/manual/reference/operator/aggregation/out/ \$out]]
387+
*/
388+
def out(databaseName: String, collectionName: String): Bson = JAggregates.out(databaseName, collectionName)
389+
380390
/**
381391
* Creates a `\$merge` pipeline stage that merges into the specified collection using the specified options.
382392
*

driver-sync/src/main/com/mongodb/client/internal/AggregateIterableImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,18 @@ private MongoNamespace getOutNamespace() {
184184
return null;
185185
}
186186
if (lastPipelineStage.containsKey("$out")) {
187-
if (!lastPipelineStage.get("$out").isString()) {
188-
throw new IllegalStateException("Cannot return a cursor when the value for $out stage is not a string");
187+
if (lastPipelineStage.get("$out").isString()) {
188+
return new MongoNamespace(namespace.getDatabaseName(), lastPipelineStage.getString("$out").getValue());
189+
} else if (lastPipelineStage.get("$out").isDocument()) {
190+
BsonDocument outDocument = lastPipelineStage.getDocument("$out");
191+
if (!outDocument.containsKey("db") || !outDocument.containsKey("coll")) {
192+
throw new IllegalStateException("Cannot return a cursor when the value for $out stage is not a namespace document");
193+
}
194+
return new MongoNamespace(outDocument.getString("db").getValue(), outDocument.getString("coll").getValue());
195+
} else {
196+
throw new IllegalStateException("Cannot return a cursor when the value for $out stage "
197+
+ "is not a string or namespace document");
189198
}
190-
return new MongoNamespace(namespace.getDatabaseName(), lastPipelineStage.getString("$out").getValue());
191199
} else if (lastPipelineStage.containsKey("$merge")) {
192200
BsonDocument mergeDocument = lastPipelineStage.getDocument("$merge");
193201
if (mergeDocument.isDocument("into")) {

driver-sync/src/test/unit/com/mongodb/client/internal/AggregateIterableSpecification.groovy

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,8 +333,28 @@ class AggregateIterableSpecification extends Specification {
333333

334334
def 'should build the expected AggregateToCollectionOperation for $out as a document'() {
335335
given:
336-
def executor = new TestOperationExecutor([null, null, null, null, null])
336+
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
337+
def cursor = {
338+
Stub(BatchCursor) {
339+
def count = 0
340+
def results;
341+
def getResult = {
342+
count++
343+
results = count == 1 ? cannedResults : null
344+
results
345+
}
346+
next() >> {
347+
getResult()
348+
}
349+
hasNext() >> {
350+
count == 0
351+
}
352+
}
353+
}
354+
def executor = new TestOperationExecutor([cursor(), cursor(), cursor(), cursor(), cursor(), cursor()])
337355
def pipeline = [new Document('$match', 1), new Document('$out', new Document('s3', true))]
356+
def outWithDBpipeline = [new Document('$match', 1),
357+
new Document('$out', new Document('db', 'testDB').append('coll', 'testCollection'))]
338358

339359
when: 'aggregation includes $out'
340360
def aggregateIterable = new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference,
@@ -391,6 +411,25 @@ class AggregateIterableSpecification extends Specification {
391411

392412
then:
393413
thrown(IllegalStateException)
414+
415+
when: 'aggregation includes $out with namespace'
416+
aggregateIterable = new AggregateIterableImpl(null, namespace, Document, Document, codecRegistry, readPreference,
417+
readConcern, writeConcern, executor, outWithDBpipeline, AggregationLevel.COLLECTION, false)
418+
aggregateIterable.toCollection()
419+
420+
operation = executor.getWriteOperation() as AggregateToCollectionOperation
421+
422+
then:
423+
expect operation, isTheSameAs(new AggregateToCollectionOperation(namespace,
424+
[new BsonDocument('$match', new BsonInt32(1)),
425+
BsonDocument.parse('{$out: {db: "testDB", coll: "testCollection"}}')], readConcern, writeConcern))
426+
427+
when: 'Trying to iterate it should succeed'
428+
def results = []
429+
aggregateIterable.into(results)
430+
431+
then:
432+
results == cannedResults
394433
}
395434

396435

0 commit comments

Comments
 (0)