Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics and logs #84

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bioyino"
version = "0.8.0"
version = "0.8.2"
authors = ["Sergey Noskov aka Albibek <[email protected]>"]
description = "StatsD-compatible, high-performance, fault-tolerant metric aggregator"
edition = "2018"
Expand Down
11 changes: 6 additions & 5 deletions src/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::atomic::Ordering};
use std::sync::Arc;

use async_channel::Sender as AsyncSender;
Expand All @@ -12,7 +12,7 @@ use bioyino_metric::{
FromF64,
};

use crate::cache::RotatedCacheShard;
use crate::{cache::RotatedCacheShard, stats::STATS};
use crate::config::{all_aggregates, Aggregation, ConfigError, Naming, RoundTimestamp};
use crate::{s, Float};

Expand Down Expand Up @@ -164,9 +164,10 @@ pub fn aggregate_task(data: AggregationData) {
_ => Some((name.clone(), typename, *aggregate, value)),
}
})
.map(|data| result.push(data))
.last();
}
.map(|data| result.push(data))
.last();
}
STATS.aggregated_metrics.fetch_add(result.len(), Ordering::Relaxed);

futures::executor::block_on(response.send(result)).unwrap_or(());
}
Expand Down
1 change: 1 addition & 0 deletions src/carbon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl CarbonBackend {
let mut writer = CarbonCodec::new(ts.clone(), options.agg.clone()).framed(conn);

for m in metrics.iter().flatten() {
s!(egress_carbon_metrics);
writer.send(m).await?
}

Expand Down
8 changes: 6 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub type Cache = HashMap<MetricName, Metric<Float>>;
pub static CONSENSUS_STATE: Lazy<Mutex<ConsensusState>> = Lazy::new(|| Mutex::new(ConsensusState::Disabled));
pub static IS_LEADER: AtomicBool = AtomicBool::new(false);

fn main() {
fn main() {
let (system, command) = System::load().expect("loading config");

let config = system.clone();
Expand Down Expand Up @@ -170,7 +170,11 @@ fn main() {
let stats_prefix = stats_prefix.trim_end_matches('.').to_string();
// Spawn future gatering bioyino own stats
info!(own_stat_log, "starting own stats counter");
let own_stats = OwnStats::new(s_interval, stats_prefix, slow_chan.clone(), fast_prio_chans.clone(), own_stat_log);
let own_stats = OwnStats::new(
s_interval, stats_prefix, slow_chan.clone(),
fast_prio_chans.clone(), own_stat_log, carbon.clone(),
aggregation.clone(), naming.clone(),
);
runtime.spawn(own_stats.run());

let compat_log = rlog.new(o!("thread" => "compat"));
Expand Down
17 changes: 13 additions & 4 deletions src/slow_task.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::sync::Arc;
use std::sync::atomic::Ordering;

use crossbeam_channel::Sender;
use futures::channel::oneshot;
use log::warn as logw;
use slog::{error, info, Logger};
use slog::{error, info, Logger, trace};

use bioyino_metric::{name::MetricName, Metric, MetricTypeName};

use crate::aggregate::{aggregate_task, AggregationData};
use crate::cache::{RotatedCache, SharedCache};
use crate::config::System;

use crate::stats::STATS;
use crate::{s, Cache, Float};

#[derive(Debug)]
Expand Down Expand Up @@ -68,7 +69,8 @@ impl SlowTaskRunner {
let ename = name.clone();
let em = MetricTypeName::from_metric(&metric);
self.cache.accumulate(name, metric).unwrap_or_else(|_| {
logw!(
trace!(
self.log,
"could not accumulate in long cache at {:?} new type '{}'",
String::from_utf8_lossy(&ename.name[..]),
em.to_string(),
Expand All @@ -84,13 +86,20 @@ impl SlowTaskRunner {
list.drain(..).map(|(name, metric)| self.update_metric(name, metric)).last();
}
SlowTask::Join(cache) => {
cache.iter().map(|(name, metric)| self.update_metric(name.clone(), metric.clone())).last();
cache.iter().map(
|(name, metric)| {
s!(slow_cache_joined_metrics);
self.update_metric(name.clone(), metric.clone());
}
).last();
}
SlowTask::AddSnapshot(mut list) => {
list.drain(..).map(|(name, metric)| self.update_metric(name, metric)).last();
}
SlowTask::Rotate(channel) => {
let rotated = self.cache.rotate(channel.is_some());
let sum: usize = rotated.iter().map(|m| m.len()).sum();
STATS.slow_cache_rotated_metrics.fetch_add(sum, Ordering::Relaxed);
if let Some(c) = channel {
let log = self.log.clone();
c.send(rotated).unwrap_or_else(|_| {
Expand Down
Loading