Skip to content

Commit c7aa8fe

Browse files
authored
Support loadBalanced in handshake request and serviceId in reply for load balanced clusters (#743)
When connection mode is load balanced * Add loadBalanced:true in handshake request * Use serviceId in handshake reply JAVA-4218
1 parent 8ded1c5 commit c7aa8fe

File tree

4 files changed

+73
-10
lines changed

4 files changed

+73
-10
lines changed

driver-core/src/main/com/mongodb/connection/ConnectionDescription.java

+12-6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.ArrayList;
2626
import java.util.Collections;
2727
import java.util.List;
28+
import java.util.Objects;
2829

2930
import static com.mongodb.assertions.Assertions.notNull;
3031
import static com.mongodb.connection.ServerDescription.getDefaultMaxDocumentSize;
@@ -274,6 +275,9 @@ public boolean equals(final Object o) {
274275

275276
ConnectionDescription that = (ConnectionDescription) o;
276277

278+
if (maxWireVersion != that.maxWireVersion) {
279+
return false;
280+
}
277281
if (maxBatchCount != that.maxBatchCount) {
278282
return false;
279283
}
@@ -283,31 +287,32 @@ public boolean equals(final Object o) {
283287
if (maxMessageSize != that.maxMessageSize) {
284288
return false;
285289
}
286-
if (!connectionId.equals(that.connectionId)) {
290+
if (!Objects.equals(serviceId, that.serviceId)) {
287291
return false;
288292
}
289-
if (serverType != that.serverType) {
293+
if (!connectionId.equals(that.connectionId)) {
290294
return false;
291295
}
292-
if (maxWireVersion != that.maxWireVersion) {
296+
if (serverType != that.serverType) {
293297
return false;
294298
}
295299
if (!compressors.equals(that.compressors)) {
296300
return false;
297301
}
298-
299-
return true;
302+
return Objects.equals(saslSupportedMechanisms, that.saslSupportedMechanisms);
300303
}
301304

302305
@Override
303306
public int hashCode() {
304307
int result = connectionId.hashCode();
305-
result = 31 * result + maxBatchCount;
308+
result = 31 * result + maxWireVersion;
306309
result = 31 * result + serverType.hashCode();
307310
result = 31 * result + maxBatchCount;
308311
result = 31 * result + maxDocumentSize;
309312
result = 31 * result + maxMessageSize;
310313
result = 31 * result + compressors.hashCode();
314+
result = 31 * result + (serviceId != null ? serviceId.hashCode() : 0);
315+
result = 31 * result + (saslSupportedMechanisms != null ? saslSupportedMechanisms.hashCode() : 0);
311316
return result;
312317
}
313318

@@ -321,6 +326,7 @@ public String toString() {
321326
+ ", maxDocumentSize=" + maxDocumentSize
322327
+ ", maxMessageSize=" + maxMessageSize
323328
+ ", compressors=" + compressors
329+
+ ", serviceId=" + serviceId
324330
+ '}';
325331
}
326332
}

driver-core/src/main/com/mongodb/internal/connection/DescriptionHelper.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Map;
4343
import java.util.Set;
4444

45+
import static com.mongodb.assertions.Assertions.assertNotNull;
4546
import static com.mongodb.connection.ConnectionDescription.getDefaultMaxMessageSize;
4647
import static com.mongodb.connection.ConnectionDescription.getDefaultMaxWriteBatchSize;
4748
import static com.mongodb.connection.ServerConnectionState.CONNECTED;
@@ -77,11 +78,13 @@ static ConnectionDescription createConnectionDescription(final ClusterConnection
7778
connectionDescription = connectionDescription.withConnectionId(newConnectionId);
7879
}
7980
if (clusterConnectionMode == ClusterConnectionMode.LOAD_BALANCED) {
80-
if (manufactureServiceId) {
81+
ObjectId serviceId = getServiceId(isMasterResult);
82+
if (serviceId != null) {
83+
connectionDescription = connectionDescription.withServiceId(serviceId);
84+
} else if (manufactureServiceId) {
8185
TopologyVersion topologyVersion = getTopologyVersion(isMasterResult);
82-
if (topologyVersion != null) {
83-
connectionDescription = connectionDescription.withServiceId(topologyVersion.getProcessId());
84-
}
86+
assertNotNull(topologyVersion);
87+
connectionDescription = connectionDescription.withServiceId(topologyVersion.getProcessId());
8588
} else {
8689
throw new MongoClientException("Driver attempted to initialize in load balancing mode, but the server does not support "
8790
+ "this mode");
@@ -144,6 +147,11 @@ private static TopologyVersion getTopologyVersion(final BsonDocument isMasterRes
144147
? new TopologyVersion(isMasterResult.getDocument("topologyVersion")) : null;
145148
}
146149

150+
private static ObjectId getServiceId(final BsonDocument isMasterResult) {
151+
return isMasterResult.containsKey("serviceId") && isMasterResult.get("serviceId").isObjectId()
152+
? isMasterResult.getObjectId("serviceId").getValue() : null;
153+
}
154+
147155
private static int getMaxMessageSizeBytes(final BsonDocument isMasterResult) {
148156
return isMasterResult.getInt32("maxMessageSizeBytes", new BsonInt32(getDefaultMaxMessageSize())).getValue();
149157
}

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java

+3
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ private BsonDocument createIsMasterCommand(final Authenticator authenticator, fi
161161
if (clientMetadataDocument != null) {
162162
isMasterCommandDocument.append("client", clientMetadataDocument);
163163
}
164+
if (clusterConnectionMode == ClusterConnectionMode.LOAD_BALANCED) {
165+
isMasterCommandDocument.append("loadBalanced", BsonBoolean.TRUE);
166+
}
164167
if (!requestedCompressors.isEmpty()) {
165168
BsonArray compressors = new BsonArray(this.requestedCompressors.size());
166169
for (MongoCompressor cur : this.requestedCompressors) {

driver-core/src/test/unit/com/mongodb/internal/connection/DescriptionHelperSpecification.groovy

+46
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.connection
1818

19+
import com.mongodb.MongoClientException
1920
import com.mongodb.ServerAddress
2021
import com.mongodb.Tag
2122
import com.mongodb.TagSet
@@ -81,6 +82,51 @@ class DescriptionHelperSpecification extends Specification {
8182
.withConnectionId(connectionId.withServerValue(1004))
8283
}
8384

85+
86+
def 'connection description should reflect ismaster result from load balancer'() {
87+
given:
88+
def connectionId = new ConnectionId(new ServerId(new ClusterId(), serverAddress))
89+
ObjectId serviceId = new ObjectId()
90+
91+
expect:
92+
createConnectionDescription(ClusterConnectionMode.LOAD_BALANCED, connectionId,
93+
parse("""{
94+
ismaster : true,
95+
msg : "isdbgrid",
96+
maxBsonObjectSize : 16777216,
97+
maxMessageSizeBytes : 48000000,
98+
maxWriteBatchSize : 1000,
99+
localTime : ISODate("2015-03-04T23:55:18.505Z"),
100+
maxWireVersion : 13,
101+
minWireVersion : 0,
102+
connectionId : 1004,
103+
serviceId: {\$oid : "${serviceId.toHexString()}"},
104+
ok : 1
105+
}""")) ==
106+
new ConnectionDescription(connectionId, 13, ServerType.SHARD_ROUTER, 1000, 16777216, 48000000, [])
107+
.withConnectionId(connectionId.withServerValue(1004))
108+
.withServiceId(serviceId)
109+
110+
when:
111+
createConnectionDescription(ClusterConnectionMode.LOAD_BALANCED, connectionId,
112+
parse('''{
113+
ismaster : true,
114+
msg : "isdbgrid",
115+
maxBsonObjectSize : 16777216,
116+
maxMessageSizeBytes : 48000000,
117+
maxWriteBatchSize : 1000,
118+
localTime : ISODate("2015-03-04T23:55:18.505Z"),
119+
maxWireVersion : 13,
120+
minWireVersion : 0,
121+
connectionId : 1004,
122+
ok : 1
123+
}'''))
124+
125+
then:
126+
def e = thrown(MongoClientException)
127+
e.getMessage() == 'Driver attempted to initialize in load balancing mode, but the server does not support this mode'
128+
}
129+
84130
def 'connection description should reflect ismaster result with compressors'() {
85131
def connectionId = new ConnectionId(new ServerId(new ClusterId(), serverAddress))
86132
expect:

0 commit comments

Comments
 (0)