Skip to content

Update axum #2713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 14 additions & 37 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ anymap = "0.12"
arrayvec = "0.7.2"
async-stream = "0.3.6"
async-trait = "0.1.68"
axum = { version = "0.7", features = ["tracing"] }
axum-extra = { version = "0.9", features = ["typed-header"] }
axum = { version = "0.8.4", features = ["tracing", "ws"] }
axum-extra = { version = "0.10", features = ["typed-header"] }
backtrace = "0.3.66"
base64 = "0.21.2"
bigdecimal = "0.4.7"
Expand Down
1 change: 0 additions & 1 deletion crates/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ futures = "0.3"
bytes = "1"
tracing.workspace = true
bytestring = "1"
tokio-tungstenite.workspace = true
itoa.workspace = true
derive_more = "0.99.17"
uuid.workspace = true
Expand Down
2 changes: 0 additions & 2 deletions crates/client-api/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ pub struct SpacetimeAuthHeader {
auth: Option<SpacetimeAuth>,
}

#[async_trait::async_trait]
impl<S: NodeDelegate + Send + Sync> axum::extract::FromRequestParts<S> for SpacetimeAuthHeader {
type Rejection = AuthorizationRejection;
async fn from_request_parts(parts: &mut request::Parts, state: &S) -> Result<Self, Self::Rejection> {
Expand Down Expand Up @@ -341,7 +340,6 @@ impl SpacetimeAuthHeader {

pub struct SpacetimeAuthRequired(pub SpacetimeAuth);

#[async_trait::async_trait]
impl<S: NodeDelegate + Send + Sync> axum::extract::FromRequestParts<S> for SpacetimeAuthRequired {
type Rejection = AuthorizationRejection;
async fn from_request_parts(parts: &mut request::Parts, state: &S) -> Result<Self, Self::Rejection> {
Expand Down
4 changes: 2 additions & 2 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,14 +778,14 @@ where
.route("/names", self.names_put)
.route("/identity", self.identity_get)
.route("/subscribe", self.subscribe_get)
.route("/call/:reducer", self.call_reducer_post)
.route("/call/{reducer}", self.call_reducer_post)
.route("/schema", self.schema_get)
.route("/logs", self.logs_get)
.route("/sql", self.sql_post);

axum::Router::new()
.route("/", self.root_post)
.nest("/:name_or_identity", db_router)
.nest("/{name_or_identity}", db_router)
.route_layer(axum::middleware::from_fn_with_state(ctx, anon_auth_middleware::<S>))
}
}
2 changes: 1 addition & 1 deletion crates/client-api/src/routes/energy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
{
use axum::routing::get;
axum::Router::new().route(
"/:identity",
"/{identity}",
get(get_energy_balance::<S>)
.put(set_energy_balance::<S>)
.post(add_energy::<S>),
Expand Down
4 changes: 2 additions & 2 deletions crates/client-api/src/routes/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,6 @@ where
.route("/", post(create_identity::<S>))
.route("/public-key", get(get_public_key::<S>))
.route("/websocket-token", post(create_websocket_token::<S>))
.route("/:identity/verify", get(validate_token))
.route("/:identity/databases", get(get_databases::<S>))
.route("/{identity}/verify", get(validate_token))
.route("/{identity}/databases", get(get_databases::<S>))
}
85 changes: 41 additions & 44 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::mem;
use std::pin::{pin, Pin};
use std::time::Duration;

use axum::extract::ws;
use axum::extract::{Path, Query, State};
use axum::response::IntoResponse;
use axum::Extension;
Expand All @@ -24,12 +25,8 @@ use spacetimedb_client_api_messages::websocket::{self as ws_api, Compression};
use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Utf8Bytes;

use crate::auth::SpacetimeAuth;
use crate::util::websocket::{
CloseCode, CloseFrame, Message as WsMessage, WebSocketConfig, WebSocketStream, WebSocketUpgrade,
};
use crate::util::{NameOrIdentity, XForwardedFor};
use crate::{log_and_500, ControlStateDelegate, NodeDelegate};

Expand Down Expand Up @@ -68,7 +65,7 @@ pub async fn handle_websocket<S>(
}): Query<SubscribeQueryParams>,
forwarded_for: Option<TypedHeader<XForwardedFor>>,
Extension(auth): Extension<SpacetimeAuth>,
ws: WebSocketUpgrade,
ws: ws::WebSocketUpgrade,
) -> axum::response::Result<impl IntoResponse>
where
S: NodeDelegate + ControlStateDelegate,
Expand All @@ -91,8 +88,17 @@ where

let db_identity = name_or_identity.resolve(&ctx).await?;

let (res, ws_upgrade, protocol) =
ws.select_protocol([(BIN_PROTOCOL, Protocol::Binary), (TEXT_PROTOCOL, Protocol::Text)]);
let ws = ws.protocols([ws_api::BIN_PROTOCOL, ws_api::TEXT_PROTOCOL]);

let protocol = ws.selected_protocol().and_then(|proto| {
if proto == BIN_PROTOCOL {
Some(Protocol::Binary)
} else if proto == TEXT_PROTOCOL {
Some(Protocol::Text)
} else {
None
}
});

let protocol = protocol.ok_or((StatusCode::BAD_REQUEST, "no valid protocol selected"))?;
let client_config = ClientConfig {
Expand Down Expand Up @@ -125,20 +131,13 @@ where
name: ctx.client_actor_index().next_client_name(),
};

let ws_config = WebSocketConfig::default()
.max_message_size(Some(0x2000000))
.max_frame_size(None)
.accept_unmasked_frames(false);

tokio::spawn(async move {
let ws = match ws_upgrade.upgrade(ws_config).await {
Ok(ws) => ws,
Err(err) => {
log::error!("WebSocket init error: {}", err);
return;
}
};
let ws = ws
.max_message_size(0x2000000)
.max_frame_size(usize::MAX)
.accept_unmasked_frames(false)
.on_failed_upgrade(|err| log::error!("WebSocket init error: {}", err));

let res = ws.on_upgrade(move |ws| async move {
match forwarded_for {
Some(TypedHeader(XForwardedFor(ip))) => {
log::debug!("New client connected from ip {}", ip)
Expand Down Expand Up @@ -180,7 +179,7 @@ where

const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60);

async fn ws_client_actor(client: ClientConnection, ws: WebSocketStream, sendrx: mpsc::Receiver<SerializableMessage>) {
async fn ws_client_actor(client: ClientConnection, ws: ws::WebSocket, sendrx: mpsc::Receiver<SerializableMessage>) {
// ensure that even if this task gets cancelled, we always cleanup the connection
let mut client = scopeguard::guard(client, |client| {
tokio::spawn(client.disconnect());
Expand All @@ -201,7 +200,7 @@ async fn make_progress<Fut: Future>(fut: &mut Pin<&mut MaybeDone<Fut>>) {

async fn ws_client_actor_inner(
client: &mut ClientConnection,
mut ws: WebSocketStream,
mut ws: ws::WebSocket,
mut sendrx: mpsc::Receiver<SerializableMessage>,
) {
let mut liveness_check_interval = tokio::time::interval(LIVELINESS_TIMEOUT);
Expand Down Expand Up @@ -280,7 +279,7 @@ async fn ws_client_actor_inner(
let workload = msg.workload();
let num_rows = msg.num_rows();

let msg = datamsg_to_wsmsg(serialize(msg, client.config));
let msg = serialize(msg, client.config);

// These metrics should be updated together,
// or not at all.
Expand All @@ -295,7 +294,7 @@ async fn ws_client_actor_inner(
.observe(msg.len() as f64);
}
// feed() buffers the message, but does not necessarily send it
ws.feed(msg).await?;
ws.feed(datamsg_to_wsmsg(msg)).await?;
}
// now we flush all the messages to the socket
ws.flush().await
Expand Down Expand Up @@ -323,7 +322,7 @@ async fn ws_client_actor_inner(
// Send a close frame while continuing to poll the `handle_queue`,
// to avoid deadlocks or delays due to enqueued futures holding resources.
let close = also_poll(
ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() })),
ws.send(ws::Message::Close(Some(ws::CloseFrame { code: ws::close_code::AWAY, reason: "module exited".into() }))),
make_progress(&mut current_message),
);
if let Err(e) = close.await {
Expand All @@ -341,7 +340,7 @@ async fn ws_client_actor_inner(
if mem::take(&mut got_pong) {
// Send a ping message while continuing to poll the `handle_queue`,
// to avoid deadlocks or delays due to enqueued futures holding resources.
if let Err(e) = also_poll(ws.send(WsMessage::Ping(Bytes::new())), make_progress(&mut current_message)).await {
if let Err(e) = also_poll(ws.send(ws::Message::Ping(Bytes::new())), make_progress(&mut current_message)).await {
log::warn!("error sending ping: {e:#}");
}
continue;
Expand Down Expand Up @@ -376,10 +375,10 @@ async fn ws_client_actor_inner(
}
log::debug!("Client caused error on text message: {}", e);
if let Err(e) = ws
.close(Some(CloseFrame {
code: CloseCode::Error,
.send(ws::Message::Close(Some(ws::CloseFrame {
code: ws::close_code::ERROR,
reason: format!("{e:#}").into(),
}))
})))
.await
{
log::warn!("error closing websocket: {e:#}")
Expand Down Expand Up @@ -419,34 +418,32 @@ enum ClientMessage {
Message(DataMessage),
Ping(Bytes),
Pong(Bytes),
Close(Option<CloseFrame>),
Close(Option<ws::CloseFrame>),
}
impl ClientMessage {
fn from_message(msg: WsMessage) -> Self {
fn from_message(msg: ws::Message) -> Self {
match msg {
WsMessage::Text(s) => Self::Message(DataMessage::Text(utf8bytes_to_bytestring(s))),
WsMessage::Binary(b) => Self::Message(DataMessage::Binary(b)),
WsMessage::Ping(b) => Self::Ping(b),
WsMessage::Pong(b) => Self::Pong(b),
WsMessage::Close(frame) => Self::Close(frame),
// WebSocket::read_message() never returns a raw Message::Frame
WsMessage::Frame(_) => unreachable!(),
ws::Message::Text(s) => Self::Message(DataMessage::Text(utf8bytes_to_bytestring(s))),
ws::Message::Binary(b) => Self::Message(DataMessage::Binary(b)),
ws::Message::Ping(b) => Self::Ping(b),
ws::Message::Pong(b) => Self::Pong(b),
ws::Message::Close(frame) => Self::Close(frame),
}
}
}

fn datamsg_to_wsmsg(msg: DataMessage) -> WsMessage {
fn datamsg_to_wsmsg(msg: DataMessage) -> ws::Message {
match msg {
DataMessage::Text(text) => WsMessage::Text(bytestring_to_utf8bytes(text)),
DataMessage::Binary(bin) => WsMessage::Binary(bin),
DataMessage::Text(text) => ws::Message::Text(bytestring_to_utf8bytes(text)),
DataMessage::Binary(bin) => ws::Message::Binary(bin),
}
}

fn utf8bytes_to_bytestring(s: Utf8Bytes) -> ByteString {
fn utf8bytes_to_bytestring(s: ws::Utf8Bytes) -> ByteString {
// SAFETY: `Utf8Bytes` and `ByteString` have the same invariant of UTF-8 validity
unsafe { ByteString::from_bytes_unchecked(Bytes::from(s)) }
}
fn bytestring_to_utf8bytes(s: ByteString) -> Utf8Bytes {
fn bytestring_to_utf8bytes(s: ByteString) -> ws::Utf8Bytes {
// SAFETY: `Utf8Bytes` and `ByteString` have the same invariant of UTF-8 validity
unsafe { Utf8Bytes::from_bytes_unchecked(s.into_bytes()) }
unsafe { ws::Utf8Bytes::try_from(s.into_bytes()).unwrap_unchecked() }
}
Loading
Loading