Skip to content

Commit 7fca23f

Browse files
authored
vmbus_client: use existing mesh channels to send revoke (#637)
In the vmbus client, instead of sending a revoke over the general notify channel (which is hard to multiplex between multiple users of the vmbus client), notify a user of a revoke by closing the per-channel notification channel. This is the last non-offer use of the notify channel. Change the notify channel to just an offer channel.
1 parent b6f9509 commit 7fca23f

File tree

5 files changed

+105
-112
lines changed

5 files changed

+105
-112
lines changed

Cargo.lock

+1
Original file line numberDiff line numberDiff line change
@@ -8026,6 +8026,7 @@ version = "0.0.0"
80268026
dependencies = [
80278027
"anyhow",
80288028
"futures",
8029+
"futures-concurrency",
80298030
"guid",
80308031
"inspect",
80318032
"mesh",

vm/devices/vmbus/vmbus_client/src/lib.rs

+24-35
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl VmbusClient {
9494
/// Creates a new instance with a receiver for incoming synic messages.
9595
pub fn new(
9696
synic: Arc<dyn SynicClient>,
97-
notify_send: mesh::Sender<ClientNotification>,
97+
offer_send: mesh::Sender<OfferInfo>,
9898
msg_source: impl VmbusMessageSource + 'static,
9999
spawner: &impl Spawn,
100100
) -> Self {
@@ -113,7 +113,7 @@ impl VmbusClient {
113113
inner,
114114
task_recv,
115115
running: false,
116-
notify_send,
116+
offer_send,
117117
msg_source,
118118
client_request_recv,
119119
state: ClientState::Disconnected,
@@ -287,12 +287,6 @@ pub struct OfferInfo {
287287
pub response_recv: mesh::Receiver<ChannelResponse>,
288288
}
289289

290-
#[derive(Debug)]
291-
pub enum ClientNotification {
292-
Offer(OfferInfo),
293-
Revoke(ChannelId),
294-
}
295-
296290
#[derive(Debug)]
297291
enum ClientRequest {
298292
InitiateContact(Rpc<InitiateContactRequest, Result<VersionInfo, ConnectError>>),
@@ -495,7 +489,7 @@ struct ClientTask<T: VmbusMessageSource> {
495489
running: bool,
496490
modify_request: Option<Rpc<ModifyConnectionRequest, ConnectionState>>,
497491
msg_source: T,
498-
notify_send: mesh::Sender<ClientNotification>,
492+
offer_send: mesh::Sender<OfferInfo>,
499493
task_recv: mesh::Receiver<TaskRequest>,
500494
client_request_recv: mesh::Receiver<ClientRequest>,
501495
}
@@ -707,7 +701,7 @@ impl<T: VmbusMessageSource> ClientTask<T> {
707701
if let ClientState::RequestingOffers(_, send) = &self.state {
708702
send.send(offer_info);
709703
} else {
710-
self.notify_send.send(ClientNotification::Offer(offer_info));
704+
self.offer_send.send(offer_info);
711705
}
712706
}
713707
}
@@ -733,6 +727,8 @@ impl<T: VmbusMessageSource> ClientTask<T> {
733727
.response_send
734728
.send(ChannelResponse::TeardownGpadl(gpadl_id));
735729
} else {
730+
// TODO: is this really necessary? The host should have
731+
// already unmapped all GPADLs. Remove if possible.
736732
send_message(
737733
self.inner.synic.as_ref(),
738734
&protocol::GpadlTeardown {
@@ -748,16 +744,20 @@ impl<T: VmbusMessageSource> ClientTask<T> {
748744
false
749745
});
750746

747+
// Drop the channel, which will close the response channel, which will
748+
// cause the client to know the channel has been revoked.
749+
//
750+
// TODO: this is wrong--client requests can still come in after this,
751+
// and they will fail to find the channel by channel ID and panic (or
752+
// worse, the channel ID will get reused). Either find and drop the
753+
// associated incoming request channel here, or keep this channel object
754+
// around until the client is done with it.
751755
self.inner.channels.remove(&rescind.channel_id);
752756

753757
// Tell the host we're not referencing the client ID anymore.
754758
self.inner.send(&protocol::RelIdReleased {
755759
channel_id: rescind.channel_id,
756760
});
757-
758-
// At this point the offer can be revoked from the relay.
759-
self.notify_send
760-
.send(ClientNotification::Revoke(rescind.channel_id));
761761
}
762762

763763
fn handle_offers_delivered(&mut self) {
@@ -1549,23 +1549,19 @@ mod tests {
15491549

15501550
impl VmbusMessageSource for TestMessageSource {}
15511551

1552-
fn test_init() -> (
1553-
Arc<TestServer>,
1554-
VmbusClient,
1555-
mesh::Receiver<ClientNotification>,
1556-
) {
1552+
fn test_init() -> (Arc<TestServer>, VmbusClient, mesh::Receiver<OfferInfo>) {
15571553
let pool = DefaultPool::new();
15581554
let driver = pool.driver();
15591555
let (msg_send, msg_recv) = mesh::channel();
15601556
let server = Arc::new(TestServer {
15611557
messages: Mutex::new(Vec::new()),
15621558
send: msg_send,
15631559
});
1564-
let (notify_send, notify_recv) = mesh::channel();
1560+
let (offer_send, offer_recv) = mesh::channel();
15651561

15661562
let mut client = VmbusClient::new(
15671563
Arc::new(server.clone()),
1568-
notify_send,
1564+
offer_send,
15691565
TestMessageSource { msg_recv },
15701566
&driver,
15711567
);
@@ -1574,7 +1570,7 @@ mod tests {
15741570
.spawn(move || pool.run())
15751571
.unwrap();
15761572

1577-
(server, client, notify_recv)
1573+
(server, client, offer_recv)
15781574
}
15791575

15801576
#[async_test]
@@ -1997,7 +1993,7 @@ mod tests {
19971993

19981994
#[async_test]
19991995
async fn test_hot_add_remove() {
2000-
let (server, mut client, mut notify_recv) = test_init();
1996+
let (server, mut client, mut offer_recv) = test_init();
20011997

20021998
server.connect(&mut client).await;
20031999
let offer = protocol::OfferChannel {
@@ -2017,9 +2013,7 @@ mod tests {
20172013
};
20182014

20192015
server.send(in_msg(MessageType::OFFER_CHANNEL, offer));
2020-
let ClientNotification::Offer(info) = notify_recv.next().await.unwrap() else {
2021-
panic!("invalid request")
2022-
};
2016+
let mut info = offer_recv.next().await.unwrap();
20232017

20242018
assert_eq!(offer, info.offer);
20252019

@@ -2037,8 +2031,7 @@ mod tests {
20372031
})
20382032
);
20392033

2040-
let request = notify_recv.next().await.unwrap();
2041-
assert!(matches!(request, ClientNotification::Revoke(ChannelId(5))));
2034+
assert!(info.response_recv.next().await.is_none());
20422035
}
20432036

20442037
#[async_test]
@@ -2144,7 +2137,7 @@ mod tests {
21442137

21452138
#[async_test]
21462139
async fn test_gpadl_with_revoke() {
2147-
let (server, mut client, mut notify_recv) = test_init();
2140+
let (server, mut client, _offer_recv) = test_init();
21482141
let mut channel = server.get_channel(&mut client).await;
21492142
let channel_id = ChannelId(0);
21502143
let gpadl_id = GpadlId(1);
@@ -2208,11 +2201,7 @@ mod tests {
22082201
OutgoingMessage::new(&protocol::RelIdReleased { channel_id })
22092202
);
22102203

2211-
let ClientNotification::Revoke(id) = notify_recv.next().await.unwrap() else {
2212-
panic!("invalid request")
2213-
};
2214-
2215-
assert_eq!(id, channel_id);
2204+
assert!(channel.response_recv.next().await.is_none());
22162205
}
22172206

22182207
#[async_test]
@@ -2250,7 +2239,7 @@ mod tests {
22502239

22512240
#[async_test]
22522241
async fn test_hvsock() {
2253-
let (server, mut client, _notify_recv) = test_init();
2242+
let (server, mut client, _offer_recv) = test_init();
22542243
server.connect(&mut client).await;
22552244
let request = HvsockConnectRequest {
22562245
service_id: Guid::new_random(),

0 commit comments

Comments
 (0)