Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 196d8c4

Browse files
committedMar 11, 2025
Make PoolSv2::start unblocking
Currently the state loop at the end of `PoolSv2::start` is blocking, it is wrapped with `tokio::task` so we can return from the `start` function. This should give us more flexibelty in how we work with `PoolSv2`. For example we can access the `PoolSv2` state directly through the struct.
1 parent ea139a7 commit 196d8c4

File tree

1 file changed

+43
-40
lines changed

1 file changed

+43
-40
lines changed
 

‎roles/pool/src/lib/mod.rs

+43-40
Original file line numberDiff line numberDiff line change
@@ -57,51 +57,54 @@ impl PoolSv2 {
5757

5858
// Start the error handling loop
5959
// See `./status.rs` and `utils/error_handling` for information on how this operates
60-
loop {
61-
let task_status = select! {
62-
task_status = status_rx.recv() => task_status,
63-
interrupt_signal = tokio::signal::ctrl_c() => {
64-
match interrupt_signal {
65-
Ok(()) => {
66-
info!("Interrupt received");
67-
},
68-
Err(err) => {
69-
error!("Unable to listen for interrupt signal: {}", err);
70-
// we also shut down in case of error
71-
},
60+
tokio::spawn(async move {
61+
loop {
62+
let task_status = select! {
63+
task_status = status_rx.recv() => task_status,
64+
interrupt_signal = tokio::signal::ctrl_c() => {
65+
match interrupt_signal {
66+
Ok(()) => {
67+
info!("Interrupt received");
68+
},
69+
Err(err) => {
70+
error!("Unable to listen for interrupt signal: {}", err);
71+
// we also shut down in case of error
72+
},
73+
}
74+
break;
7275
}
73-
break Ok(());
74-
}
75-
};
76-
let task_status: status::Status = task_status.unwrap();
76+
};
77+
let task_status: status::Status = task_status.unwrap();
7778

78-
match task_status.state {
79-
// Should only be sent by the downstream listener
80-
status::State::DownstreamShutdown(err) => {
81-
error!(
82-
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
83-
err
84-
);
85-
break Ok(());
86-
}
87-
status::State::TemplateProviderShutdown(err) => {
88-
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
89-
break Ok(());
90-
}
91-
status::State::Healthy(msg) => {
92-
info!("HEALTHY message: {}", msg);
93-
}
94-
status::State::DownstreamInstanceDropped(downstream_id) => {
95-
warn!("Dropping downstream instance {} from pool", downstream_id);
96-
if pool
97-
.safe_lock(|p| p.remove_downstream(downstream_id))
98-
.is_err()
99-
{
100-
break Ok(());
79+
match task_status.state {
80+
// Should only be sent by the downstream listener
81+
status::State::DownstreamShutdown(err) => {
82+
error!(
83+
"SHUTDOWN from Downstream: {}\nTry to restart the downstream listener",
84+
err
85+
);
86+
break;
87+
}
88+
status::State::TemplateProviderShutdown(err) => {
89+
error!("SHUTDOWN from Upstream: {}\nTry to reconnecting or connecting to a new upstream", err);
90+
break;
91+
}
92+
status::State::Healthy(msg) => {
93+
info!("HEALTHY message: {}", msg);
94+
}
95+
status::State::DownstreamInstanceDropped(downstream_id) => {
96+
warn!("Dropping downstream instance {} from pool", downstream_id);
97+
if pool
98+
.safe_lock(|p| p.remove_downstream(downstream_id))
99+
.is_err()
100+
{
101+
break;
102+
}
101103
}
102104
}
103105
}
104-
}
106+
});
107+
Ok(())
105108
}
106109
}
107110

0 commit comments

Comments
 (0)
Please sign in to comment.