Skip to content

Commit e6fef13

Browse files
Preslav LeConvex, Inc.
Preslav Le
authored and
Convex, Inc.
committed
Add instance_name to selected sync worker stats. (#28754)
With moving sync worker to Usher it is useful to be able to track some metrics per instance. GitOrigin-RevId: 7c1380c342f8fe07db06a3507e84bd7da64b20e2
1 parent 79f96cb commit e6fef13

File tree

2 files changed

+35
-16
lines changed

2 files changed

+35
-16
lines changed

crates/sync/src/metrics.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use metrics::{
44
log_counter,
55
log_counter_with_labels,
66
log_distribution,
7+
log_distribution_with_labels,
78
register_convex_counter,
89
register_convex_histogram,
910
StaticMetricLabel,
@@ -32,9 +33,9 @@ pub fn connect_timer() -> StatusTimer {
3233
register_convex_histogram!(
3334
SYNC_HANDLE_MESSAGE_SECONDS,
3435
"Time to handle a websocket message",
35-
&["status", "endpoint"]
36+
&["status", "endpoint", "instance_name"]
3637
);
37-
pub fn handle_message_timer(message: &ClientMessage) -> StatusTimer {
38+
pub fn handle_message_timer(message: &ClientMessage, instance_name: String) -> StatusTimer {
3839
let mut timer = StatusTimer::new(&SYNC_HANDLE_MESSAGE_SECONDS);
3940
let request_name = match message {
4041
ClientMessage::Authenticate { .. } => "Authenticate",
@@ -45,25 +46,30 @@ pub fn handle_message_timer(message: &ClientMessage) -> StatusTimer {
4546
ClientMessage::Event { .. } => "Event",
4647
};
4748
timer.add_label(StaticMetricLabel::new("endpoint", request_name.to_owned()));
49+
timer.add_label(StaticMetricLabel::new("instance_name", instance_name));
4850
timer
4951
}
5052

5153
register_convex_histogram!(
5254
SYNC_UPDATE_QUERIES_SECONDS,
5355
"Time to update queries",
54-
&STATUS_LABEL
56+
&[STATUS_LABEL[0], "instance_name"]
5557
);
56-
pub fn update_queries_timer() -> StatusTimer {
57-
StatusTimer::new(&SYNC_UPDATE_QUERIES_SECONDS)
58+
pub fn update_queries_timer(instance_name: String) -> StatusTimer {
59+
let mut timer = StatusTimer::new(&SYNC_UPDATE_QUERIES_SECONDS);
60+
timer.add_label(StaticMetricLabel::new("instance_name", instance_name));
61+
timer
5862
}
5963

6064
register_convex_histogram!(
6165
SYNC_MUTATION_QUEUE_SECONDS,
6266
"Time between a mutation entering and exiting the single threaded sync worker queue",
63-
&STATUS_LABEL
67+
&[STATUS_LABEL[0], "instance_name"]
6468
);
65-
pub fn mutation_queue_timer() -> StatusTimer {
66-
StatusTimer::new(&SYNC_MUTATION_QUEUE_SECONDS)
69+
pub fn mutation_queue_timer(instance_name: String) -> StatusTimer {
70+
let mut timer = StatusTimer::new(&SYNC_MUTATION_QUEUE_SECONDS);
71+
timer.add_label(StaticMetricLabel::new("instance_name", instance_name));
72+
timer
6773
}
6874

6975
register_convex_counter!(SYNC_QUERY_FAILED_TOTAL, "Number of query failures");
@@ -108,17 +114,27 @@ pub fn log_connect(last_close_reason: String, connection_count: u32) {
108114
register_convex_histogram!(
109115
SYNC_LINEARIZABILITY_DELAY_SECONDS,
110116
"How far behind the current backend is behind what the client has observed",
117+
&["instance_name"]
111118
);
112-
pub fn log_linearizability_violation(delay_secs: f64) {
113-
log_distribution(&SYNC_LINEARIZABILITY_DELAY_SECONDS, delay_secs);
119+
pub fn log_linearizability_violation(delay_secs: f64, instance_name: String) {
120+
log_distribution_with_labels(
121+
&SYNC_LINEARIZABILITY_DELAY_SECONDS,
122+
delay_secs,
123+
vec![StaticMetricLabel::new("instance_name", instance_name)],
124+
);
114125
}
115126

116127
register_convex_histogram!(
117128
SYNC_PROCESS_CLIENT_MESSAGE_SECONDS,
118129
"Delay between receiving a client message over the web socket and processing it",
130+
&["instance_name"]
119131
);
120-
pub fn log_process_client_message_delay(delay: Duration) {
121-
log_distribution(&SYNC_PROCESS_CLIENT_MESSAGE_SECONDS, delay.as_secs_f64());
132+
pub fn log_process_client_message_delay(delay: Duration, instance_name: String) {
133+
log_distribution_with_labels(
134+
&SYNC_PROCESS_CLIENT_MESSAGE_SECONDS,
135+
delay.as_secs_f64(),
136+
vec![StaticMetricLabel::new("instance_name", instance_name)],
137+
);
122138
}
123139

124140
register_convex_histogram!(

crates/sync/src/worker.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,9 @@ impl<RT: Runtime> SyncWorker<RT> {
298298
};
299299
self.handle_message(message).await?;
300300
let delay = self.rt.monotonic_now() - received_time;
301-
metrics::log_process_client_message_delay(delay);
301+
metrics::log_process_client_message_delay(
302+
delay, self.host.instance_name.clone(),
303+
);
302304
None
303305
},
304306
// TODO(presley): If I swap this with futures below, tests break.
@@ -409,7 +411,7 @@ impl<RT: Runtime> SyncWorker<RT> {
409411
}
410412

411413
async fn handle_message(&mut self, message: ClientMessage) -> anyhow::Result<()> {
412-
let timer = metrics::handle_message_timer(&message);
414+
let timer = metrics::handle_message_timer(&message, self.host.instance_name.clone());
413415
match message {
414416
ClientMessage::Connect {
415417
session_id,
@@ -435,6 +437,7 @@ impl<RT: Runtime> SyncWorker<RT> {
435437
// but lets keep it as server one for now.
436438
metrics::log_linearizability_violation(
437439
max_observed_timestamp.secs_since_f64(latest_timestamp),
440+
self.host.instance_name.clone(),
438441
);
439442
anyhow::bail!(
440443
"Client has observed a timestamp {max_observed_timestamp:?} ahead of \
@@ -481,7 +484,7 @@ impl<RT: Runtime> SyncWorker<RT> {
481484
});
482485
let rt = self.rt.clone();
483486
let client_version = self.config.client_version.clone();
484-
let timer = mutation_queue_timer();
487+
let timer = mutation_queue_timer(self.host.instance_name.clone());
485488
let api = self.api.clone();
486489
let host = self.host.clone();
487490
let caller = FunctionCaller::SyncWorker(client_version);
@@ -669,7 +672,7 @@ impl<RT: Runtime> SyncWorker<RT> {
669672
new_ts: Timestamp,
670673
subscriptions_client: Arc<dyn SubscriptionClient>,
671674
) -> anyhow::Result<impl Future<Output = anyhow::Result<TransitionState>>> {
672-
let timer = metrics::update_queries_timer();
675+
let timer = metrics::update_queries_timer(self.host.instance_name.clone());
673676
let current_version = self.state.current_version();
674677

675678
let (modifications, new_query_version, pending_identity, new_identity_version) =

0 commit comments

Comments
 (0)