Skip to content

Commit 8f065a1

Browse files
committed
Port to smol async runtime
* Use smol instead of tokio * Use flume instead of futures::mpsc Signed-off-by: Teddy Astie <[email protected]>
1 parent 776edcf commit 8f065a1

File tree

25 files changed

+552
-131
lines changed

25 files changed

+552
-131
lines changed

Cargo.lock

Lines changed: 444 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

guest-metrics/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ repository.workspace = true
66
categories.workspace = true
77

88
[dependencies]
9-
futures = "0.3"
9+
flume = "0.11.1"
1010
uuid = "1.11"
1111
os_info = "3"

guest-metrics/src/plugin.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
use std::future::Future;
22

3-
use futures::channel::mpsc;
4-
53
use crate::GuestMetric;
64

75
pub trait GuestAgentPlugin {
8-
fn run(self, channel: mpsc::Sender<GuestMetric>) -> impl Future<Output = ()> + Send;
6+
fn run(self, channel: flume::Sender<GuestMetric>) -> impl Future<Output = ()> + Send;
97
}
108

119
pub trait GuestAgentPublisher {
12-
fn run(self, channel: mpsc::Receiver<GuestMetric>) -> impl Future<Output = ()> + Send;
10+
fn run(self, channel: flume::Receiver<GuestMetric>) -> impl Future<Output = ()> + Send;
1311
}

providers/provider-memory/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ edition = "2021"
66
[dependencies]
77
guest-metrics = { path = "../../guest-metrics" }
88
futures = "0.3"
9-
tokio = { version = "1.25.0", features = ["time"] }
9+
flume = "0.11.1"
10+
smol = "2.0.2"
1011

1112
[target.'cfg(target_os = "windows")'.dependencies]
1213
windows = { version = "0.58", features = ["Win32_System_SystemInformation"] }

providers/provider-memory/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{io, time::Duration};
22

3-
use futures::{channel::mpsc, SinkExt};
3+
use futures::StreamExt;
44
use guest_metrics::{plugin::GuestAgentPlugin, MemoryInfo};
55

66
#[cfg(target_os = "freebsd")]
@@ -30,16 +30,16 @@ pub type PlatformMemorySource = windows::WindowsMemorySource;
3030
pub struct MemoryPlugin;
3131

3232
impl GuestAgentPlugin for MemoryPlugin {
33-
async fn run(self, mut channel: mpsc::Sender<guest_metrics::GuestMetric>) {
34-
let mut timer = tokio::time::interval(Duration::from_secs_f32(5.0));
33+
async fn run(self, channel: flume::Sender<guest_metrics::GuestMetric>) {
34+
let mut timer = smol::Timer::interval(Duration::from_secs_f32(5.0));
3535
let mut memory_source =
3636
PlatformMemorySource::new().expect("Unable to get memory information");
3737

3838
loop {
39-
timer.tick().await;
39+
timer.next().await;
4040

4141
if channel
42-
.send(guest_metrics::GuestMetric::Memory(MemoryInfo {
42+
.send_async(guest_metrics::GuestMetric::Memory(MemoryInfo {
4343
mem_free: memory_source.get_available_kb().unwrap(),
4444
mem_total: memory_source.get_total_kb().unwrap(),
4545
}))

providers/provider-netlink/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ edition = "2021"
77
futures = "0.3"
88
guest-metrics = { path = "../../guest-metrics" }
99
tokio = { version = "1", features = ["rt"] }
10+
smol = "2.0.2"
11+
flume = "0.11.1"
1012
uuid = "1.11"
1113
anyhow = "*"
1214
log = "*"

providers/provider-netlink/src/lib.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@ use std::{collections::HashMap, io};
33
use guest_metrics::{plugin::GuestAgentPlugin, GuestMetric, NetEvent, NetEventOp, NetInterface};
44
use vif_detect::VifDetector;
55

6-
use futures::{
7-
channel::mpsc::{self, UnboundedReceiver},
8-
SinkExt, StreamExt,
9-
};
6+
use futures::{channel::mpsc::UnboundedReceiver, StreamExt};
107
use uuid::Uuid;
118

129
use netlink_packet_core::{
@@ -49,7 +46,7 @@ impl NetlinkConnection {
4946
pub struct NetlinkPlugin;
5047

5148
impl GuestAgentPlugin for NetlinkPlugin {
52-
async fn run(self, mut channel: mpsc::Sender<GuestMetric>) {
49+
async fn run(self, channel: flume::Sender<GuestMetric>) {
5350
let connection = NetlinkConnection::new().unwrap();
5451
let vif_identify = vif_detect::PlatformVifDetector::default();
5552
let mut interfaces = HashMap::new();
@@ -91,7 +88,7 @@ impl GuestAgentPlugin for NetlinkPlugin {
9188
while let Some(msg) = stream.next().await {
9289
if let NetlinkPayload::InnerMessage(inner_msg) = msg.payload {
9390
if let Err(e) =
94-
process_message(inner_msg, &mut channel, &vif_identify, &mut interfaces).await
91+
process_message(inner_msg, &channel, &vif_identify, &mut interfaces).await
9592
{
9693
log::error!("Unable to process netlink message: {e}");
9794
}
@@ -102,7 +99,7 @@ impl GuestAgentPlugin for NetlinkPlugin {
10299

103100
async fn process_message(
104101
inner_msg: RouteNetlinkMessage,
105-
channel: &mut mpsc::Sender<GuestMetric>,
102+
channel: &flume::Sender<GuestMetric>,
106103
vif_identify: &impl VifDetector,
107104
interfaces: &mut HashMap<u32, Uuid>,
108105
) -> anyhow::Result<()> {
@@ -145,7 +142,7 @@ async fn process_message(
145142

146143
interfaces.insert(link_message.header.index, uuid);
147144
channel
148-
.send(GuestMetric::AddIface(NetInterface {
145+
.send_async(GuestMetric::AddIface(NetInterface {
149146
uuid,
150147
index: link_message.header.index,
151148
name: ifname.clone(),
@@ -158,7 +155,7 @@ async fn process_message(
158155
return Ok(());
159156
};
160157

161-
channel.send(GuestMetric::RmIface(uuid)).await.ok();
158+
channel.send_async(GuestMetric::RmIface(uuid)).await.ok();
162159
}
163160
RouteNetlinkMessage::NewAddress(address_message)
164161
| RouteNetlinkMessage::GetAddress(address_message) => {
@@ -184,7 +181,7 @@ async fn process_message(
184181
};
185182

186183
channel
187-
.send(GuestMetric::Network(NetEvent {
184+
.send_async(GuestMetric::Network(NetEvent {
188185
iface_id,
189186
op: NetEventOp::AddIp(addr),
190187
}))
@@ -213,7 +210,7 @@ async fn process_message(
213210
};
214211

215212
channel
216-
.send(GuestMetric::Network(NetEvent {
213+
.send_async(GuestMetric::Network(NetEvent {
217214
iface_id,
218215
op: NetEventOp::RmIp(addr),
219216
}))

providers/provider-os/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ categories.workspace = true
77

88
[dependencies]
99
guest-metrics = { path = "../../guest-metrics" }
10-
futures = "0.3"
10+
smol = "2.0.2"
11+
flume = "0.11.1"
1112

1213
[target.'cfg(unix)'.dependencies]
1314
uname = "0.1.1"

providers/provider-os/src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use futures::{channel::mpsc, SinkExt};
21
use guest_metrics::{os_info, plugin::GuestAgentPlugin, GuestMetric, KernelInfo, OsInfo};
32
use std::io;
43

@@ -20,11 +19,11 @@ pub fn collect_kernel() -> io::Result<Option<KernelInfo>> {
2019
pub struct OsInfoPlugin;
2120

2221
impl GuestAgentPlugin for OsInfoPlugin {
23-
async fn run(self, mut channel: mpsc::Sender<guest_metrics::GuestMetric>) {
22+
async fn run(self, channel: flume::Sender<guest_metrics::GuestMetric>) {
2423
let kernel_info = collect_kernel().expect("Unable to fetch kernel information");
2524

2625
channel
27-
.send(GuestMetric::OperatingSystem(OsInfo {
26+
.send_async(GuestMetric::OperatingSystem(OsInfo {
2827
os_info: os_info::get(),
2928
kernel_info,
3029
}))

providers/provider-simple/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ edition = "2021"
77
futures = "0.3"
88
guest-metrics = { path = "../../guest-metrics" }
99
uuid = { version = "1.11", features = ["v4"] }
10-
tokio = { version = "1", features = ["time"] }
10+
smol = "2.0.2"
11+
flume = "0.11.1"
1112

1213
vif-detect = { path = "../vif-detect" }
1314
network-interface = "2.0"

0 commit comments

Comments
 (0)