Skip to content

chore: Add doc and rename function for flushing strategy #740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions bottlecap/src/config/flush_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,27 @@ pub struct PeriodicStrategy {

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum FlushStrategy {
// Flush every 1s and at the end of the invocation
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some doc is moved from flush_control.rs

Default,
// User specifies the interval in milliseconds, will not block on the runtimeDone event
Periodically(PeriodicStrategy),
// Always flush at the end of the invocation
End,
// Flush both (1) at the end of the invocation and (2) periodically with the specified interval
EndPeriodically(PeriodicStrategy),
// Flush in a non-blocking, asynchronous manner, so the next invocation can start without waiting
// for the flush to complete
Continuously(PeriodicStrategy),
}

// A restricted subset of `FlushStrategy`. The Default strategy is now allowed, which is required to be
// translated into a concrete strategy.
#[allow(clippy::module_name_repetitions)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ConcreteFlushStrategy {
Periodically(PeriodicStrategy),
End,
EndPeriodically(PeriodicStrategy),
Continuously(PeriodicStrategy),
}

Expand Down
1 change: 1 addition & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ pub struct Config {
pub api_key: String,
pub log_level: LogLevel,

// Timeout for the request to flush data to Datadog endpoint
pub flush_timeout: u64,

// Proxy
Expand Down
37 changes: 16 additions & 21 deletions bottlecap/src/lifecycle/flush_control.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::flush_strategy::FlushStrategy;
use crate::config::flush_strategy::{ConcreteFlushStrategy, FlushStrategy};
use std::time;
use tokio::time::{Interval, MissedTickBehavior::Skip};

Expand All @@ -15,6 +15,7 @@ pub struct FlushControl {
flush_timeout: u64,
}

// The flush behavior for the current moment
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum FlushDecision {
Continuous,
Expand All @@ -23,12 +24,6 @@ pub enum FlushDecision {
Dont,
}

// 1. Default Strategy
// - Flush every 1s and at the end of the invocation
// 2. Periodic Strategy
// - User specifies the interval in milliseconds, will not block on the runtimeDone event
// 3. End strategy
// - Always flush at the end of the invocation
impl FlushControl {
#[must_use]
pub fn new(flush_strategy: FlushStrategy, flush_timeout: u64) -> FlushControl {
Expand Down Expand Up @@ -58,29 +53,29 @@ impl FlushControl {
i
}
FlushStrategy::End => {
// Set the race flush interval to the maximum value of Lambda timeout, so flush will
// only happen at the end of the invocation, and race flush will never happen.
tokio::time::interval(tokio::time::Duration::from_millis(FIFTEEN_MINUTES))
}
}
}

// Evaluate the flush decision for the current moment, based on the flush strategy, current time,
// and the past invocation times.
#[must_use]
pub fn evaluate_flush_decision(&mut self) -> FlushDecision {
let now = time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.expect("unable to poll clock, unrecoverable")
.as_secs();
self.invocation_times.add(now);
let evaluated_flush_strategy = if self.flush_strategy == FlushStrategy::Default {
&self.invocation_times.should_adapt(now, self.flush_timeout)
} else {
// User specified one
&self.flush_strategy
};
match evaluated_flush_strategy {
FlushStrategy::Default => {
unreachable!("should_adapt must translate default strategy to concrete strategy")
}
FlushStrategy::Periodically(strategy) => {
let concrete_flush_strategy = self.invocation_times.evaluate_concrete_strategy(
now,
self.flush_timeout,
self.flush_strategy,
);
match concrete_flush_strategy {
ConcreteFlushStrategy::Periodically(strategy) => {
if self.interval_passed(now, strategy.interval) {
self.last_flush = now;
// TODO calculate periodic rate. if it's more frequent than the flush_timeout
Expand All @@ -90,7 +85,7 @@ impl FlushControl {
FlushDecision::Dont
}
}
FlushStrategy::Continuously(strategy) => {
ConcreteFlushStrategy::Continuously(strategy) => {
if self.interval_passed(now, strategy.interval) {
self.last_flush = now;
// TODO calculate periodic rate. if it's more frequent than the flush_timeout
Expand All @@ -100,8 +95,8 @@ impl FlushControl {
FlushDecision::Dont
}
}
FlushStrategy::End => FlushDecision::End,
FlushStrategy::EndPeriodically(strategy) => {
ConcreteFlushStrategy::End => FlushDecision::End,
ConcreteFlushStrategy::EndPeriodically(strategy) => {
if self.interval_passed(now, strategy.interval) {
self.last_flush = now;
FlushDecision::End
Expand Down
105 changes: 69 additions & 36 deletions bottlecap/src/lifecycle/invocation_times.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::flush_strategy::{FlushStrategy, PeriodicStrategy};
use crate::config::flush_strategy::{ConcreteFlushStrategy, FlushStrategy, PeriodicStrategy};

const TWENTY_SECONDS: u64 = 20 * 1000;
const LOOKBACK_COUNT: usize = 20;
Expand All @@ -23,39 +23,63 @@ impl InvocationTimes {
self.head = (self.head + 1) % LOOKBACK_COUNT;
}

pub(crate) fn should_adapt(&self, now: u64, flush_timeout: u64) -> FlushStrategy {
// If the buffer isn't full, then we haven't seen enough invocations, so we should flush.
for idx in self.head..LOOKBACK_COUNT {
if self.times[idx] == 0 {
return FlushStrategy::End;
// Translate FlushStrategy to a ConcreteFlushStrategy
// For FlushStrategy::Default, evaluate based on past invocation times. Otherwise, return the
// strategy as is.
pub(crate) fn evaluate_concrete_strategy(
&self,
now: u64,
flush_timeout: u64,
flush_strategy: FlushStrategy,
) -> ConcreteFlushStrategy {
match flush_strategy {
FlushStrategy::Periodically(p) => ConcreteFlushStrategy::Periodically(p),
FlushStrategy::End => ConcreteFlushStrategy::End,
FlushStrategy::Continuously(p) => ConcreteFlushStrategy::Continuously(p),
FlushStrategy::EndPeriodically(p) => ConcreteFlushStrategy::EndPeriodically(p),
FlushStrategy::Default => {
// If the buffer isn't full, then we haven't seen enough invocations, so we should flush
// at the end of the invocation.
for idx in self.head..LOOKBACK_COUNT {
if self.times[idx] == 0 {
return ConcreteFlushStrategy::End;
}
}

// Now we've seen at least 20 invocations. Possible cases:
// 1. If the average time between invocations is longer than 2 minutes, stick to End strategy.
// 2. If average interval is shorter than 2 minutes:
// 2.1 If it's very short, use the continuous strategy to minimize delaying the next invocation.
// 2.2 If it's not too short, use the periodic strategy to minimize the risk that
// flushing is delayed due to the Lambda environment being frozen between invocations.
// We get the average time between each invocation by taking the difference between newest (`now`) and the
// oldest invocation in the buffer, then dividing by `LOOKBACK_COUNT - 1`.
let oldest = self.times[self.head];

let elapsed = now - oldest;
let should_adapt =
(elapsed as f64 / (LOOKBACK_COUNT - 1) as f64) < ONE_TWENTY_SECONDS;
if should_adapt {
// Both units here are in seconds
// TODO: What does this mean?
if elapsed < flush_timeout {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did that answer your question (in the other thread)?

return ConcreteFlushStrategy::Continuously(PeriodicStrategy {
interval: TWENTY_SECONDS,
});
}
return ConcreteFlushStrategy::Periodically(PeriodicStrategy {
interval: TWENTY_SECONDS,
});
}
ConcreteFlushStrategy::End
}
}

// Now we've seen at least 20 invocations. Switch to periodic if we're invoked at least once every 2 minutes.
// We get the average time between each invocation by taking the difference between newest (`now`) and the
// oldest invocation in the buffer, then dividing by `LOOKBACK_COUNT - 1`.
let oldest = self.times[self.head];

let elapsed = now - oldest;
let should_adapt = (elapsed as f64 / (LOOKBACK_COUNT - 1) as f64) < ONE_TWENTY_SECONDS;
if should_adapt {
// Both units here are in seconds
if elapsed < flush_timeout {
return FlushStrategy::Continuously(PeriodicStrategy {
interval: TWENTY_SECONDS,
});
}
return FlushStrategy::Periodically(PeriodicStrategy {
interval: TWENTY_SECONDS,
});
}
FlushStrategy::End
}
}

#[cfg(test)]
mod tests {
use crate::config::flush_strategy::{FlushStrategy, PeriodicStrategy};
use crate::config::flush_strategy::{ConcreteFlushStrategy, FlushStrategy, PeriodicStrategy};
use crate::lifecycle::invocation_times::{self, TWENTY_SECONDS};

#[test]
Expand All @@ -75,7 +99,10 @@ mod tests {
invocation_times.add(timestamp);
assert_eq!(invocation_times.times[0], timestamp);
assert_eq!(invocation_times.head, 1);
assert_eq!(invocation_times.should_adapt(1, 60), FlushStrategy::End);
assert_eq!(
invocation_times.evaluate_concrete_strategy(1, 60, FlushStrategy::Default),
ConcreteFlushStrategy::End
);
}

#[test]
Expand All @@ -88,8 +115,8 @@ mod tests {
assert_eq!(invocation_times.times[0], 20);
assert_eq!(invocation_times.head, 1);
assert_eq!(
invocation_times.should_adapt(21, 60),
FlushStrategy::Continuously(PeriodicStrategy {
invocation_times.evaluate_concrete_strategy(21, 60, FlushStrategy::Default),
ConcreteFlushStrategy::Continuously(PeriodicStrategy {
interval: TWENTY_SECONDS
})
);
Expand All @@ -105,8 +132,8 @@ mod tests {
assert_eq!(invocation_times.times[0], 20);
assert_eq!(invocation_times.head, 1);
assert_eq!(
invocation_times.should_adapt(21, 1),
FlushStrategy::Periodically(PeriodicStrategy {
invocation_times.evaluate_concrete_strategy(21, 1, FlushStrategy::Default),
ConcreteFlushStrategy::Periodically(PeriodicStrategy {
interval: TWENTY_SECONDS
})
);
Expand All @@ -122,7 +149,10 @@ mod tests {
// should wrap around
assert_eq!(invocation_times.times[0], 5019);
assert_eq!(invocation_times.head, 1);
assert_eq!(invocation_times.should_adapt(10000, 60), FlushStrategy::End);
assert_eq!(
invocation_times.evaluate_concrete_strategy(10000, 60, FlushStrategy::Default),
ConcreteFlushStrategy::End
);
}

#[test]
Expand All @@ -140,8 +170,8 @@ mod tests {
1901
);
assert_eq!(
invocation_times.should_adapt(2501, 60),
FlushStrategy::Periodically(PeriodicStrategy {
invocation_times.evaluate_concrete_strategy(2501, 60, FlushStrategy::Default),
ConcreteFlushStrategy::Periodically(PeriodicStrategy {
interval: TWENTY_SECONDS
})
);
Expand All @@ -161,6 +191,9 @@ mod tests {
invocation_times.times[invocation_times::LOOKBACK_COUNT - 1],
2471
);
assert_eq!(invocation_times.should_adapt(3251, 60), FlushStrategy::End);
assert_eq!(
invocation_times.evaluate_concrete_strategy(3251, 60, FlushStrategy::Default),
ConcreteFlushStrategy::End
);
}
}
Loading