Skip to content

Commit ad9299d

Browse files
changes!
1 parent 404e847 commit ad9299d

19 files changed

+727
-381
lines changed

src/cmap/connect.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ import {
3535
/** @public */
3636
export type Stream = Socket | TLSSocket;
3737

38+
function applyBackpressureLabels(error: MongoError) {
39+
error.addErrorLabel(MongoErrorLabel.SystemOverloadedError);
40+
error.addErrorLabel(MongoErrorLabel.RetryableError);
41+
}
42+
3843
export async function connect(options: ConnectionOptions): Promise<Connection> {
3944
let connection: Connection | null = null;
4045
try {
@@ -103,6 +108,8 @@ export async function performInitialHandshake(
103108
const authContext = new AuthContext(conn, credentials, options);
104109
conn.authContext = authContext;
105110

111+
// If we encounter an error preparing the handshake document, do NOT apply backpressure labels. Errors
112+
// encountered building the handshake document are all client-side, and do not indicate an overloaded server.
106113
const handshakeDoc = await prepareHandshakeDocument(authContext);
107114

108115
// @ts-expect-error: TODO(NODE-5141): The options need to be filtered properly, Connection options differ from Command options
@@ -163,12 +170,15 @@ export async function performInitialHandshake(
163170
try {
164171
await provider.auth(authContext);
165172
} catch (error) {
173+
// NOTE: If we encounter an error authenticating a connection, do NOT apply backpressure labels.
174+
166175
if (error instanceof MongoError) {
167176
error.addErrorLabel(MongoErrorLabel.HandshakeError);
168177
if (needsRetryableWriteLabel(error, response.maxWireVersion, conn.description.type)) {
169178
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
170179
}
171180
}
181+
172182
throw error;
173183
}
174184
}
@@ -189,6 +199,9 @@ export async function performInitialHandshake(
189199
if (error instanceof MongoError) {
190200
error.addErrorLabel(MongoErrorLabel.HandshakeError);
191201
}
202+
// If we encounter an error executing the initial handshake, apply backpressure labels.
203+
applyBackpressureLabels(error);
204+
192205
throw error;
193206
}
194207
}
@@ -424,6 +437,8 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
424437
socket = await connectedSocket;
425438
return socket;
426439
} catch (error) {
440+
// If we encounter a SystemOverloaded error while establishing a socket, apply the backpressure labels to it.
441+
applyBackpressureLabels(error);
427442
socket.destroy();
428443
throw error;
429444
} finally {

src/error.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ export const MongoErrorLabel = Object.freeze({
9999
ResetPool: 'ResetPool',
100100
PoolRequestedRetry: 'PoolRequestedRetry',
101101
InterruptInUseConnections: 'InterruptInUseConnections',
102-
NoWritesPerformed: 'NoWritesPerformed'
102+
NoWritesPerformed: 'NoWritesPerformed',
103+
RetryableError: 'RetryableError',
104+
SystemOverloadedError: 'SystemOverloadedError'
103105
} as const);
104106

105107
/** @public */

src/sdam/server.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
400400
error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError);
401401
const isNetworkTimeoutBeforeHandshakeError =
402402
error instanceof MongoNetworkError && error.beforeHandshake;
403-
const isAuthHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
403+
const isAuthOrEstablishmentHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
404+
const isSystemOverloadError = error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError);
404405

405406
// TODO: considering parse errors as SDAM unrecoverable errors seem
406407
// questionable. What if the parse error only comes from an application connection,
@@ -430,8 +431,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
430431
} else if (
431432
isNetworkNonTimeoutError ||
432433
isNetworkTimeoutBeforeHandshakeError ||
433-
isAuthHandshakeError
434+
isAuthOrEstablishmentHandshakeError
434435
) {
436+
// Do NOT clear the pool if we encounter a system overloaded error.
437+
if (isSystemOverloadError) {
438+
return;
439+
}
435440
// from the SDAM spec: The driver MUST synchronize clearing the pool with updating the topology.
436441
// In load balanced mode: there is no monitoring, so there is no topology to update. We simply clear the pool.
437442
// For other topologies: the `ResetPool` label instructs the topology to clear the server's pool in `updateServer()`.

test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
import { expect } from 'chai';
22
import { once } from 'events';
33

4-
import { type MongoClient } from '../../../src';
4+
import {
5+
type ConnectionCheckOutFailedEvent,
6+
type ConnectionPoolClearedEvent,
7+
type MongoClient
8+
} from '../../../src';
59
import {
610
CONNECTION_POOL_CLEARED,
711
CONNECTION_POOL_READY,
812
SERVER_HEARTBEAT_FAILED,
913
SERVER_HEARTBEAT_SUCCEEDED
1014
} from '../../../src/constants';
15+
import { sleep } from '../../tools/utils';
1116

1217
describe('Server Discovery and Monitoring Prose Tests', function () {
1318
context('Monitors sleep at least minHeartbeatFrequencyMS between checks', function () {
@@ -187,4 +192,74 @@ describe('Server Discovery and Monitoring Prose Tests', function () {
187192
}
188193
});
189194
});
195+
196+
context('Connection Pool Backpressure', function () {
197+
let client: MongoClient;
198+
const checkoutFailedEvents: Array<ConnectionCheckOutFailedEvent> = [];
199+
const poolClearedEvents: Array<ConnectionPoolClearedEvent> = [];
200+
201+
beforeEach(async function () {
202+
client = this.configuration.newClient({}, { maxConnecting: 100 });
203+
204+
client.on('connectionCheckOutFailed', e => checkoutFailedEvents.push(e));
205+
client.on('connectionPoolCleared', e => poolClearedEvents.push(e));
206+
207+
await client.connect();
208+
209+
const admin = client.db('admin').admin();
210+
await admin.command({
211+
setParameter: 1,
212+
ingressConnectionEstablishmentRateLimiterEnabled: true
213+
});
214+
await admin.command({
215+
setParameter: 1,
216+
ingressConnectionEstablishmentRatePerSec: 20
217+
});
218+
await admin.command({
219+
setParameter: 1,
220+
ingressConnectionEstablishmentBurstCapacitySecs: 1
221+
});
222+
await admin.command({
223+
setParameter: 1,
224+
ingressConnectionEstablishmentMaxQueueDepth: 1
225+
});
226+
227+
await client.db('test').collection('test').insertOne({});
228+
});
229+
230+
afterEach(async function () {
231+
// give the time to recover from the connection storm before cleaning up.
232+
await sleep(1000);
233+
234+
const admin = client.db('admin').admin();
235+
await admin.command({
236+
setParameter: 1,
237+
ingressConnectionEstablishmentRateLimiterEnabled: false
238+
});
239+
240+
await client.close();
241+
});
242+
243+
it(
244+
'does not clear the pool when connections are closed due to connection storms',
245+
{
246+
requires: {
247+
mongodb: '>=7.0' // rate limiting added in 7.0
248+
}
249+
},
250+
async function () {
251+
await Promise.allSettled(
252+
Array.from({ length: 100 }).map(() =>
253+
client
254+
.db('test')
255+
.collection('test')
256+
.findOne({ $where: 'function() { sleep(2000); return true; }' })
257+
)
258+
);
259+
260+
expect(poolClearedEvents).to.be.empty;
261+
expect(checkoutFailedEvents.length).to.be.greaterThan(10);
262+
}
263+
);
264+
});
190265
});

test/spec/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"isMaster",
1818
"hello"
1919
],
20-
"closeConnection": true,
20+
"errorCode": 91,
2121
"appName": "poolCreateMinSizeErrorTest"
2222
}
2323
},

test/spec/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ failPoint:
1111
mode: { times: 50 }
1212
data:
1313
failCommands: ["isMaster","hello"]
14-
closeConnection: true
14+
errorCode: 91
1515
appName: "poolCreateMinSizeErrorTest"
1616
poolOptions:
1717
minPoolSize: 1

test/spec/load-balancers/sdam-error-handling.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"description": "state change errors are correctly handled",
3-
"schemaVersion": "1.3",
3+
"schemaVersion": "1.4",
44
"runOnRequirements": [
55
{
66
"topologies": [
@@ -263,7 +263,7 @@
263263
"description": "errors during the initial connection hello are ignored",
264264
"runOnRequirements": [
265265
{
266-
"minServerVersion": "4.9"
266+
"minServerVersion": "4.4.7"
267267
}
268268
],
269269
"operations": [
@@ -282,7 +282,7 @@
282282
"isMaster",
283283
"hello"
284284
],
285-
"closeConnection": true,
285+
"errorCode": 11600,
286286
"appName": "lbSDAMErrorTestClient"
287287
}
288288
}
@@ -297,7 +297,7 @@
297297
}
298298
},
299299
"expectError": {
300-
"isClientError": true
300+
"isError": true
301301
}
302302
}
303303
],

test/spec/load-balancers/sdam-error-handling.yml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
description: state change errors are correctly handled
22

3-
schemaVersion: '1.3'
3+
schemaVersion: '1.4'
44

55
runOnRequirements:
66
- topologies: [ load-balanced ]
@@ -141,9 +141,8 @@ tests:
141141
# to the same mongos on which the failpoint is set.
142142
- description: errors during the initial connection hello are ignored
143143
runOnRequirements:
144-
# Server version 4.9+ is needed to set a fail point on the initial
145-
# connection handshake with the appName filter due to SERVER-49336.
146-
- minServerVersion: '4.9'
144+
# Require SERVER-49336 for failCommand + appName on the initial handshake.
145+
- minServerVersion: '4.4.7'
147146
operations:
148147
- name: failPoint
149148
object: testRunner
@@ -154,14 +153,14 @@ tests:
154153
mode: { times: 1 }
155154
data:
156155
failCommands: [isMaster, hello]
157-
closeConnection: true
156+
errorCode: 11600
158157
appName: *singleClientAppName
159158
- name: insertOne
160159
object: *singleColl
161160
arguments:
162161
document: { x: 1 }
163162
expectError:
164-
isClientError: true
163+
isError: true
165164
expectEvents:
166165
- client: *singleClient
167166
eventType: cmap

0 commit comments

Comments
 (0)