Skip to content

Commit

Permalink
chore: incorporate origin key into context key hash to shrink size of…
Browse files Browse the repository at this point in the history
… ContextKey
  • Loading branch information
tobz committed Jan 17, 2025
1 parent 799f140 commit f9b0aed
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 29 deletions.
13 changes: 12 additions & 1 deletion lib/saluki-context/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,18 @@ where
}
}

impl<'a> Tagged for &'a [&'static str] {
impl<'a> Tagged for &'a [&'a str] {
fn visit_tags<F>(&self, mut visitor: F)
where
F: FnMut(&str),
{
for tag in self.iter() {
visitor(tag);
}
}
}

impl<'a, const N: usize> Tagged for [&'a str; N] {
fn visit_tags<F>(&self, mut visitor: F)
where
F: FnMut(&str),
Expand Down
31 changes: 16 additions & 15 deletions lib/saluki-context/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::{context::Tagged, origin::OriginKey};

pub type FastHashSet<T> = HashSet<T, ahash::RandomState>;

pub fn new_fast_hashset<T>() -> FastHashSet<T> {
HashSet::with_hasher(ahash::RandomState::default())
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub struct ContextKey {
hash: u64,
}

#[inline]
Expand Down Expand Up @@ -57,10 +58,15 @@ where
{
seen.clear();

// Hash the name first.
let mut hasher = ahash::AHasher::default();
name.hash(&mut hasher);

// Hash the tags individually and XOR their hashes together, which allows us to be order-oblivious:
// Hash any tags that are present.
//
// We hash these into a separate value, and so so by hashing each tag individually and then XORing the resulting tag
// hash into our "combined" tag hash, which lets us be order-oblivious: we can has the same set of tags, in any
// order, and get the same output hash.
let mut combined_tags_hash = 0;

tags.visit_tags(|tag| {
Expand All @@ -76,19 +82,14 @@ where

hasher.write_u64(combined_tags_hash);

let metric_key = hasher.finish();

ContextKey { metric_key, origin_key }
}
// If we have an origin key, hash that too.
if let Some(origin_key) = origin_key {
origin_key.hash(&mut hasher);
}

#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub struct ContextKey {
metric_key: u64,
origin_key: Option<OriginKey>,
ContextKey { hash: hasher.finish() }
}

impl ContextKey {
pub fn origin_key(&self) -> Option<OriginKey> {
self.origin_key
}
pub fn new_fast_hashset<T>() -> FastHashSet<T> {
HashSet::with_hasher(ahash::RandomState::default())
}
139 changes: 129 additions & 10 deletions lib/saluki-context/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
context::{Context, ContextInner, Tagged},
expiry::{Expiration, ExpirationBuilder, ExpiryCapableLifecycle},
hash::{hash_context_with_seen, new_fast_hashset, ContextKey, FastHashSet},
origin::{OriginEnricher, OriginInfo},
origin::{OriginEnricher, OriginInfo, OriginKey},
tags::TagSet,
};

Expand Down Expand Up @@ -201,6 +201,22 @@ impl ContextResolverBuilder {
.build()
}

/// Builds a [`ContextResolver`] with a no-op configuration, suitable for tests, using the given origin enricher.
///
/// See [`for_tests`] for more information on how the resolver is configured.
pub fn for_tests_with_enricher<E>(origin_enricher: E) -> ContextResolver
where
E: OriginEnricher + Send + Sync + 'static,
{
ContextResolverBuilder::from_name("noop")
.expect("resolver name not empty")
.with_cached_contexts_limit(usize::MAX)
.with_interner_capacity_bytes(NonZeroUsize::new(1).expect("not zero"))
.with_heap_allocations(true)
.with_origin_enricher(origin_enricher)
.build()
}

/// Builds a [`ContextResolver`] from the current configuration.
pub fn build(self) -> ContextResolver {
let interner_capacity_bytes = self
Expand Down Expand Up @@ -323,9 +339,9 @@ impl ContextResolver {
})
}

fn create_context_key_from_resolvable<T>(
fn create_context_and_origin_keys<T>(
&mut self, name: &str, tags: T, origin_info: Option<OriginInfo<'_>>,
) -> ContextKey
) -> (ContextKey, Option<OriginKey>)
where
T: Tagged,
{
Expand All @@ -336,10 +352,14 @@ impl ContextResolver {
.as_ref()
.and_then(|enricher| origin_info.and_then(|info| enricher.resolve_origin_key(info)));

hash_context_with_seen(name, tags, origin_key, &mut self.hash_seen_buffer)
let context_key = hash_context_with_seen(name, tags, origin_key, &mut self.hash_seen_buffer);

(context_key, origin_key)
}

fn create_context_from_resolvable<T>(&self, key: ContextKey, name: &str, tags: T) -> Option<Context>
fn create_context<T>(
&self, context_key: ContextKey, origin_key: Option<OriginKey>, name: &str, tags: T,
) -> Option<Context>
where
T: Tagged,
{
Expand All @@ -366,17 +386,17 @@ impl ContextResolver {

// Collect any enriched tags based on the origin key of the context, if any.
if let Some(origin_enricher) = self.origin_enricher.as_ref() {
if let Some(origin_key) = key.origin_key() {
if let Some(origin_key) = origin_key {
origin_enricher.collect_origin_tags(origin_key, &mut context_tags);
}
}

self.stats.resolved_new_context_total().increment(1);

Some(Context::from_inner(ContextInner {
key: context_key,
name: context_name,
tags: context_tags,
key,
active_count: self.stats.active_contexts().clone(),
}))
}
Expand All @@ -393,14 +413,14 @@ impl ContextResolver {
where
T: Tagged,
{
let context_key = self.create_context_key_from_resolvable(name, &tags, origin_info);
let (context_key, origin_key) = self.create_context_and_origin_keys(name, &tags, origin_info);
match self.context_cache.get(&context_key) {
Some(context) => {
self.stats.resolved_existing_context_total().increment(1);
self.expiration.mark_entry_accessed(context_key);
Some(context)
}
None => match self.create_context_from_resolvable(context_key, name, tags) {
None => match self.create_context(context_key, origin_key, name, tags) {
Some(context) => {
debug!(?context_key, ?context, "Resolved new context.");

Expand Down Expand Up @@ -476,14 +496,60 @@ async fn drive_expiration(

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use metrics::{SharedString, Unit};
use metrics_util::{
debugging::{DebugValue, DebuggingRecorder},
CompositeKey,
};

use super::*;
use crate::tags::Tag;
use crate::tags::{SharedTagSet, Tag};

#[derive(Default)]
struct OriginMappings {
mappings: HashMap<OriginKey, SharedTagSet>,
}

impl OriginMappings {
fn add_mapping<T>(&mut self, info: OriginInfo<'_>, tags: T)
where
T: Into<TagSet>,
{
let tags = tags.into().into_shared();
let key = OriginKey::from_opaque(info);
self.mappings.insert(key, tags);
}

fn into_enricher(self) -> TestEnricher {
TestEnricher {
mappings: Arc::new(self.mappings),
}
}
}

struct TestEnricher {
mappings: Arc<HashMap<OriginKey, SharedTagSet>>,
}

impl OriginEnricher for TestEnricher {
fn resolve_origin_key(&self, origin_info: OriginInfo<'_>) -> Option<OriginKey> {
// We only return a mapping if it was previously configured.
let key = OriginKey::from_opaque(origin_info);
if self.mappings.contains_key(&key) {
Some(key)
} else {
None
}
}

fn collect_origin_tags(&self, origin_key: OriginKey, tags: &mut TagSet) {
if let Some(mapped_tags) = self.mappings.get(&origin_key) {
tags.merge_missing_shared(mapped_tags);
}
}
}

fn get_gauge_value(metrics: &[(CompositeKey, Option<Unit>, Option<SharedString>, DebugValue)], key: &str) -> f64 {
metrics
Expand Down Expand Up @@ -664,4 +730,57 @@ mod tests {
assert_ne!(context1, context2_duplicated);
assert_ne!(context2, context1_duplicated);
}

#[test]
fn origin_enrichment() {
// Establish a single metric name/tags combination, and then two distinct origins:
let name = "metric1";
let tags = ["tag1"];

let mut origin1 = OriginInfo::default();
origin1.set_container_id("container_id1");
let origin1_tags = ["kube_container_name:container1"];

let mut origin2 = OriginInfo::default();
origin2.set_container_id("container_id2");
let origin2_tags = ["kube_container_name:container2"];

assert_ne!(origin1, origin2);
assert_ne!(origin1_tags, origin2_tags);

let mut origin_mappings = OriginMappings::default();
origin_mappings.add_mapping(origin1.clone(), origin1_tags);
origin_mappings.add_mapping(origin2.clone(), origin2_tags);

// Build our context resolver, using the origin enricher that we've configured with the mappings we've defined:
let mut resolver = ContextResolverBuilder::for_tests_with_enricher(origin_mappings.into_enricher());

let context1 = resolver
.resolve(name, tags, Some(origin1))
.expect("should not fail to resolve");
let context2 = resolver
.resolve(name, tags, Some(origin2))
.expect("should not fail to resolve");

// The contexts should not be equal to each other, and should have distinct underlying pointers to the shared
// context state:
assert_ne!(context1, context2);
assert!(!context1.ptr_eq(&context2));

// We should, however, see that the configured enriched tags for each origin is present in the tags for the
// contexts:
let expected_tags_context1: TagSet = tags
.iter()
.chain(origin1_tags.iter())
.map(|tag| Tag::from(*tag))
.collect();
assert_eq!(context1.tags(), &expected_tags_context1);

let expected_tags_context2: TagSet = tags
.iter()
.chain(origin2_tags.iter())
.map(|tag| Tag::from(*tag))
.collect();
assert_eq!(context2.tags(), &expected_tags_context2);
}
}
18 changes: 15 additions & 3 deletions lib/saluki-context/src/tags/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,12 @@ impl<'a> IntoIterator for &'a TagSet {
}
}

impl FromIterator<Tag> for TagSet {
fn from_iter<I: IntoIterator<Item = Tag>>(iter: I) -> Self {
Self(iter.into_iter().collect())
impl<T> FromIterator<T> for TagSet
where
T: Into<Tag>,
{
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self(iter.into_iter().map(Into::into).collect())
}
}

Expand All @@ -349,6 +352,15 @@ impl From<Tag> for TagSet {
}
}

impl<T, const N: usize> From<[T; N]> for TagSet
where
T: Into<Tag>,
{
fn from(tags: [T; N]) -> Self {
Self(tags.into_iter().map(Into::into).collect())
}
}

/// A shared, read-only set of tags.
#[derive(Clone, Debug)]
pub struct SharedTagSet(Arc<TagSet>);
Expand Down

0 comments on commit f9b0aed

Please sign in to comment.