refactor: msg pool to make more structured part 3#7033
Conversation
…same message again
…elpers to methods, share via Arc
WalkthroughThis PR restructures the MessagePool internals by converting standalone helper functions into methods backed by MessagePool-owned caches, renaming pending_store to pending, introducing explicit message validation/insertion pipelines, implementing apply_head_change for chain reorg handling, and adding run_republish_cycle for centralized republish management. ChangesMessagePool Refactoring and Reorg/Republish Flow
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Warning Review ran into problems🔥 ProblemsStopped waiting for pipeline failures after 30000ms. One of your pipelines takes longer than our 30000ms fetch window to run, so review may not consider pipeline-failure results for inline comments if any failures occurred after the fetch window. Increase the timeout if you want to wait longer or run a Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Comment |
48da6f8 to
d353923
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/message_pool/msgpool/selection.rs (1)
653-662:⚠️ Potential issue | 🟠 Major | ⚡ Quick winKeep
run_head_changeside-effect free.
run_head_changeis documented as a simulation, but this change now passes&self.pendinginto it and the fallback path still callspending_store.remove(...). Any miss inrmsgscan therefore mutate the live mpool during selection, which is especially risky when address resolution is needed.Suggested fix
run_head_change( self.api.as_ref(), &self.caches.bls_sig, - &self.pending, &self.caches.key, cur_ts.clone(), ts.clone(), &mut result, )?; @@ pub(in crate::message_pool) fn run_head_change<T>( api: &T, bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>, - pending_store: &PendingStore, key_cache: &IdToAddressCache, from: Tipset, to: Tipset, rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>, ) -> Result<(), Error> @@ fn remove_applied_from_pool<T: Provider>( api: &T, key_cache: &IdToAddressCache, - pending_store: &PendingStore, ts: &Tipset, from: &Address, sequence: u64, rmsgs: &mut HashMap<Address, HashMap<u64, SignedMessage>>, ) -> Result<(), Error> { @@ if rmsgs .get_mut(from) .and_then(|temp| temp.remove(&sequence)) .is_none() && let Ok(resolved) = resolve_to_key(api, key_cache, from, ts) .inspect_err(|e| tracing::debug!(%from, "remove: failed to resolve address: {e:#}")) { - let _ = pending_store.remove(&resolved, sequence, true); + let _ = rmsgs + .get_mut(&resolved) + .and_then(|temp| temp.remove(&sequence)); } Ok(()) }Also applies to: 884-905
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/message_pool/msgpool/selection.rs` around lines 653 - 662, run_head_change is meant to be a pure simulation but currently receives a reference to the live pending set (self.pending) and the fallback path still calls pending_store.remove(...) which can mutate the live mpool; change the call sites of run_head_change (the call passing self.pending and the analogous call around lines 884-905) to pass an isolated copy or read-only snapshot of the pending data (e.g., clone the PendingStore or clone the rmsgs collection) so the simulation only operates on the clone, and update the fallback logic to remove from the real pending store only after the simulation decides removals are required; ensure functions and variables referenced include run_head_change, self.pending, pending_store.remove, and rmsgs so you locate and replace direct uses with the cloned/snapshot variants.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/message_pool/msgpool/msg_pool.rs`:
- Around line 247-251: The add(...) method currently calls add_to_pool(msg,
false, TrustPolicy::Trusted) for gossip inserts; change it to use
TrustPolicy::Untrusted so externally received gossip messages are subject to
admission limits and untrusted caps/gap rules. Locate the add function and
replace the TrustPolicy argument passed to add_to_pool with
TrustPolicy::Untrusted, ensuring the call signature (add_to_pool(msg, false,
...)) remains unchanged.
In `@src/message_pool/msgpool/reorg.rs`:
- Around line 41-43: The code currently swallows failures from calls like
self.api.load_tipset(ts.parents()), block message fetches, and reinsertion by
logging and continuing, which leaves apply_head_change partially applied while
still returning Ok(()); change those branches to propagate errors (return
Err(...)) instead of just tracing::error + continue. Locate the calls to
load_tipset, fetch_block_messages (or similar block-msg fetch functions), and
any reinsert/reapply paths inside the reorg processing routine (e.g., where
apply_head_change is invoked) and replace the log+continue behavior with early
returns that convert the underlying failure into a suitable error value returned
by the surrounding function so the caller can detect partial failure. Ensure the
error returned includes context about which operation failed (loading parent
tipset, fetching block messages, or reinserting) and references the tipset or
block identifier for easier debugging.
In `@src/message_pool/msgpool/republish.rs`:
- Around line 177-184: The bubble-down loop in republish.rs uses j for
comparisons but erroneously swaps using i and i + 1, preventing proper bubbling;
update the swap call in the loop (the line with chains.key_vec.swap) to swap j
and j + 1 (i.e., chains.key_vec.swap(j, j + 1)) so the element being compared
actually moves down the list in the while loop that checks
chains[j].compare(&chains[j + 1]).
---
Outside diff comments:
In `@src/message_pool/msgpool/selection.rs`:
- Around line 653-662: run_head_change is meant to be a pure simulation but
currently receives a reference to the live pending set (self.pending) and the
fallback path still calls pending_store.remove(...) which can mutate the live
mpool; change the call sites of run_head_change (the call passing self.pending
and the analogous call around lines 884-905) to pass an isolated copy or
read-only snapshot of the pending data (e.g., clone the PendingStore or clone
the rmsgs collection) so the simulation only operates on the clone, and update
the fallback logic to remove from the real pending store only after the
simulation decides removals are required; ensure functions and variables
referenced include run_head_change, self.pending, pending_store.remove, and
rmsgs so you locate and replace direct uses with the cloned/snapshot variants.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: cc44e6a6-4cb1-4105-b805-df5f487a3866
📒 Files selected for processing (13)
src/message_pool/config.rssrc/message_pool/errors.rssrc/message_pool/msgpool/mod.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/msg_set.rssrc/message_pool/msgpool/reorg.rssrc/message_pool/msgpool/republish.rssrc/message_pool/msgpool/selection.rssrc/message_pool/msgpool/utils.rssrc/message_pool/nonce_tracker.rssrc/rpc/methods/eth.rssrc/rpc/methods/gas.rssrc/utils/cache/lru.rs
💤 Files with no reviewable changes (3)
- src/message_pool/config.rs
- src/message_pool/errors.rs
- src/utils/cache/lru.rs
| /// Insert a message received via gossip. Runs full validation. Does | ||
| /// not publish back to the network. | ||
| pub fn add(&self, msg: SignedMessage) -> Result<(), Error> { | ||
| self.check_message(&msg)?; | ||
| let ts = self.current_tipset(); | ||
| self.add_tipset(msg, &ts, false, TrustPolicy::Trusted)?; | ||
| self.add_to_pool(msg, false, TrustPolicy::Trusted)?; | ||
| Ok(()) |
There was a problem hiding this comment.
Treat gossip inserts as untrusted to enforce admission limits.
At Line 250, gossip messages are inserted with TrustPolicy::Trusted, which bypasses the stricter untrusted caps/gap rules intended for externally sourced traffic.
Suggested fix
pub fn add(&self, msg: SignedMessage) -> Result<(), Error> {
- self.add_to_pool(msg, false, TrustPolicy::Trusted)?;
+ self.add_to_pool(msg, false, TrustPolicy::Untrusted)?;
Ok(())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Insert a message received via gossip. Runs full validation. Does | |
| /// not publish back to the network. | |
| pub fn add(&self, msg: SignedMessage) -> Result<(), Error> { | |
| self.check_message(&msg)?; | |
| let ts = self.current_tipset(); | |
| self.add_tipset(msg, &ts, false, TrustPolicy::Trusted)?; | |
| self.add_to_pool(msg, false, TrustPolicy::Trusted)?; | |
| Ok(()) | |
| /// Insert a message received via gossip. Runs full validation. Does | |
| /// not publish back to the network. | |
| pub fn add(&self, msg: SignedMessage) -> Result<(), Error> { | |
| self.add_to_pool(msg, false, TrustPolicy::Untrusted)?; | |
| Ok(()) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/message_pool/msgpool/msg_pool.rs` around lines 247 - 251, The add(...)
method currently calls add_to_pool(msg, false, TrustPolicy::Trusted) for gossip
inserts; change it to use TrustPolicy::Untrusted so externally received gossip
messages are subject to admission limits and untrusted caps/gap rules. Locate
the add function and replace the TrustPolicy argument passed to add_to_pool with
TrustPolicy::Untrusted, ensuring the call signature (add_to_pool(msg, false,
...)) remains unchanged.
| let Ok(pts) = self.api.load_tipset(ts.parents()) else { | ||
| tracing::error!("error loading reverted tipset parent"); | ||
| continue; |
There was a problem hiding this comment.
Propagate reorg failures instead of returning success after a partial update.
These branches log and continue even though apply_head_change is mutating the live pool. If any parent load, block-message fetch, or reinsert fails here, the mpool can end up half-updated while the caller still gets Ok(()), which can leave applied messages pending or drop reverted ones.
Also applies to: 49-51, 71-73, 98-105
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/message_pool/msgpool/reorg.rs` around lines 41 - 43, The code currently
swallows failures from calls like self.api.load_tipset(ts.parents()), block
message fetches, and reinsertion by logging and continuing, which leaves
apply_head_change partially applied while still returning Ok(()); change those
branches to propagate errors (return Err(...)) instead of just tracing::error +
continue. Locate the calls to load_tipset, fetch_block_messages (or similar
block-msg fetch functions), and any reinsert/reapply paths inside the reorg
processing routine (e.g., where apply_head_change is invoked) and replace the
log+continue behavior with early returns that convert the underlying failure
into a suitable error value returned by the surrounding function so the caller
can detect partial failure. Ensure the error returned includes context about
which operation failed (loading parent tipset, fetching block messages, or
reinserting) and references the tipset or block identifier for easier debugging.
| let mut j = i; | ||
| while j < chains.len() - 1 { | ||
| #[allow(clippy::indexing_slicing)] | ||
| if chains[j].compare(&chains[j + 1]) == Ordering::Less { | ||
| break; | ||
| } | ||
| chains.key_vec.swap(i, i + 1); | ||
| j += 1; |
There was a problem hiding this comment.
Use j in the bubble-down swap.
The loop compares chains[j] with chains[j + 1], but it keeps swapping i/i + 1. After the first iteration that just toggles the original pair back and forth, so the trimmed chain never bubbles past one slot.
Suggested fix
let mut j = i;
while j < chains.len() - 1 {
#[allow(clippy::indexing_slicing)]
if chains[j].compare(&chains[j + 1]) == Ordering::Less {
break;
}
- chains.key_vec.swap(i, i + 1);
+ chains.key_vec.swap(j, j + 1);
j += 1;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut j = i; | |
| while j < chains.len() - 1 { | |
| #[allow(clippy::indexing_slicing)] | |
| if chains[j].compare(&chains[j + 1]) == Ordering::Less { | |
| break; | |
| } | |
| chains.key_vec.swap(i, i + 1); | |
| j += 1; | |
| let mut j = i; | |
| while j < chains.len() - 1 { | |
| #[allow(clippy::indexing_slicing)] | |
| if chains[j].compare(&chains[j + 1]) == Ordering::Less { | |
| break; | |
| } | |
| chains.key_vec.swap(j, j + 1); | |
| j += 1; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/message_pool/msgpool/republish.rs` around lines 177 - 184, The
bubble-down loop in republish.rs uses j for comparisons but erroneously swaps
using i and i + 1, preventing proper bubbling; update the swap call in the loop
(the line with chains.key_vec.swap) to swap j and j + 1 (i.e.,
chains.key_vec.swap(j, j + 1)) so the element being compared actually moves down
the list in the while loop that checks chains[j].compare(&chains[j + 1]).
Summary of changes
Changes introduced in this pull request:
This PR is part 3 of restructuring of msg pool, it contains:
republish_cyclepart of the MessagePool, instead of being a free function with unlimited paramshead_change->apply_head_changerepublish_pending_messages->run_republish_cycleadd_helper->add_to_pool_uncheckedadd_tipset->add_to_poolRepublishState: groups the republished CID set + the early-wake trigger channelvalidate_static/validate_signature/validate_with_state)Error::MessageValueTooHighvariant (the same check is already performed byvalid_for_block_inclusion).Reference issue to close (if applicable)
Closes
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit
Refactor
Bug Fixes