Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions linera-rpc/src/cross_chain_message_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
handle_request: F,
) where
F: Fn(ShardId, CrossChainRequest) -> G + Send + Clone + 'static,
G: Future<Output = anyhow::Result<()>>,
G: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let mut steps = futures::stream::FuturesUnordered::new();
let mut steps = tokio::task::JoinSet::new();
let mut job_states: HashMap<QueueId, JobState> = HashMap::new();

let run_task = |task: Task| async move {
Expand Down Expand Up @@ -116,7 +116,7 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
metrics::CROSS_CHAIN_MESSAGE_TASKS.set(job_states.len() as i64);

tokio::select! {
Some((queue, action)) = steps.next() => {
Some(Ok((queue, action))) = steps.join_next() => {
let Entry::Occupied(mut state) = job_states.entry(queue) else {
panic!("running job without state");
};
Expand All @@ -130,7 +130,7 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
state.get_mut().retries += 1
}

steps.push(run_action.clone()(action, queue, state.get().clone()));
steps.spawn(run_action.clone()(action, queue, state.get().clone()));
}

request = receiver.next() => {
Expand All @@ -151,16 +151,18 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
};

match job_states.entry(queue) {
Entry::Vacant(entry) => steps.push(run_action.clone()(
Action::Proceed { id: 0 },
queue,
entry.insert(JobState {
id: 0,
retries: 0,
nickname: nickname.clone(),
task,
}).clone(),
)),
Entry::Vacant(entry) => {
steps.spawn(run_action.clone()(
Action::Proceed { id: 0 },
queue,
entry.insert(JobState {
id: 0,
retries: 0,
nickname: nickname.clone(),
task,
}).clone(),
));
}

Entry::Occupied(mut entry) => {
entry.insert(JobState {
Expand Down
Loading