Skip to content

Commit 60cf7ca

Browse files
Refactor BRP to allow for 3rd-party transports (#15438)
## Objective Closes #15408 (somewhat) ## Solution - Moved the existing HTTP transport to its own module with its own plugin (`RemoteHttpPlugin`) (disabled on WASM) - Swapped out the `smol` crate for the smaller crates it re-exports to make it easier to keep out non-wasm code (HTTP transport needs `async-io` which can't build on WASM) - Added a new public `BrpSender` resource holding the matching sender for the `BrpReceiver`' (formally `BrpMailbox`). This allows other crates to send `BrpMessage`'s to the "mailbox". ## Testing TODO --------- Co-authored-by: Matty <[email protected]>
1 parent e788e3b commit 60cf7ca

File tree

5 files changed

+285
-233
lines changed

5 files changed

+285
-233
lines changed

crates/bevy_remote/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ hyper = { version = "1", features = ["server", "http1"] }
2727
serde = { version = "1", features = ["derive"] }
2828
serde_json = { version = "1" }
2929
http-body-util = "0.1"
30+
async-channel = "2"
3031

3132
# dependencies that will not compile on wasm
3233
[target.'cfg(not(target_family = "wasm"))'.dependencies]
33-
smol = "2"
34+
async-io = "2"
3435
smol-hyper = "0.1"
3536

3637
[lints]

crates/bevy_remote/src/http.rs

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
//! The BRP transport using JSON-RPC over HTTP.
2+
//!
3+
//! Adding the [`RemoteHttpPlugin`] to your [`App`] causes Bevy to accept
4+
//! connections over HTTP (by default, on port 15702) while your app is running.
5+
//!
6+
//! Clients are expected to `POST` JSON requests to the root URL; see the `client`
7+
//! example for a trivial example of use.
8+
9+
#![cfg(not(target_family = "wasm"))]
10+
11+
use crate::{error_codes, BrpBatch, BrpError, BrpMessage, BrpRequest, BrpResponse, BrpSender};
12+
use anyhow::Result as AnyhowResult;
13+
use async_channel::Sender;
14+
use async_io::Async;
15+
use bevy_app::{App, Plugin, Startup};
16+
use bevy_ecs::system::{Res, Resource};
17+
use bevy_tasks::IoTaskPool;
18+
use core::net::{IpAddr, Ipv4Addr};
19+
use http_body_util::{BodyExt as _, Full};
20+
use hyper::{
21+
body::{Bytes, Incoming},
22+
server::conn::http1,
23+
service, Request, Response,
24+
};
25+
use serde_json::Value;
26+
use smol_hyper::rt::{FuturesIo, SmolTimer};
27+
use std::net::TcpListener;
28+
use std::net::TcpStream;
29+
30+
/// The default port that Bevy will listen on.
31+
///
32+
/// This value was chosen randomly.
33+
pub const DEFAULT_PORT: u16 = 15702;
34+
35+
/// The default host address that Bevy will use for its server.
36+
pub const DEFAULT_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
37+
38+
/// Add this plugin to your [`App`] to allow remote connections over HTTP to inspect and modify entities.
39+
/// It requires the [`RemotePlugin`](super::RemotePlugin).
40+
///
41+
/// This BRP transport cannot be used when targeting WASM.
42+
///
43+
/// The defaults are:
44+
/// - [`DEFAULT_ADDR`] : 127.0.0.1.
45+
/// - [`DEFAULT_PORT`] : 15702.
46+
pub struct RemoteHttpPlugin {
47+
/// The address that Bevy will bind to.
48+
address: IpAddr,
49+
/// The port that Bevy will listen on.
50+
port: u16,
51+
}
52+
53+
impl Default for RemoteHttpPlugin {
54+
fn default() -> Self {
55+
Self {
56+
address: DEFAULT_ADDR,
57+
port: DEFAULT_PORT,
58+
}
59+
}
60+
}
61+
62+
impl Plugin for RemoteHttpPlugin {
63+
fn build(&self, app: &mut App) {
64+
app.insert_resource(HostAddress(self.address))
65+
.insert_resource(HostPort(self.port))
66+
.add_systems(Startup, start_http_server);
67+
}
68+
}
69+
70+
impl RemoteHttpPlugin {
71+
/// Set the IP address that the server will use.
72+
#[must_use]
73+
pub fn with_address(mut self, address: impl Into<IpAddr>) -> Self {
74+
self.address = address.into();
75+
self
76+
}
77+
78+
/// Set the remote port that the server will listen on.
79+
#[must_use]
80+
pub fn with_port(mut self, port: u16) -> Self {
81+
self.port = port;
82+
self
83+
}
84+
}
85+
86+
/// A resource containing the IP address that Bevy will host on.
87+
///
88+
/// Currently, changing this while the application is running has no effect; this merely
89+
/// reflects the IP address that is set during the setup of the [`RemoteHttpPlugin`].
90+
#[derive(Debug, Resource)]
91+
pub struct HostAddress(pub IpAddr);
92+
93+
/// A resource containing the port number that Bevy will listen on.
94+
///
95+
/// Currently, changing this while the application is running has no effect; this merely
96+
/// reflects the host that is set during the setup of the [`RemoteHttpPlugin`].
97+
#[derive(Debug, Resource)]
98+
pub struct HostPort(pub u16);
99+
100+
/// A system that starts up the Bevy Remote Protocol HTTP server.
101+
fn start_http_server(
102+
request_sender: Res<BrpSender>,
103+
address: Res<HostAddress>,
104+
remote_port: Res<HostPort>,
105+
) {
106+
IoTaskPool::get()
107+
.spawn(server_main(
108+
address.0,
109+
remote_port.0,
110+
request_sender.clone(),
111+
))
112+
.detach();
113+
}
114+
115+
/// The Bevy Remote Protocol server main loop.
116+
async fn server_main(
117+
address: IpAddr,
118+
port: u16,
119+
request_sender: Sender<BrpMessage>,
120+
) -> AnyhowResult<()> {
121+
listen(
122+
Async::<TcpListener>::bind((address, port))?,
123+
&request_sender,
124+
)
125+
.await
126+
}
127+
128+
async fn listen(
129+
listener: Async<TcpListener>,
130+
request_sender: &Sender<BrpMessage>,
131+
) -> AnyhowResult<()> {
132+
loop {
133+
let (client, _) = listener.accept().await?;
134+
135+
let request_sender = request_sender.clone();
136+
IoTaskPool::get()
137+
.spawn(async move {
138+
let _ = handle_client(client, request_sender).await;
139+
})
140+
.detach();
141+
}
142+
}
143+
144+
async fn handle_client(
145+
client: Async<TcpStream>,
146+
request_sender: Sender<BrpMessage>,
147+
) -> AnyhowResult<()> {
148+
http1::Builder::new()
149+
.timer(SmolTimer::new())
150+
.serve_connection(
151+
FuturesIo::new(client),
152+
service::service_fn(|request| process_request_batch(request, &request_sender)),
153+
)
154+
.await?;
155+
156+
Ok(())
157+
}
158+
159+
/// A helper function for the Bevy Remote Protocol server that handles a batch
160+
/// of requests coming from a client.
161+
async fn process_request_batch(
162+
request: Request<Incoming>,
163+
request_sender: &Sender<BrpMessage>,
164+
) -> AnyhowResult<Response<Full<Bytes>>> {
165+
let batch_bytes = request.into_body().collect().await?.to_bytes();
166+
let batch: Result<BrpBatch, _> = serde_json::from_slice(&batch_bytes);
167+
168+
let serialized = match batch {
169+
Ok(BrpBatch::Single(request)) => {
170+
serde_json::to_string(&process_single_request(request, request_sender).await?)?
171+
}
172+
Ok(BrpBatch::Batch(requests)) => {
173+
let mut responses = Vec::new();
174+
175+
for request in requests {
176+
responses.push(process_single_request(request, request_sender).await?);
177+
}
178+
179+
serde_json::to_string(&responses)?
180+
}
181+
Err(err) => {
182+
let err = BrpResponse::new(
183+
None,
184+
Err(BrpError {
185+
code: error_codes::INVALID_REQUEST,
186+
message: err.to_string(),
187+
data: None,
188+
}),
189+
);
190+
191+
serde_json::to_string(&err)?
192+
}
193+
};
194+
195+
Ok(Response::new(Full::new(Bytes::from(
196+
serialized.as_bytes().to_owned(),
197+
))))
198+
}
199+
200+
/// A helper function for the Bevy Remote Protocol server that processes a single
201+
/// request coming from a client.
202+
async fn process_single_request(
203+
request: Value,
204+
request_sender: &Sender<BrpMessage>,
205+
) -> AnyhowResult<BrpResponse> {
206+
// Reach in and get the request ID early so that we can report it even when parsing fails.
207+
let id = request.as_object().and_then(|map| map.get("id")).cloned();
208+
209+
let request: BrpRequest = match serde_json::from_value(request) {
210+
Ok(v) => v,
211+
Err(err) => {
212+
return Ok(BrpResponse::new(
213+
id,
214+
Err(BrpError {
215+
code: error_codes::INVALID_REQUEST,
216+
message: err.to_string(),
217+
data: None,
218+
}),
219+
));
220+
}
221+
};
222+
223+
if request.jsonrpc != "2.0" {
224+
return Ok(BrpResponse::new(
225+
id,
226+
Err(BrpError {
227+
code: error_codes::INVALID_REQUEST,
228+
message: String::from("JSON-RPC request requires `\"jsonrpc\": \"2.0\"`"),
229+
data: None,
230+
}),
231+
));
232+
}
233+
234+
let (result_sender, result_receiver) = async_channel::bounded(1);
235+
236+
let _ = request_sender
237+
.send(BrpMessage {
238+
method: request.method,
239+
params: request.params,
240+
sender: result_sender,
241+
})
242+
.await;
243+
244+
let result = result_receiver.recv().await?;
245+
Ok(BrpResponse::new(request.id, result))
246+
}

0 commit comments

Comments
 (0)