Skip to content

Commit

Permalink
JAVA-908: Support write concern for findAndModify command
Browse files Browse the repository at this point in the history
    Throw exception from findAndModify-related operations if the response contains a write concern error
    Support write concern for findAndModify-related helper methods in DBCollection
  • Loading branch information
jyemin committed Oct 29, 2015
1 parent 5370977 commit 942e1bd
Show file tree
Hide file tree
Showing 39 changed files with 738 additions and 167 deletions.
2 changes: 1 addition & 1 deletion config/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@

<module name="MethodLength"/>
<module name="ParameterNumber">
<property name="max" value="10"/>
<property name="max" value="11"/>
</module>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package com.mongodb.async.client;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoNamespace;
import com.mongodb.MongoWriteConcernException;
import com.mongodb.MongoWriteException;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.WriteConcernResult;
import com.mongodb.WriteError;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.bulk.BulkWriteResult;
Expand Down Expand Up @@ -590,8 +592,11 @@ public void onResult(final BulkWriteResult result, final Throwable t) {
if (t instanceof MongoBulkWriteException) {
MongoBulkWriteException e = (MongoBulkWriteException) t;
if (e.getWriteErrors().isEmpty()) {
callback.onResult(null, new MongoWriteConcernException(e.getWriteConcernError(),
e.getServerAddress()));
callback.onResult(null,
new MongoWriteConcernException(e.getWriteConcernError(),
translateBulkWriteResult(request,
e.getWriteResult()),
e.getServerAddress()));
} else {
callback.onResult(null, new MongoWriteException(new WriteError(e.getWriteErrors().get(0)),
e.getServerAddress()));
Expand All @@ -603,6 +608,23 @@ public void onResult(final BulkWriteResult result, final Throwable t) {
});
}

private WriteConcernResult translateBulkWriteResult(final WriteRequest request, final BulkWriteResult writeResult) {
switch (request.getType()) {
case INSERT:
return WriteConcernResult.acknowledged(writeResult.getInsertedCount(), false, null);
case DELETE:
return WriteConcernResult.acknowledged(writeResult.getDeletedCount(), false, null);
case UPDATE:
case REPLACE:
return WriteConcernResult.acknowledged(writeResult.getMatchedCount() + writeResult.getUpserts().size(),
writeResult.getMatchedCount() > 0,
writeResult.getUpserts().isEmpty()
? null : writeResult.getUpserts().get(0).getId());
default:
throw new MongoInternalException("Unhandled write request type: " + request.getType());
}
}

private UpdateResult toUpdateResult(final com.mongodb.bulk.BulkWriteResult result) {
if (result.wasAcknowledged()) {
Long modifiedCount = result.isModifiedCountAvailable() ? (long) result.getModifiedCount() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,30 @@ class MongoCollectionSpecification extends Specification {
WriteConcern.UNACKNOWLEDGED | new TestOperationExecutor([unacknowledged()]) | DeleteResult.unacknowledged()
}

def 'deleteOne should translate BulkWriteException correctly'() {
given:
def bulkWriteException = new MongoBulkWriteException(acknowledged(0, 0, 1, null, []), [],
new WriteConcernError(100, '', new BsonDocument()),
new ServerAddress());

def executor = new TestOperationExecutor([bulkWriteException])
def collection = new MongoCollectionImpl(namespace, Document, codecRegistry, readPreference, WriteConcern.ACKNOWLEDGED,
readConcern, executor)
def futureResultCallback = new FutureResultCallback<DeleteResult>()

when:
collection.deleteOne(new Document('_id', 1), futureResultCallback)
futureResultCallback.get()

then:
def ex = thrown(MongoWriteConcernException)
ex.writeConcernError == bulkWriteException.writeConcernError
ex.writeResult.wasAcknowledged()
ex.writeResult.count == 1
!ex.writeResult.updateOfExisting
ex.writeResult.upsertedId == null
}

def 'deleteMany should use MixedBulkWriteOperation correctly'() {
given:
def collection = new MongoCollectionImpl(namespace, Document, codecRegistry, readPreference, writeConcern, readConcern, executor)
Expand Down Expand Up @@ -583,6 +607,36 @@ class MongoCollectionSpecification extends Specification {
WriteConcern.UNACKNOWLEDGED | new TestOperationExecutor([unacknowledged()]) | UpdateResult.unacknowledged()
}

def 'replaceOne should translate BulkWriteException correctly'() {
given:
def bulkWriteException = new MongoBulkWriteException(bulkWriteResult, [],
new WriteConcernError(100, '', new BsonDocument()),
new ServerAddress());

def executor = new TestOperationExecutor([bulkWriteException])
def collection = new MongoCollectionImpl(namespace, Document, codecRegistry, readPreference, WriteConcern.ACKNOWLEDGED,
readConcern, executor)
def futureResultCallback = new FutureResultCallback<UpdateResult>()

when:
collection.replaceOne(new Document('_id', 1), new Document('_id', 1), futureResultCallback)
futureResultCallback.get()

then:
def ex = thrown(MongoWriteConcernException)
ex.writeConcernError == bulkWriteException.writeConcernError
ex.writeResult.wasAcknowledged() == writeResult.wasAcknowledged()
ex.writeResult.count == writeResult.count
ex.writeResult.updateOfExisting == writeResult.updateOfExisting
ex.writeResult.upsertedId == writeResult.upsertedId

where:
bulkWriteResult | writeResult
acknowledged(0, 1, 0, 1, []) | WriteConcernResult.acknowledged(1, true, null)
acknowledged(0, 0, 0, 0, [new BulkWriteUpsert(0, new BsonInt32(1))]) | WriteConcernResult.acknowledged(1, false, new BsonInt32(1))
}


def 'updateOne should use MixedBulkWriteOperationOperation correctly'() {
given:
def collection = new MongoCollectionImpl(namespace, Document, codecRegistry, readPreference, writeConcern, readConcern, executor)
Expand Down
37 changes: 33 additions & 4 deletions driver-core/src/main/com/mongodb/MongoWriteConcernException.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.mongodb.bulk.WriteConcernError;

import static com.mongodb.assertions.Assertions.notNull;

/**
* An exception indicating a failure to apply the write concern to the requested write operation
*
Expand All @@ -29,23 +31,50 @@ public class MongoWriteConcernException extends MongoServerException {
private static final long serialVersionUID = 4577579466973523211L;

private final WriteConcernError writeConcernError;
private final WriteConcernResult writeConcernResult;

/**
* Construct an instance.
*
* @param writeConcernError the write concern error
* @param serverAddress the server address
* @param writeConcernError the non-null write concern error
* @param serverAddress the non-null server address
*/
public MongoWriteConcernException(final WriteConcernError writeConcernError, final ServerAddress serverAddress) {
this(writeConcernError, null, serverAddress);
}

/**
* Construct an instance.
*
* @param writeConcernError the non-null write concern error
* @param writeConcernResult the write result
* @param serverAddress the non-null server address
* @since 3.2
*/
public MongoWriteConcernException(final WriteConcernError writeConcernError, final WriteConcernResult writeConcernResult,
final ServerAddress serverAddress) {
super(writeConcernError.getCode(), writeConcernError.getMessage(), serverAddress);
this.writeConcernError = writeConcernError;
this.writeConcernResult = writeConcernResult;
this.writeConcernError = notNull("writeConcernError", writeConcernError);
}

/**
* Gets the write concern error.
*
* @return the write concern error
* @return the write concern error, which may not be null
*/
public WriteConcernError getWriteConcernError() {
return writeConcernError;
}

/**
* Gets the write result.
*
* @return the write result
*
* @since 3.2
*/
public WriteConcernResult getWriteResult() {
return writeConcernResult;
}
}
2 changes: 2 additions & 0 deletions driver-core/src/main/com/mongodb/WriteConcernException.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public static int extractErrorCode(final BsonDocument response) {
public static String extractErrorMessage(final BsonDocument response) {
if (response.isString("err")) {
return response.getString("err").getValue();
} else if (response.isString("errmsg")) {
return response.getString("errmsg").getValue();
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package com.mongodb.operation;

import com.mongodb.ExplainVerbosity;
import com.mongodb.Function;
import com.mongodb.MongoNamespace;
import com.mongodb.ReadConcern;
import com.mongodb.ServerAddress;
import com.mongodb.async.AsyncBatchCursor;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.binding.AsyncConnectionSource;
Expand All @@ -30,6 +30,7 @@
import com.mongodb.connection.Connection;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.QueryResult;
import com.mongodb.operation.CommandOperationHelper.CommandTransformer;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -321,21 +322,21 @@ private QueryResult<T> createQueryResult(final BsonDocument result, final Connec
}
}

private Function<BsonDocument, BatchCursor<T>> transformer(final ConnectionSource source, final Connection connection) {
return new Function<BsonDocument, BatchCursor<T>>() {
private CommandTransformer<BsonDocument, BatchCursor<T>> transformer(final ConnectionSource source, final Connection connection) {
return new CommandTransformer<BsonDocument, BatchCursor<T>>() {
@Override
public BatchCursor<T> apply(final BsonDocument result) {
public BatchCursor<T> apply(final BsonDocument result, final ServerAddress serverAddress) {
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
return new QueryBatchCursor<T>(queryResult, 0, batchSize != null ? batchSize : 0, decoder, source);
}
};
}

private Function<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final AsyncConnectionSource source,
private CommandTransformer<BsonDocument, AsyncBatchCursor<T>> asyncTransformer(final AsyncConnectionSource source,
final AsyncConnection connection) {
return new Function<BsonDocument, AsyncBatchCursor<T>>() {
return new CommandTransformer<BsonDocument, AsyncBatchCursor<T>>() {
@Override
public AsyncBatchCursor<T> apply(final BsonDocument result) {
public AsyncBatchCursor<T> apply(final BsonDocument result, final ServerAddress serverAddress) {
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
return new AsyncQueryBatchCursor<T>(queryResult, 0, batchSize != null ? batchSize : 0, decoder, source, connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.operation.CommandOperationHelper.VoidTransformer;
import static com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol;
import static com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocolAsync;
import static com.mongodb.operation.OperationHelper.VoidTransformer;
import static com.mongodb.operation.OperationHelper.releasingCallback;
import static com.mongodb.operation.OperationHelper.serverIsAtLeastVersionThreeDotTwo;
import static com.mongodb.operation.OperationHelper.withConnection;
Expand Down
Loading

0 comments on commit 942e1bd

Please sign in to comment.