Skip to content

Commit 0807f05

Browse files
committed
update rstun (supports arbitrary udp and tcp tunnels)
1 parent ff2c84e commit 0807f05

File tree

4 files changed

+90
-72
lines changed

4 files changed

+90
-72
lines changed

Cargo.lock

Lines changed: 26 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "omnip"
3-
version = "0.6.12"
3+
version = "0.7.0"
44
edition = "2021"
55

66
[lib]
@@ -28,7 +28,7 @@ lazy_static = "1.4"
2828
async-trait = "0.1"
2929
byte-pool = { git = "https://github.com/neevek/byte-pool" }
3030
# rstun = { path = "../rstun" }
31-
rstun = { git = "https://github.com/neevek/rstun", tag = "release/0.6.8" }
31+
rstun = { git = "https://github.com/neevek/rstun", tag = "release/0.7.0" }
3232
hyper = { version = "0.14", features = ["full"]}
3333
http = "0.2"
3434
http-body = "0.4"

src/quic/quic_client.rs

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::QuicClientConfig;
22
use anyhow::Result;
3+
use rstun::{TunnelConfig, TunnelMode, Upstream, UpstreamType};
34
use std::net::SocketAddr;
4-
use tokio::task::JoinHandle;
55

66
pub struct QuicClient {
77
client: rstun::Client,
@@ -19,15 +19,15 @@ impl QuicClient {
1919
}
2020
}
2121

22-
pub async fn start_tcp_server(&mut self) -> Result<SocketAddr> {
23-
Ok(self.client.start_tcp_server().await?.unwrap())
22+
pub async fn start_tcp_server(&mut self, addr: SocketAddr) -> Result<SocketAddr> {
23+
Ok(self.client.start_tcp_server(addr).await?.addr())
2424
}
2525

26-
pub async fn start_udp_server(&mut self) -> Result<SocketAddr> {
27-
Ok(self.client.start_udp_server().await?.unwrap())
26+
pub async fn start_udp_server(&mut self, addr: SocketAddr) -> Result<SocketAddr> {
27+
Ok(self.client.start_udp_server(addr).await?.addr())
2828
}
2929

30-
pub fn connect_and_serve_async(&self) -> JoinHandle<()> {
30+
pub fn connect_and_serve_async(&mut self) {
3131
self.client.connect_and_serve_async()
3232
}
3333

@@ -52,8 +52,31 @@ impl QuicClient {
5252
}
5353

5454
fn set_config(config: &mut rstun::ClientConfig, quic_client_config: &QuicClientConfig) {
55+
let mut tunnels = Vec::new();
56+
if quic_client_config.local_tcp_server_addr.is_some() {
57+
tunnels.push(TunnelConfig {
58+
mode: TunnelMode::Out,
59+
local_server_addr: quic_client_config.local_tcp_server_addr,
60+
upstream: Upstream {
61+
upstream_addr: None,
62+
upstream_type: UpstreamType::Tcp,
63+
},
64+
});
65+
}
66+
67+
if quic_client_config.local_udp_server_addr.is_some() {
68+
tunnels.push(TunnelConfig {
69+
mode: TunnelMode::Out,
70+
local_server_addr: quic_client_config.local_udp_server_addr,
71+
upstream: Upstream {
72+
upstream_addr: None,
73+
upstream_type: UpstreamType::Udp,
74+
},
75+
});
76+
}
77+
78+
config.tunnels = tunnels;
5579
config.server_addr = quic_client_config.server_addr.to_string();
56-
config.mode = rstun::TUNNEL_MODE_OUT;
5780
config.password = quic_client_config.common_cfg.password.clone();
5881
config.cert_path = quic_client_config.common_cfg.cert.clone();
5982
config.cipher = quic_client_config.common_cfg.cipher.clone();
@@ -62,18 +85,6 @@ impl QuicClient {
6285
config.udp_timeout_ms = quic_client_config.common_cfg.udp_timeout_ms;
6386
config.wait_before_retry_ms = quic_client_config.common_cfg.retry_interval_ms;
6487
config.workers = quic_client_config.common_cfg.workers;
65-
config.local_tcp_server_addr = quic_client_config.local_tcp_server_addr;
66-
config.local_udp_server_addr = quic_client_config.local_udp_server_addr;
67-
config.tcp_upstream = if quic_client_config.local_tcp_server_addr.is_some() {
68-
Some(rstun::Upstream::PeerDefault)
69-
} else {
70-
None
71-
};
72-
config.udp_upstream = if quic_client_config.local_udp_server_addr.is_some() {
73-
Some(rstun::Upstream::PeerDefault)
74-
} else {
75-
None
76-
};
7788
config.dot_servers = quic_client_config.dot_servers.clone();
7889
config.dns_servers = quic_client_config.name_servers.clone();
7990
}

src/server.rs

Lines changed: 32 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -182,31 +182,30 @@ impl Server {
182182
.clone()
183183
.map_or(false, |p| p == ProtoType::Tcp || p == ProtoType::Udp);
184184

185-
let mut quic_client_join_handle = None;
185+
let mut require_quic_client = false;
186186
if let Some(upstream_addr) = &cfg.upstream_addr {
187187
// connect to QUIC server if it is +quic protocols
188-
let require_quic_client = upstream_addr.is_quic_proto;
188+
require_quic_client = upstream_addr.is_quic_proto;
189189
if require_quic_client {
190190
// connecting to quic server, and it will set relevant upstream address
191-
let join_handle = self
192-
.start_quic_client(
193-
upstream_addr.net_addr.clone(),
194-
self.common_quic_config.clone(),
195-
)
196-
.await?;
191+
self.start_quic_client(
192+
upstream_addr.net_addr.clone(),
193+
self.common_quic_config.clone(),
194+
)
195+
.await?;
197196

198197
if is_tcp_or_udp_proto {
199198
info!(
200199
"start serving {} through quic client",
201200
server_proto.unwrap().format_as_string(false)
202201
);
203-
join_handle.await.ok();
202+
203+
// wait indefinitely here...
204+
std::future::pending::<()>().await;
205+
204206
// directly use the quic client's tcp server or udp server, and return early
205207
return Ok(());
206208
}
207-
208-
// self.tcp_upstream or self.udp_upstream is set accordingly when reaching here
209-
quic_client_join_handle = Some(join_handle);
210209
} else if upstream_addr.proto == Some(ProtoType::Udp) {
211210
// non-quic upstream can only use IP address instead of domain
212211
inner_state!(self, udp_upstream) = upstream_addr.net_addr.to_socket_addr();
@@ -253,10 +252,9 @@ impl Server {
253252
};
254253

255254
// join on the QUIC tunnel after the proxy server is started
256-
if let Some(quic_client_join_handle) = quic_client_join_handle {
257-
info!("join on the quic tunnel...",);
258-
quic_client_join_handle.await.ok();
259-
info!("quic tunnel quit");
255+
if require_quic_client {
256+
// wait indefinitely here...
257+
std::future::pending::<()>().await;
260258
} else if require_quic_server {
261259
let quic_server_config = QuicServerConfig {
262260
server_addr: orig_server_addr,
@@ -335,7 +333,7 @@ impl Server {
335333
&self,
336334
quic_server_addr: NetAddr,
337335
common_quic_config: CommonQuicConfig,
338-
) -> Result<JoinHandle<()>> {
336+
) -> Result<()> {
339337
// if we have to forward tcp/udp through quic tunnel, we can directly use the
340338
// quic client's tcp/udp entry without creating another layer of traffic relay
341339
let cfg = &self.config;
@@ -374,36 +372,23 @@ impl Server {
374372
});
375373
}
376374

377-
let (require_tcp, require_udp) = self.is_tcp_or_udp_server_required();
378-
379-
if require_tcp {
380-
let tcp_server_addr = client.start_tcp_server().await?;
375+
if let Some(addr) = tcp_server_addr {
376+
let tcp_server_addr = client.start_tcp_server(addr).await?;
381377
inner_state!(self, tcp_upstream) = Some(tcp_server_addr);
382378
info!("started quic tcp server: {tcp_server_addr}");
383379
}
384380

385-
if require_udp {
386-
let udp_server_addr = client.start_udp_server().await?;
381+
if let Some(addr) = udp_server_addr {
382+
let udp_server_addr = client.start_udp_server(addr).await?;
387383
inner_state!(self, udp_upstream) = Some(udp_server_addr);
388384
info!("started quic udp server: {udp_server_addr}");
389385
}
390386

391-
// will handover the handle to the caller, so we don't block here
392-
let join_handle = client.connect_and_serve_async();
387+
client.connect_and_serve_async();
393388

394389
inner_state!(self, quic_client) = Some(Arc::new(client));
395390

396-
Ok(join_handle)
397-
}
398-
399-
fn is_tcp_or_udp_server_required(&self) -> (bool, bool) {
400-
self.config
401-
.server_addr
402-
.proto
403-
.as_ref()
404-
.map_or((true, false), |p| {
405-
(*p != ProtoType::Udp, *p == ProtoType::Udp)
406-
})
391+
Ok(())
407392
}
408393

409394
async fn init_resolver(self: &mut Arc<Self>) {
@@ -671,13 +656,17 @@ impl Server {
671656
}
672657

673658
MatchResult::Proxy if upstream.is_some() => {
674-
outbound_type = match params.upstream_type.as_ref().unwrap() {
675-
ProtoType::Http => OutboundType::HttpProxy,
676-
ProtoType::Socks4 => OutboundType::SocksProxy(SocksVersion::V4),
677-
ProtoType::Socks5 => OutboundType::SocksProxy(SocksVersion::V5),
678-
ProtoType::Tcp | ProtoType::Udp => {
659+
outbound_type = match params.upstream_type.as_ref() {
660+
Some(ProtoType::Http) => OutboundType::HttpProxy,
661+
Some(ProtoType::Socks4) => OutboundType::SocksProxy(SocksVersion::V4),
662+
Some(ProtoType::Socks5) => OutboundType::SocksProxy(SocksVersion::V5),
663+
Some(ProtoType::Tcp | ProtoType::Udp) => {
679664
unreachable!("not valid proxy type")
680665
}
666+
None => {
667+
error!("protocol is required for upstream");
668+
return Err(ProxyError::InternalError);
669+
}
681670
};
682671

683672
debug!(
@@ -1263,13 +1252,8 @@ impl Api for Server {
12631252
base_common_quic_config.quic_timeout_ms = config.idle_timeout;
12641253
base_common_quic_config.retry_interval_ms = config.retry_interval;
12651254
let upstream_addr = NetAddr::from_str(config.upstream_addr.as_str())?;
1266-
let quic_client_join_handle = self
1267-
.start_quic_client(upstream_addr, base_common_quic_config)
1268-
.await;
1269-
1270-
tokio::spawn(async move { quic_client_join_handle.unwrap().await });
1271-
1272-
Ok(())
1255+
self.start_quic_client(upstream_addr, base_common_quic_config)
1256+
.await
12731257
}
12741258
}
12751259

0 commit comments

Comments
 (0)