Skip to content

Commit 56a3127

Browse files
committed
Revert "aborting tasks with JoinHandle instead of futures abortable"
This reverts commit 9ab1f8b.
1 parent 05944dc commit 56a3127

File tree

9 files changed

+109
-78
lines changed

9 files changed

+109
-78
lines changed

Cargo.lock

+6-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs

+12-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use std::{
44
fmt::{self, Debug, Display},
5+
future::Future,
56
io,
67
net::{Ipv4Addr, SocketAddr},
78
sync::{
@@ -12,7 +13,7 @@ use std::{
1213
};
1314

1415
use byte_string::ByteStr;
15-
use futures::future;
16+
use futures::future::{self, AbortHandle};
1617
use log::{debug, info, trace};
1718
use shadowsocks::{
1819
config::Mode,
@@ -25,7 +26,6 @@ use shadowsocks::{
2526
};
2627
use tokio::{
2728
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
28-
task::JoinHandle,
2929
time,
3030
};
3131

@@ -72,7 +72,7 @@ impl PingBalancerBuilder {
7272
self.servers.push(Arc::new(server));
7373
}
7474

75-
pub async fn build(self) -> PingBalancer {
75+
pub async fn build(self) -> (PingBalancer, impl Future<Output = ()>) {
7676
assert!(!self.servers.is_empty(), "build PingBalancer without any servers");
7777

7878
let balancer_context = PingBalancerContext {
@@ -87,17 +87,21 @@ impl PingBalancerBuilder {
8787

8888
let shared_context = Arc::new(balancer_context);
8989

90-
let abortable = {
90+
let (checker, abortable) = {
9191
let shared_context = shared_context.clone();
92-
tokio::spawn(async move { shared_context.checker_task().await })
92+
future::abortable(async move { shared_context.checker_task().await })
93+
};
94+
let checker = async move {
95+
let _ = checker.await;
9396
};
9497

95-
PingBalancer {
98+
let balancer = PingBalancer {
9699
inner: Arc::new(PingBalancerInner {
97100
context: shared_context,
98101
abortable,
99102
}),
100-
}
103+
};
104+
(balancer, checker)
101105
}
102106
}
103107

@@ -256,7 +260,7 @@ impl PingBalancerContext {
256260

257261
struct PingBalancerInner {
258262
context: Arc<PingBalancerContext>,
259-
abortable: JoinHandle<()>,
263+
abortable: AbortHandle,
260264
}
261265

262266
impl Drop for PingBalancerInner {

crates/shadowsocks-service/src/local/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,10 @@ pub async fn run(mut config: Config) -> io::Result<()> {
189189
for server in config.server {
190190
balancer_builder.add_server(ServerIdent::new(server));
191191
}
192-
balancer_builder.build().await
192+
let (balancer, checker) = balancer_builder.build().await;
193+
tokio::spawn(checker);
194+
195+
balancer
193196
};
194197

195198
#[cfg(feature = "local-flow-stat")]

crates/shadowsocks-service/src/local/net/udp/association.rs

+29-21
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99

1010
use async_trait::async_trait;
1111
use bytes::Bytes;
12+
use futures::future::{self, AbortHandle};
1213
use log::{debug, error, trace, warn};
1314
use lru_time_cache::LruCache;
1415
use shadowsocks::{
@@ -23,7 +24,6 @@ use spin::Mutex as SpinMutex;
2324
use tokio::{
2425
net::UdpSocket,
2526
sync::{mpsc, Mutex},
26-
task::JoinHandle,
2727
time,
2828
};
2929

@@ -53,8 +53,8 @@ where
5353
respond_writer: W,
5454
context: Arc<ServiceContext>,
5555
assoc_map: SharedAssociationMap<W>,
56-
cleanup_abortable: JoinHandle<()>,
57-
keepalive_abortable: JoinHandle<()>,
56+
cleanup_abortable: AbortHandle,
57+
keepalive_abortable: AbortHandle,
5858
keepalive_tx: mpsc::Sender<SocketAddr>,
5959
balancer: PingBalancer,
6060
}
@@ -89,25 +89,29 @@ where
8989

9090
let cleanup_abortable = {
9191
let assoc_map = assoc_map.clone();
92-
tokio::spawn(async move {
92+
let (cleanup_task, cleanup_abortable) = future::abortable(async move {
9393
loop {
9494
time::sleep(time_to_live).await;
9595

9696
// cleanup expired associations. iter() will remove expired elements
9797
let _ = assoc_map.lock().await.iter();
9898
}
99-
})
99+
});
100+
tokio::spawn(cleanup_task);
101+
cleanup_abortable
100102
};
101103

102-
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(64);
104+
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256);
103105

104106
let keepalive_abortable = {
105107
let assoc_map = assoc_map.clone();
106-
tokio::spawn(async move {
108+
let (keepalive_task, keepalive_abortable) = future::abortable(async move {
107109
while let Some(peer_addr) = keepalive_rx.recv().await {
108110
assoc_map.lock().await.get(&peer_addr);
109111
}
110-
})
112+
});
113+
tokio::spawn(keepalive_task);
114+
keepalive_abortable
111115
};
112116

113117
UdpAssociationManager {
@@ -195,7 +199,7 @@ enum UdpAssociationBypassState {
195199
Empty,
196200
Connected {
197201
socket: Arc<UdpSocket>,
198-
abortable: JoinHandle<io::Result<()>>,
202+
abortable: AbortHandle,
199203
},
200204
Aborted,
201205
}
@@ -213,7 +217,7 @@ impl UdpAssociationBypassState {
213217
UdpAssociationBypassState::Empty
214218
}
215219

216-
fn set_connected(&mut self, socket: Arc<UdpSocket>, abortable: JoinHandle<io::Result<()>>) {
220+
fn set_connected(&mut self, socket: Arc<UdpSocket>, abortable: AbortHandle) {
217221
*self = UdpAssociationBypassState::Connected { socket, abortable };
218222
}
219223

@@ -226,7 +230,7 @@ enum UdpAssociationProxyState {
226230
Empty,
227231
Connected {
228232
socket: Arc<MonProxySocket>,
229-
abortable: JoinHandle<io::Result<()>>,
233+
abortable: AbortHandle,
230234
},
231235
Aborted,
232236
}
@@ -247,7 +251,7 @@ impl UdpAssociationProxyState {
247251
*self = UdpAssociationProxyState::Empty;
248252
}
249253

250-
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: JoinHandle<io::Result<()>>) {
254+
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: AbortHandle) {
251255
self.abort_inner();
252256
*self = UdpAssociationProxyState::Connected { socket, abortable };
253257
}
@@ -390,12 +394,13 @@ where
390394
ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?;
391395
let socket: Arc<UdpSocket> = Arc::new(socket.into());
392396

393-
// CLIENT <- REMOTE
394-
let r2l_abortable = {
397+
let (r2l_fut, r2l_abortable) = {
395398
let assoc = self.clone();
396-
tokio::spawn(assoc.copy_bypassed_r2l(socket.clone()))
399+
future::abortable(assoc.copy_bypassed_r2l(socket.clone()))
397400
};
398401

402+
// CLIENT <- REMOTE
403+
tokio::spawn(r2l_fut);
399404
debug!(
400405
"created udp association for {} (bypassed) with {:?}",
401406
self.peer_addr,
@@ -444,12 +449,13 @@ where
444449
ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?;
445450
let socket: Arc<UdpSocket> = Arc::new(socket.into());
446451

447-
// CLIENT <- REMOTE
448-
let r2l_abortable = {
452+
let (r2l_fut, r2l_abortable) = {
449453
let assoc = self.clone();
450-
tokio::spawn(assoc.copy_bypassed_r2l(socket.clone()))
454+
future::abortable(assoc.copy_bypassed_r2l(socket.clone()))
451455
};
452456

457+
// CLIENT <- REMOTE
458+
tokio::spawn(r2l_fut);
453459
debug!(
454460
"created udp association for {} (bypassed) with {:?}",
455461
self.peer_addr,
@@ -509,12 +515,14 @@ where
509515
let socket = MonProxySocket::from_socket(socket, self.context.flow_stat());
510516
let socket = Arc::new(socket);
511517

512-
// CLIENT <- REMOTE
513-
let r2l_abortable = {
518+
let (r2l_fut, r2l_abortable) = {
514519
let assoc = self.clone();
515-
tokio::spawn(assoc.copy_proxied_r2l(socket.clone()))
520+
future::abortable(assoc.copy_proxied_r2l(socket.clone()))
516521
};
517522

523+
// CLIENT <- REMOTE
524+
tokio::spawn(r2l_fut);
525+
518526
debug!(
519527
"created udp association for {} <-> {} (proxied) with {:?}",
520528
self.peer_addr,

crates/shadowsocks-service/src/local/redir/udprelay/mod.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
};
99

1010
use async_trait::async_trait;
11+
use futures::future::{self, AbortHandle};
1112
use log::{error, info, trace, warn};
1213
use lru_time_cache::LruCache;
1314
use shadowsocks::{
@@ -16,7 +17,7 @@ use shadowsocks::{
1617
relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE},
1718
ServerAddr,
1819
};
19-
use tokio::{sync::Mutex, task::JoinHandle};
20+
use tokio::sync::Mutex;
2021

2122
use crate::{
2223
config::RedirType,
@@ -40,7 +41,7 @@ const INBOUND_SOCKET_CACHE_CAPACITY: usize = 256;
4041

4142
struct UdpRedirInboundCache {
4243
cache: Arc<Mutex<LruCache<SocketAddr, Arc<UdpRedirSocket>>>>,
43-
watcher: JoinHandle<()>,
44+
watcher: AbortHandle,
4445
}
4546

4647
impl Drop for UdpRedirInboundCache {
@@ -56,15 +57,16 @@ impl UdpRedirInboundCache {
5657
INBOUND_SOCKET_CACHE_CAPACITY,
5758
)));
5859

59-
let watcher = {
60+
let (cleanup_fut, watcher) = {
6061
let cache = cache.clone();
61-
tokio::spawn(async move {
62+
future::abortable(async move {
6263
loop {
6364
tokio::time::sleep(INBOUND_SOCKET_CACHE_EXPIRATION).await;
6465
let _ = cache.lock().await.iter();
6566
}
6667
})
6768
};
69+
tokio::spawn(cleanup_fut);
6870

6971
UdpRedirInboundCache { cache, watcher }
7072
}

0 commit comments

Comments
 (0)