-
Notifications
You must be signed in to change notification settings - Fork 16
metrics: Expose litep2p metrics in an agnostic manner #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
lexnv
wants to merge
32
commits into
master
Choose a base branch
from
lexnv/metrics
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
2245703
cargo: Add metrics feature flag with prometheus
lexnv c6154da
litep2p/config: Expose prometheus metrics in config
lexnv f3948c1
cargo: Make litep2p agnostic of metrics
lexnv 31144bf
metrics: Add abstraction layers over metrics
lexnv 77332aa
Expose metrics to litep2p transport layers
lexnv 3ec010c
tcp: Extend collected metrics at intervals for code simplicity
lexnv 5936083
tcp: Selectively poll interval if the metrics are enabled
lexnv c68ef20
metric: Extend Gauge methods with inc / dec
lexnv 907312f
metrics: Add scope RAII metric counter
lexnv 4362919
metrics: Make register methods less restrictive
lexnv 5d91741
tcp: Extend with connection metrics
lexnv 05f5741
websocket: Add specific metrics
lexnv f6d478d
req-resp: Propagate metrics
lexnv b5e80e2
notifications: Expose metrics
lexnv fc70eea
ping: Add metrics
lexnv b91b8a6
identify: Propagate metrics
lexnv 48cb7ca
kad: Add metrics
lexnv 27042cb
transport/manager: Add metrics
lexnv 2de8a68
Simplify metrics collection
lexnv 142e08f
tests: Adjust testing
lexnv 7079ade
protocol: To metric name string
lexnv e543853
transport: Simplify collection
lexnv 17786b6
tcp: Decrement metrics on connection closed
lexnv e43ee71
websocket: Decrement metrics on connection closed
lexnv fd084c8
websocket: Use ref mut self
lexnv 09bd786
websocket: Fix clone move
lexnv cbc273d
metrics: Add MeteredFuturesStream
lexnv deb8b1a
metrics: Use MeteredFuturesStream for tcp and websocket
lexnv cf3faf1
Merge remote-tracking branch 'origin/master' into lexnv/metrics
lexnv 125ee19
Fix docs
lexnv 8ccf710
Merge branch 'master' into lexnv/metrics
lexnv dfd4b0d
Merge remote-tracking branch 'origin/master' into lexnv/metrics
lexnv File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
// Copyright 2020 Parity Technologies (UK) Ltd. | ||
// Copyright 2024 litep2p developers | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a | ||
// copy of this software and associated documentation files (the "Software"), | ||
// to deal in the Software without restriction, including without limitation | ||
// the rights to use, copy, modify, merge, publish, distribute, sublicense, | ||
// and/or sell copies of the Software, and to permit persons to whom the | ||
// Software is furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | ||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | ||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER | ||
// DEALINGS IN THE SOFTWARE. | ||
|
||
//! A generic module for handling the metrics exposed by litep2p. | ||
//! | ||
//! Contains the traits and types that are used to define and interact with metrics. | ||
|
||
use crate::{utils::futures_stream::FuturesStream, Error}; | ||
use futures::{Stream, StreamExt}; | ||
use std::{ | ||
future::Future, | ||
pin::Pin, | ||
sync::Arc, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
pub type MetricCounter = Arc<dyn MetricCounterT>; | ||
|
||
pub type MetricGauge = Arc<dyn MetricGaugeT>; | ||
|
||
pub type MetricsRegistry = Arc<dyn MetricsRegistryT>; | ||
|
||
/// Represents a metric that can only go up. | ||
pub trait MetricCounterT: Send + Sync { | ||
/// Increment the counter by `value`. | ||
fn inc(&self, value: u64); | ||
} | ||
|
||
/// Represents a metric that can arbitrarily go up and down. | ||
pub trait MetricGaugeT: Send + Sync { | ||
/// Set the gauge to `value`. | ||
fn set(&self, value: u64); | ||
|
||
/// Increment the gauge. | ||
fn inc(&self); | ||
|
||
/// Decrement the gauge. | ||
fn dec(&self); | ||
|
||
/// Add `value` to the gauge. | ||
fn add(&self, value: u64); | ||
|
||
/// Subtract `value` from the gauge. | ||
fn sub(&self, value: u64); | ||
} | ||
|
||
/// A registry for metrics. | ||
pub trait MetricsRegistryT: Send + Sync { | ||
/// Register a new counter. | ||
fn register_counter(&self, name: String, help: String) -> Result<MetricCounter, Error>; | ||
|
||
/// Register a new gauge. | ||
fn register_gauge(&self, name: String, help: String) -> Result<MetricGauge, Error>; | ||
} | ||
|
||
/// A scope for metrics that modifies a provided gauge in an RAII fashion. | ||
/// | ||
/// The gauge is incremented when constructed and decremented when the object is dropped. | ||
#[derive(Clone)] | ||
pub struct ScopeGaugeMetric { | ||
inner: MetricGauge, | ||
} | ||
|
||
impl ScopeGaugeMetric { | ||
/// Create a new [`ScopeGaugeMetric`]. | ||
pub fn new(inner: MetricGauge) -> Self { | ||
inner.inc(); | ||
ScopeGaugeMetric { inner } | ||
} | ||
} | ||
|
||
impl Drop for ScopeGaugeMetric { | ||
fn drop(&mut self) { | ||
self.inner.dec(); | ||
} | ||
} | ||
|
||
/// Wrapper around `FuturesStream` that provides information to the given metric. | ||
#[derive(Default)] | ||
pub struct MeteredFuturesStream<F> { | ||
stream: FuturesStream<F>, | ||
metric: Option<MetricGauge>, | ||
} | ||
|
||
impl<F> MeteredFuturesStream<F> { | ||
pub fn new(metric: Option<MetricGauge>) -> Self { | ||
MeteredFuturesStream { | ||
stream: FuturesStream::new(), | ||
metric, | ||
} | ||
} | ||
|
||
pub fn push(&mut self, future: F) { | ||
if let Some(ref metric) = self.metric { | ||
metric.inc(); | ||
} | ||
|
||
self.stream.push(future); | ||
} | ||
|
||
/// Number of futures in the stream. | ||
pub fn len(&self) -> usize { | ||
self.stream.len() | ||
} | ||
|
||
/// Returns `true` if the stream is empty. | ||
pub fn is_empty(&self) -> bool { | ||
self.stream.len() == 0 | ||
} | ||
} | ||
|
||
impl<F: Future> Stream for MeteredFuturesStream<F> { | ||
type Item = <F as Future>::Output; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let result = self.stream.poll_next_unpin(cx); | ||
if result.is_ready() { | ||
if let Some(ref metric) = self.metric { | ||
metric.dec(); | ||
} | ||
} | ||
result | ||
} | ||
} | ||
|
||
impl<F> Drop for MeteredFuturesStream<F> { | ||
fn drop(&mut self) { | ||
if let Some(ref metric) = self.metric { | ||
metric.sub(self.len() as u64); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere?