Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions lib/vector-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,10 @@ vector-config.workspace = true
vector-common-macros.workspace = true

[dev-dependencies]
criterion.workspace = true
futures = { version = "0.3.31", default-features = false, features = ["async-await"] }
tokio = { workspace = true, features = ["rt", "time"] }

[[bench]]
name = "fast_clock"
harness = false
47 changes: 47 additions & 0 deletions lib/vector-common/benches/fast_clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! Microbenchmark comparing read-cost of `fast_clock::recent_millis()`
//! against `Instant::now()` and `Utc::now()` patterns used elsewhere in
//! Vector for histogram-binning timestamps.
//!
//! Run: `cargo bench --bench fast_clock -p vector-common`

use std::{
hint::black_box,
time::{Duration, Instant},
};

use chrono::Utc;
use criterion::{Criterion, criterion_group, criterion_main};
use vector_common::fast_clock;

fn bench_clocks(c: &mut Criterion) {
fast_clock::init();
// Give the updater thread a moment to populate the cached value before
// we start measuring. Avoids the very first read returning 0.
std::thread::sleep(Duration::from_millis(50));

let mut group = c.benchmark_group("clocks");

group.bench_function("fast_clock::recent_millis", |b| {
b.iter(|| black_box(fast_clock::recent_millis()))
});

group.bench_function("fast_clock::recent_unix_millis", |b| {
b.iter(|| black_box(fast_clock::recent_unix_millis()))
});

group.bench_function("Instant::now", |b| b.iter(|| black_box(Instant::now())));

group.bench_function("Instant_elapsed_as_millis", |b| {
let epoch = Instant::now();
b.iter(|| black_box(u64::try_from(epoch.elapsed().as_millis()).unwrap_or(u64::MAX)))
});

group.bench_function("Utc::now_timestamp_millis", |b| {
b.iter(|| black_box(Utc::now().timestamp_millis()))
});

group.finish();
}

criterion_group!(benches, bench_clocks);
criterion_main!(benches);
120 changes: 120 additions & 0 deletions lib/vector-common/src/fast_clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
//! A coarse, cached monotonic clock for hot-path metric instrumentation.
//!
//! Reading [`recent_millis`] is a single relaxed atomic load, which is
//! significantly cheaper than `std::time::Instant::now()` on platforms where
//! the latter goes through a vDSO-backed `clock_gettime`. The tradeoff is
//! resolution: the value is updated by a background thread on a fixed cadence
//! (see [`TICK`]), so it lags real time by up to that cadence.
//!
//! Intended for histogram binning and lag-time computations where
//! millisecond resolution is sufficient. Do not use where ordering between
//! events on the same path matters at sub-tick granularity.

use std::{
sync::{
OnceLock,
atomic::{AtomicI64, AtomicU64, Ordering},
},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};

/// Cadence at which the background updater refreshes the cached timestamp.
const TICK: Duration = Duration::from_millis(25);

static RECENT_MS: AtomicU64 = AtomicU64::new(0);
static RECENT_UNIX_MS: AtomicI64 = AtomicI64::new(0);
static INIT: OnceLock<()> = OnceLock::new();

fn unix_millis_now() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.and_then(|d| i64::try_from(d.as_millis()).ok())
.unwrap_or(i64::MAX)
}

fn ensure_init() {
INIT.get_or_init(|| {
let epoch = Instant::now();
// Pre-populate the cached values so the very first reader does not
// observe 0 before the updater thread has had a chance to tick.
RECENT_UNIX_MS.store(unix_millis_now(), Ordering::Relaxed);
std::thread::Builder::new()
.name("fast-clock".into())
.spawn(move || {
loop {
let elapsed_ms = u64::try_from(epoch.elapsed().as_millis()).unwrap_or(u64::MAX);
RECENT_MS.store(elapsed_ms, Ordering::Relaxed);
RECENT_UNIX_MS.store(unix_millis_now(), Ordering::Relaxed);
std::thread::sleep(TICK);
}
})
.expect("failed to spawn fast-clock updater thread");
});
}

/// Eagerly start the background updater. Optional: [`recent_millis`] will
/// auto-initialize on first call. Calling this from `main` removes the
/// auto-init branch from the very first reader.
pub fn init() {
ensure_init();
}

/// Returns the cached count of milliseconds since the clock was first
/// initialized. Cost is a single relaxed atomic load (plus a `OnceLock`
/// fast-path check on first call).
///
/// Resolution is at most `TICK` (currently 25ms). Returns `0` if called
/// before the first updater tick has executed; callers that compute
/// elapsed durations should be tolerant of this initial-zero case.
#[must_use]
pub fn recent_millis() -> u64 {
ensure_init();
RECENT_MS.load(Ordering::Relaxed)
}

/// Returns the cached count of milliseconds since the Unix epoch.
///
/// Resolution is at most `TICK` (currently 25ms) — the value is refreshed
/// from `SystemTime::now()` on each tick, so `recent_unix_millis()` will
/// lag real wall-clock time by up to that cadence. Suitable for
/// histogram-style metrics like source lag time, where ms precision and
/// up-to-25ms staleness are both acceptable.
///
/// Cost is a single relaxed atomic load.
#[must_use]
pub fn recent_unix_millis() -> i64 {
ensure_init();
RECENT_UNIX_MS.load(Ordering::Relaxed)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn ticks_forward() {
init();
std::thread::sleep(Duration::from_millis(100));
let a = recent_millis();
std::thread::sleep(Duration::from_millis(100));
let b = recent_millis();
assert!(b > a, "expected b ({b}) > a ({a})");
assert!(b - a >= 50, "expected at least 50ms elapsed, got {}", b - a);
}

#[test]
fn unix_millis_is_close_to_systemtime() {
init();
// Allow a tick to populate the cached value.
std::thread::sleep(Duration::from_millis(50));
let cached = recent_unix_millis();
let truth = unix_millis_now();
let drift = (truth - cached).abs();
// Worst case is roughly TICK + scheduling jitter; allow 200ms.
assert!(
drift < 200,
"drift too large: cached={cached} truth={truth}"
);
}
}
1 change: 1 addition & 0 deletions lib/vector-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub mod shutdown;
pub mod sensitive_string;

pub mod atomic;
pub mod fast_clock;
pub mod stats;
pub mod trigger;

Expand Down
6 changes: 3 additions & 3 deletions lib/vector-core/src/source_sender/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
time::{Duration, Instant},
};

use chrono::Utc;
use futures::{Stream, StreamExt as _};
use metrics::Histogram;
use tracing::Span;
Expand All @@ -15,6 +14,7 @@ use vector_buffers::{
};
use vector_common::{
byte_size_of::ByteSizeOf,
fast_clock,
internal_event::{
self, ComponentEventsDropped, ComponentEventsTimedOut, Count, CountByteSize, EventsSent,
InternalEventHandle as _, RegisterInternalEvent as _, Registered, UNINTENTIONAL,
Expand Down Expand Up @@ -162,7 +162,7 @@ impl Output {
events: EventArray,
unsent_event_count: &mut UnsentEventCount,
) -> Result<(), SendError> {
let reference = Utc::now().timestamp_millis();
let reference = fast_clock::recent_unix_millis();
self.send_inner(events, unsent_event_count, reference).await
}

Expand Down Expand Up @@ -261,7 +261,7 @@ impl Output {
{
// Capture a single reference timestamp for the entire batch so that lag time
// measurements are not inflated by channel-send latency for later chunks.
let reference = Utc::now().timestamp_millis();
let reference = fast_clock::recent_unix_millis();

// It's possible that the caller stops polling this future while it is blocked waiting
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
Expand Down
Loading