Skip to content

Commit 8c9c1c3

Browse files
committed
Add unix domain socket for telemetry server
1 parent cd7061c commit 8c9c1c3

File tree

4 files changed

+141
-36
lines changed

4 files changed

+141
-36
lines changed

examples/http_server/example_conf.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ telemetry:
5757
# Enables telemetry server
5858
enabled: true
5959
# Telemetry server address.
60+
# Can be either a TCP socket address (e.g., "127.0.0.1:8080")
61+
# or a Unix domain socket path (e.g., "/tmp/telemetry.sock") on Unix systems.
6062
addr: "127.0.0.1:0"
63+
# Example Unix socket configuration (uncomment to use):
64+
# addr: "/tmp/telemetry.sock"
6165
# HTTP endpoints configuration.
6266
endpoints:
6367
Example endpoint:

foundations/src/telemetry/driver.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::task::{Context, Poll};
99

1010
feature_use!(cfg(feature = "telemetry-server"), {
1111
use super::server::TelemetryServerFuture;
12-
use std::net::SocketAddr;
1312
});
1413

1514
/// A future that drives async telemetry functionality and that is returned
@@ -21,7 +20,7 @@ feature_use!(cfg(feature = "telemetry-server"), {
2120
/// [security syscall-related]: `crate::security`
2221
pub struct TelemetryDriver {
2322
#[cfg(feature = "telemetry-server")]
24-
server_addr: Option<SocketAddr>,
23+
server_addr: Option<String>,
2524

2625
#[cfg(feature = "telemetry-server")]
2726
server_fut: Option<TelemetryServerFuture>,
@@ -36,7 +35,7 @@ impl TelemetryDriver {
3635
) -> Self {
3736
Self {
3837
#[cfg(feature = "telemetry-server")]
39-
server_addr: server_fut.as_ref().map(|fut| fut.local_addr()),
38+
server_addr: server_fut.as_ref().and_then(|fut| fut.local_addr().ok()),
4039

4140
#[cfg(feature = "telemetry-server")]
4241
server_fut,
@@ -49,8 +48,8 @@ impl TelemetryDriver {
4948
///
5049
/// Returns `None` if the server wasn't spawned.
5150
#[cfg(feature = "telemetry-server")]
52-
pub fn server_addr(&self) -> Option<SocketAddr> {
53-
self.server_addr
51+
pub fn server_addr(&self) -> Option<String> {
52+
self.server_addr.clone()
5453
}
5554

5655
/// Instructs the telemetry driver and server to perform an orderly shutdown when the given

foundations/src/telemetry/server/mod.rs

Lines changed: 83 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#[cfg(feature = "metrics")]
22
use super::metrics;
3-
use super::settings::TelemetrySettings;
3+
use super::settings::{TelemetryServerAddr, TelemetrySettings};
44
use crate::telemetry::log;
55
use crate::BootstrapResult;
66
use anyhow::Context as _;
@@ -14,18 +14,33 @@ use std::net::SocketAddr;
1414
use std::pin::Pin;
1515
use std::sync::Arc;
1616
use std::task::{Context, Poll};
17+
use tokio::io::{AsyncRead, AsyncWrite};
1718
use tokio::net::TcpListener;
19+
#[cfg(unix)]
20+
use tokio::net::{TcpStream, UnixListener, UnixStream};
1821
use tokio::sync::watch;
1922

2023
mod router;
2124

2225
use router::Router;
26+
27+
trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + Unpin + 'static {}
28+
29+
impl AsyncReadWrite for TcpStream {}
30+
#[cfg(unix)]
31+
impl AsyncReadWrite for UnixStream {}
32+
33+
enum TelemetryListener {
34+
Tcp(TcpListener),
35+
#[cfg(unix)]
36+
Unix(UnixListener),
37+
}
2338
pub use router::{
2439
BoxError, TelemetryRouteHandler, TelemetryRouteHandlerFuture, TelemetryServerRoute,
2540
};
2641

2742
pub(super) struct TelemetryServerFuture {
28-
listener: TcpListener,
43+
listener: TelemetryListener,
2944
router: Router,
3045
}
3146

@@ -47,27 +62,52 @@ impl TelemetryServerFuture {
4762
.map_err(|err| anyhow::anyhow!(err))?;
4863
}
4964

50-
let addr = settings.server.addr;
51-
52-
#[cfg(feature = "settings")]
53-
let addr = SocketAddr::from(addr);
54-
55-
let router = Router::new(custom_routes, settings);
56-
57-
let listener = {
58-
let std_listener = std::net::TcpListener::from(
59-
bind_socket(addr).with_context(|| format!("binding to socket {addr:?}"))?,
60-
);
65+
let router = Router::new(custom_routes, settings.clone());
66+
67+
let listener = match &settings.server.addr {
68+
TelemetryServerAddr::Tcp(addr) => {
69+
#[cfg(feature = "settings")]
70+
let addr = SocketAddr::from(*addr);
71+
72+
let std_listener = std::net::TcpListener::from(
73+
bind_socket(addr).with_context(|| format!("binding to TCP socket {addr:?}"))?,
74+
);
75+
std_listener.set_nonblocking(true)?;
76+
let tokio_listener = tokio::net::TcpListener::from_std(std_listener)?;
77+
TelemetryListener::Tcp(tokio_listener)
78+
}
79+
#[cfg(unix)]
80+
TelemetryServerAddr::Unix(path) => {
81+
if let Some(parent) = path.parent() {
82+
if !parent.exists() {
83+
std::fs::create_dir_all(parent).with_context(|| {
84+
format!("creating parent directories for Unix socket {path:?}")
85+
})?;
86+
}
87+
}
6188

62-
std_listener.set_nonblocking(true)?;
89+
if path.exists() {
90+
std::fs::remove_file(path)
91+
.with_context(|| format!("removing existing Unix socket {path:?}"))?;
92+
}
6393

64-
tokio::net::TcpListener::from_std(std_listener)?
94+
let unix_listener = UnixListener::bind(path)
95+
.with_context(|| format!("binding to Unix socket {path:?}"))?;
96+
TelemetryListener::Unix(unix_listener)
97+
}
6598
};
6699

67100
Ok(Some(TelemetryServerFuture { listener, router }))
68101
}
69-
pub(super) fn local_addr(&self) -> SocketAddr {
70-
self.listener.local_addr().unwrap()
102+
pub(super) fn local_addr(&self) -> BootstrapResult<String> {
103+
match &self.listener {
104+
TelemetryListener::Tcp(listener) => Ok(listener.local_addr()?.to_string()),
105+
#[cfg(unix)]
106+
TelemetryListener::Unix(listener) => match listener.local_addr()?.as_pathname() {
107+
Some(path) => Ok(path.display().to_string()),
108+
None => Ok("<unnamed>".to_string()),
109+
},
110+
}
71111
}
72112

73113
// Adapted from Hyper 0.14 Server stuff and axum::serve::serve.
@@ -87,15 +127,21 @@ impl TelemetryServerFuture {
87127
let (close_tx, close_rx) = watch::channel(());
88128
let listener = self.listener;
89129

90-
pin_mut!(listener);
91-
92130
loop {
93131
let socket = tokio::select! {
94-
conn = listener.accept() => match conn {
132+
conn = async {
133+
match &listener {
134+
TelemetryListener::Tcp(listener) => listener.accept().await.map(|(conn, addr)| (Box::new(conn) as Box<dyn AsyncReadWrite>, addr.to_string())),
135+
#[cfg(unix)]
136+
TelemetryListener::Unix(listener) => listener.accept().await.map(|(conn, addr)| {
137+
let addr_str = addr.as_pathname().map(|p| p.display().to_string()).unwrap_or_else(|| "<unnamed>".to_string());
138+
(Box::new(conn) as Box<dyn AsyncReadWrite>, addr_str)
139+
}),
140+
}
141+
} => match conn {
95142
Ok((conn, _)) => TokioIo::new(conn),
96143
Err(e) => {
97144
log::warn!("failed to accept connection"; "error" => e);
98-
99145
continue;
100146
}
101147
},
@@ -140,13 +186,22 @@ impl Future for TelemetryServerFuture {
140186
let this = &mut *self;
141187

142188
loop {
143-
let socket = match ready!(Pin::new(&mut this.listener).poll_accept(cx)) {
144-
Ok((conn, _)) => TokioIo::new(conn),
145-
Err(e) => {
146-
log::warn!("failed to accept connection"; "error" => e);
147-
148-
continue;
149-
}
189+
let socket = match &mut this.listener {
190+
TelemetryListener::Tcp(listener) => match ready!(listener.poll_accept(cx)) {
191+
Ok((conn, _)) => TokioIo::new(Box::new(conn) as Box<dyn AsyncReadWrite>),
192+
Err(e) => {
193+
log::warn!("failed to accept connection"; "error" => e);
194+
continue;
195+
}
196+
},
197+
#[cfg(unix)]
198+
TelemetryListener::Unix(listener) => match ready!(listener.poll_accept(cx)) {
199+
Ok((conn, _)) => TokioIo::new(Box::new(conn) as Box<dyn AsyncReadWrite>),
200+
Err(e) => {
201+
log::warn!("failed to accept connection"; "error" => e);
202+
continue;
203+
}
204+
},
150205
};
151206

152207
let router = this.router.clone();

foundations/src/telemetry/settings/server.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,53 @@ use crate::settings::settings;
55
use std::net::Ipv4Addr;
66
#[cfg(not(feature = "settings"))]
77
use std::net::SocketAddr;
8+
use std::path::PathBuf;
9+
10+
/// Telemetry server address - can be either TCP or Unix socket
11+
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
12+
#[serde(untagged)]
13+
pub enum TelemetryServerAddr {
14+
/// TCP socket address
15+
Tcp(SocketAddr),
16+
/// Unix domain socket path
17+
#[cfg(unix)]
18+
Unix(PathBuf),
19+
}
20+
21+
impl Default for TelemetryServerAddr {
22+
fn default() -> Self {
23+
let addr: std::net::SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
24+
25+
#[cfg(feature = "settings")]
26+
let addr = addr.into();
27+
28+
TelemetryServerAddr::Tcp(addr)
29+
}
30+
}
31+
32+
impl From<SocketAddr> for TelemetryServerAddr {
33+
fn from(addr: SocketAddr) -> Self {
34+
TelemetryServerAddr::Tcp(addr)
35+
}
36+
}
37+
38+
impl From<std::net::SocketAddr> for TelemetryServerAddr {
39+
fn from(addr: std::net::SocketAddr) -> Self {
40+
#[cfg(feature = "settings")]
41+
let addr = addr.into();
42+
TelemetryServerAddr::Tcp(addr)
43+
}
44+
}
45+
46+
#[cfg(unix)]
47+
impl From<PathBuf> for TelemetryServerAddr {
48+
fn from(path: PathBuf) -> Self {
49+
TelemetryServerAddr::Unix(path)
50+
}
51+
}
52+
53+
#[cfg(feature = "settings")]
54+
impl crate::settings::Settings for TelemetryServerAddr {}
855

956
/// Telemetry server settings.
1057
#[cfg_attr(feature = "settings", settings(crate_path = "crate"))]
@@ -16,7 +63,7 @@ pub struct TelemetryServerSettings {
1663

1764
/// Telemetry server address.
1865
#[serde(default = "TelemetryServerSettings::default_addr")]
19-
pub addr: SocketAddr,
66+
pub addr: TelemetryServerAddr,
2067
}
2168

2269
#[cfg(not(feature = "settings"))]
@@ -34,12 +81,12 @@ impl TelemetryServerSettings {
3481
true
3582
}
3683

37-
fn default_addr() -> SocketAddr {
84+
fn default_addr() -> TelemetryServerAddr {
3885
let addr: std::net::SocketAddr = (Ipv4Addr::LOCALHOST, 0).into();
3986

4087
#[cfg(feature = "settings")]
4188
let addr = addr.into();
4289

43-
addr
90+
TelemetryServerAddr::Tcp(addr)
4491
}
4592
}

0 commit comments

Comments
 (0)