Skip to content

Commit 60d57ef

Browse files
committed
Forward cross chain messages for different recipients actually in parallel
1 parent b5d9bcd commit 60d57ef

File tree

1 file changed

+16
-14
lines changed

1 file changed

+16
-14
lines changed

linera-rpc/src/cross_chain_message_queue.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
5858
handle_request: F,
5959
) where
6060
F: Fn(ShardId, CrossChainRequest) -> G + Send + Clone + 'static,
61-
G: Future<Output = anyhow::Result<()>>,
61+
G: Future<Output = anyhow::Result<()>> + Send + 'static,
6262
{
63-
let mut steps = futures::stream::FuturesUnordered::new();
63+
let mut steps = tokio::task::JoinSet::new();
6464
let mut job_states: HashMap<QueueId, JobState> = HashMap::new();
6565

6666
let run_task = |task: Task| async move {
@@ -116,7 +116,7 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
116116
metrics::CROSS_CHAIN_MESSAGE_TASKS.set(job_states.len() as i64);
117117

118118
tokio::select! {
119-
Some((queue, action)) = steps.next() => {
119+
Some(Ok((queue, action))) = steps.join_next() => {
120120
let Entry::Occupied(mut state) = job_states.entry(queue) else {
121121
panic!("running job without state");
122122
};
@@ -130,7 +130,7 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
130130
state.get_mut().retries += 1
131131
}
132132

133-
steps.push(run_action.clone()(action, queue, state.get().clone()));
133+
steps.spawn(run_action.clone()(action, queue, state.get().clone()));
134134
}
135135

136136
request = receiver.next() => {
@@ -151,16 +151,18 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
151151
};
152152

153153
match job_states.entry(queue) {
154-
Entry::Vacant(entry) => steps.push(run_action.clone()(
155-
Action::Proceed { id: 0 },
156-
queue,
157-
entry.insert(JobState {
158-
id: 0,
159-
retries: 0,
160-
nickname: nickname.clone(),
161-
task,
162-
}).clone(),
163-
)),
154+
Entry::Vacant(entry) => {
155+
steps.spawn(run_action.clone()(
156+
Action::Proceed { id: 0 },
157+
queue,
158+
entry.insert(JobState {
159+
id: 0,
160+
retries: 0,
161+
nickname: nickname.clone(),
162+
task,
163+
}).clone(),
164+
));
165+
}
164166

165167
Entry::Occupied(mut entry) => {
166168
entry.insert(JobState {

0 commit comments

Comments
 (0)