Skip to content

Commit

Permalink
Allow configuring the monitoring protocol to use; use the polling pro…
Browse files Browse the repository at this point in the history
…tocol in a FaaS environment by default (mongodb#1313)

JAVA-4936

---------

Co-authored-by: Viacheslav Babanin <[email protected]>
Co-authored-by: Jeff Yemin <[email protected]>
  • Loading branch information
3 people authored Feb 29, 2024
1 parent d85982d commit 7295322
Show file tree
Hide file tree
Showing 32 changed files with 1,611 additions and 80 deletions.
34 changes: 32 additions & 2 deletions driver-core/src/main/com/mongodb/ConnectionString.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.connection.ServerMonitoringMode;
import com.mongodb.connection.ServerSettings;
import com.mongodb.connection.SocketSettings;
import com.mongodb.event.ConnectionCheckOutStartedEvent;
import com.mongodb.event.ConnectionCheckedInEvent;
import com.mongodb.event.ConnectionCheckedOutEvent;
import com.mongodb.event.ConnectionCreatedEvent;
import com.mongodb.event.ConnectionReadyEvent;
import com.mongodb.internal.connection.ServerMonitoringModeUtil;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.dns.DefaultDnsResolver;
Expand Down Expand Up @@ -111,6 +114,13 @@
* <ul>
* <li>{@code heartbeatFrequencyMS=ms}: The frequency that the driver will attempt to determine the current state of each server in the
* cluster.</li>
* <li>{@code serverMonitoringMode=enum}: The server monitoring mode, which defines the monitoring protocol to use. Enumerated values:
* <ul>
* <li>{@code stream};</li>
* <li>{@code poll};</li>
* <li>{@code auto} - the default.</li>
* </ul>
* </li>
* </ul>
* <p>Replica set configuration:</p>
* <ul>
Expand Down Expand Up @@ -307,6 +317,7 @@ public class ConnectionString {
private Integer serverSelectionTimeout;
private Integer localThreshold;
private Integer heartbeatFrequency;
private ServerMonitoringMode serverMonitoringMode;
private String applicationName;
private List<MongoCompressor> compressorList;
private UuidRepresentation uuidRepresentation;
Expand Down Expand Up @@ -529,6 +540,7 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
GENERAL_OPTIONS_KEYS.add("serverselectiontimeoutms");
GENERAL_OPTIONS_KEYS.add("localthresholdms");
GENERAL_OPTIONS_KEYS.add("heartbeatfrequencyms");
GENERAL_OPTIONS_KEYS.add("servermonitoringmode");
GENERAL_OPTIONS_KEYS.add("retrywrites");
GENERAL_OPTIONS_KEYS.add("retryreads");

Expand Down Expand Up @@ -665,6 +677,9 @@ private void translateOptions(final Map<String, List<String>> optionsMap) {
case "heartbeatfrequencyms":
heartbeatFrequency = parseInteger(value, "heartbeatfrequencyms");
break;
case "servermonitoringmode":
serverMonitoringMode = ServerMonitoringModeUtil.fromString(value);
break;
case "appname":
applicationName = value;
break;
Expand Down Expand Up @@ -1623,6 +1638,20 @@ public Integer getHeartbeatFrequency() {
return heartbeatFrequency;
}

/**
* The server monitoring mode, which defines the monitoring protocol to use.
* <p>
* Default is {@link ServerMonitoringMode#AUTO}.</p>
*
* @return The {@link ServerMonitoringMode}, or {@code null} if unset and the default is to be used.
* @see ServerSettings#getServerMonitoringMode()
* @since 5.1
*/
@Nullable
public ServerMonitoringMode getServerMonitoringMode() {
return serverMonitoringMode;
}

/**
* Gets the logical name of the application. The application name may be used by the client to identify the application to the server,
* for use in server logs, slow query logs, and profile collection.
Expand Down Expand Up @@ -1704,6 +1733,7 @@ public boolean equals(final Object o) {
&& Objects.equals(serverSelectionTimeout, that.serverSelectionTimeout)
&& Objects.equals(localThreshold, that.localThreshold)
&& Objects.equals(heartbeatFrequency, that.heartbeatFrequency)
&& Objects.equals(serverMonitoringMode, that.serverMonitoringMode)
&& Objects.equals(applicationName, that.applicationName)
&& Objects.equals(compressorList, that.compressorList)
&& Objects.equals(uuidRepresentation, that.uuidRepresentation)
Expand All @@ -1717,7 +1747,7 @@ public int hashCode() {
writeConcern, retryWrites, retryReads, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime,
maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, socketTimeout, sslEnabled,
sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency,
applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost, proxyPort,
proxyUsername, proxyPassword);
serverMonitoringMode, applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost,
proxyPort, proxyUsername, proxyPassword);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.connection;

import com.mongodb.event.ClusterListener;
import com.mongodb.event.ServerHeartbeatFailedEvent;
import com.mongodb.event.ServerHeartbeatStartedEvent;
import com.mongodb.event.ServerHeartbeatSucceededEvent;
import com.mongodb.event.ServerListener;

/**
* The server monitoring mode, which defines the monitoring protocol to use.
*
* @see <a href="https://www.mongodb.com/docs/drivers/java/sync/current/fundamentals/monitoring/#server-discovery-and-monitoring-events">
* server discovery and monitoring (SDAM)</a>
* @since 5.1
*/
public enum ServerMonitoringMode {
/**
* Use the streaming protocol when the server supports it or fall back to the polling protocol otherwise.
* When the streaming protocol comes into play,
* {@link ServerHeartbeatStartedEvent#isAwaited()}, {@link ServerHeartbeatSucceededEvent#isAwaited()},
* {@link ServerHeartbeatFailedEvent#isAwaited()} return {@code true} for new events.
* <p>
* The streaming protocol uses long polling for server monitoring, and is intended to reduce the delay between a server change
* that warrants a new event for {@link ServerListener}/{@link ClusterListener},
* and that event being emitted, as well as the related housekeeping work being done.</p>
*/
STREAM(),
/**
* Use the polling protocol.
*/
POLL(),
/**
* Behave the same as {@link #POLL} if running in a FaaS environment, otherwise behave as {@link #STREAM}.
* This is the default.
*/
AUTO()
}
72 changes: 49 additions & 23 deletions driver-core/src/main/com/mongodb/connection/ServerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static com.mongodb.assertions.Assertions.notNull;
Expand All @@ -38,6 +39,7 @@
public class ServerSettings {
private final long heartbeatFrequencyMS;
private final long minHeartbeatFrequencyMS;
private final ServerMonitoringMode serverMonitoringMode;
private final List<ServerListener> serverListeners;
private final List<ServerMonitorListener> serverMonitorListeners;

Expand Down Expand Up @@ -68,6 +70,7 @@ public static Builder builder(final ServerSettings serverSettings) {
public static final class Builder {
private long heartbeatFrequencyMS = 10000;
private long minHeartbeatFrequencyMS = 500;
private ServerMonitoringMode serverMonitoringMode = ServerMonitoringMode.AUTO;
private List<ServerListener> serverListeners = new ArrayList<>();
private List<ServerMonitorListener> serverMonitorListeners = new ArrayList<>();

Expand All @@ -87,6 +90,7 @@ public Builder applySettings(final ServerSettings serverSettings) {
notNull("serverSettings", serverSettings);
heartbeatFrequencyMS = serverSettings.heartbeatFrequencyMS;
minHeartbeatFrequencyMS = serverSettings.minHeartbeatFrequencyMS;
serverMonitoringMode = serverSettings.serverMonitoringMode;
serverListeners = new ArrayList<>(serverSettings.serverListeners);
serverMonitorListeners = new ArrayList<>(serverSettings.serverMonitorListeners);
return this;
Expand Down Expand Up @@ -117,6 +121,20 @@ public Builder minHeartbeatFrequency(final long minHeartbeatFrequency, final Tim
return this;
}

/**
* Sets the server monitoring mode, which defines the monitoring protocol to use.
* The default value is {@link ServerMonitoringMode#AUTO}.
*
* @param serverMonitoringMode The {@link ServerMonitoringMode}.
* @return {@code this}.
* @see #getServerMonitoringMode()
* @since 5.1
*/
public Builder serverMonitoringMode(final ServerMonitoringMode serverMonitoringMode) {
this.serverMonitoringMode = notNull("serverMonitoringMode", serverMonitoringMode);
return this;
}

/**
* Add a server listener.
*
Expand Down Expand Up @@ -181,6 +199,10 @@ public Builder applyConnectionString(final ConnectionString connectionString) {
if (heartbeatFrequency != null) {
heartbeatFrequencyMS = heartbeatFrequency;
}
ServerMonitoringMode serverMonitoringMode = connectionString.getServerMonitoringMode();
if (serverMonitoringMode != null) {
this.serverMonitoringMode = serverMonitoringMode;
}
return this;
}

Expand Down Expand Up @@ -215,6 +237,19 @@ public long getMinHeartbeatFrequency(final TimeUnit timeUnit) {
return timeUnit.convert(minHeartbeatFrequencyMS, TimeUnit.MILLISECONDS);
}

/**
* Gets the server monitoring mode, which defines the monitoring protocol to use.
* The default value is {@link ServerMonitoringMode#AUTO}.
*
* @return The {@link ServerMonitoringMode}.
* @see Builder#serverMonitoringMode(ServerMonitoringMode)
* @see ConnectionString#getServerMonitoringMode()
* @since 5.1
*/
public ServerMonitoringMode getServerMonitoringMode() {
return serverMonitoringMode;
}

/**
* Gets the server listeners. The default value is an empty list.
*
Expand Down Expand Up @@ -243,40 +278,30 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

ServerSettings that = (ServerSettings) o;

if (heartbeatFrequencyMS != that.heartbeatFrequencyMS) {
return false;
}
if (minHeartbeatFrequencyMS != that.minHeartbeatFrequencyMS) {
return false;
}

if (!serverListeners.equals(that.serverListeners)) {
return false;
}
if (!serverMonitorListeners.equals(that.serverMonitorListeners)) {
return false;
}

return true;
final ServerSettings that = (ServerSettings) o;
return heartbeatFrequencyMS == that.heartbeatFrequencyMS
&& minHeartbeatFrequencyMS == that.minHeartbeatFrequencyMS
&& serverMonitoringMode == that.serverMonitoringMode
&& Objects.equals(serverListeners, that.serverListeners)
&& Objects.equals(serverMonitorListeners, that.serverMonitorListeners);
}

@Override
public int hashCode() {
int result = (int) (heartbeatFrequencyMS ^ (heartbeatFrequencyMS >>> 32));
result = 31 * result + (int) (minHeartbeatFrequencyMS ^ (minHeartbeatFrequencyMS >>> 32));
result = 31 * result + serverListeners.hashCode();
result = 31 * result + serverMonitorListeners.hashCode();
return result;
return Objects.hash(
heartbeatFrequencyMS,
minHeartbeatFrequencyMS,
serverMonitoringMode,
serverListeners,
serverMonitorListeners);
}

@Override
public String toString() {
return "ServerSettings{"
+ "heartbeatFrequencyMS=" + heartbeatFrequencyMS
+ ", minHeartbeatFrequencyMS=" + minHeartbeatFrequencyMS
+ ", serverMonitoringMode=" + serverMonitoringMode
+ ", serverListeners='" + serverListeners + '\''
+ ", serverMonitorListeners='" + serverMonitorListeners + '\''
+ '}';
Expand All @@ -285,6 +310,7 @@ public String toString() {
ServerSettings(final Builder builder) {
heartbeatFrequencyMS = builder.heartbeatFrequencyMS;
minHeartbeatFrequencyMS = builder.minHeartbeatFrequencyMS;
serverMonitoringMode = builder.serverMonitoringMode;
serverListeners = unmodifiableList(builder.serverListeners);
serverMonitorListeners = unmodifiableList(builder.serverMonitorListeners);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.event;

import com.mongodb.connection.ConnectionId;
import com.mongodb.connection.ServerMonitoringMode;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -77,6 +78,7 @@ public long getElapsedTime(final TimeUnit timeUnit) {
* to the server and the time that the server waited before sending a response.
*
* @return whether the response was awaited
* @see ServerMonitoringMode#STREAM
* @since 4.1
* @mongodb.server.release 4.4
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.event;

import com.mongodb.connection.ConnectionId;
import com.mongodb.connection.ServerMonitoringMode;

import static com.mongodb.assertions.Assertions.notNull;

Expand All @@ -27,14 +28,30 @@
*/
public final class ServerHeartbeatStartedEvent {
private final ConnectionId connectionId;
private final boolean awaited;

/**
* Construct an instance.
*
* @param connectionId the non-null connnectionId
* @param awaited {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
* @since 5.1
*/
public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
public ServerHeartbeatStartedEvent(final ConnectionId connectionId, final boolean awaited) {
this.connectionId = notNull("connectionId", connectionId);
this.awaited = awaited;
}

/**
* Construct an instance.
*
* @param connectionId the non-null connnectionId
* @deprecated Prefer {@link #ServerHeartbeatStartedEvent(ConnectionId, boolean)}.
* If this constructor is used then {@link #isAwaited()} is {@code false}.
*/
@Deprecated
public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
this(connectionId, false);
}

/**
Expand All @@ -46,12 +63,24 @@ public ConnectionId getConnectionId() {
return connectionId;
}

/**
* Gets whether the heartbeat is for an awaitable `hello` / legacy hello.
*
* @return {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
* @see ServerMonitoringMode#STREAM
* @since 5.1
*/
public boolean isAwaited() {
return awaited;
}

@Override
public String toString() {
return "ServerHeartbeatStartedEvent{"
+ "connectionId=" + connectionId
+ ", server=" + connectionId.getServerId().getAddress()
+ ", clusterId=" + connectionId.getServerId().getClusterId()
+ ", awaited=" + awaited
+ "} " + super.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.event;

import com.mongodb.connection.ConnectionId;
import com.mongodb.connection.ServerMonitoringMode;
import org.bson.BsonDocument;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -87,6 +88,7 @@ public long getElapsedTime(final TimeUnit timeUnit) {
* to the server and the time that the server waited before sending a response.
*
* @return whether the response was awaited
* @see ServerMonitoringMode#STREAM
* @since 4.1
* @mongodb.server.release 4.4
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public Cluster createCluster(final ClusterSettings originalClusterSettings, fina
connectionPoolSettings, internalConnectionPoolSettings,
streamFactory, heartbeatStreamFactory, credential, loggerSettings, commandListener, applicationName,
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList,
serverApi);
serverApi, FaasEnvironment.getFaasEnvironment() != FaasEnvironment.UNKNOWN);

if (clusterSettings.getMode() == ClusterConnectionMode.SINGLE) {
return new SingleServerCluster(clusterId, clusterSettings, serverFactory);
Expand Down
Loading

0 comments on commit 7295322

Please sign in to comment.