Skip to content

Commit 219b934

Browse files
RUST-1509 SDAM Logging (mongodb#918)
1 parent 85956f7 commit 219b934

File tree

86 files changed

+5108
-385
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+5108
-385
lines changed

src/cmap/conn.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ pub struct ConnectionInfo {
6565
#[derivative(Debug)]
6666
pub(crate) struct Connection {
6767
/// Driver-generated ID for the connection.
68-
pub(super) id: u32,
68+
pub(crate) id: u32,
69+
6970
/// Server-generated ID for the connection.
7071
pub(crate) server_id: Option<i64>,
7172

7273
pub(crate) address: ServerAddress,
74+
7375
pub(crate) generation: ConnectionGeneration,
7476

7577
/// The cached StreamDescription from the connection's handshake.
@@ -164,9 +166,8 @@ impl Connection {
164166

165167
/// Create a connection intended for monitoring purposes.
166168
/// TODO: RUST-1454 Rename this to just `new`, drop the pooling-specific data.
167-
pub(crate) fn new_monitoring(address: ServerAddress, stream: AsyncStream) -> Self {
168-
// Monitoring connections don't have IDs, so just use 0 as a placeholder here.
169-
Self::new(address, stream, 0, ConnectionGeneration::Monitoring)
169+
pub(crate) fn new_monitoring(address: ServerAddress, stream: AsyncStream, id: u32) -> Self {
170+
Self::new(address, stream, id, ConnectionGeneration::Monitoring)
170171
}
171172

172173
pub(crate) fn info(&self) -> ConnectionInfo {

src/cmap/establish.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,10 @@ impl ConnectionEstablisher {
146146
pub(crate) async fn establish_monitoring_connection(
147147
&self,
148148
address: ServerAddress,
149+
id: u32,
149150
) -> Result<(Connection, HelloReply)> {
150151
let stream = self.make_stream(address.clone()).await?;
151-
let mut connection = Connection::new_monitoring(address, stream);
152+
let mut connection = Connection::new_monitoring(address, stream, id);
152153

153154
let hello_reply = self.handshaker.handshake(&mut connection, None).await?;
154155

src/event/sdam.rs

+23
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,14 @@ pub struct ServerHeartbeatStartedEvent {
121121

122122
/// Determines if this heartbeat event is from an awaitable `hello`.
123123
pub awaited: bool,
124+
125+
/// The driver-generated ID for the connection used for the heartbeat.
126+
pub driver_connection_id: u32,
127+
128+
/// The server-generated ID for the connection used for the heartbeat. This value is only
129+
/// present on server versions 4.2+. If this event corresponds to the first heartbeat on a
130+
/// new monitoring connection, this value will not be present.
131+
pub server_connection_id: Option<i64>,
124132
}
125133

126134
/// Published when a server monitor's `hello` or legacy hello command succeeds.
@@ -139,6 +147,13 @@ pub struct ServerHeartbeatSucceededEvent {
139147

140148
/// Determines if this heartbeat event is from an awaitable `hello`.
141149
pub awaited: bool,
150+
151+
/// The driver-generated ID for the connection used for the heartbeat.
152+
pub driver_connection_id: u32,
153+
154+
/// The server-generated ID for the connection used for the heartbeat. This value is only
155+
/// present for server versions 4.2+.
156+
pub server_connection_id: Option<i64>,
142157
}
143158

144159
/// Published when a server monitor's `hello` or legacy hello command fails.
@@ -158,6 +173,14 @@ pub struct ServerHeartbeatFailedEvent {
158173

159174
/// Determines if this heartbeat event is from an awaitable `hello`.
160175
pub awaited: bool,
176+
177+
/// The driver-generated ID for the connection used for the heartbeat.
178+
pub driver_connection_id: u32,
179+
180+
/// The server-generated ID for the connection used for the heartbeat. This value is only
181+
/// present on server versions 4.2+. If this event corresponds to the first heartbeat on a
182+
/// new monitoring connection, this value will not be present.
183+
pub server_connection_id: Option<i64>,
161184
}
162185

163186
#[derive(Clone, Debug)]

src/sdam/monitor.rs

+34-7
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
use std::{
2-
sync::Arc,
2+
sync::{
3+
atomic::{AtomicU32, Ordering},
4+
Arc,
5+
},
36
time::{Duration, Instant},
47
};
58

69
use bson::doc;
10+
use lazy_static::lazy_static;
711
use tokio::sync::watch;
812

913
use super::{
@@ -26,6 +30,13 @@ use crate::{
2630
runtime::{self, stream::DEFAULT_CONNECT_TIMEOUT, WorkerHandle, WorkerHandleListener},
2731
};
2832

33+
fn next_monitoring_connection_id() -> u32 {
34+
lazy_static! {
35+
static ref MONITORING_CONNECTION_ID: AtomicU32 = AtomicU32::new(0);
36+
}
37+
MONITORING_CONNECTION_ID.fetch_add(1, Ordering::SeqCst)
38+
}
39+
2940
pub(crate) const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10);
3041
pub(crate) const MIN_HEARTBEAT_FREQUENCY: Duration = Duration::from_millis(500);
3142

@@ -162,10 +173,18 @@ impl Monitor {
162173
}
163174

164175
async fn perform_hello(&mut self) -> HelloResult {
176+
let driver_connection_id = self
177+
.connection
178+
.as_ref()
179+
.map(|c| c.id)
180+
.unwrap_or(next_monitoring_connection_id());
181+
165182
self.emit_event(|| {
166183
SdamEvent::ServerHeartbeatStarted(ServerHeartbeatStartedEvent {
167184
server_address: self.address.clone(),
168185
awaited: self.topology_version.is_some(),
186+
driver_connection_id,
187+
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
169188
})
170189
});
171190

@@ -215,7 +234,7 @@ impl Monitor {
215234
let start = Instant::now();
216235
let res = self
217236
.connection_establisher
218-
.establish_monitoring_connection(self.address.clone())
237+
.establish_monitoring_connection(self.address.clone(), driver_connection_id)
219238
.await;
220239
match res {
221240
Ok((conn, hello_reply)) => {
@@ -264,6 +283,8 @@ impl Monitor {
264283
reply,
265284
server_address: self.address.clone(),
266285
awaited: self.topology_version.is_some(),
286+
driver_connection_id,
287+
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
267288
})
268289
});
269290

@@ -272,18 +293,21 @@ impl Monitor {
272293
self.topology_version = r.command_response.topology_version;
273294
}
274295
HelloResult::Err(ref e) | HelloResult::Cancelled { reason: ref e } => {
275-
// Per the spec, cancelled requests and errors both require the monitoring
276-
// connection to be closed.
277-
self.connection = None;
278-
self.rtt_monitor_handle.reset_average_rtt();
279296
self.emit_event(|| {
280297
SdamEvent::ServerHeartbeatFailed(ServerHeartbeatFailedEvent {
281298
duration,
282299
failure: e.clone(),
283300
server_address: self.address.clone(),
284301
awaited: self.topology_version.is_some(),
302+
driver_connection_id,
303+
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
285304
})
286305
});
306+
307+
// Per the spec, cancelled requests and errors both require the monitoring
308+
// connection to be closed.
309+
self.connection = None;
310+
self.rtt_monitor_handle.reset_average_rtt();
287311
self.topology_version.take();
288312
}
289313
}
@@ -402,7 +426,10 @@ impl RttMonitor {
402426
None => {
403427
let connection = self
404428
.connection_establisher
405-
.establish_monitoring_connection(self.address.clone())
429+
.establish_monitoring_connection(
430+
self.address.clone(),
431+
next_monitoring_connection_id(),
432+
)
406433
.await?
407434
.0;
408435
self.connection = Some(connection);

src/sdam/topology.rs

+65-28
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ use crate::{
4343
TopologyType,
4444
};
4545

46+
#[cfg(feature = "tracing-unstable")]
47+
use crate::trace::topology::TopologyTracingEventEmitter;
48+
4649
use super::{
4750
monitor::{MonitorManager, MonitorRequestReceiver},
4851
srv_polling::SrvPollingMonitor,
@@ -67,22 +70,36 @@ pub(crate) struct Topology {
6770
impl Topology {
6871
pub(crate) fn new(options: ClientOptions) -> Result<Topology> {
6972
let description = TopologyDescription::default();
73+
let id = ObjectId::new();
7074

71-
let event_emitter = options.sdam_event_handler.as_ref().map(|handler| {
72-
let (tx, mut rx) = mpsc::unbounded_channel::<AcknowledgedMessage<SdamEvent>>();
73-
74-
// Spin up a task to handle events so that a user's event handling code can't block the
75-
// TopologyWorker.
76-
let handler = handler.clone();
77-
runtime::execute(async move {
78-
while let Some(event) = rx.recv().await {
79-
let (event, ack) = event.into_parts();
80-
handle_sdam_event(handler.as_ref(), event);
81-
ack.acknowledge(());
82-
}
83-
});
84-
SdamEventEmitter { sender: tx }
85-
});
75+
let event_emitter =
76+
if options.sdam_event_handler.is_some() || cfg!(feature = "tracing-unstable") {
77+
let user_handler = options.sdam_event_handler.clone();
78+
79+
#[cfg(feature = "tracing-unstable")]
80+
let tracing_emitter =
81+
TopologyTracingEventEmitter::new(options.tracing_max_document_length_bytes, id);
82+
let (tx, mut rx) = mpsc::unbounded_channel::<AcknowledgedMessage<SdamEvent>>();
83+
runtime::execute(async move {
84+
while let Some(event) = rx.recv().await {
85+
let (event, ack) = event.into_parts();
86+
87+
if let Some(ref user_handler) = user_handler {
88+
#[cfg(feature = "tracing-unstable")]
89+
handle_sdam_event(user_handler.as_ref(), event.clone());
90+
#[cfg(not(feature = "tracing-unstable"))]
91+
handle_sdam_event(user_handler.as_ref(), event);
92+
}
93+
#[cfg(feature = "tracing-unstable")]
94+
handle_sdam_event(&tracing_emitter, event);
95+
96+
ack.acknowledge(());
97+
}
98+
});
99+
Some(SdamEventEmitter { sender: tx })
100+
} else {
101+
None
102+
};
86103

87104
let (updater, update_receiver) = TopologyUpdater::channel();
88105
let (worker_handle, handle_listener) = WorkerHandleListener::channel();
@@ -95,8 +112,6 @@ impl Topology {
95112
let connection_establisher =
96113
ConnectionEstablisher::new(EstablisherOptions::from_client_options(&options))?;
97114

98-
let id = ObjectId::new();
99-
100115
let worker = TopologyWorker {
101116
id,
102117
topology_description: description,
@@ -375,18 +390,41 @@ impl TopologyWorker {
375390
// indicate to the topology watchers that the topology is no longer alive
376391
drop(self.publisher);
377392

378-
// close all the monitors.
379-
let mut close_futures = self
380-
.servers
381-
.into_values()
382-
.map(|server| {
383-
drop(server.inner);
384-
server.monitor_manager.close_monitor()
385-
})
386-
.collect::<FuturesUnordered<_>>();
393+
// Close all the monitors.
394+
let mut close_futures = FuturesUnordered::new();
395+
for (address, server) in self.servers.into_iter() {
396+
if let Some(ref emitter) = self.event_emitter {
397+
emitter
398+
.emit(SdamEvent::ServerClosed(ServerClosedEvent {
399+
address,
400+
topology_id: self.id,
401+
}))
402+
.await;
403+
}
404+
drop(server.inner);
405+
close_futures.push(server.monitor_manager.close_monitor());
406+
}
387407
while close_futures.next().await.is_some() {}
388408

389409
if let Some(emitter) = self.event_emitter {
410+
if !self.topology_description.servers.is_empty()
411+
&& self.options.load_balanced != Some(true)
412+
{
413+
let previous_description = self.topology_description;
414+
let mut new_description = previous_description.clone();
415+
new_description.servers.clear();
416+
417+
emitter
418+
.emit(SdamEvent::TopologyDescriptionChanged(Box::new(
419+
TopologyDescriptionChangedEvent {
420+
topology_id: self.id,
421+
previous_description: previous_description.into(),
422+
new_description: new_description.into(),
423+
},
424+
)))
425+
.await;
426+
}
427+
390428
emitter
391429
.emit(SdamEvent::TopologyClosed(TopologyClosedEvent {
392430
topology_id: self.id,
@@ -436,11 +474,10 @@ impl TopologyWorker {
436474
let diff = old_description.diff(&self.topology_description);
437475
let changed = diff.is_some();
438476
if let Some(diff) = diff {
439-
// For ordering of events in tests, sort the addresses.
440-
441477
#[cfg(not(test))]
442478
let changed_servers = diff.changed_servers;
443479

480+
// For ordering of events in tests, sort the addresses.
444481
#[cfg(test)]
445482
let changed_servers = {
446483
let mut servers = diff.changed_servers.into_iter().collect::<Vec<_>>();

src/test/spec.rs

+6
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ pub(crate) fn deserialize_spec_tests<T: DeserializeOwned>(
7171
continue;
7272
};
7373

74+
if let Ok(unskipped_filename) = std::env::var("TEST_FILE") {
75+
if filename != unskipped_filename {
76+
continue;
77+
}
78+
}
79+
7480
if let Some(skipped_files) = skipped_files {
7581
if skipped_files.contains(&filename) {
7682
log_uncaptured(format!("Skipping deserializing {:?}", &path));

0 commit comments

Comments
 (0)