config: engine: Implement adaptive flushing#11498
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 052256aed4
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
📝 WalkthroughWalkthroughAdds adaptive flush: new configuration options and macros, new fields in flb_config, two public API functions, implementation and integration of adaptive flush logic in the engine (timer reset, backpressure measurement, hysteresis), initialization wiring, and unit tests plus conf comments. Changes
Sequence DiagramsequenceDiagram
participant E as Engine
participant C as Config
participant O as Outputs
participant T as Timer
Note over E: After a flush event
E->>O: Query backlog/size per output
O-->>E: Return max backpressure %
E->>E: flb_engine_adaptive_flush_target_level(pressure)
E->>E: Apply hysteresis (up/down steps) -> determine target level
E->>C: Update adaptive_level, hits, direction
E->>E: flb_engine_adaptive_flush_interval(level)
E->>C: Update flush_adaptive_current_interval
alt Interval changed
E->>T: flb_engine_flush_timer_reset(new_interval)
T-->>E: Timer restarted
E->>E: Log interval change
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
src/flb_engine.c (1)
354-365: Reuse the helper APIs inside the update path to avoid logic drift.
flb_engine_adaptive_flush_update()reimplements threshold-to-level and interval computation logic already exposed byflb_engine_adaptive_flush_target_level()andflb_engine_adaptive_flush_interval().♻️ Proposed simplification
- if (pressure >= config->flush_adaptive_high_pressure) { - target_level = 3; - } - else if (pressure >= config->flush_adaptive_medium_pressure) { - target_level = 2; - } - else if (pressure <= config->flush_adaptive_low_pressure) { - target_level = 0; - } - else { - target_level = 1; - } + target_level = flb_engine_adaptive_flush_target_level(config, pressure); @@ - interval = config->flush * multipliers[config->flush_adaptive_level]; - - interval = adaptive_flush_clamp(interval, - config->flush_adaptive_min_interval, - config->flush_adaptive_max_interval); + interval = flb_engine_adaptive_flush_interval(config, + config->flush_adaptive_level);Also applies to: 394-398
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_engine.c` around lines 354 - 365, In flb_engine_adaptive_flush_update(), replace the duplicated threshold-to-level and interval computation with calls to the existing helpers: call flb_engine_adaptive_flush_target_level(pressure, config) to get target_level and then flb_engine_adaptive_flush_interval(target_level, config) to compute the interval; update the code paths that currently reimplement those conditionals (also the similar block around the other update at the later location) to use these helpers so logic is centralized and cannot drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/flb_engine.c`:
- Around line 1184-1202: Clamp all three adaptive pressure thresholds to the
[0,100] range before performing ordering adjustments: ensure
config->flush_adaptive_low_pressure, config->flush_adaptive_medium_pressure and
config->flush_adaptive_high_pressure are each bounded to 0.0 and 100.0 first,
then run the existing ordering checks that adjust medium/high relative to low;
this prevents medium or high from remaining out-of-range even after the
cross-field corrections.
- Around line 1204-1210: The startup sets
config->flush_adaptive_current_interval to the raw config->flush which can
violate adaptive min/max; before calling flb_engine_flush_timer_reset you should
initialize the first adaptive interval using the adaptive calculation routine
(e.g. call flb_engine_flush_adaptive_calculate(config) or otherwise compute &
clamp using config->flush_adaptive_min/max) so that
config->flush_adaptive_current_interval reflects adaptive rules, then pass that
value to flb_engine_flush_timer_reset instead of config->flush.
- Around line 285-303: Currently the code destroys the existing flush timer with
mk_event_timeout_destroy(event) before calling mk_event_timeout_create for the
new interval, which can leave flushing stopped if creation fails; change the
flow to attempt creating the new timer first (use a temporary event/timer or
save current config->flush_fd and event state), call mk_event_timeout_create
with flb_time_from_double(&t_flush, interval) into a temporary fd, and only on
success set event->priority = FLB_ENGINE_PRIORITY_FLUSH, update config->flush_fd
to the new fd and destroy the old timer (mk_event_timeout_destroy) and
free/replace the event; if creation fails, keep the existing
config->flush_fd/event intact and call flb_utils_error(FLB_ERR_CFG_FLUSH_CREATE)
and return -1 so periodic flushing is not lost.
In `@tests/internal/engine_adaptive_flush.c`:
- Around line 11-17: The test dereferences config after calling
flb_config_init() but only uses TEST_CHECK(config != NULL), which doesn't stop
execution on failure; update the test to guard against a NULL config by either
replacing TEST_CHECK with a failing assertion macro (e.g., TEST_REQUIRE or
similar) or adding an explicit null-check after flb_config_init(): if (config ==
NULL) { TEST_FAIL("flb_config_init() returned NULL"); return; } then set
config->flush_adaptive_low_pressure, config->flush_adaptive_medium_pressure, and
config->flush_adaptive_high_pressure only when config is non-NULL; apply the
same change to the other identical block that uses flb_config_init() and the
same flush_adaptive_* fields.
---
Nitpick comments:
In `@src/flb_engine.c`:
- Around line 354-365: In flb_engine_adaptive_flush_update(), replace the
duplicated threshold-to-level and interval computation with calls to the
existing helpers: call flb_engine_adaptive_flush_target_level(pressure, config)
to get target_level and then flb_engine_adaptive_flush_interval(target_level,
config) to compute the interval; update the code paths that currently
reimplement those conditionals (also the similar block around the other update
at the later location) to use these helpers so logic is centralized and cannot
drift.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
conf/fluent-bit.confinclude/fluent-bit/flb_config.hinclude/fluent-bit/flb_engine.hsrc/flb_config.csrc/flb_engine.ctests/internal/CMakeLists.txttests/internal/engine_adaptive_flush.c
052256a to
01eecd7
Compare
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
01eecd7 to
bc55331
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/flb_engine.c (1)
446-464:⚠️ Potential issue | 🟠 MajorPressure threshold normalization is still incomplete for edge inputs.
Line 446 clamps only the low lower-bound and Line 450 clamps only the high upper-bound. Invalid values like
low > 100orhigh < 0can still leave inconsistent ordering after the current adjustments.🔧 Proposed fix
- if (config->flush_adaptive_low_pressure < 0.0) { - config->flush_adaptive_low_pressure = 0.0; - } - - if (config->flush_adaptive_high_pressure > 100.0) { - config->flush_adaptive_high_pressure = 100.0; - } + config->flush_adaptive_low_pressure = + adaptive_flush_clamp(config->flush_adaptive_low_pressure, 0.0, 100.0); + config->flush_adaptive_medium_pressure = + adaptive_flush_clamp(config->flush_adaptive_medium_pressure, 0.0, 100.0); + config->flush_adaptive_high_pressure = + adaptive_flush_clamp(config->flush_adaptive_high_pressure, 0.0, 100.0); if (config->flush_adaptive_low_pressure > config->flush_adaptive_medium_pressure) { config->flush_adaptive_medium_pressure = config->flush_adaptive_low_pressure; } if (config->flush_adaptive_medium_pressure > config->flush_adaptive_high_pressure) { config->flush_adaptive_medium_pressure = config->flush_adaptive_high_pressure; } + + if (config->flush_adaptive_low_pressure > + config->flush_adaptive_medium_pressure) { + config->flush_adaptive_low_pressure = + config->flush_adaptive_medium_pressure; + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_engine.c` around lines 446 - 464, The current normalization only clamps flush_adaptive_low_pressure lower-bound and flush_adaptive_high_pressure upper-bound leaving cases like low > 100 or high < 0 unhandled; update the normalization to first clamp both config->flush_adaptive_low_pressure and config->flush_adaptive_high_pressure into [0.0, 100.0], then ensure ordering by clamping config->flush_adaptive_medium_pressure between the (possibly adjusted) low and high values (e.g., medium = max(medium, low); medium = min(medium, high)), and finally ensure low <= medium <= high by adjusting medium as needed using the normalized low/high values so all edge inputs produce a consistent ordering.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/flb_engine.c`:
- Around line 426-470: The adaptive init currently always computes and sets
config->flush_adaptive_current_interval and the timer arming site
unconditionally, which lets adaptive clamps override a fixed Flush setting;
update flb_engine_adaptive_flush_init to only compute/assign
flush_adaptive_current_interval when adaptive mode is enabled (check the
adaptive flag/level), and modify the code that arms the timer (the place that
currently uses config->flush_adaptive_current_interval) to use
config->flush_interval when adaptive is disabled; reference
flb_engine_adaptive_flush_init, flb_engine_adaptive_flush_interval, and the
timer-arming call that uses flush_adaptive_current_interval and gate the
adaptive logic behind the adaptive enable/level check so fixed Flush values are
preserved.
---
Duplicate comments:
In `@src/flb_engine.c`:
- Around line 446-464: The current normalization only clamps
flush_adaptive_low_pressure lower-bound and flush_adaptive_high_pressure
upper-bound leaving cases like low > 100 or high < 0 unhandled; update the
normalization to first clamp both config->flush_adaptive_low_pressure and
config->flush_adaptive_high_pressure into [0.0, 100.0], then ensure ordering by
clamping config->flush_adaptive_medium_pressure between the (possibly adjusted)
low and high values (e.g., medium = max(medium, low); medium = min(medium,
high)), and finally ensure low <= medium <= high by adjusting medium as needed
using the normalized low/high values so all edge inputs produce a consistent
ordering.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
conf/fluent-bit.confinclude/fluent-bit/flb_config.hinclude/fluent-bit/flb_engine.hsrc/flb_config.csrc/flb_engine.ctests/internal/CMakeLists.txttests/internal/engine_adaptive_flush.c
🚧 Files skipped from review as they are similar to previous changes (3)
- tests/internal/engine_adaptive_flush.c
- include/fluent-bit/flb_engine.h
- conf/fluent-bit.conf
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
bc55331 to
c42c4ff
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/flb_engine.c (1)
451-469:⚠️ Potential issue | 🟠 Major
flush_adaptive_high_pressureis not clamped to>= 0; invalid threshold ordering can survive validation.
flush_adaptive_low_pressureis floored at 0 (line 451) andflush_adaptive_high_pressureis capped at 100 (line 455), but a negativehigh_pressurevalue passes through unguarded. The ordering correction at lines 465–469 then copies that negative value intoflush_adaptive_medium_pressure, leaving medium below the already-clamped low, which is a state the rest of the adaptive logic never recovers from.Minimal reproduction:
low=10, medium=20, high=-5→ after init:low=10, medium=-5, high=-5.The previous review's proposed fix (clamp all three independently to
[0, 100]before the ordering checks) was only partially applied.🛡️ Proposed fix
- if (config->flush_adaptive_low_pressure < 0.0) { - config->flush_adaptive_low_pressure = 0.0; - } - - if (config->flush_adaptive_high_pressure > 100.0) { - config->flush_adaptive_high_pressure = 100.0; - } + config->flush_adaptive_low_pressure = + adaptive_flush_clamp(config->flush_adaptive_low_pressure, 0.0, 100.0); + config->flush_adaptive_medium_pressure = + adaptive_flush_clamp(config->flush_adaptive_medium_pressure, 0.0, 100.0); + config->flush_adaptive_high_pressure = + adaptive_flush_clamp(config->flush_adaptive_high_pressure, 0.0, 100.0); if (config->flush_adaptive_low_pressure > config->flush_adaptive_medium_pressure) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_engine.c` around lines 451 - 469, The three adaptive pressure fields (config->flush_adaptive_low_pressure, config->flush_adaptive_medium_pressure, config->flush_adaptive_high_pressure) must each be independently clamped to the valid range [0.0, 100.0] before enforcing ordering; update the init logic so you floor any value < 0.0 to 0.0 and cap any value > 100.0 to 100.0 for all three fields, then apply the existing ordering corrections that copy low->medium or high->medium as needed (i.e., perform the per-field clamps first, then run the comparisons that adjust flush_adaptive_medium_pressure).
🧹 Nitpick comments (2)
src/flb_engine.c (1)
249-276: No outputs with limits → pressure always 0; consider logging a hint.When no output instance has
total_limit_size > 0, this function always returns 0.0, keeping adaptive at level 0 (2× slow). Users who enableflush.adaptivewithout configuring chunk limits will silently get the slowest interval with no diagnostic. A single one-timeflb_debuglog at init time noting that adaptive mode has no limited outputs to measure pressure from would help.This is not a logic error — the behavior is correct given the design — but it is an operational observability gap.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/flb_engine.c` around lines 249 - 276, flb_engine_get_chunk_backpressure_percent can silently return 0 when no output has total_limit_size > 0; add a one-time debug hint by detecting when no limited outputs were found (e.g., track a boolean saw_limited while iterating config->outputs) and, if saw_limited is false, emit a flb_debug log stating that flush.adaptive has no limited outputs to measure backpressure from; place this check/log in flb_engine_get_chunk_backpressure_percent (use the existing ins->total_limit_size and max_pressure variables) so operators see a diagnostic when adaptive mode will always use the slowest interval.tests/internal/engine_adaptive_flush.c (1)
33-64: LGTM — interval arithmetic and clamp checks are correct; floating-point equality is safe here.All multiplied values (2.0, 1.0, 0.75, 0.5) and their products with the test
flushvalues are exactly representable in IEEE 754 binary64, so direct==comparison is reliable. Level clamping and min/max bound clamping cases are well covered.Consider adding a test case that exercises
flb_engine_adaptive_flush_initwith an inverted threshold ordering (e.g.,high_pressure = -5) to catch the validation gap flagged insrc/flb_engine.clines 451–469.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/internal/engine_adaptive_flush.c` around lines 33 - 64, Add a new unit test that exercises flb_engine_adaptive_flush_init with inverted pressure thresholds (e.g., call flb_engine_adaptive_flush_init(config, high_pressure = -5, low_pressure = 5) after flb_config_init()), and assert the function either fails (non-zero/negative return) or normalizes the values by ensuring the resulting thresholds are in correct order (high >= low); reference flb_engine_adaptive_flush_init and the config object to locate where to add the check so the test detects the validation gap flagged in src/flb_engine.c.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/flb_engine.c`:
- Around line 451-469: The three adaptive pressure fields
(config->flush_adaptive_low_pressure, config->flush_adaptive_medium_pressure,
config->flush_adaptive_high_pressure) must each be independently clamped to the
valid range [0.0, 100.0] before enforcing ordering; update the init logic so you
floor any value < 0.0 to 0.0 and cap any value > 100.0 to 100.0 for all three
fields, then apply the existing ordering corrections that copy low->medium or
high->medium as needed (i.e., perform the per-field clamps first, then run the
comparisons that adjust flush_adaptive_medium_pressure).
---
Nitpick comments:
In `@src/flb_engine.c`:
- Around line 249-276: flb_engine_get_chunk_backpressure_percent can silently
return 0 when no output has total_limit_size > 0; add a one-time debug hint by
detecting when no limited outputs were found (e.g., track a boolean saw_limited
while iterating config->outputs) and, if saw_limited is false, emit a flb_debug
log stating that flush.adaptive has no limited outputs to measure backpressure
from; place this check/log in flb_engine_get_chunk_backpressure_percent (use the
existing ins->total_limit_size and max_pressure variables) so operators see a
diagnostic when adaptive mode will always use the slowest interval.
In `@tests/internal/engine_adaptive_flush.c`:
- Around line 33-64: Add a new unit test that exercises
flb_engine_adaptive_flush_init with inverted pressure thresholds (e.g., call
flb_engine_adaptive_flush_init(config, high_pressure = -5, low_pressure = 5)
after flb_config_init()), and assert the function either fails
(non-zero/negative return) or normalizes the values by ensuring the resulting
thresholds are in correct order (high >= low); reference
flb_engine_adaptive_flush_init and the config object to locate where to add the
check so the test detects the validation gap flagged in src/flb_engine.c.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
conf/fluent-bit.confinclude/fluent-bit/flb_engine.hsrc/flb_engine.ctests/internal/CMakeLists.txttests/internal/engine_adaptive_flush.c
🚧 Files skipped from review as they are similar to previous changes (2)
- conf/fluent-bit.conf
- include/fluent-bit/flb_engine.h
Adaptive flush parameters behavior
This behavior is described as follows with related parameters:
flush:flush.adaptive:off=> legacy fixed interval behavior.on=> dynamic interval updates on each flush cycle.flush.adaptive.min_interval/flush.adaptive.max_interval:flush.adaptive.low_pressure/flush.adaptive.medium_pressure/flush.adaptive.high_pressure:flush.adaptive.up_steps:faster level (smaller interval).
flush.adaptive.down_steps:(larger interval).
up_stepsto reduce oscillation.2.0xbase interval1.0xbase interval0.75xbase interval0.5xbase intervaleffective = clamp(flush * level_multiplier, min_interval, max_interval)Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Documentation
Tests