Skip to content

Commit

Permalink
Migrate to new version of bytes and capnp
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Noskov committed Jan 16, 2020
1 parent 18fb159 commit ba1b0c9
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 52 deletions.
14 changes: 6 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bioyino-metric"
version = "0.2.0"
version = "0.3.0"
authors = ["Sergey Noskov aka Albibek <[email protected]>"]
description = "Library for handling metrics in bioyino and related projects"
edition = "2018"
Expand All @@ -10,15 +10,13 @@ keywords = ["statsd", "metrics", "graphite"]
categories = ["parser-implementations", "network-programming"]

[dependencies]
serde="^1.0"
serde_derive="^1.0"
failure="^0.1"
failure_derive="^0.1"
capnp = "^0.10"
serde= { version = "^1.0", features = ["derive"] }
thiserror="^1.0"
capnp = "^0.11"
combine="^3.6"
bytes = { version = "^0.4", features = [ "serde" ] }
bytes = { version = "^0.5", features = [ "serde" ] }
num-traits="^0.2"
lazysort="^0.2"

[build-dependencies]
capnpc = "^0.10"
capnpc = "^0.11"
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
# bioyino-metrics
Capnp schema and functions for metrics used in bioyino and related projects
This library contains useful types and methods for working with metrics in bioyino statsd server and some other
metric processing software. Features:

* a type for representing typed and timestamped metrics, generic over floating point format
* streaming parser of statsd format
* metric aggregation routines
* working with Graphite-compatible metric naming including basic tags support
* schema and functions for sending/receiving metrics in binary Cap'n'Proto format

2 changes: 1 addition & 1 deletion src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::hash::{Hash, Hasher};
use std::str::FromStr;

use num_traits::{AsPrimitive, Float};
use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, Serialize};

use crate::metric::{FromF64, Metric, MetricType};

Expand Down
26 changes: 13 additions & 13 deletions src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,28 @@ use std::fmt::Debug;
use bytes::Bytes;
use capnp;
use capnp::message::{Allocator, Builder, HeapAllocator};
use failure::Error;
use failure_derive::Fail;
use serde_derive::{Deserialize, Serialize};
use num_traits::{AsPrimitive, Float};
use serde::{Deserialize, Serialize};
use thiserror::Error;

use crate::name::{find_tag_pos, MetricName, TagFormat};
use crate::protocol_capnp::{gauge, metric as cmetric, metric_type};

use num_traits::{AsPrimitive, Float};

#[derive(Fail, Debug)]
#[derive(Error, Debug)]
pub enum MetricError {
#[fail(display = "float conversion")]
#[error("float conversion")]
FloatToRatio,

#[fail(display = "bad sampling range")]
#[error("bad sampling range")]
Sampling,

#[fail(display = "aggregating metrics of different types")]
#[error("aggregating metrics of different types")]
Aggregating,

#[fail(display = "decoding error: {}", _0)]
#[error("decoding error: {}", _0)]
Capnp(capnp::Error),

#[fail(display = "schema error: {}", _0)]
#[error("schema error: {}", _0)]
CapnpSchema(capnp::NotInSchema),
}

Expand Down Expand Up @@ -110,7 +108,7 @@ where
}

/// Join self with a new incoming metric depending on type
pub fn accumulate(&mut self, new: Metric<F>) -> Result<(), Error> {
pub fn accumulate(&mut self, new: Metric<F>) -> Result<(), MetricError> {
use self::MetricType::*;
self.update_counter += new.update_counter;
match (&mut self.mtype, new.mtype) {
Expand Down Expand Up @@ -153,7 +151,9 @@ where
}

pub fn from_capnp(reader: cmetric::Reader) -> Result<(MetricName, Metric<F>), MetricError> {
let name: Bytes = reader.get_name().map_err(MetricError::Capnp)?.into();
//let name: Bytes = reader.get_name().map_err(MetricError::Capnp)?.into();
let name: &[u8] = reader.get_name().map_err(MetricError::Capnp)?.as_bytes();
let name = Bytes::copy_from_slice(name);
let tag_pos = find_tag_pos(&name[..], TagFormat::Graphite);
let name = MetricName::from_raw_parts(name, tag_pos);
let value: F = F::from_f64(reader.get_value());
Expand Down
52 changes: 26 additions & 26 deletions src/name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::hash::{Hash, Hasher};

use bytes::{BufMut, Bytes, BytesMut};
use num_traits::{AsPrimitive, Float};
use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, Serialize};

use crate::aggregate::Aggregate;
use crate::metric::FromF64;
Expand Down Expand Up @@ -193,14 +193,14 @@ impl MetricName {
None => {
buf.put_slice(&self.name);
if suflen > 0 {
buf.put(b'.');
buf.put_u8(b'.');
buf.put_slice(suffix);
}
}
Some(pos) => {
buf.put_slice(&self.name[..pos]);
if suflen > 0 {
buf.put(b'.');
buf.put_u8(b'.');
buf.put_slice(suffix);
}
if with_tags {
Expand Down Expand Up @@ -230,10 +230,10 @@ impl MetricName {
}
// easy case: no tags
if self.name[namelen - 1] != b';' {
buf.put(b';');
buf.put_u8(b';');
}
buf.put_slice(tag_name);
buf.put(b'=');
buf.put_u8(b'=');
buf.put_slice(tag);
}
Some(pos) => {
Expand Down Expand Up @@ -264,20 +264,20 @@ impl MetricName {

// prepend new tag with semicolon if required
if self.name[namelen - 1] != b';' {
buf.put(b';');
buf.put_u8(b';');
}

// put new tag
buf.put_slice(tag_name);
buf.put(b'=');
buf.put_u8(b'=');
buf.put_slice(tag);
} else if offset == pos + 1 {
// new tag is put before all tags

// put the new tag with semicolon
buf.put(b';');
buf.put_u8(b';');
buf.put_slice(tag_name);
buf.put(b'=');
buf.put_u8(b'=');
buf.put_slice(tag);

// put other tags with leading semicolon
Expand All @@ -288,11 +288,11 @@ impl MetricName {

// put the new tag
buf.put_slice(tag_name);
buf.put(b'=');
buf.put_u8(b'=');
buf.put_slice(tag);

if self.name[offset] != b';' {
buf.put(b';');
buf.put_u8(b';');
}

buf.extend_from_slice(&self.name[offset..]);
Expand Down Expand Up @@ -331,8 +331,8 @@ impl MetricName {
let prefix = prefix_replacements.get(&agg).ok_or(())?;
if !prefix.is_empty() {
buf.reserve(prefix.len() + 1);
buf.put(prefix);
buf.put(b'.');
buf.put_slice(prefix.as_bytes());
buf.put_u8(b'.');
}

// we should not use let agg_postfix before the match, because with the tag case we don't
Expand Down Expand Up @@ -399,7 +399,7 @@ mod tests {
}

fn assert_buf(buf: &mut BytesMut, match_: &[u8], error: &'static str) {
let res = &buf.take()[..];
let res = &buf.split()[..];
assert_eq!(
res,
match_,
Expand Down Expand Up @@ -529,14 +529,14 @@ mod tests {
without_tags
.put_with_aggregate(&mut buf, AggregationDestination::Smart, Aggregate::Value, &po_reps, &pr_reps, &t_reps)
.unwrap();
assert_eq!(&buf.take()[..], &b"gorets.bobez"[..]);
assert_eq!(&buf.split()[..], &b"gorets.bobez"[..]);

// update count is aggregated only with prefix
without_tags
.put_with_aggregate(&mut buf, AggregationDestination::Smart, Aggregate::UpdateCount, &po_reps, &pr_reps, &t_reps)
.unwrap();

assert_eq!(&buf.take()[..], &b"updates.gorets.bobez"[..]);
assert_eq!(&buf.split()[..], &b"updates.gorets.bobez"[..]);

with_tags
.put_with_aggregate(&mut buf, AggregationDestination::Smart, Aggregate::UpdateCount, &po_reps, &pr_reps, &t_reps)
Expand All @@ -552,7 +552,7 @@ mod tests {
without_tags
.put_with_aggregate(&mut buf, AggregationDestination::Smart, Aggregate::Count, &po_reps, &pr_reps, &t_reps)
.unwrap();
assert_eq!(&buf.take()[..], &b"counts.gorets.bobez.count"[..]);
assert_eq!(&buf.split()[..], &b"counts.gorets.bobez.count"[..]);

without_tags
.put_with_aggregate(
Expand All @@ -564,12 +564,12 @@ mod tests {
&t_reps,
)
.unwrap();
assert_eq!(&buf.take()[..], &b"gorets.bobez.percentile80"[..], "existing postfix replacement was not put");
assert_eq!(&buf.split()[..], &b"gorets.bobez.percentile80"[..], "existing postfix replacement was not put");

without_tags
.put_with_aggregate(&mut buf, AggregationDestination::Name, Aggregate::Count, &po_reps, &pr_reps, &t_reps)
.unwrap();
assert_eq!(&buf.take()[..], &b"counts.gorets.bobez.count"[..]);
assert_eq!(&buf.split()[..], &b"counts.gorets.bobez.count"[..]);

without_tags
.put_with_aggregate(
Expand All @@ -581,12 +581,12 @@ mod tests {
&t_reps,
)
.unwrap();
assert_eq!(&buf.take()[..], &b"gorets.bobez.percentile80"[..], "existing postfix replacement was not put");
assert_eq!(&buf.split()[..], &b"gorets.bobez.percentile80"[..], "existing postfix replacement was not put");

without_tags
.put_with_aggregate(&mut buf, AggregationDestination::Tag, Aggregate::Count, &po_reps, &pr_reps, &t_reps)
.unwrap();
assert_eq!(&buf.take()[..], &b"counts.gorets.bobez;agg=cnt"[..]);
assert_eq!(&buf.split()[..], &b"counts.gorets.bobez;agg=cnt"[..]);

let err = without_tags.put_with_aggregate(
&mut buf,
Expand All @@ -601,7 +601,7 @@ mod tests {
without_tags
.put_with_aggregate(&mut buf, AggregationDestination::Both, Aggregate::Count, &po_reps, &pr_reps, &t_reps)
.unwrap();
assert_eq!(&buf.take()[..], &b"counts.gorets.bobez.count;agg=cnt"[..]);
assert_eq!(&buf.split()[..], &b"counts.gorets.bobez.count;agg=cnt"[..]);

let err = without_tags.put_with_aggregate(
&mut buf,
Expand All @@ -621,12 +621,12 @@ mod tests {
with_tags
.put_with_aggregate(&mut buf, AggregationDestination::Smart, Aggregate::Value, &po_reps, &pr_reps, &t_reps)
.unwrap();
assert_eq!(&buf.take()[..], &b"gorets.bobez;tag=value"[..]);
assert_eq!(&buf.split()[..], &b"gorets.bobez;tag=value"[..]);

with_tags
.put_with_aggregate(&mut buf, AggregationDestination::Smart, Aggregate::Count, &po_reps, &pr_reps, &t_reps)
.unwrap();
assert_eq!(&buf.take()[..], &b"counts.gorets.bobez;agg=cnt;tag=value"[..]);
assert_eq!(&buf.split()[..], &b"counts.gorets.bobez;agg=cnt;tag=value"[..]);

let err = with_tags.put_with_aggregate(
&mut buf,
Expand Down Expand Up @@ -685,12 +685,12 @@ mod tests {
with_semicolon
.put_with_aggregate(&mut buf, AggregationDestination::Smart, Aggregate::Value, &po_reps, &pr_reps, &t_reps)
.unwrap();
assert_eq!(&buf.take()[..], &b"gorets.bobez;"[..]);
assert_eq!(&buf.split()[..], &b"gorets.bobez;"[..]);

with_semicolon
.put_with_aggregate(&mut buf, AggregationDestination::Smart, Aggregate::Count, &po_reps, &pr_reps, &t_reps)
.unwrap();
assert_eq!(&buf.take()[..], &b"counts.gorets.bobez;agg=cnt"[..]);
assert_eq!(&buf.split()[..], &b"counts.gorets.bobez;agg=cnt"[..]);
}

#[test]
Expand Down
6 changes: 3 additions & 3 deletions src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use combine::stream::{decode, PointerOffset, RangeStream, StreamErrorFor};
use combine::{choice, position};
use combine::{optional, skip_many1, Parser};

use bytes::BytesMut;
use bytes::{Buf, BytesMut};

use crate::metric::{FromF64, Metric, MetricType};
use crate::name::{sort_tags, MetricName, TagFormat};
Expand Down Expand Up @@ -440,8 +440,8 @@ mod tests {
use bytes::BufMut;
let mut data = BytesMut::from(&b"borets1"[..]);
data.reserve(1000);
data.put(193u8);
data.put(129u8);
data.put_u8(193u8);
data.put_u8(129u8);
data.put(&b":+1000|g\ngorets:-1000|g|@0.5"[..]);
let mut parser = make_parser(&mut data);
let r = parser.next();
Expand Down

0 comments on commit ba1b0c9

Please sign in to comment.