Skip to content

Commit 554cff7

Browse files
committed
Track chain worker request queue wait time
1 parent 82479f5 commit 554cff7

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

linera-core/src/chain_worker/actor.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use linera_base::{
1616
data_types::{ApplicationDescription, Blob, BlockHeight, Epoch, TimeDelta, Timestamp},
1717
hashed::Hashed,
1818
identifiers::{ApplicationId, BlobId, ChainId},
19+
time::Instant,
1920
};
2021
use linera_chain::{
2122
data_types::{BlockProposal, MessageBundle, ProposedBlock},
@@ -39,6 +40,22 @@ use crate::{
3940
worker::{NetworkActions, WorkerError},
4041
};
4142

43+
#[cfg(with_metrics)]
44+
mod metrics {
45+
use std::sync::LazyLock;
46+
47+
use linera_base::prometheus_util::{exponential_bucket_latencies, register_histogram};
48+
use prometheus::Histogram;
49+
50+
pub static CHAIN_WORKER_REQUEST_QUEUE_WAIT_TIME: LazyLock<Histogram> = LazyLock::new(|| {
51+
register_histogram(
52+
"chain_worker_request_queue_wait_time",
53+
"Time (ms) a chain worker request waits in queue before being processed",
54+
exponential_bucket_latencies(10_000.0),
55+
)
56+
});
57+
}
58+
4259
/// A request for the [`ChainWorkerActor`].
4360
#[derive(Debug)]
4461
pub(crate) enum ChainWorkerRequest<Context>
@@ -261,6 +278,7 @@ where
261278
incoming_requests: mpsc::UnboundedReceiver<(
262279
ChainWorkerRequest<StorageClient::Context>,
263280
tracing::Span,
281+
Instant,
264282
)>,
265283
is_tracked: bool,
266284
) {
@@ -302,11 +320,19 @@ where
302320
mut incoming_requests: mpsc::UnboundedReceiver<(
303321
ChainWorkerRequest<StorageClient::Context>,
304322
tracing::Span,
323+
Instant,
305324
)>,
306325
) -> Result<(), WorkerError> {
307326
trace!("Starting `ChainWorkerActor`");
308327

309-
while let Some((request, span)) = incoming_requests.recv().await {
328+
while let Some((request, span, _queued_at)) = incoming_requests.recv().await {
329+
// Record how long the request waited in queue (in milliseconds)
330+
#[cfg(with_metrics)]
331+
{
332+
let queue_wait_time_ms = _queued_at.elapsed().as_secs_f64() * 1000.0;
333+
metrics::CHAIN_WORKER_REQUEST_QUEUE_WAIT_TIME.observe(queue_wait_time_ms);
334+
}
335+
310336
let (_service_runtime_thread, service_runtime_task, service_runtime_endpoint) =
311337
if self.config.long_lived_services {
312338
let actor = ServiceRuntimeActor::spawn(self.chain_id).await;
@@ -337,9 +363,17 @@ where
337363
futures::select! {
338364
() = self.sleep_until_timeout().fuse() => break,
339365
maybe_request = incoming_requests.recv().fuse() => {
340-
let Some((request, span)) = maybe_request else {
366+
let Some((request, span, _queued_at)) = maybe_request else {
341367
break; // Request sender was dropped.
342368
};
369+
370+
// Record how long the request waited in queue (in milliseconds)
371+
#[cfg(with_metrics)]
372+
{
373+
let queue_wait_time_ms = _queued_at.elapsed().as_secs_f64() * 1000.0;
374+
metrics::CHAIN_WORKER_REQUEST_QUEUE_WAIT_TIME.observe(queue_wait_time_ms);
375+
}
376+
343377
Box::pin(worker.handle_request(request)).instrument(span).await;
344378
}
345379
}

linera-core/src/worker.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use linera_base::{
1515
doc_scalar,
1616
hashed::Hashed,
1717
identifiers::{AccountOwner, ApplicationId, BlobId, ChainId, EventId, StreamId},
18+
time::Instant,
1819
};
1920
#[cfg(with_testing)]
2021
use linera_chain::ChainExecutionContext;
@@ -389,6 +390,7 @@ where
389390
type ChainActorEndpoint<StorageClient> = mpsc::UnboundedSender<(
390391
ChainWorkerRequest<<StorageClient as Storage>::Context>,
391392
tracing::Span,
393+
Instant,
392394
)>;
393395

394396
pub(crate) type DeliveryNotifiers = HashMap<ChainId, DeliveryNotifier>;
@@ -886,7 +888,11 @@ where
886888
request: ChainWorkerRequest<StorageClient::Context>,
887889
) -> Result<
888890
Option<
889-
mpsc::UnboundedReceiver<(ChainWorkerRequest<StorageClient::Context>, tracing::Span)>,
891+
mpsc::UnboundedReceiver<(
892+
ChainWorkerRequest<StorageClient::Context>,
893+
tracing::Span,
894+
Instant,
895+
)>,
890896
>,
891897
WorkerError,
892898
> {
@@ -899,7 +905,7 @@ where
899905
(sender, Some(receiver))
900906
};
901907

902-
if let Err(e) = sender.send((request, tracing::Span::current())) {
908+
if let Err(e) = sender.send((request, tracing::Span::current(), Instant::now())) {
903909
// The actor was dropped. Give up without (re-)inserting the endpoint in the cache.
904910
return Err(WorkerError::ChainActorSendError {
905911
chain_id,

0 commit comments

Comments
 (0)