Skip to content

Commit f11f151

Browse files
committed
Reduce race conditions in SDAM error handling
JAVA-3696
1 parent 0ab5590 commit f11f151

File tree

89 files changed

+9381
-270
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+9381
-270
lines changed

driver-core/src/main/com/mongodb/internal/connection/ClusterableServer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,26 @@ interface ClusterableServer extends Server {
2626
*/
2727
void invalidate();
2828

29+
/**
30+
* Invalidate the description of this server due to the passed in reason.
31+
* @param connectionGeneration the connection pool's generation of the connection from which the error arose
32+
*/
33+
void invalidate(int connectionGeneration);
34+
2935
/**
3036
* Invalidate the description of this server due to the passed in reason.
3137
* @param reason the reason for invalidation.
3238
*/
3339
void invalidate(Throwable reason);
3440

41+
/**
42+
* Invalidate the description of this server due to the passed in reason.
43+
* @param reason the reason for invalidation.
44+
* @param connectionGeneration the connection pool's generation of the connection from which the error arose
45+
* @param maxWireVersion the maxWireVersion from the connection from which the error arose
46+
*/
47+
void invalidate(Throwable reason, int connectionGeneration, int maxWireVersion);
48+
3549
/**
3650
* <p>Closes the server. Instances that have been closed will no longer be available for use.</p>
3751
*

driver-core/src/main/com/mongodb/internal/connection/ConnectionPool.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,6 @@ interface ConnectionPool extends Closeable {
3535
void invalidate();
3636

3737
void close();
38+
39+
int getGeneration();
3840
}

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 18 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,8 @@
1616

1717
package com.mongodb.internal.connection;
1818

19-
import com.mongodb.MongoException;
2019
import com.mongodb.MongoInterruptedException;
21-
import com.mongodb.MongoSocketException;
22-
import com.mongodb.MongoSocketReadTimeoutException;
2320
import com.mongodb.MongoTimeoutException;
24-
import com.mongodb.event.ConnectionCreatedEvent;
25-
import com.mongodb.event.ConnectionPoolCreatedEvent;
26-
import com.mongodb.event.ConnectionReadyEvent;
27-
import com.mongodb.internal.async.SingleResultCallback;
2821
import com.mongodb.connection.ConnectionDescription;
2922
import com.mongodb.connection.ConnectionId;
3023
import com.mongodb.connection.ConnectionPoolSettings;
@@ -37,12 +30,16 @@
3730
import com.mongodb.event.ConnectionCheckedInEvent;
3831
import com.mongodb.event.ConnectionCheckedOutEvent;
3932
import com.mongodb.event.ConnectionClosedEvent;
33+
import com.mongodb.event.ConnectionCreatedEvent;
4034
import com.mongodb.event.ConnectionPoolClearedEvent;
4135
import com.mongodb.event.ConnectionPoolClosedEvent;
36+
import com.mongodb.event.ConnectionPoolCreatedEvent;
4237
import com.mongodb.event.ConnectionPoolListener;
38+
import com.mongodb.event.ConnectionReadyEvent;
39+
import com.mongodb.internal.async.SingleResultCallback;
4340
import com.mongodb.internal.connection.ConcurrentPool.Prune;
44-
import com.mongodb.internal.thread.DaemonThreadFactory;
4541
import com.mongodb.internal.session.SessionContext;
42+
import com.mongodb.internal.thread.DaemonThreadFactory;
4643
import org.bson.ByteBuf;
4744
import org.bson.codecs.Decoder;
4845

@@ -263,6 +260,11 @@ public void close() {
263260
}
264261
}
265262

263+
@Override
264+
public int getGeneration() {
265+
return generation.get();
266+
}
267+
266268
/**
267269
* Synchronously prune idle connections and ensure the minimum pool size.
268270
*/
@@ -400,23 +402,6 @@ private com.mongodb.event.ConnectionRemovedEvent.Reason getReasonForRemoved(fina
400402
return removedReason;
401403
}
402404

403-
/**
404-
* If there was a socket exception that wasn't some form of interrupted read, increment the generation count so that any connections
405-
* created prior will be discarded.
406-
*
407-
* @param connection the connection that generated the exception
408-
* @param t the exception
409-
*/
410-
private void incrementGenerationOnSocketException(final InternalConnection connection, final Throwable t) {
411-
if (t instanceof MongoSocketException && !(t instanceof MongoSocketReadTimeoutException)) {
412-
if (LOGGER.isWarnEnabled()) {
413-
LOGGER.warn(format("Got socket exception on connection [%s] to %s. All connections to %s will be closed.",
414-
getId(connection), serverId.getAddress(), serverId.getAddress()));
415-
}
416-
invalidate();
417-
}
418-
}
419-
420405
private ConnectionId getId(final InternalConnection internalConnection) {
421406
return internalConnection.getDescription().getConnectionId();
422407
}
@@ -429,6 +414,11 @@ private class PooledConnection implements InternalConnection {
429414
this.wrapped = notNull("wrapped", wrapped);
430415
}
431416

417+
@Override
418+
public int getGeneration() {
419+
return wrapped.getGeneration();
420+
}
421+
432422
@Override
433423
public void open() {
434424
isTrue("open", !isClosed.get());
@@ -485,23 +475,13 @@ public ByteBuf getBuffer(final int capacity) {
485475
@Override
486476
public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId) {
487477
isTrue("open", !isClosed.get());
488-
try {
489-
wrapped.sendMessage(byteBuffers, lastRequestId);
490-
} catch (MongoException e) {
491-
incrementGenerationOnSocketException(this, e);
492-
throw e;
493-
}
478+
wrapped.sendMessage(byteBuffers, lastRequestId);
494479
}
495480

496481
@Override
497482
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final SessionContext sessionContext) {
498483
isTrue("open", !isClosed.get());
499-
try {
500-
return wrapped.sendAndReceive(message, decoder, sessionContext);
501-
} catch (MongoException e) {
502-
incrementGenerationOnSocketException(this, e);
503-
throw e;
504-
}
484+
return wrapped.sendAndReceive(message, decoder, sessionContext);
505485
}
506486

507487
@Override
@@ -511,9 +491,6 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
511491
wrapped.sendAndReceiveAsync(message, decoder, sessionContext, new SingleResultCallback<T>() {
512492
@Override
513493
public void onResult(final T result, final Throwable t) {
514-
if (t != null) {
515-
incrementGenerationOnSocketException(PooledConnection.this, t);
516-
}
517494
callback.onResult(result, t);
518495
}
519496
});
@@ -522,12 +499,7 @@ public void onResult(final T result, final Throwable t) {
522499
@Override
523500
public ResponseBuffers receiveMessage(final int responseTo) {
524501
isTrue("open", !isClosed.get());
525-
try {
526-
return wrapped.receiveMessage(responseTo);
527-
} catch (MongoException e) {
528-
incrementGenerationOnSocketException(this, e);
529-
throw e;
530-
}
502+
return wrapped.receiveMessage(responseTo);
531503
}
532504

533505
@Override
@@ -536,9 +508,6 @@ public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequ
536508
wrapped.sendMessageAsync(byteBuffers, lastRequestId, new SingleResultCallback<Void>() {
537509
@Override
538510
public void onResult(final Void result, final Throwable t) {
539-
if (t != null) {
540-
incrementGenerationOnSocketException(PooledConnection.this, t);
541-
}
542511
callback.onResult(null, t);
543512
}
544513
});
@@ -550,9 +519,6 @@ public void receiveMessageAsync(final int responseTo, final SingleResultCallback
550519
wrapped.receiveMessageAsync(responseTo, new SingleResultCallback<ResponseBuffers>() {
551520
@Override
552521
public void onResult(final ResponseBuffers result, final Throwable t) {
553-
if (t != null) {
554-
incrementGenerationOnSocketException(PooledConnection.this, t);
555-
}
556522
callback.onResult(result, t);
557523
}
558524
});

driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,19 @@
2323
import com.mongodb.MongoSecurityException;
2424
import com.mongodb.MongoSocketException;
2525
import com.mongodb.MongoSocketReadTimeoutException;
26-
import com.mongodb.internal.async.SingleResultCallback;
2726
import com.mongodb.connection.ClusterConnectionMode;
2827
import com.mongodb.connection.ServerDescription;
2928
import com.mongodb.connection.ServerId;
3029
import com.mongodb.connection.ServerType;
30+
import com.mongodb.connection.TopologyVersion;
3131
import com.mongodb.diagnostics.logging.Logger;
3232
import com.mongodb.diagnostics.logging.Loggers;
3333
import com.mongodb.event.CommandListener;
3434
import com.mongodb.event.ServerClosedEvent;
3535
import com.mongodb.event.ServerDescriptionChangedEvent;
3636
import com.mongodb.event.ServerListener;
3737
import com.mongodb.event.ServerOpeningEvent;
38+
import com.mongodb.internal.async.SingleResultCallback;
3839
import com.mongodb.internal.session.SessionContext;
3940

4041
import java.util.List;
@@ -92,7 +93,7 @@ public Connection getConnection() {
9293
connectionPool.invalidate();
9394
throw e;
9495
} catch (MongoSocketException e) {
95-
invalidate();
96+
invalidate(connectionPool.getGeneration());
9697
throw e;
9798
}
9899
}
@@ -106,7 +107,7 @@ public void onResult(final InternalConnection result, final Throwable t) {
106107
if (t instanceof MongoSecurityException) {
107108
connectionPool.invalidate();
108109
} else if (t instanceof MongoSocketException) {
109-
invalidate();
110+
invalidate(connectionPool.getGeneration());
110111
}
111112
if (t != null) {
112113
callback.onResult(null, t);
@@ -126,23 +127,43 @@ public ServerDescription getDescription() {
126127

127128
@Override
128129
public void invalidate() {
130+
invalidate(connectionPool.getGeneration());
131+
}
132+
133+
@Override
134+
public void invalidate(final int connectionGeneration) {
129135
if (!isClosed()) {
136+
if (connectionGeneration < connectionPool.getGeneration()) {
137+
return;
138+
}
130139
serverStateListener.stateChanged(new ChangeEvent<ServerDescription>(description, ServerDescription.builder()
131140
.state(CONNECTING)
132141
.address(serverId.getAddress())
133142
.build()));
134143
connectionPool.invalidate();
144+
// TODO: in streaming protocol, we want to close the current connection and start over
135145
connect();
136146
}
137147
}
138148

139149
@Override
140-
public void invalidate(final Throwable t) {
150+
public void invalidate(final Throwable reason) {
151+
invalidate(reason, connectionPool.getGeneration(), description.getMaxWireVersion());
152+
}
153+
154+
@Override
155+
public void invalidate(final Throwable t, final int connectionGeneration, final int maxWireVersion) {
141156
if (!isClosed()) {
157+
if (connectionGeneration < connectionPool.getGeneration()) {
158+
return;
159+
}
142160
if ((t instanceof MongoSocketException && !(t instanceof MongoSocketReadTimeoutException))) {
143161
invalidate();
144162
} else if (t instanceof MongoNotPrimaryException || t instanceof MongoNodeIsRecoveringException) {
145-
if (description.getMaxWireVersion() < FOUR_DOT_TWO_WIRE_VERSION) {
163+
if (isStale(((MongoCommandException) t))) {
164+
return;
165+
}
166+
if (maxWireVersion < FOUR_DOT_TWO_WIRE_VERSION) {
146167
invalidate();
147168
} else if (SHUTDOWN_CODES.contains(((MongoCommandException) t).getErrorCode())) {
148169
invalidate();
@@ -156,9 +177,33 @@ public void invalidate(final Throwable t) {
156177
}
157178
}
158179

159-
public void invalidate(final Throwable t, final SessionContext sessionContext) {
180+
private boolean isStale(final MongoCommandException t) {
181+
if (!t.getResponse().containsKey("topologyVersion")) {
182+
return false;
183+
}
184+
return isStale(description.getTopologyVersion(), new TopologyVersion(t.getResponse().getDocument("topologyVersion")));
185+
}
186+
187+
private boolean isStale(final TopologyVersion currentTopologyVersion, final TopologyVersion candidateTopologyVersion) {
188+
if (candidateTopologyVersion == null || currentTopologyVersion == null) {
189+
return false;
190+
}
191+
192+
if (!candidateTopologyVersion.getProcessId().equals(currentTopologyVersion.getProcessId())) {
193+
return false;
194+
}
195+
196+
if (candidateTopologyVersion.getCounter() <= currentTopologyVersion.getCounter()) {
197+
return true;
198+
}
199+
200+
return false;
201+
}
202+
203+
public void invalidate(final Throwable t, final int connectionGeneration, final int maxWireVersion,
204+
final SessionContext sessionContext) {
160205
notNull("sessionContext", sessionContext);
161-
invalidate(t);
206+
invalidate(t, connectionGeneration, maxWireVersion);
162207
if (t instanceof MongoSocketException && sessionContext.hasSession()) {
163208
sessionContext.markSessionDirty();
164209
}
@@ -195,7 +240,7 @@ public <T> T execute(final LegacyProtocol<T> protocol, final InternalConnection
195240
protocol.setCommandListener(commandListener);
196241
return protocol.execute(connection);
197242
} catch (MongoException e) {
198-
invalidate(e);
243+
invalidate(e, connection.getGeneration(), connection.getDescription().getMaxWireVersion());
199244
throw e;
200245
}
201246
}
@@ -208,7 +253,7 @@ public <T> void executeAsync(final LegacyProtocol<T> protocol, final InternalCon
208253
@Override
209254
public void onResult(final T result, final Throwable t) {
210255
if (t != null) {
211-
invalidate(t);
256+
invalidate(t, connection.getGeneration(), connection.getDescription().getMaxWireVersion());
212257
}
213258
callback.onResult(result, t);
214259
}
@@ -226,7 +271,7 @@ public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection
226271
invalidate();
227272
return (T) e.getResponse();
228273
} catch (MongoException e) {
229-
invalidate(e, sessionContext);
274+
invalidate(e, connection.getGeneration(), connection.getDescription().getMaxWireVersion(), sessionContext);
230275
throw e;
231276
}
232277
}
@@ -244,7 +289,7 @@ public void onResult(final T result, final Throwable t) {
244289
invalidate();
245290
callback.onResult((T) ((MongoWriteConcernWithResponseException) t).getResponse(), null);
246291
} else {
247-
invalidate(t, sessionContext);
292+
invalidate(t, connection.getGeneration(), connection.getDescription().getMaxWireVersion(), sessionContext);
248293
callback.onResult(null, t);
249294
}
250295
} else {
@@ -259,8 +304,28 @@ private final class DefaultServerStateListener implements ChangeListener<ServerD
259304
@Override
260305
public void stateChanged(final ChangeEvent<ServerDescription> event) {
261306
ServerDescription oldDescription = description;
262-
description = event.getNewValue();
263-
serverListener.serverDescriptionChanged(new ServerDescriptionChangedEvent(serverId, description, oldDescription));
307+
if (shouldReplace(oldDescription, event.getNewValue())) {
308+
description = event.getNewValue();
309+
serverListener.serverDescriptionChanged(new ServerDescriptionChangedEvent(serverId, description, oldDescription));
310+
}
311+
}
312+
313+
private boolean shouldReplace(final ServerDescription oldDescription, final ServerDescription newDescription) {
314+
TopologyVersion oldTopologyVersion = oldDescription.getTopologyVersion();
315+
TopologyVersion newTopologyVersion = newDescription.getTopologyVersion();
316+
if (newTopologyVersion == null || oldTopologyVersion == null) {
317+
return true;
318+
}
319+
320+
if (!newTopologyVersion.getProcessId().equals(oldTopologyVersion.getProcessId())) {
321+
return true;
322+
}
323+
324+
if (newTopologyVersion.getCounter() >= oldTopologyVersion.getCounter()) {
325+
return true;
326+
}
327+
328+
return false;
264329
}
265330
}
266331
}

driver-core/src/main/com/mongodb/internal/connection/DefaultServerConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.ReadPreference;
2121
import com.mongodb.WriteConcernResult;
22-
import com.mongodb.internal.async.SingleResultCallback;
2322
import com.mongodb.connection.ClusterConnectionMode;
2423
import com.mongodb.connection.ConnectionDescription;
2524
import com.mongodb.diagnostics.logging.Logger;
2625
import com.mongodb.diagnostics.logging.Loggers;
26+
import com.mongodb.internal.async.SingleResultCallback;
2727
import com.mongodb.internal.bulk.DeleteRequest;
2828
import com.mongodb.internal.bulk.InsertRequest;
2929
import com.mongodb.internal.bulk.UpdateRequest;

driver-core/src/main/com/mongodb/internal/connection/DescriptionHelper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ private static Integer getSetVersion(final BsonDocument isMasterResult) {
120120
}
121121

122122
private static TopologyVersion getTopologyVersion(final BsonDocument isMasterResult) {
123-
return isMasterResult.containsKey("topologyVersion") ? new TopologyVersion(isMasterResult.getDocument("topologyVersion")) : null;
123+
return isMasterResult.containsKey("topologyVersion") && isMasterResult.get("topologyVersion").isDocument()
124+
? new TopologyVersion(isMasterResult.getDocument("topologyVersion")) : null;
124125
}
125126

126127
private static int getMaxMessageSizeBytes(final BsonDocument isMasterResult) {

0 commit comments

Comments
 (0)